slaeforms/slaeforms/app.py

558 lines
22 KiB
Python

import sys
import json
import random
import base64
from flask import Flask, redirect, url_for, request, session, make_response, jsonify, send_from_directory
from flask import render_template
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Integer, String, Column, Float
from datetime import datetime
import uuid
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import select, join
from sqlalchemy.orm import DeclarativeBase
from flask_wtf.csrf import CSRFProtect
import os
import csv
random_order = True
# activate environment: cd C:\Users\Jan\Google Drive\Master Stuff\Code\SLAEForms Testing\.venv\Scripts\
# then this: activate
#SETUP--------------------------------------------------
#Set up sqlalchemy
class Base(DeclarativeBase):
pass
db = SQLAlchemy(model_class=Base)
#create the app
app = Flask(__name__)
# configure the database, give it a path (it will be in the instances folder)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///database.db"
app.config["PERMANENT_SESSION_LIFETIME"] = 10800 #3 Stunden, 10800 sekunden
db.init_app(app)
#set the secret key (TODO change this for final deployment)
app.secret_key = b"29fe9e8edd407c5491d4f1c05632d9fa33e26ed8734a3f5e080ebac3772a555a"
UPLOAD_FOLDER = 'uploads'
EXPORT_FOLDER = 'exports'
#csrf = CSRFProtect(app) #enable CSRF protection globally
#------------------------------------------------------------------------
# Setting up DB Models
# This table is always created, tracks all users
class User(db.Model):
user_id = db.Column("user_id",db.UUID(as_uuid=True), primary_key=True, nullable=False)
device_id = db.Column("device_id",db.UUID(as_uuid=True), nullable=False)
question_order = db.Column("question_order",db.String(60))
date_created = db.Column("date_created",db.DateTime, default=datetime.today()) # todo test if this default works
def __repr__(self) -> str:
return "<User %r>" % self.user_id
# create the table (existing tables are not overwritten)
try:
with app.app_context():
db.create_all()
except SQLAlchemyError as e:
print("Error occurred during database creation:", str(e))
#-----------------------------------------------------------------------------
#open, parse and execute json file
#open the json file with the config
configfile = open("default.json", encoding='utf-8') #todo replace with other name
#convert it to dict
config = json.load(configfile)
configfile.close()
db_tables = {} # contains all dynamically created tables, key = table/classname
#TODO insert code to create all tables
def create_json_tables():
print(config)
for block_key, block_content in config.items():
if "database_table" in block_content:
if not (block_content["database_table"]["table_name"] in db_tables):
print("New table: \n {table}".format(table=block_content["database_table"]["table_name"]))
db_tables[block_content["database_table"]["table_name"]]=create_model_class(block_content["database_table"])
print("created table: {table}".format(table=db_tables[block_content["database_table"]["table_name"]]))
print("tables in db_tables: \n {db_tables}".format(db_tables=db_tables))
def create_model_class(schema):
class_name = schema["table_name"].capitalize()
print("creating table class: {class_name}".format(class_name=class_name))
# Define class attributes dynamically
attributes = {"__tablename__": schema["table_name"]}
# id as key and date as standard fields
attributes["id"] = Column("id",db.UUID(as_uuid=True), primary_key=True, nullable=False)
attributes["user_id"] = Column("user_id",db.UUID(as_uuid=True), nullable=False)
attributes["date_created"] = db.Column("date_created",db.DateTime)
attributes["stimulus_name"] = db.Column("stimulus_name",db.String(30))
for column_name, column_info in schema["fields"].items():
if column_info["type"] == "integer":
column_type = Integer
elif column_info["type"] == "string":
column_type = String(int(column_info["size"]))
if column_info["type"] == "float":
column_type = Float
attributes[column_name] = Column(column_name,column_type, nullable=column_info["nullable"])
print("attributes of the table: ",attributes)
# Create the model class
return type(class_name, (db.Model,), attributes)
create_json_tables()
# create the table (existing tables are not overwritten)
try:
print("try to create tables")
with app.app_context():
db.create_all()
print("successfully created all tables")
except SQLAlchemyError as e:
print("Error occurred during database creation:", str(e))
#------------------------------------------------------------------------------
#actual page logic with start, form and send
@app.route("/start", methods=["GET", "POST"])
def startpage():
session.permanent = False
if not "slaeform_device_id" in session:
# If this device was not seen, remember it.
new_device_id = uuid.uuid4()
session["slaeform_device_id"] = new_device_id
session["agreed_to_tos"] = False
if request.method == "POST":
#right now if a user that has an active session goes to the startpage again and accepts tos
#it will just start another session and discard the previous session
#config is the dict with the info
#get the block names
block_names = config.keys()
session["block_names"] = list(block_names)
session["block_order"] = {} # only for templates, for each block the list of keys for stimuli
session["current_block_index"] = 0
session["current_stimulus_index"] = 0
session["current_block_name"] = session["block_names"][session["current_block_index"]]
session["number_of_blocks"] = len(session["block_names"])
current_block = config[session["current_block_name"]]
session["number_of_stimuli"] = 0
# if the block has stimuli, get how many
if "stimuli" in current_block:
if current_block["stimuli"]["type"] == "single_video" or current_block["stimuli"]["type"] == "empty":
session["number_of_stimuli"] = len(list(current_block["stimuli"]["list"]))
elif current_block["stimuli"]["type"] == "double_video":
session["number_of_stimuli"] = len(list(current_block["stimuli"]["list_1"]))
print("number of blocks: ",len(session["block_names"]))
print("Startpage post")
print(session["block_names"])
for name in block_names:
if config[name]["type"] == "TaskTemplate" and ("stimuli" in current_block):
match config[name]["stimuli"]["type"]:
case "single_video":
order = list(config[name]["stimuli"]["list"]) # order = list of simuli keys
print("order: ",order)
if config[name]["stimuli"]["order"] == "random":
random.shuffle(order) #in random order
session["block_order"][name] = order
case "double_video":
order = [] # order = list of stimuli keys
list_1 = list(current_block["stimuli"]["list_1"])
list_2 = list(current_block["stimuli"]["list_2"])
for i in range(len(list(current_block["stimuli"]["list_1"]))):
order.append((list_1[i], list_2[i]))
print("order: ",order)
#TODO random is not implemented here
session["block_order"][name] = order
case "empty":
order = list(config[name]["stimuli"]["list"]) # order = list of simuli keys
print("order: ",order)
session["block_order"][name] = order
if "stimuli" in current_block:
#get the name of the current stimulus
session["current_stimulus_name"] = session["block_order"][session["current_block_name"]][session["current_stimulus_index"]]
#save the new user to the database and the session
session["agreed_to_tos"] = True
new_user_id = uuid.uuid4()
session["slaeform_user_id"] = new_user_id
device_id = session["slaeform_device_id"]
user_id = new_user_id
question_order = str(session["block_order"])
date = datetime.today()
new_user = User(user_id=user_id, device_id=device_id,question_order=question_order,date_created = date) #,question_order=question_order
db.session.add(new_user)
db.session.commit()
print("block order: {order}".format(order=session["block_order"]))
try:
db.session.add(new_user)
db.session.commit()
except:
return "There was a problem while adding the user to the Database"
return redirect("/form")
return render_template(
"teststartpage.html"
)
@app.route("/form")
def form():
#user is not yet registered and should not be here
if not "slaeform_user_id" in session:
return redirect("/start") #TODO replace this later with actual startpage
current_block = config[session["current_block_name"]]
print("form")
print("current_block_name: {current_block_name}".format(current_block_name=session["current_block_name"]))
print("current_block_index: {current_block_order}".format(current_block_order=session["current_block_index"]))
print("current_stimulus: {current_stimulus}".format(current_stimulus=session["current_stimulus_index"]))
#print("current Blockname: {blockname}, current block index: {blockindex}, current stim index: {stimulusindex}".format(blockname=session["current_block_name"],
# blockindex=session["current_block_index"],
# stimulusindex=session["current_stimulus_index"] ) )
# erster Fall: SinglePage
if current_block["type"] == "SinglePage":
return render_template(current_block["template"])
#zweiter Fall, empty TaskTemplate
if current_block["type"] == "TaskTemplate" and current_block["stimuli"]["type"] == "empty":
current_block_order = session["block_order"][session["current_block_name"]]
current_block_stimuli = current_block["stimuli"]
current_stimulus = current_block_order[session["current_stimulus_index"]]
stimulus_type=current_block["stimuli"]["type"]
return render_template(
"standard_template.html",
stimuli=current_block_stimuli,
stimulus_type=stimulus_type,
current_stimulus=current_stimulus,
questions=current_block["questions"]
)
# ansonsten, templates:
current_block_order = session["block_order"][session["current_block_name"]]
current_block_stimuli = current_block["stimuli"]
current_stimulus = current_block_order[session["current_stimulus_index"]]
stimulus_type=current_block["stimuli"]["type"]
stimulus_configuration = current_block["stimuli"]["configuration"] # dict with the config
if current_block["type"] == "TaskTemplate":
print("case: TaskTemplate")
match stimulus_type:
case "single_video":
stimulus_configuration["video_url"] = config[session["current_block_name"]]["stimuli"]["list"][current_stimulus]
print("-------------videourl: ", stimulus_configuration["video_url"])
case "double_video":
stimulus_configuration["video_url1"] = config[session["current_block_name"]]["stimuli"]["list_1"][current_stimulus[0]]
stimulus_configuration["video_url2"] = config[session["current_block_name"]]["stimuli"]["list_2"][current_stimulus[1]]
return render_template(
"standard_template.html",
stimuli=current_block_stimuli,
stimulus_type=stimulus_type,
current_stimulus=current_stimulus,
stimulus_configuration=stimulus_configuration,
questions=current_block["questions"]
)
return "Error, none of the Blocks triggered"
@app.route("/send", methods=["POST"])
def sendpage():
print("send")
# Do I need to write to a table at all?
# I can figure it out by checking if the current block has a database field, that is best
if not ("database_table" in config[session["current_block_name"]]): #it has no database field, so nothing to receive
# so just move on
print("no database table")
update_session()
return redirect("/form")
# now to if it has a database field
# find out which table we need to write to
table_name = config[session["current_block_name"]]["database_table"]["table_name"]
print("Form posted: {rqform}".format(rqform=request.form))
print("Writing to table: {table_name}".format(table_name=table_name))
session_user_id = session["slaeform_user_id"]
new_id = uuid.uuid4()
date = datetime.today()
stimulus_name = str(session["current_stimulus_name"])
new_entry = db_tables[table_name](id=new_id,user_id = session_user_id,date_created = date,stimulus_name=stimulus_name)
for key, value in request.form.items():
print("hasattr key: ", key)
if hasattr(new_entry, key):
print("key exists: ", key)
setattr(new_entry, key, value)
print("setattr value: ", value)
print("entry: ", new_entry)
try:
db.session.add(new_entry)
db.session.commit()
except Exception as e:
print("Error occurred: {e}".format(e=str(e)))
return "There was a problem while adding the response to the Database"
# handle possible Video that was send
if 'recordedVideo' in request.files:
video = request.files['recordedVideo']
formatted_date = date.strftime("%Y.%m.%d %H-%M-%S")
print("date: ", date)
video_name = str(session_user_id) + "_" + session["current_block_name"] + "_" + session["current_stimulus_name"] + "_" + str(formatted_date) + ".webm"
path = os.path.join(UPLOAD_FOLDER, video_name)
print("path: ",path)
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
video.save(path)
# Now move to the next stimulus or block
update_session()
print("now redirect and reload the page")
return redirect("/form")
def update_session():
if "stimuli" in config[session["current_block_name"]]:
# if there are stimuli in this block
if session["current_stimulus_index"] < session["number_of_stimuli"]-1:
# if there are still stimuli left, keep going through them
session["current_stimulus_index"] += 1
# set the name of the current stimulus
session["current_stimulus_name"] = session["block_order"][session["current_block_name"]][session["current_stimulus_index"]]
else:
# if there are no stimuli left..
if(session["current_block_index"] < session["number_of_blocks"]-1):
# go to next block if possible
session["current_block_index"] += 1
session["current_block_name"] = session["block_names"][session["current_block_index"]]
session["current_stimulus_index"] = 0
if "stimuli" in config[session["current_block_name"]]:
session["number_of_stimuli"] = len(list(config[session["current_block_name"]]["stimuli"]["list"]))
else:
# if there arent any stimuli, go to the next block
session["number_of_stimuli"] = 0
session["current_stimulus_index"] = 0
if(session["current_block_index"] < session["number_of_blocks"]-1):
session["current_block_index"] += 1
session["current_block_name"] = session["block_names"][session["current_block_index"]]
if (session["current_block_index"] == session["number_of_blocks"]-1) and (session["current_stimulus_index"] >= session["number_of_stimuli"]-1):
session.pop("slaeform_user_id")
print("---Session updated---")
print("current_block_index / number_of_blocks: {current_block_index} / {number_of_blocks}".format(current_block_index=session["current_block_index"],number_of_blocks=session["number_of_blocks"]))
print("current_block_name: ", session["current_block_name"])
print("current_stimulus_index: ", session["current_stimulus_index"])
print("Current number_of_stimuli: ", session["number_of_stimuli"])
# Database stuff------------------------------------------------------------------------------
def create_csv(model, filename):
filename = filename + ".csv"
# Query all data from the table
data = db.session.query(model).all()
# Get the column names from the model
column_names = [column.name for column in model.__table__.columns]
# Open a CSV file and write data
path = os.path.join(EXPORT_FOLDER, filename)
with open(path, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(column_names) # Write header
for row in data:
writer.writerow([getattr(row, column) for column in column_names])
# export CSV
@app.route("/export_csv")
def export_csv():
create_csv(User, "usertable")
"""
meta = db.metadata
tables = meta.tables.keys()
print("tables: ",tables)
print("testquerys:")
qtable = meta.tables["default_demographic_test"]
query1 = select(qtable).where(qtable.c.alter == 78)
print("Query 1: ", query1)
print("Query 1 result: ")
result = db.session.execute(query1)
print("Columns: ", result.columns)
for row in result:
print(row)
"""
return redirect("/")
# the contents of all tables
@app.route("/table_contents")
def table_contents():
meta = db.metadata
#meta.reflect(db.engine) # Uncomment this to also get the hidden tables, but this crashes rn
tables = meta.tables.keys()
table_contents = {}
#Testing querys for exporting etc
print("tables: ",tables)
print("testquerys:")
qtable = meta.tables["default_demographic_test"]
query1 = select(qtable).where(qtable.c.alter == 78)
print("Query 1: ", query1)
print("Query 1 result: ")
result = db.session.execute(query1)
print("Columns: ", result.columns)
for row in result:
print(row)
#Test End
for table_name in tables:
table = meta.tables[table_name]
columns = table.columns.keys()
#print(table)
rows = db.session.query(table)
try:
rows = rows.all()
except Exception as e:
print("Error occurred: {e}".format(e=str(e)))
print("Values:\n Table = {table}\n Tabletype = {tabletype}\n".format(table=table,tabletype=type(table)))
table_contents[table_name] = {
'columns': columns,
'rows': rows
}
return render_template(
"table_contents.html",
table_contents=table_contents,
)
@app.route('/show_tables')
def show_tables():
meta = db.metadata
meta.reflect(db.engine)
tables = meta.tables
return render_template('show_tables.html', tables=tables)
# Control Panel ---------------------------------------------------------
@app.route("/upload_configs")
def upload_configs():
links = []
for rule in app.url_map.iter_rules():
# Filter out rules we can't navigate to in a browser
# and rules that require parameters
if "GET" in rule.methods and has_no_empty_params(rule):
url = url_for(rule.endpoint, **(rule.defaults or {}))
links.append((url, rule.endpoint))
return render_template("all_links.html", links=links)
# Root page -----------------------------
def has_no_empty_params(rule):
defaults = rule.defaults if rule.defaults is not None else ()
arguments = rule.arguments if rule.arguments is not None else ()
return len(defaults) >= len(arguments)
@app.route("/")
def all_links():
links = []
for rule in app.url_map.iter_rules():
# Filter out rules we can't navigate to in a browser
# and rules that require parameters
if "GET" in rule.methods and has_no_empty_params(rule):
url = url_for(rule.endpoint, **(rule.defaults or {}))
links.append((url, rule.endpoint))
return render_template("all_links.html", links=links)
# delete all tables as last link --------------------------
# Route to delete all entries
@app.route('/delete_json_tables', methods=['GET'])
def delete_json_tables():
try:
meta = db.metadata
meta.reflect(db.engine)
for table in reversed(meta.sorted_tables): # Iterate through tables in reverse order to handle foreign key constraints
if table.name != "user" and table.name != "response":
print("Deleting Table: {name}".format(name=table.name))
db.session.execute(table.delete())
db.session.commit()
return 'All entries deleted successfully'
except Exception as e:
# Rollback changes if any error occurs
db.session.rollback()
return f'Error occurred: {str(e)}', 500
finally:
# Close the session
db.session.close()
# Route to delete all entries
@app.route('/delete_all_entries', methods=['GET'])
def delete_all_entries():
# here I could also use a "drop_all()", that works jsut like create all from the creation part
# this together with the reflect could drop actually all tables
try:
meta = db.metadata
for table in reversed(meta.sorted_tables): # Iterate through tables in reverse order to handle foreign key constraints
db.session.execute(table.delete())
db.session.commit()
return 'All entries deleted successfully'
except Exception as e:
# Rollback changes if any error occurs
db.session.rollback()
return f'Error occurred: {str(e)}', 500
finally:
# Close the session
db.session.close()
if __name__ == '__main__':
app.run()