import gradio as gr from groq import Groq import os from PIL import Image, ImageDraw, ImageFont from datetime import datetime import json import tempfile from typing import List, Dict, Tuple, Optional from dataclasses import dataclass import subprocess from llama_index import VectorStoreIndex, Document from llama_index.embeddings import OpenAIEmbedding from docx import Document import fitz # PyMuPDF import shutil import tempfile @dataclass class Question: question: str options: List[str] correct_answer: int @dataclass class QuizFeedback: is_correct: bool selected: Optional[str] correct_answer: str class DocumentProcessor: def __init__(self, storage_dir="./storage"): self.storage_dir = storage_dir self.temp_dir = tempfile.mkdtemp() self.index = None os.makedirs(storage_dir, exist_ok=True) def process_file(self, file_path: str) -> str: """Process uploaded file and save to temp directory""" file_ext = os.path.splitext(file_path)[1].lower() if file_ext == '.pdf': return self._process_pdf(file_path) elif file_ext in ['.doc', '.docx']: return self._process_word(file_path) else: raise ValueError(f"Unsupported file type: {file_ext}") def _process_pdf(self, file_path: str) -> str: """Extract text from PDF file""" text = "" try: doc = fitz.open(file_path) for page in doc: text += page.get_text() doc.close() # Save processed text output_path = os.path.join(self.temp_dir, "processed.txt") with open(output_path, "w", encoding="utf-8") as f: f.write(text) return output_path except Exception as e: raise Exception(f"Error processing PDF: {str(e)}") def _process_word(self, file_path: str) -> str: """Extract text from Word file""" try: doc = Document(file_path) text = "\n".join([paragraph.text for paragraph in doc.paragraphs]) # Save processed text output_path = os.path.join(self.temp_dir, "processed.txt") with open(output_path, "w", encoding="utf-8") as f: f.write(text) return output_path except Exception as e: raise Exception(f"Error processing Word file: {str(e)}") def create_index(self) -> bool: """Create vector store index from processed documents""" try: # Load documents from temp directory documents = SimpleDirectoryReader(self.temp_dir).load_data() # Create and save index self.index = VectorStoreIndex.from_documents( documents, storage_context=StorageContext.from_defaults( vector_store=SimpleVectorStore(), persist_dir=self.storage_dir ) ) return True except Exception as e: print(f"Error creating index: {str(e)}") return False def clear_index(self): """Clear the vector store index and temporary files""" try: # Clear storage directory if os.path.exists(self.storage_dir): shutil.rmtree(self.storage_dir) os.makedirs(self.storage_dir) # Clear temp directory if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) self.temp_dir = tempfile.mkdtemp() self.index = None return "Index cleared successfully" except Exception as e: return f"Error clearing index: {str(e)}" def cleanup(self): """Clean up temporary directory""" if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) class QuizGenerator: def __init__(self, api_key: str): self.client = Groq(api_key=api_key) def generate_questions(self, text: str, num_questions: int) -> List[Question]: prompt = self._create_prompt(text, num_questions) try: response = self.client.chat.completions.create( messages=[ { "role": "system", "content": """You are a quiz generator. Create clear questions with concise answer options. Always return valid JSON array of questions. Format answers as short, clear phrases.""" }, { "role": "user", "content": prompt } ], model="llama-3.2-3b-preview", temperature=0, max_tokens=2048 ) questions = self._parse_response(response.choices[0].message.content) return self._validate_questions(questions, num_questions) except Exception as e: print(f"Error in generate_questions: {str(e)}") print(f"Response content: {response.choices[0].message.content if response else 'No response'}") raise QuizGenerationError(f"Failed to generate questions: {str(e)}") def _create_prompt(self, text: str, num_questions: int) -> str: return f"""Generate exactly {num_questions} multiple choice questions based on the following text. Follow these rules strictly: 1. Each question must be clear and focused 2. Provide exactly 4 options for each question 3. Mark the correct answer with index (0-3) 4. Keep all options concise (under 10 words) 5. Return ONLY a JSON array with this exact format: [ {{ "question": "Clear question text here?", "options": [ "Brief option 1", "Brief option 2", "Brief option 3", "Brief option 4" ], "correct_answer": 0 }} ] Text to generate questions from: {text} Important: Return only the JSON array, no other text or formatting.""" def _parse_response(self, response_text: str) -> List[Dict]: """Parse response with improved error handling""" try: # Clean up the response text cleaned_text = response_text.strip() cleaned_text = cleaned_text.replace('```json', '').replace('```', '').strip() # Find the JSON array start_idx = cleaned_text.find('[') end_idx = cleaned_text.rfind(']') if start_idx == -1 or end_idx == -1: raise ValueError("No valid JSON array found in response") json_text = cleaned_text[start_idx:end_idx + 1] # Attempt to parse the JSON try: return json.loads(json_text) except json.JSONDecodeError as e: print(f"JSON Parse Error: {str(e)}") print(f"Attempted to parse: {json_text}") raise except Exception as e: print(f"Error parsing response: {str(e)}") print(f"Original response: {response_text}") raise ValueError(f"Failed to parse response: {str(e)}") def _validate_questions(self, questions: List[Dict], num_questions: int) -> List[Question]: """Validate questions with improved error checking""" validated = [] for q in questions: try: if not self._is_valid_question(q): print(f"Invalid question format: {q}") continue validated.append(Question( question=q["question"].strip(), options=[str(opt).strip()[:100] for opt in q["options"]], correct_answer=int(q["correct_answer"]) % 4 )) except Exception as e: print(f"Error validating question: {str(e)}") continue if not validated: raise ValueError("No valid questions after validation") return validated[:num_questions] def _is_valid_question(self, question: Dict) -> bool: """Check if question format is valid""" try: return ( isinstance(question, dict) and all(key in question for key in ["question", "options", "correct_answer"]) and isinstance(question["question"], str) and isinstance(question["options"], list) and len(question["options"]) == 4 and all(isinstance(opt, str) for opt in question["options"]) and isinstance(question["correct_answer"], (int, str)) and int(question["correct_answer"]) in range(4) ) except Exception as e: print(f"Question validation error: {str(e)}") return False class FontManager: """Manages font installation and loading for the certificate generator""" @staticmethod def install_fonts(): """Install required fonts if they're not already present""" try: # Install fonts package subprocess.run([ "apt-get", "update", "-y" ], check=True) subprocess.run([ "apt-get", "install", "-y", "fonts-liberation", # Liberation Sans fonts "fontconfig", # Font configuration "fonts-dejavu-core" # DejaVu fonts as fallback ], check=True) # Clear font cache subprocess.run(["fc-cache", "-f"], check=True) print("Fonts installed successfully") except subprocess.CalledProcessError as e: print(f"Warning: Could not install fonts: {e}") except Exception as e: print(f"Warning: Unexpected error installing fonts: {e}") @staticmethod def get_font_paths() -> Dict[str, str]: """Get the paths to the required fonts with multiple fallbacks""" standard_paths = [ "/usr/share/fonts", "/usr/local/share/fonts", "/usr/share/fonts/truetype", "~/.fonts" ] font_paths = { 'regular': None, 'bold': None } # Common font filenames to try fonts_to_try = { 'regular': [ 'LiberationSans-Regular.ttf', 'DejaVuSans.ttf', 'FreeSans.ttf' ], 'bold': [ 'LiberationSans-Bold.ttf', 'DejaVuSans-Bold.ttf', 'FreeSans-Bold.ttf' ] } def find_font(font_name: str) -> Optional[str]: """Search for a font file in standard locations""" for base_path in standard_paths: for root, _, files in os.walk(os.path.expanduser(base_path)): if font_name in files: return os.path.join(root, font_name) return None # Try to find each font for style in ['regular', 'bold']: for font_name in fonts_to_try[style]: font_path = find_font(font_name) if font_path: font_paths[style] = font_path break # If no fonts found, try using fc-match as fallback if not all(font_paths.values()): try: for style in ['regular', 'bold']: if not font_paths[style]: result = subprocess.run( ['fc-match', '-f', '%{file}', 'sans-serif:style=' + style], capture_output=True, text=True ) if result.returncode == 0 and result.stdout.strip(): font_paths[style] = result.stdout.strip() except Exception as e: print(f"Warning: Could not use fc-match to find fonts: {e}") return font_paths class QuizGenerationError(Exception): """Exception raised for errors in quiz generation""" pass class CertificateGenerator: def __init__(self): self.certificate_size = (1200, 800) self.background_color = '#FFFFFF' self.border_color = '#1C1D1F' # Install fonts if needed FontManager.install_fonts() self.font_paths = FontManager.get_font_paths() def _load_fonts(self) -> Dict[str, ImageFont.FreeTypeFont]: """Load fonts with fallbacks""" fonts = {} try: if self.font_paths['regular'] and self.font_paths['bold']: fonts['title'] = ImageFont.truetype(self.font_paths['bold'], 36) fonts['subtitle'] = ImageFont.truetype(self.font_paths['regular'], 14) fonts['text'] = ImageFont.truetype(self.font_paths['regular'], 20) fonts['name'] = ImageFont.truetype(self.font_paths['bold'], 32) else: raise ValueError("No suitable fonts found") except Exception as e: print(f"Font loading error: {e}. Using default font.") default = ImageFont.load_default() fonts = { 'title': default, 'subtitle': default, 'text': default, 'name': default } return fonts def _add_professional_border(self, draw: ImageDraw.Draw): # Single elegant inner border with padding padding = 40 draw.rectangle( [(padding, padding), (self.certificate_size[0] - padding, self.certificate_size[1] - padding)], outline='#1C1D1F', width=2 ) def _add_content( self, draw: ImageDraw.Draw, fonts: Dict[str, ImageFont.FreeTypeFont], name: str, course_name: str, score: float ): # Add "CERTIFICATE OF COMPLETION" text draw.text((60, 140), "CERTIFICATE OF COMPLETION", font=fonts['subtitle'], fill='#666666') # Add course name (large and bold) course_name = course_name.strip() or "Assessment" text_width = draw.textlength(course_name, fonts['title']) max_width = self.certificate_size[0] - 120 # Leave margins if text_width > max_width: words = course_name.split() lines = [] current_line = [] current_width = 0 for word in words: word_width = draw.textlength(word + " ", fonts['title']) if current_width + word_width <= max_width: current_line.append(word) current_width += word_width else: lines.append(" ".join(current_line)) current_line = [word] current_width = word_width if current_line: lines.append(" ".join(current_line)) course_name = "\n".join(lines) draw.multiline_text((60, 200), course_name, font=fonts['title'], fill='#1C1D1F', spacing=10) # Add instructor info draw.text((60, 300), "Instructor", font=fonts['subtitle'], fill='#666666') draw.text((60, 330), "CertifyMe AI", font=fonts['text'], fill='#1C1D1F') # Add participant name (large) name = name.strip() or "Participant" draw.text((60, 420), name, font=fonts['name'], fill='#1C1D1F') # Add date and score info with spacing date_str = datetime.now().strftime("%b. %d, %Y") # Date section draw.text((60, 500), "Date", font=fonts['subtitle'], fill='#666666') draw.text((60, 530), date_str, font=fonts['text'], fill='#1C1D1F') # Score section draw.text((300, 500), "Score", font=fonts['subtitle'], fill='#666666') draw.text((300, 530), f"{float(score):.1f}%", font=fonts['text'], fill='#1C1D1F') # Footer section with certificate number and reference certificate_id = f"Certificate no: {datetime.now().strftime('%Y%m%d')}-{abs(hash(name)) % 10000:04d}" ref_number = f"Reference Number: {abs(hash(name + date_str)) % 10000:04d}" # Draw footer text aligned to left and right draw.text((60, 720), certificate_id, font=fonts['subtitle'], fill='#666666') draw.text((1140, 720), ref_number, font=fonts['subtitle'], fill='#666666', anchor="ra") def _add_logo(self, certificate: Image.Image, logo_path: str): try: logo = Image.open(logo_path) # Resize logo to appropriate size logo.thumbnail((150, 80)) # Position in top-left corner with padding certificate.paste(logo, (60, 50), mask=logo if 'A' in logo.getbands() else None) except Exception as e: print(f"Error adding logo: {e}") def _add_photo(self, certificate: Image.Image, photo_path: str): try: photo = Image.open(photo_path) # Create circular mask size = (100, 100) mask = Image.new('L', size, 0) draw = ImageDraw.Draw(mask) draw.ellipse((0, 0, size[0], size[1]), fill=255) # Resize photo maintaining aspect ratio photo.thumbnail(size) # Create a circular photo output = Image.new('RGBA', size, (0, 0, 0, 0)) output.paste(photo, (0, 0)) output.putalpha(mask) # Position in top-right corner with padding certificate.paste(output, (1000, 50), mask=output) except Exception as e: print(f"Error adding photo: {e}") def generate( self, score: float, name: str, course_name: str, company_logo: Optional[str] = None, participant_photo: Optional[str] = None ) -> str: try: certificate = self._create_base_certificate() draw = ImageDraw.Draw(certificate) # Add professional border self._add_professional_border(draw) fonts = self._load_fonts() self._add_content(draw, fonts, str(name), str(course_name), float(score)) if company_logo: self._add_logo(certificate, company_logo) if participant_photo: self._add_photo(certificate, participant_photo) return self._save_certificate(certificate) except Exception as e: print(f"Error generating certificate: {e}") return None def _create_base_certificate(self) -> Image.Image: return Image.new('RGB', self.certificate_size, self.background_color) def _save_certificate(self, certificate: Image.Image) -> str: temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png') certificate.save(temp_file.name, 'PNG', quality=95) return temp_file.name class QuizApp: def __init__(self, api_key: str): self.quiz_generator = QuizGenerator(api_key) self.certificate_generator = CertificateGenerator() self.current_questions: List[Question] = [] self.document_processor = DocumentProcessor() # Add this new method def process_uploaded_file(self, file_path: str) -> str: """Process uploaded file and return status message""" try: if not file_path: return "⚠️ Please upload a file first." self.document_processor.process_file(file_path) return "✅ File processed successfully! Click 'Create Index' to continue." except Exception as e: return f"❌ Error processing file: {str(e)}" # Add this new method def create_index(self) -> str: """Create vector store index and return status message""" success = self.document_processor.create_index() if success: return "✅ Index created successfully!" return "❌ Failed to create index. Please try again." # Add this new method def clear_index(self) -> str: """Clear vector store index and return status message""" return self.document_processor.clear_index() def generate_questions(self, text: str, num_questions: int) -> Tuple[bool, List[Question]]: """ Generate quiz questions using the QuizGenerator Returns (success, questions) tuple """ try: questions = self.quiz_generator.generate_questions(text, num_questions) self.current_questions = questions return True, questions except Exception as e: print(f"Error generating questions: {e}") return False, [] def calculate_score(self, answers: List[Optional[str]]) -> Tuple[float, bool, List[QuizFeedback]]: """ Calculate the quiz score and generate feedback Returns (score, passed, feedback) tuple """ if not answers or not self.current_questions: return 0, False, [] feedback = [] correct = 0 for question, answer in zip(self.current_questions, answers): if answer is None: feedback.append(QuizFeedback(False, None, question.options[question.correct_answer])) continue try: selected_index = question.options.index(answer) is_correct = selected_index == question.correct_answer if is_correct: correct += 1 feedback.append(QuizFeedback( is_correct, answer, question.options[question.correct_answer] )) except ValueError: feedback.append(QuizFeedback(False, answer, question.options[question.correct_answer])) score = (correct / len(self.current_questions)) * 100 return score, score >= 80, feedback def update_questions(self, text: str, num_questions: int) -> Tuple[gr.update, gr.update, List[gr.update], List[Question], gr.update]: """ Event handler for generating new questions """ if not text.strip(): return ( gr.update(value=""), gr.update(value="⚠️ Please enter some text content to generate questions."), *[gr.update(visible=False, choices=[]) for _ in range(5)], [], gr.update(selected=1) ) success, questions = self.generate_questions(text, num_questions) if not success or not questions: return ( gr.update(value=""), gr.update(value="❌ Failed to generate questions. Please try again."), *[gr.update(visible=False, choices=[]) for _ in range(5)], [], gr.update(selected=1) ) # Create question display questions_html = "# 📝 Assessment Questions\n\n" questions_html += "> Please select one answer for each question.\n\n" # Update radio buttons updates = [] for i, q in enumerate(questions): questions_html += f"### Question {i+1}\n{q.question}\n\n" updates.append(gr.update( visible=True, choices=q.options, value=None, label=f"Select your answer:" )) # Hide unused radio buttons for i in range(len(questions), 5): updates.append(gr.update(visible=False, choices=[])) return ( gr.update(value=questions_html), gr.update(value=""), *updates, questions, gr.update(selected=1) ) def submit_quiz(self, q1: Optional[str], q2: Optional[str], q3: Optional[str], q4: Optional[str], q5: Optional[str], questions: List[Question] ) -> Tuple[gr.update, List[gr.update], float, str, gr.update]: """ Event handler for quiz submission """ answers = [q1, q2, q3, q4, q5][:len(questions)] if not all(a is not None for a in answers): return ( gr.update(value="⚠️ Please answer all questions before submitting."), *[gr.update() for _ in range(5)], 0, "", gr.update(selected=1) ) score, passed, feedback = self.calculate_score(answers) # Create feedback HTML feedback_html = "# Assessment Results\n\n" for i, (q, f) in enumerate(zip(self.current_questions, feedback)): color = "green" if f.is_correct else "red" symbol = "✅" if f.is_correct else "❌" feedback_html += f""" ### Question {i+1} {q.question}
You passed the assessment with a score of {score:.1f}%
Your certificate has been generated.
Your score: {score:.1f}%
You need 80% or higher to pass and receive a certificate.