from datetime import datetime, timedelta from jose import JWTError, jwt from typing import Any, Union from tinydb import TinyDB, Query from tinydb.storages import MemoryStorage # Secret key to encode JWT tokens. In production, use a more secure key and keep it secret! SECRET_KEY = "a_very_secret_key" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 # The expiration time for the access token db = TinyDB(storage=MemoryStorage) tokens_table = db.table('tokens') def insert_token(user_id: str, token: str, expires_in: timedelta): expiration = datetime.utcnow() + expires_in tokens_table.insert({'user_id': user_id, 'token': token, 'expires_at': expiration.isoformat()}) def validate_token(user_id: str, token: str) -> bool: User = Query() result = tokens_table.search((User.user_id == user_id) & (User.token == token)) if not result: return False # Check token expiration expires_at = datetime.fromisoformat(result[0]['expires_at']) if datetime.utcnow() > expires_at: return False return True def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str: """ Creates a JWT access token. :param data: A dictionary of claims (e.g., {"sub": user_id}) to include in the token. :param expires_delta: A timedelta object representing how long the token is valid. :return: A JWT token as a string. """ to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt # Additional functions can be added here for verifying tokens, decoding tokens, etc.