97 lines
2.1 KiB
Python
97 lines
2.1 KiB
Python
from contextlib import asynccontextmanager
|
|
from typing import Literal
|
|
|
|
import aiohttp
|
|
from pydantic import BaseModel, Field
|
|
|
|
API_URL = "https://api.n2yo.com/rest/v1/satellite"
|
|
|
|
Compass = Literal[
|
|
"N",
|
|
"NNE",
|
|
"NE",
|
|
"ENE",
|
|
"E",
|
|
"ESE",
|
|
"SE",
|
|
"SSE",
|
|
"S",
|
|
"SSW",
|
|
"SW",
|
|
"WSW",
|
|
"W",
|
|
"WNW",
|
|
"NW",
|
|
"NNW",
|
|
]
|
|
|
|
|
|
class PassInfo(BaseModel):
|
|
sat_id: int = Field(alias="satid")
|
|
sat_name: str = Field(alias="satname")
|
|
transaction_count: int = Field(alias="transactionscount")
|
|
pass_count: int = Field(alias="passescount")
|
|
|
|
|
|
class RadioPass(BaseModel):
|
|
start_az: float = Field(alias="startAz")
|
|
start_az_compass: Compass = Field(alias="startAzCompass")
|
|
start_utc: int = Field(alias="startUTC")
|
|
|
|
max_az: float = Field(alias="maxAz")
|
|
max_az_compass: Compass = Field(alias="maxAzCompass")
|
|
max_el: float = Field(alias="maxEl")
|
|
max_utc: int = Field(alias="maxUTC")
|
|
|
|
end_az: float = Field(alias="endAz")
|
|
end_az_compass: Compass = Field(alias="endAzCompass")
|
|
end_utc: int = Field(alias="endUTC")
|
|
|
|
|
|
class RadioPasses(BaseModel):
|
|
info: PassInfo
|
|
passes: list[RadioPass] | None
|
|
|
|
|
|
class N2YO:
|
|
def __init__(self, api_key: str):
|
|
self.api_key = api_key
|
|
self.client = aiohttp.ClientSession()
|
|
|
|
async def get_radio_passes(
|
|
self,
|
|
norad: int,
|
|
latitude: float,
|
|
longitude: float,
|
|
altitude: float,
|
|
days: int,
|
|
min_elevation: int,
|
|
):
|
|
url = "/".join(
|
|
str(s)
|
|
for s in [
|
|
API_URL,
|
|
"radiopasses",
|
|
norad,
|
|
latitude,
|
|
longitude,
|
|
altitude,
|
|
days,
|
|
min_elevation,
|
|
]
|
|
)
|
|
|
|
async with self.client.get(url, params={"apiKey": self.api_key}) as request:
|
|
response = await request.json()
|
|
|
|
return RadioPasses.parse_obj(response)
|
|
|
|
|
|
@asynccontextmanager
|
|
async def n2yo_api(key: str):
|
|
api = N2YO(key)
|
|
try:
|
|
yield api
|
|
finally:
|
|
await api.client.close()
|