742 lines
28 KiB
Python
742 lines
28 KiB
Python
import sys
|
|
import json
|
|
import random
|
|
import base64
|
|
from flask import Flask, redirect, url_for, request, session, make_response, jsonify, send_file
|
|
from flask import render_template
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from sqlalchemy import Integer, String, Column, Float
|
|
from datetime import datetime
|
|
import uuid
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy import select, join
|
|
from sqlalchemy.orm import DeclarativeBase
|
|
#from flask_wtf.csrf import CSRFProtect
|
|
import os
|
|
import csv
|
|
from zipfile import ZipFile
|
|
import hashlib
|
|
|
|
# activate environment: cd C:\...\...\....\...\Code\SLAEForms Testing\.venv\Scripts\
|
|
# then this: activate
|
|
|
|
#SETUP--------------------------------------------------
|
|
|
|
#Set up sqlalchemy
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
db = SQLAlchemy(model_class=Base)
|
|
|
|
#create the app
|
|
app = Flask(__name__)
|
|
# configure the database, give it a path (it will be in the instances folder)
|
|
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///database.db"
|
|
app.config["PERMANENT_SESSION_LIFETIME"] = 10800 #3 Stunden, 10800 sekunden
|
|
app.config['MAX_CONTENT_LENGTH'] = 22 * 1000 * 1000 # try and fix video upload not working
|
|
db.init_app(app)
|
|
|
|
#set the secret key (TODO change this for final deployment)
|
|
app.secret_key = b"29fe9e8edd407c5491d4f1c05632d9fa33e26ed8734a3f5e080ebac3772a555a"
|
|
|
|
UPLOAD_FOLDER = 'uploads'
|
|
EXPORT_FOLDER = 'exports'
|
|
PASSWORD = 'd5aff9fc14d1f20f4ccddaa8b4f2c1765228b74ed0b1dfb868bf1064e0d655e2'
|
|
CONFIGFILE = 'userstudy1.json'
|
|
# CONFIGFILE = 'test.json'
|
|
# CONFIGFILE = 'default.json'
|
|
|
|
#csrf = CSRFProtect(app) #enable CSRF protection globally
|
|
|
|
|
|
#------------------------------------------------------------------------
|
|
# Setting up DB Models
|
|
|
|
# This table is always created, tracks all users
|
|
class User(db.Model):
|
|
user_id = db.Column("user_id",db.UUID(as_uuid=True), primary_key=True, nullable=False)
|
|
device_id = db.Column("device_id",db.UUID(as_uuid=True), nullable=False)
|
|
question_order = db.Column("question_order",db.String(60))
|
|
date_created = db.Column("date_created",db.DateTime, default=datetime.today()) # todo test if this default works
|
|
form_completed = db.Column("form_completed",db.Boolean, default=False)
|
|
def __repr__(self) -> str:
|
|
return "<User %r>" % self.user_id
|
|
|
|
# create the table (existing tables are not overwritten)
|
|
try:
|
|
with app.app_context():
|
|
db.create_all()
|
|
except SQLAlchemyError as e:
|
|
print("Error occurred during database creation:", str(e))
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
#open, parse and execute json file
|
|
|
|
#open the json file with the config
|
|
configfile = open(CONFIGFILE, encoding='utf-8') #todo replace with other name
|
|
#convert it to dict
|
|
config = json.load(configfile)
|
|
configfile.close()
|
|
db_tables = {} # contains all dynamically created tables, key = table/classname
|
|
|
|
|
|
#TODO insert code to create all tables
|
|
def create_json_tables():
|
|
#print(config)
|
|
for block_key, block_content in config.items():
|
|
if "database_table" in block_content:
|
|
if not (block_content["database_table"]["table_name"] in db_tables):
|
|
print("New table: \n {table}".format(table=block_content["database_table"]["table_name"]))
|
|
db_tables[block_content["database_table"]["table_name"]]=create_model_class(block_content["database_table"])
|
|
print("created table: {table}".format(table=db_tables[block_content["database_table"]["table_name"]]))
|
|
|
|
print("tables in db_tables: \n {db_tables}".format(db_tables=db_tables))
|
|
|
|
def create_model_class(schema):
|
|
class_name = schema["table_name"].capitalize()
|
|
#print("creating table class: {class_name}".format(class_name=class_name))
|
|
|
|
# Define class attributes dynamically
|
|
attributes = {"__tablename__": schema["table_name"]}
|
|
|
|
# id as key and date as standard fields
|
|
attributes["id"] = Column("id",db.UUID(as_uuid=True), primary_key=True, nullable=False)
|
|
attributes["user_id"] = Column("user_id",db.UUID(as_uuid=True), nullable=False)
|
|
attributes["date_created"] = db.Column("date_created",db.DateTime)
|
|
attributes["stimulus_name"] = db.Column("stimulus_name",db.String(30))
|
|
|
|
|
|
for column_name, column_info in schema["fields"].items():
|
|
if column_info["type"] == "integer":
|
|
column_type = Integer
|
|
elif column_info["type"] == "string":
|
|
column_type = String(int(column_info["size"]))
|
|
if column_info["type"] == "float":
|
|
column_type = Float
|
|
|
|
attributes[column_name] = Column(column_name,column_type, nullable=column_info["nullable"])
|
|
|
|
#print("attributes of the table: ",attributes)
|
|
# Create the model class
|
|
return type(class_name, (db.Model,), attributes)
|
|
|
|
|
|
|
|
create_json_tables()
|
|
# create the table (existing tables are not overwritten)
|
|
try:
|
|
print("try to create tables")
|
|
with app.app_context():
|
|
db.create_all()
|
|
print("successfully created all tables")
|
|
except SQLAlchemyError as e:
|
|
print("Error occurred during database creation:", str(e))
|
|
|
|
|
|
#------------------------------------------------------------------------------
|
|
#actual page logic with start, form and send
|
|
|
|
#@app.route("/popuptest", methods=["GET"])
|
|
def popuptest():
|
|
|
|
return render_template(
|
|
"popuptest.html"
|
|
)
|
|
|
|
|
|
@app.route("/start", methods=["GET", "POST"])
|
|
def startpage():
|
|
session.permanent = False
|
|
if not "slaeform_device_id" in session:
|
|
# If this device was not seen, remember it.
|
|
new_device_id = uuid.uuid4()
|
|
session["slaeform_device_id"] = new_device_id
|
|
session["agreed_to_tos"] = False
|
|
|
|
|
|
|
|
if request.method == "POST":
|
|
#right now if a user that has an active session goes to the startpage again and accepts tos
|
|
#it will just start another session and discard the previous session
|
|
|
|
#config is the dict with the info
|
|
#get the block names
|
|
block_names = config.keys()
|
|
session["block_names"] = list(block_names)
|
|
session["block_order"] = {} # only for templates, for each block the list of keys for stimuli
|
|
session["current_block_index"] = 0
|
|
session["current_stimulus_index"] = 0
|
|
session["current_block_name"] = session["block_names"][session["current_block_index"]]
|
|
session["number_of_blocks"] = len(session["block_names"])
|
|
current_block = config[session["current_block_name"]]
|
|
session["number_of_stimuli"] = 0
|
|
|
|
# if the block has stimuli, get how many
|
|
if "stimuli" in current_block:
|
|
if current_block["stimuli"]["type"] == "single_video" or current_block["stimuli"]["type"] == "empty":
|
|
session["number_of_stimuli"] = len(list(current_block["stimuli"]["list"]))
|
|
elif current_block["stimuli"]["type"] == "double_video":
|
|
session["number_of_stimuli"] = len(list(current_block["stimuli"]["list_1"]))
|
|
|
|
print("number of blocks: ",len(session["block_names"]))
|
|
|
|
print("Startpage post")
|
|
print(session["block_names"])
|
|
|
|
for name in block_names:
|
|
print("block: ",name)
|
|
if config[name]["type"] == "TaskTemplate" and ("stimuli" in config[name]):
|
|
match config[name]["stimuli"]["type"]:
|
|
case "single_video":
|
|
order = list(config[name]["stimuli"]["list"]) # order = list of simuli keys
|
|
print("order: ",order)
|
|
if "order" in config[name]["stimuli"]:
|
|
if config[name]["stimuli"]["order"] == "random":
|
|
random.shuffle(order) #in random order
|
|
session["block_order"][name] = order
|
|
case "double_video":
|
|
order = [] # order = list of stimuli keys
|
|
list_1 = list(config[name]["stimuli"]["list_1"])
|
|
list_2 = list(config[name]["stimuli"]["list_2"])
|
|
for i in range(len(list(config[name]["stimuli"]["list_1"]))):
|
|
order.append((list_1[i], list_2[i]))
|
|
print("order: ",order)
|
|
#TODO random is not implemented here
|
|
session["block_order"][name] = order
|
|
case "empty":
|
|
order = list(config[name]["stimuli"]["list"]) # order = list of simuli keys
|
|
print("order: ",order)
|
|
session["block_order"][name] = order
|
|
|
|
|
|
if "stimuli" in current_block:
|
|
#get the name of the current stimulus
|
|
session["current_stimulus_name"] = session["block_order"][session["current_block_name"]][session["current_stimulus_index"]]
|
|
|
|
#save the new user to the database and the session
|
|
session["agreed_to_tos"] = True
|
|
new_user_id = uuid.uuid4()
|
|
session["slaeform_user_id"] = new_user_id
|
|
device_id = session["slaeform_device_id"]
|
|
user_id = new_user_id
|
|
question_order = str(session["block_order"])
|
|
date = datetime.today()
|
|
new_user = User(user_id=user_id, device_id=device_id,question_order=question_order,date_created = date,form_completed=False) #,question_order=question_order
|
|
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
|
|
print("block order: {order}".format(order=session["block_order"]))
|
|
try:
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
except:
|
|
return "There was a problem while adding the user to the Database"
|
|
return redirect("/form")
|
|
|
|
return render_template(
|
|
"startpage.html"
|
|
)
|
|
|
|
@app.route("/endpage")
|
|
def endpage():
|
|
print("Form is done, sent to endpage")
|
|
return render_template("endpage.html")
|
|
|
|
@app.route("/datenschutz")
|
|
def datenschutz():
|
|
return render_template("datenschutz.html")
|
|
|
|
@app.route("/impressum")
|
|
def impressum():
|
|
return render_template("impressum.html")
|
|
|
|
@app.route("/studytest")
|
|
def studytest():
|
|
return render_template("studytest.html")
|
|
|
|
|
|
@app.route("/form")
|
|
def form():
|
|
#user is not yet registered and should not be here
|
|
if not "slaeform_user_id" in session:
|
|
return redirect("/start") #TODO replace this later with actual startpage
|
|
|
|
|
|
current_block = config[session["current_block_name"]]
|
|
print("form")
|
|
print("current_block_name: {current_block_name}".format(current_block_name=session["current_block_name"]))
|
|
print("current_block_index: {current_block_order}".format(current_block_order=session["current_block_index"]))
|
|
print("current_stimulus: {current_stimulus}".format(current_stimulus=session["current_stimulus_index"]))
|
|
|
|
#print("current Blockname: {blockname}, current block index: {blockindex}, current stim index: {stimulusindex}".format(blockname=session["current_block_name"],
|
|
# blockindex=session["current_block_index"],
|
|
# stimulusindex=session["current_stimulus_index"] ) )
|
|
infovideo = None
|
|
if "infovideo" in current_block:
|
|
infovideo = current_block["infovideo"]
|
|
|
|
# erster Fall: SinglePage
|
|
if current_block["type"] == "SinglePage":
|
|
return render_template(current_block["template"])
|
|
|
|
|
|
#zweiter Fall, empty TaskTemplate
|
|
if current_block["type"] == "TaskTemplate" and current_block["stimuli"]["type"] == "empty":
|
|
current_block_order = session["block_order"][session["current_block_name"]]
|
|
current_block_stimuli = current_block["stimuli"]
|
|
current_stimulus = current_block_order[session["current_stimulus_index"]]
|
|
stimulus_type=current_block["stimuli"]["type"]
|
|
return render_template(
|
|
"standard_template.html",
|
|
stimuli=current_block_stimuli,
|
|
stimulus_type=stimulus_type,
|
|
current_stimulus=current_stimulus,
|
|
questions=current_block["questions"],
|
|
infovideo=infovideo
|
|
)
|
|
|
|
|
|
# ansonsten, templates:
|
|
current_block_order = session["block_order"][session["current_block_name"]]
|
|
current_block_stimuli = current_block["stimuli"]
|
|
current_stimulus = current_block_order[session["current_stimulus_index"]]
|
|
stimulus_type=current_block["stimuli"]["type"]
|
|
stimulus_configuration = current_block["stimuli"]["configuration"] # dict with the config
|
|
|
|
|
|
|
|
if current_block["type"] == "TaskTemplate":
|
|
print("case: TaskTemplate")
|
|
match stimulus_type:
|
|
case "single_video":
|
|
stimulus_configuration["video_url"] = config[session["current_block_name"]]["stimuli"]["list"][current_stimulus]
|
|
print("-------------videourl: ", stimulus_configuration["video_url"])
|
|
case "double_video":
|
|
stimulus_configuration["video_url1"] = config[session["current_block_name"]]["stimuli"]["list_1"][current_stimulus[0]]
|
|
stimulus_configuration["video_url2"] = config[session["current_block_name"]]["stimuli"]["list_2"][current_stimulus[1]]
|
|
|
|
return render_template(
|
|
"standard_template.html",
|
|
stimuli=current_block_stimuli,
|
|
stimulus_type=stimulus_type,
|
|
current_stimulus=current_stimulus,
|
|
stimulus_configuration=stimulus_configuration,
|
|
questions=current_block["questions"],
|
|
infovideo=infovideo
|
|
)
|
|
|
|
|
|
|
|
|
|
return "Error, none of the Blocks triggered"
|
|
|
|
|
|
@app.route("/send", methods=["POST"])
|
|
def sendpage():
|
|
print("send")
|
|
# Do I need to write to a table at all?
|
|
# I can figure it out by checking if the current block has a database field, that is best
|
|
if not ("database_table" in config[session["current_block_name"]]): #it has no database field, so nothing to receive
|
|
# so just move on
|
|
print("no database table")
|
|
update_session()
|
|
return redirect("/form")
|
|
|
|
|
|
|
|
# now to if it has a database field
|
|
|
|
# find out which table we need to write to
|
|
table_name = config[session["current_block_name"]]["database_table"]["table_name"]
|
|
print("Form posted: {rqform}".format(rqform=request.form))
|
|
print("Writing to table: {table_name}".format(table_name=table_name))
|
|
session_user_id = session["slaeform_user_id"]
|
|
new_id = uuid.uuid4()
|
|
date = datetime.today()
|
|
stimulus_name = str(session["current_stimulus_name"])
|
|
|
|
new_entry = db_tables[table_name](id=new_id,user_id = session_user_id,date_created = date,stimulus_name=stimulus_name)
|
|
|
|
# handle possible Video that was send
|
|
if 'recordedVideo' in request.files:
|
|
video = request.files['recordedVideo']
|
|
formatted_date = date.strftime("%Y.%m.%d %H-%M-%S")
|
|
print("date: ", date)
|
|
video_name = str(session_user_id) + "_" + session["current_block_name"] + "_" + str(session["current_stimulus_name"]) + "_" + str(formatted_date) + ".webm"
|
|
path = os.path.join(UPLOAD_FOLDER, video_name)
|
|
print("path: ",path)
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
|
|
video.save(path)
|
|
|
|
if 'recordedVideo' in request.files:
|
|
if hasattr(new_entry, "video_upload"):
|
|
setattr(new_entry, "video_upload", video_name)
|
|
|
|
|
|
# TODO maybe find a prettier solution, this handeles multiple choice now, so the fact that there can be
|
|
# multiple keys that are the same in the form data, but I want to bring them together to 1 key value pair
|
|
form_data = {}
|
|
for key in request.form:
|
|
values = request.form.getlist(key)
|
|
|
|
# If there's more than one value for the key, join them with commas
|
|
if len(values) > 1:
|
|
form_data[key] = ','.join(map(str, values)) # Join multiple values into a single comma-separated string
|
|
else:
|
|
form_data[key] = values[0] # If only one value, store it directly
|
|
|
|
for key, value in form_data.items():
|
|
print("hasattr key: ", key)
|
|
if hasattr(new_entry, key):
|
|
print("key exists: ", key)
|
|
setattr(new_entry, key, value)
|
|
print("setattr value: ", value)
|
|
print("entry: ", new_entry)
|
|
try:
|
|
db.session.add(new_entry)
|
|
db.session.commit()
|
|
except Exception as e:
|
|
print("Error occurred: {e}".format(e=str(e)))
|
|
return "There was a problem while adding the response to the Database"
|
|
|
|
|
|
if (session["current_block_index"] == session["number_of_blocks"]-1) and (session["current_stimulus_index"] >= session["number_of_stimuli"]-1):
|
|
#update the database entry, the form is completed
|
|
user = db.session.query(User).filter_by(user_id=session["slaeform_user_id"]).one()
|
|
user.form_completed = True
|
|
print("updated db entry for form_completed")
|
|
# This user is done, so we can remove it from the session
|
|
session.pop("slaeform_user_id")
|
|
try:
|
|
db.session.commit()
|
|
except Exception as e:
|
|
print("Error occurred: {e}".format(e=str(e)))
|
|
return "There was a problem while updating the user"
|
|
return redirect("/endpage")
|
|
|
|
# Now move to the next stimulus or block
|
|
update_session()
|
|
print("now redirect and reload the page")
|
|
return redirect("/form")
|
|
|
|
def update_session():
|
|
print("update session")
|
|
if "stimuli" in config[session["current_block_name"]]:
|
|
# if there are stimuli in this block
|
|
if session["current_stimulus_index"] < session["number_of_stimuli"]-1:
|
|
print("there are still stimuli left")
|
|
# if there are still stimuli left, keep going through them
|
|
session["current_stimulus_index"] += 1
|
|
# set the name of the current stimulus
|
|
session["current_stimulus_name"] = session["block_order"][session["current_block_name"]][session["current_stimulus_index"]]
|
|
|
|
else:
|
|
print("here are no stimuli left")
|
|
session["number_of_stimuli"] = 0
|
|
# if there are no stimuli left..
|
|
if(session["current_block_index"] < session["number_of_blocks"]-1):
|
|
# go to next block if possible
|
|
session["current_block_index"] += 1
|
|
session["current_block_name"] = session["block_names"][session["current_block_index"]]
|
|
session["current_stimulus_index"] = 0
|
|
if "stimuli" in config[session["current_block_name"]]:
|
|
session["number_of_stimuli"] = len(session["block_order"][session["current_block_name"]])
|
|
|
|
else:
|
|
# if there arent any stimuli, go to the next block
|
|
session["number_of_stimuli"] = 0
|
|
session["current_stimulus_index"] = 0
|
|
if(session["current_block_index"] < session["number_of_blocks"]-1):
|
|
session["current_block_index"] += 1
|
|
session["current_block_name"] = session["block_names"][session["current_block_index"]]
|
|
|
|
#set values for the new block
|
|
current_block = config[session["current_block_name"]]
|
|
|
|
if "stimuli" in current_block:
|
|
# set the name of the current stimulus
|
|
session["current_stimulus_name"] = session["block_order"][session["current_block_name"]][session["current_stimulus_index"]]
|
|
|
|
# if the block has stimuli, get how many
|
|
if current_block["stimuli"]["type"] == "single_video" or current_block["stimuli"]["type"] == "empty":
|
|
session["number_of_stimuli"] = len(list(current_block["stimuli"]["list"]))
|
|
elif current_block["stimuli"]["type"] == "double_video":
|
|
session["number_of_stimuli"] = len(list(current_block["stimuli"]["list_1"]))
|
|
|
|
print("---Session updated-----------------------------------------------")
|
|
print("current_block_index / number_of_blocks: {current_block_index} / {number_of_blocks}".format(current_block_index=session["current_block_index"],number_of_blocks=session["number_of_blocks"]))
|
|
print("current_block_name: ", session["current_block_name"])
|
|
print("current_stimulus_index: ", session["current_stimulus_index"])
|
|
print("Current number_of_stimuli: ", session["number_of_stimuli"])
|
|
|
|
#@app.route("/update")
|
|
def update():
|
|
|
|
print("Current Session: ",session)
|
|
try:
|
|
user = db.session.query(User).filter_by(user_id=session["slaeform_user_id"]).one()
|
|
user.form_completed = True
|
|
db.session.commit()
|
|
except Exception as e:
|
|
print("Error occurred: {e}".format(e=str(e)))
|
|
return "There was a problem while updating the user"
|
|
|
|
return "db entry updated!"
|
|
|
|
|
|
|
|
# Database stuff------------------------------------------------------------------------------
|
|
|
|
def zipdir(path, ziph):
|
|
# ziph is zipfile handle
|
|
for root, dirs, files in os.walk(path):
|
|
for file in files:
|
|
ziph.write(os.path.join(root, file),
|
|
os.path.relpath(os.path.join(root, file),
|
|
os.path.join(path, '..')))
|
|
|
|
#create all csvs
|
|
def create_all_csvs():
|
|
meta = db.metadata
|
|
tables = meta.tables.keys()
|
|
|
|
for table_name in tables:
|
|
create_csv(meta.tables[table_name], table_name)
|
|
|
|
|
|
# export CSV
|
|
@app.route("/export_all_tables")
|
|
def export_all_tables():
|
|
if not session.get("logged_in"):
|
|
return redirect("/login")
|
|
|
|
create_all_csvs()
|
|
|
|
with ZipFile('zip_exports/all_tables.zip', 'w') as zipf: #no compression, need to add zipfile.ZIP_DEFLATED for compression
|
|
zipdir('exports/', zipf)
|
|
|
|
return send_file("zip_exports/all_tables.zip", as_attachment=False, download_name="all_tables.zip")
|
|
|
|
# export Database
|
|
@app.route("/export_db")
|
|
def export_db():
|
|
if not session.get("logged_in"):
|
|
return redirect("/login")
|
|
|
|
with ZipFile('zip_exports/db.zip', 'w') as zipf: #no compression, need to add zipfile.ZIP_DEFLATED for compression
|
|
zipdir('instance/', zipf)
|
|
|
|
return send_file("zip_exports/db.zip", as_attachment=False, download_name="db.zip")
|
|
|
|
# export CSV
|
|
@app.route("/export_all_videos")
|
|
def export_all_videos():
|
|
if not session.get("logged_in"):
|
|
return redirect("/login")
|
|
|
|
with ZipFile('zip_exports/all_videos.zip', 'w') as zipf: #no compression, need to add zipfile.ZIP_DEFLATED for compression
|
|
zipdir('uploads/', zipf)
|
|
|
|
return send_file("zip_exports/all_videos.zip", as_attachment=False, download_name="all_tables.zip")
|
|
|
|
|
|
def create_csv(table, filename):
|
|
filename = filename + ".csv"
|
|
# Query all data from the table
|
|
data = db.session.query(table).all()
|
|
|
|
# Get the column names from the model
|
|
column_names = [column.name for column in table.columns]
|
|
|
|
# Open a CSV file and write data
|
|
path = os.path.join(EXPORT_FOLDER, filename)
|
|
with open(path, 'w', newline='') as file:
|
|
writer = csv.writer(file)
|
|
writer.writerow(column_names) # Write header
|
|
for row in data:
|
|
writer.writerow([getattr(row, column) for column in column_names])
|
|
|
|
|
|
# export CSV
|
|
@app.route("/export_csv/<table_name>")
|
|
def export_csv(table_name):
|
|
|
|
meta = db.metadata
|
|
|
|
try:
|
|
create_csv(meta.tables[table_name], table_name)
|
|
except Exception as e:
|
|
return f'Error occurred: {str(e)}', 500
|
|
|
|
return redirect("/")
|
|
|
|
# the contents of all tables
|
|
@app.route("/table_contents")
|
|
def table_contents():
|
|
if not session.get("logged_in"):
|
|
return redirect("/login")
|
|
meta = db.metadata
|
|
#meta.reflect(db.engine) # Uncomment this to also get the hidden tables, but this crashes rn
|
|
tables = meta.tables.keys()
|
|
table_contents = {}
|
|
|
|
for table_name in tables:
|
|
table = meta.tables[table_name]
|
|
columns = table.columns.keys()
|
|
#print(table)
|
|
rows = db.session.query(table)
|
|
try:
|
|
rows = rows.all()
|
|
except Exception as e:
|
|
print("Error occurred: {e}".format(e=str(e)))
|
|
print("Values:\n Table = {table}\n Tabletype = {tabletype}\n".format(table=table,tabletype=type(table)))
|
|
|
|
table_contents[table_name] = {
|
|
'columns': columns,
|
|
'rows': rows
|
|
}
|
|
|
|
return render_template(
|
|
"table_contents.html",
|
|
table_contents=table_contents,
|
|
)
|
|
|
|
|
|
|
|
@app.route('/show_tables')
|
|
def show_tables():
|
|
if not session.get("logged_in"):
|
|
return redirect("/login")
|
|
meta = db.metadata
|
|
meta.reflect(db.engine)
|
|
tables = meta.tables
|
|
return render_template('show_tables.html', tables=tables)
|
|
|
|
@app.route("/manage_uploads")
|
|
def manage_uploads():
|
|
if not session.get("logged_in"):
|
|
return redirect("/login")
|
|
|
|
videodir = "uploads/"
|
|
videolist = os.listdir(videodir)
|
|
num_videos = len(videolist)
|
|
|
|
return render_template("manage_uploads.html", videolist=videolist, num_videos=num_videos)
|
|
|
|
|
|
|
|
@app.route("/deleteuploads", methods=["POST"])
|
|
def deleteuploads():
|
|
if not session.get("logged_in"):
|
|
return redirect("/login")
|
|
|
|
print("deleting all videos")
|
|
|
|
videodir = "uploads/"
|
|
for video in os.listdir(videodir):
|
|
os.remove(os.path.join(videodir, video))
|
|
|
|
print("videos deleted")
|
|
|
|
return redirect("/all_links")
|
|
|
|
@app.route("/task3")
|
|
def task3():
|
|
return render_template("task3.html")
|
|
|
|
|
|
|
|
# Root page -----------------------------
|
|
|
|
@app.route("/login", methods=["GET","POST"])
|
|
def login():
|
|
|
|
if request.method == "POST":
|
|
pwhash = hashlib.sha256(request.form["password"].encode('utf-8')).hexdigest()
|
|
if pwhash == PASSWORD:
|
|
session["logged_in"] = True
|
|
return redirect(url_for("all_links"))
|
|
|
|
return render_template("login.html")
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
session["logged_in"] = False
|
|
return redirect("/")
|
|
|
|
def has_no_empty_params(rule):
|
|
defaults = rule.defaults if rule.defaults is not None else ()
|
|
arguments = rule.arguments if rule.arguments is not None else ()
|
|
return len(defaults) >= len(arguments)
|
|
|
|
@app.route("/")
|
|
def root():
|
|
return redirect("/start")
|
|
|
|
@app.route("/all_links")
|
|
def all_links():
|
|
links = []
|
|
for rule in app.url_map.iter_rules():
|
|
# Filter out rules we can't navigate to in a browser
|
|
# and rules that require parameters
|
|
if "GET" in rule.methods and has_no_empty_params(rule):
|
|
url = url_for(rule.endpoint, **(rule.defaults or {}))
|
|
links.append((url, rule.endpoint))
|
|
return render_template("all_links.html", links=links)
|
|
|
|
# delete all tables as last link --------------------------
|
|
|
|
# Route to delete all entries
|
|
#@app.route('/delete_json_tables', methods=['GET'])
|
|
def delete_json_tables():
|
|
try:
|
|
meta = db.metadata
|
|
meta.reflect(db.engine)
|
|
for table in reversed(meta.sorted_tables): # Iterate through tables in reverse order to handle foreign key constraints
|
|
if table.name != "user" and table.name != "response":
|
|
print("Deleting Table: {name}".format(name=table.name))
|
|
db.session.execute(table.delete())
|
|
|
|
db.session.commit()
|
|
return 'All entries deleted successfully'
|
|
except Exception as e:
|
|
# Rollback changes if any error occurs
|
|
db.session.rollback()
|
|
return f'Error occurred: {str(e)}', 500
|
|
finally:
|
|
# Close the session
|
|
db.session.close()
|
|
|
|
|
|
# Route to delete all entries
|
|
@app.route('/delete_all_entries', methods=['GET'])
|
|
def delete_all_entries():
|
|
if not session.get("logged_in"):
|
|
return redirect("/login")
|
|
# here I could also use a "drop_all()", that works jsut like create all from the creation part
|
|
# this together with the reflect could drop actually all tables
|
|
try:
|
|
meta = db.metadata
|
|
for table in reversed(meta.sorted_tables): # Iterate through tables in reverse order to handle foreign key constraints
|
|
db.session.execute(table.delete())
|
|
db.session.commit()
|
|
return 'All entries deleted successfully'
|
|
except Exception as e:
|
|
# Rollback changes if any error occurs
|
|
db.session.rollback()
|
|
return f'Error occurred: {str(e)}', 500
|
|
finally:
|
|
# Close the session
|
|
db.session.close()
|
|
|
|
def create_app():
|
|
return app
|
|
|
|
if __name__ == '__main__':
|
|
app.run()
|