import os import csv from datetime import datetime, timedelta import requests from dotenv import load_dotenv TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" OUTPUT_FILE = "out/{year:04d}-{month:02d}-{day:02d}.csv" ACCURACY = 1 # Minutes load_dotenv() load_dotenv("secret.env") def expect_env(key: str) -> str: value = os.environ.get(key) if value is None: raise Exception(f"{key} was not provided!") return value url = expect_env("URL") token = expect_env("TOKEN") org = expect_env("ORG") def request_day(year: int, month: int, day: int): start_time = datetime(year, month, day, 0, 0) stop_time = start_time + timedelta(days=1) query = f'from(bucket: "kba") \ |> range(start: {start_time.strftime(TIME_FORMAT)}, \ stop: {stop_time.strftime(TIME_FORMAT)}) \ |> filter(fn: (r) => r["_measurement"] == "kba") \ |> filter(fn: (r) => r["_field"] == "visitor_count")' response = requests.post( f"{url}/api/v2/query?orgID={org}", headers={ "Authorization": f"Token {token}", "Accept": "application/csv", "Content-type": "application/vnd.flux", }, data=query, ) assert response.status_code == 200, f"Got response code {response.status_code}" reader = csv.DictReader(response.text.split("\n")) with open(OUTPUT_FILE.format(year=year, month=month, day=day), "w") as f: writer = csv.writer(f) for row in reader: time = row["_time"] value = row["_value"] if datetime.fromisoformat(time[:-1]).minute % ACCURACY == 0: writer.writerow((time, value)) day = datetime.fromisoformat(expect_env("START_DAY")) now = datetime.now() count = 0 while day < now: count += 1 day += timedelta(days=1) if not os.path.isfile(OUTPUT_FILE.format( year = day.year, month = day.month, day = day.day )): print(f"requesting {day.isoformat()}") request_day(day.year, day.month, day.day) print(f"{count} days")