Initial Commit

This commit is contained in:
Dominic Zimmer 2019-08-08 09:50:07 +02:00
commit e4aa416bfb
16 changed files with 4406 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
.idea/**/*
*.swp
*.session
*/**/__pycache__/
__pycache__/*
*.session-journal

17
README.md Normal file
View File

@ -0,0 +1,17 @@
# TTTC, the telegram client
TTTC is an unofficial [Telegram](https://telegram.org/) commandline client.
It aims to provide a user experience similar to that of [VIM](https://www.vim.org/).
## Requirements
TTTC uses the `curses` and `telethon` python libraries. Curses is usually shipped with
your python installation, but `telethon` can easily be installed via `pip`.
In order to use TTTC, you will need your own Telegram `api_id` and `api_hash`.
You can read more about how to get them [here](https://core.telegram.org/api/obtaining_api_id).
Once you obtained your own api key and hash, you need to set them as your environment variables
`TTTC_API_ID` and `TTTC_API_HASH`, respectively.
The client can be run with `python3 tttc.py`.

80
authview.py Normal file
View File

@ -0,0 +1,80 @@
from asyncio import Condition
import telethon
import resources
import curses
from tttcutils import debug, show_stacktrace
def bla(scr):
lines = resources.tttc_logo
tttw,ttth = len(lines[4]), len(lines)
w, h = curses.COLS, curses.LINES
yoff = h//2 - ttth//2
xoff = w//2 - tttw//2
for a in range(len(lines)):
scr.addstr(yoff + a, xoff, lines[a])
scr.refresh()
class AuthView():
def __init__(self, client, stdscr):
self.stdscr = stdscr
self.client = client
self.inputevent = Condition()
self.inputs = ""
self.w, self.h = curses.COLS, curses.LINES
self.fin = False
async def textinput(self):
self.stdscr.addstr("\n> ")
self.stdscr.refresh()
self.inputs = ""
with await self.inputevent:
await self.inputevent.wait()
out = self.inputs
self.inputs = ""
self.stdscr.addstr("\n")
self.stdscr.refresh()
return out
async def run(self):
await self.client.connect()
self.stdscr.addstr("connected")
self.auth = await self.client.is_user_authorized()
bla(self.stdscr)
if not self.auth:
while True:
self.stdscr.addstr("Please enter your phone number: ")
self.stdscr.refresh()
self.phone = await self.textinput()
try:
response = await self.client.send_code_request(self.phone)
if not response.phone_registered:
self.stdscr.addstr("This phone number is not registered in telegram. ")
self.stdscr.refresh()
else:
break
except telethon.errors.rpcerrorlist.FloodWaitError as err:
self.stdscr.addstr(f"The telegram servers blocked you for too many retries ({err.seconds}s remaining). ")
self.stdscr.refresh()
except Exception as e:
self.stdscr.addstr("Incorrect phone number. ")
self.stdscr.refresh()
self.stdscr.addstr("auth with code now.")
self.stdscr.refresh()
self.code = await self.textinput()
await self.client.sign_in(self.phone, self.code)
self.stdscr.addstr(f"auth successful. ")
self.stdscr.refresh()
async def handle_key(self, key):
if key == "RETURN":
with await self.inputevent:
self.inputevent.notify()
elif key == "BACKSPACE":
self.inputs = self.inputs[0:-1]
else:
self.inputs += key
self.stdscr.addstr(20, 50, self.inputs)
self.stdscr.clrtoeol()
self.stdscr.refresh()

167
commandline.py Normal file
View File

@ -0,0 +1,167 @@
import argparse
import sys
import commandline
import re
import curses
from telethon import sync, TelegramClient
import os
import tttcutils
def handle():
parser = argparse.ArgumentParser(description="Run with no arguments to start interactive mode")
parser.add_argument("--verbose", "-v", action="store_true", help="Be verbose")
parser.add_argument("--colortest", action="store_true", help="Test the used curses color pallet")
contacts = parser.add_argument_group("contacts")
contacts.add_argument("--list", "-l", action="store_true", help="List available dialogs with chat ids")
contacts.add_argument("--startswith", "-s", metavar="S",
help="Search for a contact starting with S (case sensitive). Can be used with messaging options if result is unique.")
contacts.add_argument("--contains", "-c", metavar="C",
help="Search for a contact containing S (case sensitive). Can be used with messaging options if result is unique.")
contacts.add_argument("--matches", "-x", metavar="M",
help="Search for a contact matching regular expression M. Can be used with messaging options if result is unique.")
messaging = parser.add_argument_group("messaging")
messaging.add_argument("--me", action="store_true", help="Send the message to yourself")
messaging.add_argument("--target", "-t", metavar="chatid", type=int, help="Send the message to the given chat")
messaging.add_argument("--message", "--msg", "-m", help="Provide a message.")
messaging.add_argument("--stdin", "-i", action="store_true", help="Read a message from stdin.")
parsed = parser.parse_args()
global debug
if parsed.verbose:
def debug(*args, **kwargs):
print(*args, **kwargs)
else:
def debug(*args, **kwargs):
pass
interactive = len(sys.argv) == 1
if interactive:
return False
elif parsed.colortest:
with ColorSample() as c:
c.show()
return True
else:
api_id, api_hash = tttcutils.assert_environment()
client = TelegramClient("tttc", api_id, api_hash)
debug("Connecting...", file=sys.stderr)
client.connect()
debug("Connected.", file=sys.stderr)
if not client.is_user_authorized():
print("Please use the interactive client first to authorize and create a tttc.session file. Aborting.", file=sys.stderr)
exit(1)
debug("Client is authorized.", file=sys.stderr)
if parsed.startswith or parsed.list or parsed.matches or parsed.contains:
global chats
debug("Fetching chats...", file=sys.stderr)
chats = client.get_dialogs()
filtered = chats if parsed.list else filter_chats((parsed.startswith, parsed.contains, parsed.matches))
unique = None
if filtered:
if len(filtered) == 0:
print("No matching chats found.")
elif len(filtered) > 1:
for result in reversed(filtered):
print(str(result.id).rjust(16) + " "*4 + result.name)
else:
if not (parsed.message or parsed.stdin):
for result in reversed(filtered):
print(str(result.id).rjust(16) + " "*4 + result.name)
exit()
unique = filtered[0].id
if not (parsed.message or parsed.stdin):
exit() # we are done here
recipient = unique or parsed.target
if not recipient and parsed.me:
recipient = client.get_me().id
if not recipient:
if len(filtered) > 1:
print("Recipient ambiguous. Aborting.")
else:
print("No recipient provided.")
exit()
#try:
# pass
# #recipient = client.get_input_entity(recipient)
# #client.get_entity(recipient)
#except ValueError:
# print("Illegal entity id. Aborting.", file=sys.stderr)
# exit(1)
if parsed.message or parsed.stdin:
send_message(client, recipient, message=parsed.message)
return True
def send_message(client, chat_id, message):
#print(f"call to {client} {chat_id} {message}")
try:
recipient = client.get_input_entity(chat_id)
except ValueError:
print("Could not find the entity for this entity id. Aborting.", file=sys.stderr)
exit(1)
debug("Chat exists. Sending message.", file=sys.stderr)
if message is None:
debug("No message specified. Reading from stdin.", file=sys.stderr)
message = "".join([line for line in sys.stdin])
if message.strip() == "":
print("The message must not be empty.", file=sys.stderr)
exit()
client.send_message(chat_id, message)
def filter_chats(filt):
if filt == (None, None, None):
return None
starts, contains, matches = filt
if starts:
return [ chat for chat in chats if chat.name.startswith(starts) ]
elif contains:
return [ chat for chat in chats if contains in chat.name ]
else:
reg = re.compile(matches)
return [ chat for chat in chats if reg.match(chat.name) ]
class ColorSample:
def __init__(self):
pass
def __enter__(self):
self.stdscr = curses.initscr()
curses.start_color()
curses.use_default_colors()
curses.noecho()
curses.cbreak()
self.stdscr.keypad(1)
self.stdscr.refresh()
return self
def __exit__(self, *args):
curses.nocbreak()
self.stdscr.keypad(0)
curses.echo()
curses.endwin()
return False
def show(self):
self.stdscr.addstr("These are all the colors your terminal supports (and their codes):\n")
for i in range(curses.COLORS):
curses.init_pair(i, i, -1);
self.stdscr.addstr(f" {i} ", curses.color_pair(i))
self.stdscr.addstr(f" {i} ", curses.A_STANDOUT | curses.color_pair(i))
self.stdscr.addstr("\n\n")
self.stdscr.addstr("Press any key to continue.")
self.stdscr.getch()
self.stdscr.refresh()
self.stdscr.clear()
self.stdscr.addstr("TTTC uses these colors. Refer to the previous page for their color codes. You can adjust them in the config:\n", curses.color_pair(0))
from config import colors
for (i, (k, v)) in enumerate(colors.colors.items()):
f, b = v
curses.init_pair(i+1, f, b);
self.stdscr.addstr(f"{k.ljust(25)} {v}\n", curses.color_pair(i+1))
self.stdscr.refresh()
self.stdscr.refresh()
self.stdscr.getch()

0
config/__init__.py Normal file
View File

29
config/colors.py Normal file
View File

@ -0,0 +1,29 @@
import curses
colors_256 = {
"default": (255, -1),
"default_highlight": (255, 8),
"primary": (14, -1),
"secondary": (10, -1),
"ternary": (11, -1),
"standout": (0, 3),
"error": (9, -1),
"accent": (237, -1)
}
colors_8 = {
"default": (7, -1),
"default_highlight": (0, 7),
"primary": (6, -1),
"secondary": (2, -1),
"ternary": (5, -1),
"standout": (7, 3),
"error": (1, -1),
"accent": (7, -1)
}
colors = colors_256 if curses.COLORS >= 256 else colors_8
def get_colors():
for (i, (k, v)) in enumerate(colors.items()):
f, b = v
curses.init_pair(i+1, f, b);
return { k: curses.color_pair(i+1) for (i, (k,v)) in enumerate(colors.items()) }

69
config/tttcrc.py Normal file
View File

@ -0,0 +1,69 @@
# Colors
# How much space should the contacts area use?
split_ratio = 0.3
# set to None for transparency
background = None
primary = curses.COLOR_CYAN
highlight = curses.COLOR_RED
# General
bootscreen_duration = 0.4
contacts_scroll_offset = 0
messages_scroll_offset = 0
## Contacts
pinned_group = True
pinned_symbol = ""
# indicate the amount of unread messages next to the contacts name
new_messages = True
# characters to indicate chat type
symbol_read = ""
symbol_channel = "C"
symbol_group = "G"
symbol_supergroup = "S"
symbol_private = "P"
time_today = "%I:%M %p"
# 6 days to prevent confusion with last weeks <this weekday>
# ie if today is monday "Mon" would refer to last weeks monday instead of today. change to 7*86400 if you're fine with that
time_lastweek = "%a"
lastweek = 6*86400
# anything that's not withing lastweek is considered longtimeago
time_longtimeago = "%d.%m.%y"
## Messages
message_edited = "edited"
message_forwarded = "<author> via <from>"
# when author and from match
message_forwarded_self = "via <author>"
# Keys
tttc_quit = "q"
vimline_open = ":"
contacts_search = "/"
contacts_top = "gg"
contacts_bottom = "G"
contacts_next = "c"
contacts_prev = "C"
# mind the escape sequence
messages_search = "\\"
messages_compose = "m"
# use xdg-open to open file/image
messages_visual_link = "l"
messages_visual_open = "o"
messages_visual_reply = "r"
messages_visual_next = "n"
messages_visual_prev = "N"
messages_visual_toggle_select = "space"
messages_visual_forward = "w"
compose_send = "y"
compose_edit = "e"
# add a file to the message, the message itself then becomes the caption
compose_file = "f"
# add image to the message, the message itself then becomes the caption
compose_image = "i"
# after having composed a message you can still select it as a reply to another message
compose_select_reply ="r"

335
drawtool.py Normal file
View File

@ -0,0 +1,335 @@
import curses
import math
from telethon.utils import get_display_name
import emojis
import os
import datetime
import time
import config
import textwrap
from tttcutils import show_stacktrace, debug
class Drawtool():
def __init__(self, main_view):
self.client = main_view.client
self.main_view = main_view
self.stdscr = main_view.stdscr
self.chat_ratio = 0.3
self.H, self.W = self.stdscr.getmaxyx()
self.recompute_dimensions()
self.chat_rows = 5
self.chat_offset_fraction = 0.3
self.single_chat_fraction = 0.3
self.dialog_fraction = 0.25
self.show_indices = False
def recompute_dimensions(self):
self.min_input_lines = int(0.1 * self.H)
self.max_input_lines = int(0.3 * self.H)
try:
self.input_lines = min(self.max_input_lines, max(len(self._get_input_lines(self.main_view.inputs, width = self.W - 4)), self.min_input_lines))
except:
show_stacktrace()
self.chats_height = self.H - self.input_lines - 3
self.chats_width = int(self.W * self.chat_ratio)
self.chats_num = self.chats_height // 3
async def resize(self):
self.H, self.W = self.stdscr.getmaxyx()
self.recompute_dimensions()
await self.redraw()
def _get_input_lines(self, s, width = 50):
# in order to preserve user made linebreaks:
wrapper = textwrap.TextWrapper()
wrapper.width = width
wrapper.replace_whitespace = False
wrapper.drop_whitespace = False
lines = s.split("\n")
newlines = []
for line in lines:
if line:
newlines += wrapper.wrap(line)
else:
newlines += [""]
return newlines
#return textwrap.wrap(s, width = width)
def _get_cursor_position(self, s, width = 50):
lines = self._get_input_lines(s, width = width)[-self.input_lines:]
if not lines:
return (0, 0)
x = len(lines[-1])
y = len(lines) - 1
return y, x
async def redraw(self):
self.recompute_dimensions()
self.stdscr.clear()
self.draw_chats()
await self.draw_messages()
if self.main_view.mode == "search":
if self.main_view.search_result == []:
self.stdscr.addstr(self.H - 1, 0, "/" + self.main_view.search_box, self.main_view.colors["error"])
else:
self.stdscr.addstr(self.H - 1, 0, "/" + self.main_view.search_box)
elif self.main_view.mode == "vimmode":
self.stdscr.addstr(self.H - 1, 0, ":" + self.main_view.vimline_box)
else:
self.stdscr.addstr(self.H - 1, 0, "--" + self.main_view.mode.upper() + "--")
self.stdscr.addstr(self.H - 1, int(self.W * 2/3), self.main_view.command_box[:8])
for index, line in enumerate(self._get_input_lines(self.main_view.inputs, width = self.W - 4)[-self.input_lines:]):
self.stdscr.addstr(self.H - self.input_lines - 2 + index, 2, f"{line}")
if self.main_view.mode == "insert":
curses.curs_set(1)
y, x = self._get_cursor_position(self.main_view.inputs, width = self.W - 4)
self.stdscr.move(self.H - self.input_lines - 2 + y, 2 + x)
elif self.main_view.mode == "search":
curses.curs_set(1)
self.stdscr.move(self.H - 1, 1 + len(self.main_view.search_box))
elif self.main_view.mode == "vimmode":
curses.curs_set(1)
self.stdscr.move(self.H - 1, 1 + len(self.main_view.vimline_box))
else:
curses.curs_set(0)
self.stdscr.refresh()
def format(self, text, attributes = None, width = None, alignment = "left", inner_alignment = "left", truncation = "..."):
if attributes == None:
attributes = self.main_view.colors["default"]
if width == None:
width = len(text)
return {
"text": text,
"attributes": attributes,
"width": width,
"alignment": alignment,
"inner_alignment": inner_alignment,
"truncation": truncation
}
def _datestring(self, date):
now = datetime.datetime.now().astimezone()
today = datetime.date.today()
if (now - date).total_seconds() < 20*3600:
out = date.strftime(f"%I:%M %p")
return out
if (now - date).total_seconds() < 6*86400:
return date.strftime("%A")
return date.strftime("%d.%m.%y")
def draw_frame(self, yoff, xoff, h, w, chars = "││──┌┐└┘", attributes = 0):
for i in range(h):
self.stdscr.addstr(yoff + i, xoff, chars[0], attributes)
for i in range(h):
self.stdscr.addstr(yoff + i, xoff + w, chars[1], attributes)
for i in range(w):
self.stdscr.addstr(yoff, xoff + i, chars[2], attributes)
for i in range(w):
self.stdscr.addstr(yoff + h, xoff + i, chars[3], attributes)
self.stdscr.addstr(yoff, xoff, chars[4], attributes)
self.stdscr.addstr(yoff, xoff + w, chars[5], attributes)
self.stdscr.addstr(yoff + h, xoff, chars[6], attributes)
self.stdscr.addstr(yoff + h, xoff + w, chars[7], attributes)
def draw_chats(self):
selected_chat_index = self.main_view.selected_chat - self.main_view.selected_chat_offset
offset = self.main_view.selected_chat_offset
try:
self.draw_frame(0,0, self.chats_height , self.chats_width)
index = 0
y = 1
for index in range(self.chats_num):
dialog = self.main_view.dialogs[index + offset]
message = dialog["messages"][0] if "messages" in dialog else dialog["dialog"].message
message_string = message.text if message.text else "[Non-text object]"
if self.main_view.text_emojis:
message_string = emojis.decode(message_string)
chat_name = get_display_name(dialog["dialog"].entity)
from_string = get_display_name(message.sender)
unread = dialog["unread_count"]
unread_string = f"({unread} new)" if unread else ""
date = dialog["dialog"].date
date = date.astimezone()
date_string = self._datestring(date)
pinned = "* " if dialog["dialog"].pinned else " "
selected = selected_chat_index == index
self.draw_text(
[
self.format("o" if dialog["online"] else " ", attributes = self.main_view.colors["secondary"]),
self.format(chat_name, attributes = self.main_view.colors["primary"] | curses.A_STANDOUT if selected else curses.A_BOLD, width = int(0.5 * self.chats_width)),
self.format(f" {str(index)} " if self.show_indices else "", attributes = self.main_view.colors["standout"]),
self.format(unread_string, attributes = self.main_view.colors["error"], alignment = "right"),
self.format(date_string, alignment = "right", attributes = self.main_view.colors["primary"]),
],
y, 2, maxwidth = self.chats_width - 2)
self.draw_text(
[
self.format(f"{from_string}:"),
self.format(message_string, width = self.chats_width - len(f"{from_string}: ") - 3)
],
y + 1, 2, maxwidth = self.chats_width - 2)
y += 3
index += 1
except Exception:
show_stacktrace()
def draw_text(self, format_dicts, y_off = 0, x_off = 0, screen = None, maxwidth = None, separator = " "):
if maxwidth == None:
maxwidth = sum(format_dict["width"] for format_dict in format_dicts)
if screen == None:
screen = self.stdscr
left = [ x for x in format_dicts if x["alignment"] == "left" ]
right = list(reversed([ x for x in format_dicts if x["alignment"] == "right" ]))
center = [ x for x in format_dicts if x["alignment"] == "center" ]
entries = [ (x, "left") for x in left ] + [ (x, "right") for x in right ] + [ (x, "center") for x in center ]
x_left = 0
x_right = maxwidth -1
for (format_dict, alignment) in entries:
text = format_dict["text"]
text = text.replace("\n", "")
attributes = format_dict["attributes"]
width = format_dict["width"]
inner_alignment = format_dict["inner_alignment"]
truncation = format_dict["truncation"]
# TODO: make this split preferrably at spaces and not show linebreaks
display_text = text.split("\n")[0]
if len(display_text) > width:
if truncation:
# TODO: make this split preferrably at spaces and not show linebreaks
display_text = display_text[:width - len(truncation)] + truncation
else:
display_text = display_text[:width]
rljust = " " * (width - len(display_text))
if alignment == "left":
if inner_alignment == "left":
screen.addstr(y_off, x_off + x_left, display_text, attributes)
x_left += len(display_text)
if rljust:
screen.addstr(y_off, x_off + x_left, rljust)
x_left += len(rljust)
elif inner_alignment == "right":
if rljust:
screen.addstr(y_off, x_off + x_left, rljust)
x_left += len(rljust)
screen.addstr(y_off, x_off + x_left, display_text, attributes)
x_left += len(display_text)
if left and format_dict != left[-1]:
screen.addstr(y_off, x_off + x_left, separator)
x_left += len(separator)
elif alignment == "right":
if inner_alignment == "left":
x_right -= len(text)
self.stdscr.addstr(y_off, x_off + x_right, text, attributes)
x_right -= len(rljust)
self.stdscr.addstr(y_off, x_off + x_right, rljust)
elif inner_alignment == "right":
x_right -= len(rljust)
self.stdscr.addstr(y_off, x_off + x_right, rljust)
x_right -= len(text)
self.stdscr.addstr(y_off, x_off + x_right, text, attributes)
if right and format_dict != right[-1]:
x_right -= len(separator)
self.stdscr.addstr(y_off, x_off + x_right, separator)
elif alignment == "center":
self.stdscr.addstr(y_off, maxwidth // 2 - len(display_text) // 2, display_text, attributes)
def draw_message(self, main_view, message, chat_idx):
maxtextwidth = int(self.single_chat_fraction * self.W) - 2
lines = []
if message.text:
message_lines = message.text.split("\n")
for message_line in message_lines:
if main_view.text_emojis:
message_line = emojis.decode(message_line)
if message_line == "":
lines += [""]
else:
lines += [
message_line[maxtextwidth * i: maxtextwidth*i+maxtextwidth]
for i in range(int(math.ceil(len(message_line)/maxtextwidth)))
]
if message.media:
media_type = message.media.to_dict()["_"]
if media_type == "MessageMediaPhoto":
media_type = "Photo"
elif media_type == "MessageMediaDocument":
atts = message.media.document.attributes
filename = [ x for x in atts if x.to_dict()["_"] == "DocumentAttributeFilename" ]
if filename:
filename = filename[0].to_dict()["file_name"]
media_type = f"Document ({filename})"
else:
media_type = f"Document ({message.media.document.mime_type})"
lines += [ f"[{media_type}]" ]
reply = ""
if message.is_reply:
reply_id = message.reply_to_msg_id
reply = " r?? "
for idx2, message2 in enumerate(main_view.dialogs[main_view.selected_chat]["messages"]):
if message2.id == reply_id:
reply = f"r{idx2:02d}"
break
from_message = message
from_user = "You" if message.out else get_display_name(from_message.sender)
via_user = f" via {get_display_name(from_message.forward.sender)}" if message.forward else ""
user_string = f"{from_user}{via_user} "
out = []
if message.out:
out.append(f"{chat_idx} {user_string}{self._datestring(message.date.astimezone())}".rjust(maxtextwidth))
for idx, text in enumerate(lines):
out.append(text.rjust(maxtextwidth - 4))
#out.append(f"{chat_idx} {message.date.hour}:{message.date.minute:02d}".rjust(maxtextwidth) + ".")
if message.is_reply:
out.append(reply)
else:
out.append(f"{chat_idx} {user_string}{self._datestring(message.date.astimezone())}")
for idx, text in enumerate(lines):
out.append(" " + text)
if message.is_reply:
out.append(reply)
return (out, message)
async def load_messages(self, chat_index):
main_view = self.main_view
index = chat_index
if not "messages" in main_view.dialogs[index]:
temp = await main_view.client.get_messages(main_view.dialogs[index]["dialog"], 50)
main_view.dialogs[index]["messages"] = [ message for message in temp ]
async def draw_messages(self, offset = 0):
main_view = self.main_view
await self.load_messages(main_view.selected_chat)
messages = main_view.dialogs[main_view.selected_chat]["messages"]
max_rows = self.H - self.input_lines - 3 - 1
lines = []
chat_count = 0
while len(lines) < max_rows + offset and chat_count < len(messages):
text, message = self.draw_message(main_view, messages[chat_count], chat_count)
for line in reversed(text):
lines.append((line, message))
lines.append(("",message))
chat_count += 1
for i in range(min(len(lines)-offset, max_rows)):
text, message = lines[i + offset]
if message.out:
self.stdscr.addstr(max_rows - i, int(self.W * (1-self.single_chat_fraction) - 4) + 2, text)
else:
self.stdscr.addstr(max_rows - i, int(self.W * self.chat_offset_fraction) + 2, text)
self.draw_frame(0, self.chats_width + 1, self.chats_height, self.W - self.chats_width - 2)

3025
emojis.py Normal file

File diff suppressed because it is too large Load Diff

13
functest.py Normal file
View File

@ -0,0 +1,13 @@
def f():
print("very secret variable leaked")
def g():
#h = lambda : (print("hi"), print("hello"))
yn(f, "leak secret? ")
def yn(task, question):
x = input(question)
if x == "y":
task()
g()

14
interactive.py Normal file
View File

@ -0,0 +1,14 @@
from telethon import sync, TelegramClient
import os
if not ("TTTC_API_ID" in os.environ or "TTTC_API_HASH" in os.environ):
print("Please set your environment variables \"TTTC_API_ID\" and \"TTTC_API_HASH\" accordingly.")
print("Please consult https://core.telegram.org/api/obtaining_api_id on how to get your own API id and hash.")
quit(1)
api_id = os.environ["TTTC_API_ID"]
api_hash = os.environ["TTTC_API_HASH"]
client = TelegramClient("tttc", api_id, api_hash)
client.connect()
chats = client.get_dialogs()
print("client: TelegramClient chats: [Dialog]")

451
mainview.py Normal file
View File

@ -0,0 +1,451 @@
from asyncio import Condition
import telethon
from telethon import events
import resources
import os
import curses
from subprocess import call
import drawtool
import emojis
import shlex
import sqlite3
from telethon.utils import get_display_name
import datetime
import re
from tttcutils import debug, show_stacktrace
import subprocess
class MainView():
def __init__(self, client, stdscr):
self.stdscr = stdscr
self.client = client
self.inputevent = Condition()
self.client.add_event_handler(self.on_message, events.NewMessage)
self.client.add_event_handler(self.on_user_update, events.UserUpdate)
# TODO
# self.client.add_event_handler(self.on_read, events.MessageRead)
self.text_emojis = True
self.inputs = ""
self.inputs_cursor = 0
self.drawtool = drawtool.Drawtool(self)
self.fin = False
from config import colors as colorconfig
self.colors = colorconfig.get_colors()
self.ready = False
self.search_result = None
self.search_index = None
self.search_box = ""
self.vimline_box = ""
self.command_box = ""
# index corresponds to the index in self.dialogs
self.selected_chat = 0
# index offset
self.selected_chat_offset = 0
self.selected_message = None
self.mode = "normal"
async def quit(self):
self.fin = True
with await self.inputevent:
self.inputevent.notify()
async def on_user_update(self, event):
user_id = event.user_id
if event.online != None:
for dialog in self.dialogs:
if event.online == True:
dialog["online_until"] = event.until
elif dialog["online_until"]:
now = datetime.datetime.now().astimezone()
until = dialog["online_until"].astimezone()
if (now - until).seconds > 0:
dialog["online_until"] = None
dialog["online"] = False
if dialog["dialog"].entity.id == user_id:
dialog["online"] = event.online
async def on_message(self, event):
# move chats with news up
for idx, dialog in enumerate(self.dialogs):
if dialog["dialog"].id == event.chat_id:
# stuff to do upon arriving messages
newmessage = await self.client.get_messages(dialog["dialog"], 1)
dialog["messages"].insert(0, newmessage[0])
if not event.out:
dialog["unread_count"] += 1
front = self.dialogs.pop(idx)
self.dialogs = [front] + self.dialogs
break
# auto adjust relative replys to match shifted message offsets
if event.chat_id == self.dialogs[self.selected_chat]["dialog"].id:
if self.inputs.startswith("r"):
num = self.inputs[1:].split()[0]
try:
# num = int(s[1:].split()[0])
number = int(num)
msg = self.inputs.replace("r" + num, "r" + str(number+1))
self.inputs = msg
except:
pass
# dont switch the dialoge upon arriving messages
if idx == self.selected_chat:
self.selected_chat = 0
elif idx > self.selected_chat:
self.selected_chat += 1
elif idx < self.selected_chat:
pass
await self.drawtool.redraw()
async def textinput(self):
self.inputs = ""
with await self.inputevent:
await self.inputevent.wait()
if self.fin:
return ""
out = self.inputs
self.inputs = ""
return out
async def run(self):
try:
chats = await self.client.get_dialogs()
except sqlite3.OperationalError:
self.stdscr.addstr("Database is locked. Cannot connect with this session. Aborting")
self.stdscr.refresh()
await self.textinput()
await self.quit()
self.dialogs = [
{
"dialog": dialog,
"unread_count": dialog.unread_count,
"online": dialog.entity.status.to_dict()["_"] == "UserStatusOnline" if hasattr(dialog.entity, "status") and dialog.entity.status else None,
"online_until": None,
# "last_seen": dialog.entity.status.to_dict()["was_online"] if online == False else None,
} for dialog in chats ]
await self.drawtool.redraw()
self.ready = True
while True:
s = await self.textinput()
if self.fin:
return
if s.startswith("r"):
try:
num = int(s[1:].split()[0])
except:
continue
s = s.replace("r" + str(num) + " ", "")
reply_msg = self.dialogs[self.selected_chat]["messages"][num]
s = emojis.encode(s)
reply = await reply_msg.reply(s)
await self.on_message(reply)
elif s.startswith("media"):
try:
num = int(s[5:].split()[0])
except:
continue
message = self.dialogs[self.selected_chat]["messages"][num]
if message.media:
os.makedirs("/tmp/tttc/", exist_ok=True)
path = await self.client.download_media(message.media, "/tmp/tttc/")
# TODO mute calls
if message.media.photo:
sizes = message.media.photo.sizes
w, h = sizes[0].w, sizes[0].h
# w, h
basesize = 1500
w3m_command=f"0;1;0;0;{basesize};{int(basesize*h/w)};;;;;{path}\n4;\n3;"
W3MIMGDISPLAY="/usr/lib/w3m/w3mimgdisplay"
os.system(f"echo -e '{w3m_command}' | {W3MIMGDISPLAY} & disown")
await self.textinput()
else:
subprocess.call(["xdg-open", "{shlex.quote(path)}"], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
#os.system(f"(xdg-open {shlex.quote(path)} 2>&1 > /dev/null) & disown")
else:
s = emojis.encode(s)
outgoing_message = await self.dialogs[self.selected_chat]["dialog"].send_message(s)
await self.on_message(outgoing_message)
await self.drawtool.redraw()
def select_next_chat(self):
# if wrapping not allowed:
# self.selected_chat = min(self.selected_chat + 1, len(self.dialogs) - 1)
self.selected_chat = (self.selected_chat + 1) % (len(self.dialogs))
self.center_selected_chat()
def select_prev_chat(self):
# if wrapping not allowed:
# self.selected_chat = max(self.selected_chat - 1, 0)
self.selected_chat = (self.selected_chat - 1) % (len(self.dialogs))
self.center_selected_chat()
def center_selected_chat(self):
if self.selected_chat < self.drawtool.chats_num // 2:
self.selected_chat_offset = 0
elif self.selected_chat > len(self.dialogs) - self.drawtool.chats_num // 2:
self.selected_chat_offset = len(self.dialogs) - self.drawtool.chats_num
else:
self.selected_chat_offset = self.selected_chat - self.drawtool.chats_num // 2
def select_chat(self, index):
if index < -1 or index >= len(self.dialogs):
return
if index == -1:
index = len(self.dialogs) - 1
while index < self.selected_chat:
self.select_prev_chat()
else:
while index > self.selected_chat:
self.select_next_chat()
def is_subsequence(self, xs, ys):
xs = list(xs)
for y in ys:
if xs and xs[0] == y:
xs.pop(0)
return not xs
def search_chats(self, query = None):
if query is None:
query = self.search_box
if query is None:
return # we dont search for ""
filter_function = self.is_subsequence
filter_function = lambda x, y: x in y
self.search_result = [ idx for (idx, dialog) in enumerate(self.dialogs)
if filter_function(query.lower(), get_display_name(dialog["dialog"].entity).lower())]
self.search_index = -1
def search_next(self):
if not self.search_result:
return
if self.search_index == -1:
import bisect
self.search_index = bisect.bisect_left(self.search_result, self.selected_chat)
self.select_chat(self.search_result[self.search_index % len(self.search_result)])
self.center_selected_chat()
return
self.search_index = (self.search_index + 1) % len(self.search_result)
index = self.search_result[self.search_index]
self.select_chat(index)
self.center_selected_chat()
def search_prev(self):
if not self.search_result:
return
if self.search_index == -1:
import bisect
self.search_index = bisect.bisect_right(self.search_result, self.selected_chat)
self.select_chat(self.search_result[self.search_index])
self.center_selected_chat()
return
self.search_index = (self.search_index - 1) % len(self.search_result)
self.select_chat(self.search_result[self.search_index % len(self.search_result)])
self.center_selected_chat()
async def call_command(self):
command = self.vimline_box
if command == "q":
await self.quit()
elif command == "pfd":
m = ""
for i in range(len(self.inputs)):
m += self.inputs[i].lower() if i%2==0 else self.inputs[i].lower().swapcase()
self.inputs = m
async def send_message(self):
if not self.inputs:
return
s = self.inputs
s = emojis.encode(s)
outgoing_message = await self.dialogs[self.selected_chat]["dialog"].send_message(s)
await self.on_message(outgoing_message)
await self.mark_read()
self.center_selected_chat()
self.inputs = ""
async def mark_read(self):
chat = self.dialogs[self.selected_chat]
dialog = chat["dialog"]
lastmessage = chat["messages"][0]
await self.client.send_read_acknowledge(dialog, lastmessage)
self.dialogs[self.selected_chat]["unread_count"] = 0
async def show_media(self, num = None):
if not num:
return
message = self.dialogs[self.selected_chat]["messages"][num]
if message.media:
os.makedirs("/tmp/tttc/", exist_ok=True)
# TODO test if file exists, ask for confirmation to replace or download again
path = await self.client.download_media(message.media, "/tmp/tttc/")
if hasattr(message.media, "photo") and False:
sizes = message.media.photo.sizes
w, h = sizes[0].w, sizes[0].h
# w, h
basesize = 300
w3m_command=f"0;1;0;0;{basesize};{int(basesize*h/w)};;;;;{path}\n4;\n3;"
W3MIMGDISPLAY="/usr/lib/w3m/w3mimgdisplay"
echo_sp = subprocess.Popen(["echo", "-e", f"{w3m_command}"], stdout = subprocess.PIPE)
w3m_sp = subprocess.Popen([f"{W3MIMGDISPLAY}"], stdin = echo_sp.stdout)
else:
subprocess.Popen(["xdg-open", f"{path}"], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
async def handle_key(self, key):
if not self.ready:
return
if key == "RESIZE":
await self.drawtool.resize()
return
if self.mode == "search":
if key == "ESCAPE" or key == "RETURN":
self.mode = "normal"
elif key == "BACKSPACE":
if self.search_box == "":
self.mode = "normal"
else:
self.search_box = self.search_box[0:-1]
self.search_chats()
self.search_next()
else:
self.search_box += key
self.search_chats()
self.search_next()
elif self.mode == "vimmode":
if key == "ESCAPE":
self.mode = "normal"
elif key == "RETURN":
await self.call_command()
self.vimline_box = ""
self.mode = "normal"
elif key == "BACKSPACE":
if self.vimline_box == "":
self.mode = "normal"
else:
self.vimline_box = self.vimline_box[0:-1]
else:
self.vimline_box += key
elif self.mode == "normal":
num = None
try:
num = int(key)
except:
pass
if num is not None:
self.command_box += str(num)
await self.drawtool.redraw()
return
elif key == ":":
self.mode = "vimmode"
self.vimline_box = ""
elif key == "RETURN" or key == "y":
await self.send_message()
elif key == "q":
await self.quit()
#elif key == "D":
# for i in range(10):
# self.select_prev_chat()
#elif key == "d":
# for i in range(10):
# self.select_next_chat()
elif key == "C":
self.select_prev_chat()
elif key == "c":
self.select_next_chat()
elif key == "e":
self.text_emojis ^= True
elif key == "R":
await self.mark_read()
elif key == "d":
if self.command_box:
try:
n = int(self.command_box)
except:
return
if n >= len(self.dialogs[self.selected_chat]["messages"]):
#TODO: alert user
return
to_delete = self.dialogs[self.selected_chat]["messages"][n]
await to_delete.delete()
self.dialogs[self.selected_chat]["messages"].pop(n)
self.command_box = ""
await self.drawtool.redraw()
elif key == "r":
if self.command_box:
try:
n = int(self.command_box)
except:
return
reply_to = self.dialogs[self.selected_chat]["messages"][n]
s = emojis.encode(self.inputs)
reply = await reply_to.reply(s)
await self.on_message(reply)
self.command_box = ""
self.inputs = ""
elif key == "m":
if self.command_box:
try:
n = int(self.command_box)
except:
return
self.command_box = ""
await self.show_media(n)
elif key == "M":
self.center_selected_chat()
elif key == "HOME" or key == "g":
self.select_chat(0)
elif key == "END" or key == "G":
self.select_chat(-1)
elif key == "i":
self.mode = "insert"
elif key == "n":
self.search_next()
elif key == "N":
self.search_prev()
elif key == "/":
self.mode = "search"
self.search_box = ""
elif key == " ":
self.drawtool.show_indices ^= True
elif self.mode == "insert":
if key == "ESCAPE":
self.mode = "normal"
elif key == "LEFT":
self.insert_move_left()
elif key == "RIGHT":
self.insert_move_right()
elif key == "BACKSPACE":
self.inputs = self.inputs[0:-1]
elif key == "RETURN":
self.inputs += "\n"
else:
self.inputs += key
self.command_box = ""
await self.drawtool.redraw()
def insert_move_left(self):
self.inputs_cursor = max(0, self.cursor - 1)
def insert_move_right(self):
self.inputs_cursor = min(len(self.inputs), self.cursor + 1)
async def handle_key_old(self, key):
if key == "RETURN":
with await self.inputevent:
self.inputevent.notify()
elif key == "":
chat = self.dialogs[self.selected_chat]["dialog"]
last_message = self.dialogs[self.selected_chat]["messages"][0]
await self.client.send_read_acknowledge(chat, max_id=last_message.id)
self.dialogs[self.selected_chat]["unread_count"] = 0
else:
self.inputs += key
self.drawtool.redraw()

45
resources.py Normal file
View File

@ -0,0 +1,45 @@
key_mapping = {
"\n": "RETURN",
"\x1b":"ESCAPE",
"\t":"TAB",
343: "NUM_RETURN",
263: "BACKSPACE",
"": "BACKSPACE",
330: "DEL",
331: "INSERT",
262: "HOME",
360: "END",
338: "PGDOWN",
339: "PGUP",
410: "RESIZE",
258: "DOWN",
259: "UP",
260: "LEFT",
261: "RIGHT"
}
tttc_logo= \
""" ____.
____.----' ,##/ _ _ _
____.----' ,##" / | |_ | |_ | |_ ___
_----' ,###" / | __| | __| | __| / __|
-_ .#####" / | |_ | |_ | |_ | (__
'-._ .#######" / \__| \__| \__| \___|
'-..#######" /
\#####" /
\##/, /
\/ '-, /
'-, /
' """.split("\n")
auth_text = {
0: ["", "Please enter your full phone number:", " %phone%^"],
1: ["An error occured trying to sign you in.", "Please enter your full phone number:", " %phone%^"],
2: ["", "Sending authentification code..."],
3: ["", "You have been sent a message with", "an activation code. Please provide said code:", " %code%^"],
4: ["Incorrect code.", "You have been sent a message with", "an activation code. Please provide said code:", " %code%^"],
5: ["", "Signing in..."],
6: ["", "Two factor authentification is enabled.", "Please provide your password: %pass%^"],
7: ["Incorrect password.", "Two factor authentification is enabled.", "Please provide your password: %pass%^"],
8: ["", "Signing in..."]
}

76
tttc.py Executable file
View File

@ -0,0 +1,76 @@
#!/bin/python
from authview import AuthView
from functools import partial
from mainview import MainView
from queue import Queue
from telethon import TelegramClient
from telethon import events
from time import sleep
import argparse
import asyncio
import commandline
import concurrent
import curses
import resources
import sys
import os
from tttcutils import debug, show_stacktrace
import tttcutils
class Display:
def __init__(self, loop):
self.loop = loop
api_id, api_hash = tttcutils.assert_environment()
self.client = TelegramClient("tttc", api_id, api_hash, loop=self.loop)
def __enter__(self):
self.stdscr = curses.initscr()
curses.start_color()
curses.use_default_colors()
for i in range(curses.COLORS):
curses.init_pair(i, i, -1);
curses.noecho()
curses.cbreak()
self.stdscr.keypad(1)
self.stdscr.refresh()
return self
def __exit__(self, *args):
curses.nocbreak()
self.stdscr.keypad(0)
curses.echo()
curses.endwin()
async def main(self):
tasks = [
self.run(),
self.get_ch()
]
await asyncio.wait(tasks)
async def run(self):
self.view = AuthView(self.client, self.stdscr)
await self.view.run()
self.view = MainView(self.client, self.stdscr)
await self.view.run()
async def get_ch(self):
while True:
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
a = await self.loop.run_in_executor(pool, self.stdscr.get_wch)
out = resources.key_mapping.get(a, str(a))
if self.view:
await self.view.handle_key(out)
if self.view.fin:
return
if commandline.handle():
exit()
elif __name__ == '__main__':
loop = asyncio.get_event_loop()
with Display(loop) as display:
loop.run_until_complete(display.main())

19
tttcutils.py Normal file
View File

@ -0,0 +1,19 @@
import os
import shlex
import traceback
def debug(x):
with open("/tmp/tttc.log", "a") as f:
f.write(str(x) + "\n")
os.system(f"notify-send {shlex.quote(str(x))}")
def show_stacktrace():
a = traceback.format_exc()
os.system(f"notify-send {shlex.quote(a)}")
def assert_environment():
if not ("TTTC_API_ID" in os.environ or "TTTC_API_HASH" in os.environ):
print("Please set your environment variables \"TTTC_API_ID\" and \"TTTC_API_HASH\" accordingly.")
print("Please consult https://core.telegram.org/api/obtaining_api_id on how to get your own API id and hash.")
exit(1)
return os.environ["TTTC_API_ID"], os.environ["TTTC_API_HASH"]

60
vimbindings.md Normal file
View File

@ -0,0 +1,60 @@
A a append/append
B b back/BACK
C c change
D d delete
E e end/END
F f find/Find
G g go/GO
H h LEFT
I i INSERT
J j DOWN
K k UP
L l RIGHT
M m
N n
O o insert line
P p put
Q q quit
R r nReply, vReply/replace
S s
T t till
U u
V v visual mode
v - text visual
V - message visual
vV - text line visual
W w word
X x del
Y y yank
Z z
0 $ goto 0 $
? help
RETURN send??
^A
^B
^C kill
^D
^E
^F
^G
^H
^I
^J
^K
^L
^M
^N
^O
^P
^Q
^R
^S
^T
^U
^V
^W
^X
^Y
^Z