controlpanel/server/server/db.py

37 lines
922 B
Python

from datetime import datetime
from bson import ObjectId
import motor.motor_asyncio as motor
from .settings import settings
client: motor.AsyncIOMotorClient = motor.AsyncIOMotorClient(settings.mongo_uri)
db = client["controlpanel"]
events: motor.AsyncIOMotorCollection = db["events"]
async def create_event(value: any):
event = {
"timestamp": datetime.now(),
"value": value,
}
new_event = await events.insert_one(event)
created_event = await events.find_one({"_id": new_event.inserted_id})
return created_event
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid objectid")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")