140 lines
5.0 KiB
Python
Executable File
140 lines
5.0 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
import requests
|
|
import re
|
|
import sys
|
|
import json
|
|
from datetime import datetime
|
|
from json.decoder import JSONDecodeError
|
|
import os
|
|
|
|
|
|
POLLING_TIMEOUT = 15 * 60 # 15 minutes
|
|
|
|
token = os.environ["MENSA_BOT_TOKEN"] # the bot's token as obtained by the BotFather
|
|
|
|
pattern = re.compile("[a-zA-ZäöüÄÖÜ]{2}\s*@mensamap")
|
|
|
|
table_indices = {
|
|
"a": [chr(i) for i in range(ord("a"), ord("o") + 1)],
|
|
"b": [chr(i) for i in range(ord("a"), ord("t") + 1)],
|
|
"c": [chr(i) for i in range(ord("a"), ord("z") + 1)] + ["ä", "ö", "ü"],
|
|
"d": [chr(i) for i in range(ord("a"), ord("o") + 1)],
|
|
"e": [chr(i) for i in range(ord("a"), ord("s") + 1)],
|
|
"f": [chr(i) for i in range(ord("a"), ord("m") + 1)],
|
|
"g": [chr(i) for i in range(ord("a"), ord("s") + 1)],
|
|
"h": [chr(i) for i in range(ord("a"), ord("h") + 1)],
|
|
"i": [chr(i) for i in range(ord("a"), ord("f") + 1)]
|
|
}
|
|
|
|
def log(text, log_file, prepend_time=False):
|
|
if not log_file:
|
|
return
|
|
if prepend_time:
|
|
log_file.write(str(datetime.now()) + ':\n')
|
|
log_file.flush()
|
|
log_file.buffer.write(text.encode("utf-8"))
|
|
log_file.flush()
|
|
|
|
def send_image(path, chat_id, log_file):
|
|
try:
|
|
for old, new in [('ä', 'ae'), ('ö', 'oe'), ('ü', 'ue')]:
|
|
path = path.replace(old, new)
|
|
url = f"https://api.telegram.org/bot{token}/sendPhoto";
|
|
data = {'chat_id' : chat_id, 'photo': f'http://leafbla.de/mensa/img/{path}'}
|
|
log(f" SENDING IMAGE:\n file name = {path}\n chat id = {chat_id}\n", log_file)
|
|
r = requests.post(url, data=data)
|
|
except:
|
|
pass
|
|
|
|
def send_map(message, chat_id, log_file):
|
|
try:
|
|
m = pattern.search(message)
|
|
command = m.group()
|
|
loc = command[:2].lower()
|
|
grp = loc[0]
|
|
idx = loc[1]
|
|
log(f" FOUND COMMAND:\n text = \"{command}\"\n group = {grp}\n table = {idx}\n", log_file)
|
|
if idx in table_indices[grp]:
|
|
send_image(f"{loc}.png", chat_id, log_file)
|
|
except (AttributeError, IndexError, KeyError):
|
|
pass
|
|
|
|
def get_inline_query_result(s):
|
|
for old, new in [('ä', 'ae'), ('ö', 'oe'), ('ü', 'ue')]:
|
|
s = s.replace(old, new)
|
|
|
|
return {
|
|
"type": "photo",
|
|
"id": s,
|
|
"photo_url": "http://leafbla.de/mensa/img/" + s + ".png",
|
|
"thumb_url": "http://leafbla.de/mensa/thumbs/" + s + ".png",
|
|
}
|
|
|
|
def answer_inline_query(query_id, request, log_file):
|
|
print("answer_inline_query")
|
|
print(query_id, request, log_file)
|
|
if len(request) == 0:
|
|
print("handle empty request")
|
|
results = json.dumps(
|
|
{
|
|
"type": "photo",
|
|
"id": 1,
|
|
"photo_url": "https://gitea.leafbla.de/img/gitea-lg.png",
|
|
"thumb_url": "https://gitea.leafbla.de/img/gitea-lg.png",
|
|
}
|
|
)
|
|
if len(request) == 1 and request in table_indices:
|
|
results = json.dumps([get_inline_query_result(request + s1) for s1 in table_indices[request]])
|
|
elif len(request) == 2 and request[0] in table_indices and request[1] in table_indices[request[0]]:
|
|
results = json.dumps([get_inline_query_result(request)])
|
|
else:
|
|
results = "[]"
|
|
|
|
data = {
|
|
"inline_query_id": query_id,
|
|
"results": results
|
|
}
|
|
log(f" ANSWERING INLINE QUERY:\n request = {request}\n query id = {query_id}\n", log_file)
|
|
requests.post(f"https://api.telegram.org/bot{token}/answerInlineQuery", data=data)
|
|
|
|
def run(log_file=None):
|
|
update_id = None
|
|
while True:
|
|
url = f"https://api.telegram.org/bot{token}/getUpdates"
|
|
data = {'timeout': POLLING_TIMEOUT}
|
|
if update_id is not None:
|
|
data['offset'] = update_id
|
|
log(f" REQUESTING UPDATE:\n timeout = {data['timeout']}\n offset = {update_id}\n", log_file, prepend_time=True)
|
|
response = requests.post(url, data=data)
|
|
log(" RECEIVED RESPONSE:\n", log_file, prepend_time=True)
|
|
try:
|
|
j = response.json()
|
|
update_id = j["result"][-1]["update_id"] + 1
|
|
result = j["result"][-1]
|
|
if "message" in result:
|
|
msg = result["message"]
|
|
chat_id = msg["chat"]["id"]
|
|
text = msg["text"]
|
|
log(f" update id = {update_id}\n chat id = {chat_id}\n text = \"{text}\"\n", log_file)
|
|
send_map(text, chat_id, log_file)
|
|
elif "inline_query" in result:
|
|
msg = result["inline_query"]
|
|
query_id = msg["id"]
|
|
request = msg["query"]
|
|
answer_inline_query(query_id, request, log_file)
|
|
except (IndexError, KeyError, JSONDecodeError):
|
|
log(" no new messages\n", log_file)
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
path = sys.argv[1] if len(sys.argv) > 1 else None
|
|
if path == "stdout":
|
|
run(log_file=sys.stdout)
|
|
elif path:
|
|
run(log_file=open(path, 'a'))
|
|
else:
|
|
run()
|
|
except KeyboardInterrupt:
|
|
pass
|