Spaces:
Runtime error
Runtime error
import threading | |
import random | |
from typing import Tuple | |
import re | |
import copy | |
import os | |
import json | |
import openai | |
import time | |
openai.api_key = os.environ["OPENAI_KEY"] | |
SIZE = 15 | |
RIDDLE_COUNT = 3 | |
MIN_WORD_SIZE = 4 | |
MAX_WORD_SIZE = 10 | |
WORD_LIMIT = 7500 | |
COMPLETE_GAME_TIMEOUT = 60 * 60 | |
INCOMPLETE_GAME_TIMEOUT = 60 * 60 * 24 | |
GUESS_TIMEOUT = 30 | |
with open("words.json", "r") as f: | |
words = json.loads(f.read()) | |
with open("prompt.txt", "r") as f: | |
RIDDLE_PROMPT = f.read() | |
search_text = "\n".join(words) | |
games = {} | |
all_games_lock = threading.Lock() | |
def get_riddle(answer): | |
prompt = RIDDLE_PROMPT.format( | |
answer, | |
) | |
while True: | |
try: | |
completions = openai.Completion.create( | |
engine="text-davinci-003", prompt=prompt, max_tokens=200, n=1, temperature=0.5 | |
) | |
riddle = completions["choices"][0]["text"] | |
return riddle | |
except Exception as e: | |
print("OpenAI Error", e, "Retrying...") | |
time.sleep(0.5) | |
class Clue: | |
def __init__(self, answer, location, across, solved): | |
self.answer: str = answer | |
self.location: Tuple[int, int] = location | |
self.across: bool = across | |
self.solved: bool = solved | |
self.riddle: str = "" | |
self.create_time: int | |
self.solver: str = None | |
self.timed_out: bool = False | |
def __repr__(self): | |
return f"{self.answer}: {self.location}, {'Across' if self.across else 'Down'}, {'Solved' if self.solved else 'Unsolved'}" | |
class Game: | |
def __init__(self, room_name, competitive, init_word): | |
self.room_name = room_name | |
self.competitive = competitive | |
self.player_scores = {} | |
self.grid = [[None for i in range(SIZE)] for j in range(SIZE)] | |
self.grid = place_on_grid( | |
self.grid, init_word, (SIZE // 2, SIZE // 2 - len(init_word) // 2), True | |
) | |
self.clues = [None] * RIDDLE_COUNT | |
self.previous_clues = [] | |
self.lock = threading.Lock() | |
self.complete = False | |
self.last_update_index = 0 | |
self.last_update_time = time.time() | |
self.last_riddle_update_time = None | |
self.pending_request = False | |
games[room_name] = self | |
def update(self): | |
self.last_update_index += 1 | |
self.last_update_time = time.time() | |
def replace_clue(self, index): | |
clue_grid = copy.deepcopy(self.grid) | |
for j, clue in enumerate(self.clues): | |
if clue and index != j: | |
clue_grid = place_on_grid(clue_grid, clue.answer, clue.location, clue.across) | |
if self.clues[index]: | |
self.previous_clues.append(self.clues[index]) | |
clue = find_clue(clue_grid) | |
if clue is None: | |
self.complete = True | |
return | |
clue.create_time = time.time() | |
self.pending_request = True | |
clue.riddle = get_riddle(clue.answer) | |
self.pending_request = False | |
self.last_riddle_update_time = time.time() | |
self.clues[index] = clue | |
def add_player(self, player_name): | |
with self.lock: | |
self.update() | |
if player_name not in self.player_scores: | |
self.player_scores[player_name] = 0 | |
def player_guess(self, player_name, guess): | |
guess = guess.lower() | |
if self.pending_request: | |
return | |
with self.lock: | |
self.update() | |
matched_clues = [ | |
clue for clue in self.clues if not clue.solved and clue.answer == guess | |
] | |
if len(matched_clues) == 0: | |
return False | |
for clue in matched_clues: | |
clue.solved = True | |
clue.solver = player_name | |
place_on_grid(self.grid, clue.answer, clue.location, clue.across) | |
self.player_scores[player_name] += 1 | |
def place_on_grid(grid, word, location, across): | |
x, y = location | |
if across: | |
grid[x][y : y + len(word)] = word | |
else: | |
for i, letter in enumerate(word): | |
grid[x + i][y] = letter | |
return grid | |
def find_clue(grid) -> Clue: | |
all_coordinate_pairs = [ | |
(i, j) for i in range(SIZE) for j in range(SIZE) if grid[i][j] is not None | |
] | |
random.shuffle(all_coordinate_pairs) | |
for i, j in all_coordinate_pairs: | |
regexes = [] | |
if (j == 0 or grid[i][j - 1] is None) and ( | |
j + 1 == SIZE or grid[i][j + 1] is None | |
): | |
running_regex = "" | |
possible_across_regexes = [] | |
for k in range(j, -1, -1): | |
if ( | |
(i != 0 and grid[i - 1][k] is not None) | |
or (i != SIZE - 1 and grid[i + 1][k] is not None) | |
) and grid[i][k] is None: | |
break | |
valid = k == 0 or grid[i][k - 1] is None | |
running_regex = (grid[i][k] or ".") + running_regex | |
possible_across_regexes.append(((i, k), running_regex, valid)) | |
possible_across_regexes = [p for p in possible_across_regexes if p[2]] | |
for k in range(j + 1, SIZE): | |
if ( | |
(i != 0 and grid[i - 1][k] is not None) | |
or (i != SIZE - 1 and grid[i + 1][k] is not None) | |
) and grid[i][k] is None: | |
break | |
valid = k == SIZE - 1 or grid[i][k + 1] is None | |
for start, possible_across_regex, _ in possible_across_regexes[:]: | |
if start[1] + len(possible_across_regex) == k: | |
possible_across_regexes.append( | |
(start, possible_across_regex + (grid[i][k] or "."), valid) | |
) | |
possible_across_regexes = [ | |
(loc, regex, True) | |
for loc, regex, valid in possible_across_regexes | |
if len(regex) >= MIN_WORD_SIZE and valid | |
] | |
regexes.extend(possible_across_regexes) | |
elif (i == 0 or grid[i - 1][j] is None) and ( | |
i + 1 == SIZE or grid[i + 1][j] is None | |
): | |
running_regex = "" | |
possible_down_regexes = [] | |
for k in range(i, -1, -1): | |
if ( | |
(j != 0 and grid[k][j - 1] is not None) | |
or (j != SIZE - 1 and grid[k][j + 1] is not None) | |
) and grid[k][j] is None: | |
break | |
valid = k == 0 or grid[k - 1][j] is None | |
running_regex = (grid[k][j] or ".") + running_regex | |
possible_down_regexes.append(((k, j), running_regex, valid)) | |
possible_down_regexes = [p for p in possible_down_regexes if p[2]] | |
for k in range(i + 1, SIZE): | |
if ( | |
(j != 0 and grid[k][j - 1] is not None) | |
or (j != SIZE - 1 and grid[k][j + 1] is not None) | |
) and grid[k][j] is None: | |
break | |
valid = k == SIZE - 1 or grid[k + 1][j] is None | |
for start, possible_down_regex, _ in possible_down_regexes[:]: | |
if start[0] + len(possible_down_regex) == k: | |
possible_down_regexes.append( | |
(start, possible_down_regex + (grid[k][j] or "."), valid) | |
) | |
possible_down_regexes = [ | |
(loc, regex, False) | |
for loc, regex, valid in possible_down_regexes | |
if len(regex) >= MIN_WORD_SIZE and valid | |
] | |
regexes.extend(possible_down_regexes) | |
random.shuffle(regexes) | |
for loc, regex, across in regexes: | |
matches = re.findall("^" + regex + "$", search_text, re.MULTILINE) | |
if len(matches) > 1: | |
random.shuffle(matches) | |
answer = matches[0] | |
clue = Clue(answer, loc, across, False) | |
return clue | |
return None | |
def new_game(room_name): | |
competitive = room_name != "" | |
with all_games_lock: | |
if room_name in games: | |
return games[room_name] | |
if not competitive: | |
while room_name == "" or room_name in games: | |
room_name = str(random.randint(0, 999999)) | |
init_word = random.choice(words) | |
else: | |
init_word = room_name | |
return Game(room_name, competitive, init_word) | |
def game_thread(): | |
while True: | |
now = time.time() | |
for room_name, game in games.items(): | |
idle_time = now - game.last_update_time | |
if (game.complete and idle_time > COMPLETE_GAME_TIMEOUT) or ( | |
idle_time > INCOMPLETE_GAME_TIMEOUT | |
): | |
del games[room_name] | |
continue | |
for i, clue in enumerate(game.clues): | |
timed_out = now - clue.create_time > GUESS_TIMEOUT if clue is not None else None | |
if timed_out: | |
game.clues[i].timed_out = True | |
if clue is None or clue.solved or timed_out: | |
game.replace_clue(i) | |
time.sleep(0.1) | |
thread = threading.Thread(target=game_thread) | |
thread.daemon = True | |
thread.start() |