Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, Response, Request, Header, Form, UploadFile, Body, BackgroundTasks | |
from pydantic import BaseModel | |
from fastapi.responses import FileResponse | |
from starlette.middleware.cors import CORSMiddleware | |
from starlette.responses import RedirectResponse | |
import subprocess | |
from cardtagger import cardtagger | |
import io | |
from PIL import Image | |
import os, time | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # Allows all origins | |
allow_credentials=True, | |
allow_methods=["*"], # Allows all methods | |
allow_headers=["*"], # Allows all headers | |
) | |
# API_TOKEN = os.environ["API_TOKEN"] | |
API_TOKEN = '34dsadfF$$%#$TGREGEFGE%Q*)(*&%' | |
card_is_running = True | |
# --------------------------------------------------------------------------- | |
# - Token auth method - | |
# --------------------------------------------------------------------------- | |
def check_auth(authorization_header): | |
if authorization_header == API_TOKEN : | |
return True | |
else: | |
return False | |
# --------------------------------------------------------------------------- | |
# - Delete cache files after 30 days - | |
# --------------------------------------------------------------------------- | |
def delete_old_files(): | |
path = "./tmp" | |
now = time.time() | |
for filename in os.listdir(path): | |
filestamp = os.stat(os.path.join(path, filename)).st_mtime | |
filecompare = now - 14 * 86400 | |
if filestamp < filecompare: | |
print(filename) | |
os.remove(os.path.join(path, filename)) | |
# @app.on_event("startup") | |
# def startup_event(background_tasks: BackgroundTasks): | |
# ''' | |
# Create out dir | |
# ''' | |
# if not os.path.exists("/tmp"): | |
# os.makedirs("/tmp") | |
# | |
# background_tasks.add_task(check_gpu_usage) | |
async def root(): | |
response = RedirectResponse(url='/docs') | |
return response | |
async def resolution(request: Request, background_tasks: BackgroundTasks, response: Response, data: UploadFile = Form(...)): | |
authorization_token = request.headers.get("Authorization", None) | |
# check_token = check_auth(authorization_token) | |
check_token = True | |
if check_token: | |
card_is_running = True | |
# background_tasks.add_task(check_gpu_usage) | |
image = io.BytesIO(await data.read()) | |
image.seek(0) | |
# background_tasks.add_task(check_gpu_usage) | |
result = cardtagger(image.read()) | |
card_is_running = False | |
background_tasks.add_task(delete_old_files) | |
return result | |
else : | |
response.status_code = 401 | |
return response | |
def check_gpu_usage(): | |
try: | |
while card_is_running: | |
output = subprocess.check_output("nvidia-smi", shell=True).decode("utf-8") | |
# lines = output.split('\n') | |
# | |
# for line in lines: | |
# if '%' in line: | |
# print(line.strip().split(' ')[-2]) | |
print(output) | |
return | |
except subprocess.CalledProcessError: | |
print("Nvidia-smi command not found, please check if the NVIDIA drivers are installed.") | |
return "Nvidia-smi command not found, please check if the NVIDIA drivers are installed." | |
#uvicorn server:app --host 0.0.0.0 --port 5050 --workers 4 |