Spaces:
Sleeping
Sleeping
import logging | |
from fastapi import FastAPI, HTTPException, Request | |
from fastapi.responses import JSONResponse | |
from pydantic import BaseModel | |
from transformers import pipeline | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
app = FastAPI() | |
# Initialize the text generation pipeline | |
try: | |
pipe = pipeline("text-generation", model="defog/sqlcoder-7b-2", pad_token_id=2) | |
logger.info("Model loaded successfully") | |
except Exception as e: | |
logger.error(f"Failed to load the model: {str(e)}") | |
raise | |
class QueryRequest(BaseModel): | |
text: str | |
def home(): | |
return {"message": "SQL Generation Server is running"} | |
async def generate(request: QueryRequest): | |
try: | |
text = request.text | |
logger.info(f"Received request: {text}") | |
prompt = f"Generate a valid SQL query for the following request. Only return the SQL query, nothing else:\n\n{text}\n\nSQL query:" | |
output = pipe(prompt, max_new_tokens=100) | |
generated_text = output[0]['generated_text'] | |
sql_query = generated_text.split("SQL query:")[-1].strip() | |
# Basic validation to ensure it's a valid SQL query | |
if not sql_query.lower().startswith(('select', 'show', 'describe', 'insert', 'update', 'delete')): | |
raise ValueError("Generated text is not a valid SQL query") | |
# Further validation to ensure no additional text | |
sql_query = sql_query.split(';')[0].strip() | |
# Comprehensive list of SQL keywords | |
allowed_keywords = { | |
'select', 'insert', 'update', 'delete', 'show', 'describe', 'from', 'where', 'and', 'or', 'like', 'limit', 'order by', 'group by', 'join', 'inner join', 'left join', 'right join', 'full join', 'on', 'using', 'union', 'union all', 'distinct', 'having', 'into', 'values', 'set', 'create', 'alter', 'drop', 'table', 'database', 'index', 'view', 'trigger', 'procedure', 'function', 'if', 'exists', 'primary key', 'foreign key', 'references', 'check', 'constraint', 'default', 'auto_increment', 'null', 'not null', 'in', 'is', 'is not', 'between', 'case', 'when', 'then', 'else', 'end', 'asc', 'desc', 'count', 'sum', 'avg', 'min', 'max', 'timestamp', 'date', 'time', 'varchar', 'char', 'int', 'integer', 'smallint', 'bigint', 'decimal', 'numeric', 'float', 'real', 'double', 'boolean', 'enum', 'text', 'blob', 'clob' | |
} | |
# Ensure the query only contains allowed keywords | |
tokens = sql_query.lower().split() | |
for token in tokens: | |
if not any(token.startswith(keyword) for keyword in allowed_keywords): | |
raise ValueError(f"Generated text contains invalid SQL syntax: {token}") | |
logger.info(f"Generated SQL query: {sql_query}") | |
return {"output": sql_query} | |
except ValueError as ve: | |
logger.warning(f"Validation error: {str(ve)}") | |
raise HTTPException(status_code=400, detail=str(ve)) | |
except Exception as e: | |
logger.error(f"Error in generate endpoint: {str(e)}", exc_info=True) | |
raise HTTPException(status_code=500, detail="An error occurred while generating the SQL query") | |
async def http_exception_handler(request: Request, exc: HTTPException): | |
return JSONResponse( | |
status_code=exc.status_code, | |
content={"message": exc.detail}, | |
) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) |