forschungstage-2023/hp/hp.py
2023-06-16 12:32:22 +02:00

136 lines
3.6 KiB
Python
Executable File

#!/bin/python3
import sys
if len(sys.argv) < 2:
print(sys.argv)
exit("Please provide a filename as argument")
filename = sys.argv[-1]
def colorize(word):
return f"\u001b[0;33m{word}\u001b[0m"
def softmax(distr):
temperature = 1.5
def f(x):
return np.exp(x / temperature)
for i in range(len(distr)):
distr[i] = f(distr[i])
Σ = sum(distr)
distr /= Σ
return distr
import re
chapter_pattern = re.compile("^CHAPTER")
page_pattern = re.compile("^\d+$")
with open(filename) as f:
lines = f.readlines()
print("Stripping excess data")
truelines = []
i = 0
i += 1
while i < len(lines):
line = lines[i].strip()
#print(f"read {i} {line}")
i += 1
if match := chapter_pattern.match(line):
#print(f"toss {line}")
#print(f"toss {lines[i]}")
i += 1 # ditch one extra line
continue
if match := page_pattern.match(line):
#print(f"toss {line}")
continue
truelines.append(line)
print("Parsing lines")
corpus = " ".join(truelines)
#with open("hp_parsed.txt", "w") as f:
# f.writelines(corpus)
tokens = corpus.split(" ")
words = set(tokens)
id_to_word = dict(enumerate(words))
word_to_id = dict([ [word, idx] for [idx, word] in enumerate(words)])
N = len(words)
from numpy import matrix as M, array
#m = M( [ [ 0 for _ in range(len(words)) ] for _ in range(len(words)) ] )
print("allocating array")
# ich zähle alle folgeworte i→j
m = array([0])
m.resize(N, N)
# die wahrscheinlichkeiten
M = array([0.0])
M.resize(N, N)
print("processing bigrams")
bigrams = zip(tokens, tokens[1:])
for a, b in bigrams:
ida, idb = word_to_id[a], word_to_id[b]
m[ida, idb] += 1
print("normalizing matrix")
for i in range(N):
row = m[i]
Σ = sum(row)
if Σ == 0: continue
M[i] = m[i] * (1.0/Σ)
print("Done preparing")
# randomwalker
from time import sleep
from numpy.random import random as uniform
import numpy as np
#word = id_to_word[0]
infty = 1_000_000
def simulate(softmax_enabled = True):
word_id = 0
try:
while True:
distr = [ 1 if i == word_id else 0 for i in range(N) ]
next_distr = array(distr) @ M
if softmax_enabled:
next_distr = softmax([ v if v != 0 else -infty for v in next_distr ])
U = uniform()
prefixsum = np.cumsum(next_distr)
word_id = next(i for i, v in enumerate(prefixsum) if v >= U)
word = id_to_word[word_id]
print(word if next_distr[word_id] == 1 else colorize(word), end=" ", flush=True)
except KeyboardInterrupt:
print()
print("I'm dying!")
def interactive(softmax_enabled = True):
word_id = 0
text = []
try:
while True:
distr = [ 1 if i == word_id else 0 for i in range(N) ]
next_distr = array(distr) @ M
if softmax_enabled:
next_distr = softmax([ v if v != 0 else -infty for v in next_distr ])
prefixsum = np.cumsum(next_distr)
def pick():
U = uniform()
return next(i for i, v in enumerate(prefixsum) if v >= U)
choices = list(set([pick(), pick(), pick(), pick(), pick()]))
print("Pick from")
for i, v in enumerate(choices):
print(f" {i}: {id_to_word[v].ljust(10)} ({next_distr[v]*100:.2f}%)")
try:
word_id = choices[int(input())]
except (ValueError, IndexError):
word_id = choices[0]
word = id_to_word[word_id]
text.append(word)
print()
print(" ".join(text))
except KeyboardInterrupt:
print()
print("I'm dying!")