mensamap/mensaar-map-bot.py
2019-11-12 13:44:41 +01:00

140 lines
5.0 KiB
Python
Executable File

#!/usr/bin/python
import requests
import re
import sys
import json
from datetime import datetime
from json.decoder import JSONDecodeError
import os
POLLING_TIMEOUT = 15 * 60 # 15 minutes
token = os.environ["MENSA_BOT_TOKEN"] # the bot's token as obtained by the BotFather
pattern = re.compile("[a-zA-ZäöüÄÖÜ]{2}\s*@mensamap")
table_indices = {
"a": [chr(i) for i in range(ord("a"), ord("o") + 1)],
"b": [chr(i) for i in range(ord("a"), ord("t") + 1)],
"c": [chr(i) for i in range(ord("a"), ord("z") + 1)] + ["ä", "ö", "ü"],
"d": [chr(i) for i in range(ord("a"), ord("o") + 1)],
"e": [chr(i) for i in range(ord("a"), ord("s") + 1)],
"f": [chr(i) for i in range(ord("a"), ord("m") + 1)],
"g": [chr(i) for i in range(ord("a"), ord("s") + 1)],
"h": [chr(i) for i in range(ord("a"), ord("h") + 1)],
"i": [chr(i) for i in range(ord("a"), ord("f") + 1)]
}
def log(text, log_file, prepend_time=False):
if not log_file:
return
if prepend_time:
log_file.write(str(datetime.now()) + ':\n')
log_file.flush()
log_file.buffer.write(text.encode("utf-8"))
log_file.flush()
def send_image(path, chat_id, log_file):
try:
for old, new in [('ä', 'ae'), ('ö', 'oe'), ('ü', 'ue')]:
path = path.replace(old, new)
url = f"https://api.telegram.org/bot{token}/sendPhoto";
data = {'chat_id' : chat_id, 'photo': f'http://leafbla.de/mensa/img/{path}'}
log(f" SENDING IMAGE:\n file name = {path}\n chat id = {chat_id}\n", log_file)
r = requests.post(url, data=data)
except:
pass
def send_map(message, chat_id, log_file):
try:
m = pattern.search(message)
command = m.group()
loc = command[:2].lower()
grp = loc[0]
idx = loc[1]
log(f" FOUND COMMAND:\n text = \"{command}\"\n group = {grp}\n table = {idx}\n", log_file)
if idx in table_indices[grp]:
send_image(f"{loc}.png", chat_id, log_file)
except (AttributeError, IndexError, KeyError):
pass
def get_inline_query_result(s):
for old, new in [('ä', 'ae'), ('ö', 'oe'), ('ü', 'ue')]:
s = s.replace(old, new)
return {
"type": "photo",
"id": s,
"photo_url": "http://leafbla.de/mensa/img/" + s + ".png",
"thumb_url": "http://leafbla.de/mensa/thumbs/" + s + ".png",
}
def answer_inline_query(query_id, request, log_file):
print("answer_inline_query")
print(query_id, request, log_file)
if len(request) == 0:
print("handle empty request")
results = json.dumps(
{
"type": "photo",
"id": 1,
"photo_url": "https://gitea.leafbla.de/img/gitea-lg.png",
"thumb_url": "https://gitea.leafbla.de/img/gitea-lg.png",
}
)
if len(request) == 1 and request in table_indices:
results = json.dumps([get_inline_query_result(request + s1) for s1 in table_indices[request]])
elif len(request) == 2 and request[0] in table_indices and request[1] in table_indices[request[0]]:
results = json.dumps([get_inline_query_result(request)])
else:
results = "[]"
data = {
"inline_query_id": query_id,
"results": results
}
log(f" ANSWERING INLINE QUERY:\n request = {request}\n query id = {query_id}\n", log_file)
requests.post(f"https://api.telegram.org/bot{token}/answerInlineQuery", data=data)
def run(log_file=None):
update_id = None
while True:
url = f"https://api.telegram.org/bot{token}/getUpdates"
data = {'timeout': POLLING_TIMEOUT}
if update_id is not None:
data['offset'] = update_id
log(f" REQUESTING UPDATE:\n timeout = {data['timeout']}\n offset = {update_id}\n", log_file, prepend_time=True)
response = requests.post(url, data=data)
log(" RECEIVED RESPONSE:\n", log_file, prepend_time=True)
try:
j = response.json()
update_id = j["result"][-1]["update_id"] + 1
result = j["result"][-1]
if "message" in result:
msg = result["message"]
chat_id = msg["chat"]["id"]
text = msg["text"]
log(f" update id = {update_id}\n chat id = {chat_id}\n text = \"{text}\"\n", log_file)
send_map(text, chat_id, log_file)
elif "inline_query" in result:
msg = result["inline_query"]
query_id = msg["id"]
request = msg["query"]
answer_inline_query(query_id, request, log_file)
except (IndexError, KeyError, JSONDecodeError):
log(" no new messages\n", log_file)
if __name__ == "__main__":
try:
path = sys.argv[1] if len(sys.argv) > 1 else None
if path == "stdout":
run(log_file=sys.stdout)
elif path:
run(log_file=open(path, 'a'))
else:
run()
except KeyboardInterrupt:
pass