Spaces:
Sleeping
Sleeping
File size: 3,284 Bytes
60a68a6 398a38a efbc12f 60a68a6 4603dd2 60a68a6 afd885c 4603dd2 60a68a6 398a38a 0dbd16b 398a38a 60a68a6 afd885c 60a68a6 398a38a 60a68a6 a3a7599 971ea86 60a68a6 4603dd2 971ea86 60a68a6 f9adf74 60a68a6 0ab5cef efbc12f f9adf74 0ab5cef 6a53fd5 0ab5cef efbc12f 01c1ecc efbc12f 60a68a6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
from fastapi import FastAPI, Response, Request, Header, Form, UploadFile, Body, BackgroundTasks
from pydantic import BaseModel
from fastapi.responses import FileResponse
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import RedirectResponse
import subprocess
from cardtagger import cardtagger
import io
from PIL import Image
import os, time
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
# API_TOKEN = os.environ["API_TOKEN"]
API_TOKEN = '34dsadfF$$%#$TGREGEFGE%Q*)(*&%'
card_is_running = True
# ---------------------------------------------------------------------------
# - Token auth method -
# ---------------------------------------------------------------------------
def check_auth(authorization_header):
if authorization_header == API_TOKEN :
return True
else:
return False
# ---------------------------------------------------------------------------
# - Delete cache files after 30 days -
# ---------------------------------------------------------------------------
def delete_old_files():
path = "./tmp"
now = time.time()
for filename in os.listdir(path):
filestamp = os.stat(os.path.join(path, filename)).st_mtime
filecompare = now - 14 * 86400
if filestamp < filecompare:
print(filename)
os.remove(os.path.join(path, filename))
# @app.on_event("startup")
# def startup_event(background_tasks: BackgroundTasks):
# '''
# Create out dir
# '''
# if not os.path.exists("/tmp"):
# os.makedirs("/tmp")
#
# background_tasks.add_task(check_gpu_usage)
@app.get("/")
async def root():
response = RedirectResponse(url='/docs')
return response
@app.post("/cardtagger")
async def resolution(request: Request, background_tasks: BackgroundTasks, response: Response, data: UploadFile = Form(...)):
authorization_token = request.headers.get("Authorization", None)
# check_token = check_auth(authorization_token)
check_token = True
if check_token:
card_is_running = True
# background_tasks.add_task(check_gpu_usage)
image = io.BytesIO(await data.read())
image.seek(0)
# background_tasks.add_task(check_gpu_usage)
result = cardtagger(image.read())
card_is_running = False
background_tasks.add_task(delete_old_files)
return result
else :
response.status_code = 401
return response
def check_gpu_usage():
try:
while card_is_running:
output = subprocess.check_output("nvidia-smi", shell=True).decode("utf-8")
# lines = output.split('\n')
#
# for line in lines:
# if '%' in line:
# print(line.strip().split(' ')[-2])
print(output)
return
except subprocess.CalledProcessError:
print("Nvidia-smi command not found, please check if the NVIDIA drivers are installed.")
return "Nvidia-smi command not found, please check if the NVIDIA drivers are installed."
#uvicorn server:app --host 0.0.0.0 --port 5050 --workers 4 |