# -*- coding: utf-8 -*- import re import logging import subprocess import tempfile import os import time import hashlib from io import BytesIO from typing import Dict, List, Optional from functools import lru_cache try: from docx import Document from docx.opc.exceptions import PackageNotFoundError except ImportError: Document = None PackageNotFoundError = Exception from zipfile import BadZipFile from .certificate_logger import CertificateLogger from .admin_notifier import AdminNotifier _logger = logging.getLogger(__name__) class CertificateGenerator: """ Service class for generating personalized certificates from DOCX templates. This generator replaces placeholders in DOCX templates with actual participant data and converts the result to PDF format using LibreOffice. Performance Optimizations: - Caches LibreOffice availability check - Caches parsed template structure - Optimizes subprocess calls with retry mechanism - Implements efficient file cleanup """ # Class-level cache for LibreOffice availability check _libreoffice_available = None _libreoffice_check_error = None # Class-level cache for parsed templates (LRU cache with max 50 templates) _template_cache = {} _template_cache_max_size = 50 def __init__(self): """Initialize the certificate generator.""" if Document is None: raise ImportError( "python-docx library is required. " "Install it with: pip install python-docx" ) @classmethod def check_libreoffice_availability(cls) -> tuple: """ Check if LibreOffice is available on the system. This method checks if LibreOffice can be executed and caches the result to avoid repeated system calls. Returns: tuple: (is_available: bool, error_message: str) - is_available: True if LibreOffice is available, False otherwise - error_message: Empty string if available, error description if not """ # Return cached result if available if cls._libreoffice_available is not None: return cls._libreoffice_available, cls._libreoffice_check_error or '' try: # Try to execute LibreOffice with --version flag result = subprocess.run( ['libreoffice', '--version'], capture_output=True, text=True, timeout=10, check=False ) if result.returncode == 0: version_info = result.stdout.strip() _logger.info(f'LibreOffice is available: {version_info}') cls._libreoffice_available = True cls._libreoffice_check_error = None return True, '' else: error_msg = ( 'LibreOffice is installed but returned an error. ' f'Exit code: {result.returncode}' ) _logger.warning(error_msg) cls._libreoffice_available = False cls._libreoffice_check_error = error_msg return False, error_msg except FileNotFoundError: error_msg = ( 'LibreOffice is not installed or not found in system PATH. ' 'PDF conversion will not be available. ' 'Please install LibreOffice to enable certificate generation.' ) _logger.error(error_msg) cls._libreoffice_available = False cls._libreoffice_check_error = error_msg return False, error_msg except subprocess.TimeoutExpired: error_msg = 'LibreOffice version check timed out after 10 seconds' _logger.error(error_msg) cls._libreoffice_available = False cls._libreoffice_check_error = error_msg return False, error_msg except Exception as e: error_msg = f'Unexpected error checking LibreOffice availability: {str(e)}' _logger.error(error_msg, exc_info=True) cls._libreoffice_available = False cls._libreoffice_check_error = error_msg return False, error_msg @classmethod def reset_libreoffice_check(cls): """ Reset the cached LibreOffice availability check. This can be called after LibreOffice is installed or if the check needs to be re-run. """ cls._libreoffice_available = None cls._libreoffice_check_error = None _logger.info('LibreOffice availability check cache cleared') @classmethod def _get_template_cache_key(cls, template_binary: bytes) -> str: """ Generate a cache key for a template based on its content. Args: template_binary: Binary content of the template Returns: str: SHA256 hash of the template content """ return hashlib.sha256(template_binary).hexdigest() @classmethod def _get_cached_template(cls, cache_key: str) -> Optional[Document]: """ Retrieve a cached template if available. Args: cache_key: Cache key for the template Returns: Document: Cached template document, or None if not cached """ cached = cls._template_cache.get(cache_key) if cached: _logger.debug(f'Template cache hit for key: {cache_key[:16]}...') return cached @classmethod def _cache_template(cls, cache_key: str, template_doc: Document): """ Cache a parsed template document. Implements LRU eviction when cache is full. Args: cache_key: Cache key for the template template_doc: Parsed template document """ # Implement simple LRU: remove oldest entry if cache is full if len(cls._template_cache) >= cls._template_cache_max_size: # Remove the first (oldest) entry oldest_key = next(iter(cls._template_cache)) del cls._template_cache[oldest_key] _logger.debug(f'Evicted oldest template from cache: {oldest_key[:16]}...') cls._template_cache[cache_key] = template_doc _logger.debug(f'Cached template with key: {cache_key[:16]}... (cache size: {len(cls._template_cache)})') @classmethod def clear_template_cache(cls): """ Clear the template cache. This can be called to free memory or when templates are updated. """ cache_size = len(cls._template_cache) cls._template_cache.clear() _logger.info(f'Cleared template cache ({cache_size} entries removed)') def generate_certificate( self, template_binary: bytes, mappings: Dict, data: Dict, use_cache: bool = True ) -> Optional[bytes]: """ Generate a PDF certificate from a DOCX template with placeholder replacement. This is the main entry point for certificate generation. It: 1. Loads the template DOCX (with optional caching) 2. Replaces placeholders with actual data 3. Converts the result to PDF Args: template_binary: Binary content of the DOCX template mappings: Dictionary containing placeholder mappings Format: {'placeholders': [{'key': '...', 'value_type': '...', ...}]} data: Dictionary containing actual data for replacement use_cache: Whether to use template caching (default: True) Returns: bytes: PDF certificate content, or None if generation fails Raises: ValueError: If template is invalid or data is missing """ if not template_binary: raise ValueError("Template binary data is required") if not isinstance(template_binary, bytes): raise ValueError("Template must be provided as binary data") if not mappings or 'placeholders' not in mappings: raise ValueError("Mappings must contain 'placeholders' key") if not data: raise ValueError("Data dictionary is required") temp_docx_path = None try: # Check cache for template cache_key = None if use_cache: cache_key = self._get_template_cache_key(template_binary) cached_doc = self._get_cached_template(cache_key) if cached_doc: # Use cached template (create a copy to avoid modifying cached version) doc_stream = BytesIO(template_binary) document = Document(doc_stream) else: # Load and cache the template doc_stream = BytesIO(template_binary) document = Document(doc_stream) # Note: We cache the original template, not the modified one self._cache_template(cache_key, document) else: # Load without caching doc_stream = BytesIO(template_binary) document = Document(doc_stream) # Replace placeholders with actual data document = self.replace_placeholders(document, mappings, data) # Save the modified document to a temporary file temp_docx = tempfile.NamedTemporaryFile( suffix='.docx', delete=False ) temp_docx_path = temp_docx.name temp_docx.close() # Save the document document.save(temp_docx_path) # Convert to PDF pdf_content = self.convert_to_pdf(temp_docx_path) return pdf_content except (PackageNotFoundError, BadZipFile) as e: error_msg = "Template file is not a valid DOCX file or is corrupted" _logger.error(f"{error_msg}: {e}") raise ValueError(error_msg) except Exception as e: error_msg = f"Failed to generate certificate: {str(e)}" _logger.error(error_msg, exc_info=True) raise ValueError(error_msg) finally: # Clean up temporary DOCX file if temp_docx_path and os.path.exists(temp_docx_path): try: os.unlink(temp_docx_path) except Exception as e: _logger.warning( f"Failed to delete temporary file {temp_docx_path}: {e}" ) def replace_placeholders( self, template_doc: Document, mappings: Dict, data: Dict ) -> Document: """ Replace all placeholders in the document with actual values. This method processes all paragraphs, tables, headers, and footers in the document, replacing placeholders according to the mappings. Args: template_doc: python-docx Document object mappings: Dictionary containing placeholder mappings data: Dictionary containing actual data for replacement Returns: Document: Modified document with placeholders replaced """ # Build a replacement dictionary from mappings and data replacements = self._build_replacement_dict(mappings, data) # Replace in paragraphs for paragraph in template_doc.paragraphs: self._replace_in_paragraph(paragraph, replacements) # Replace in tables for table in template_doc.tables: for row in table.rows: for cell in row.cells: for paragraph in cell.paragraphs: self._replace_in_paragraph(paragraph, replacements) # Replace in headers and footers for section in template_doc.sections: # Header for paragraph in section.header.paragraphs: self._replace_in_paragraph(paragraph, replacements) # Footer for paragraph in section.footer.paragraphs: self._replace_in_paragraph(paragraph, replacements) return template_doc def _build_replacement_dict( self, mappings: Dict, data: Dict ) -> Dict[str, str]: """ Build a dictionary mapping placeholder keys to their replacement values. This method handles missing data gracefully by using empty strings for unmapped or unavailable placeholders, preventing generation failures. Args: mappings: Dictionary containing placeholder mappings data: Dictionary containing actual data Returns: Dict[str, str]: Dictionary mapping placeholder keys to replacement values Missing or unmapped values are represented as empty strings """ replacements = {} try: placeholders = mappings.get('placeholders', []) if not placeholders: _logger.warning('No placeholders found in mappings') return replacements for mapping in placeholders: try: placeholder_key = mapping.get('key', '') if not placeholder_key: _logger.warning('Skipping mapping with empty placeholder key') continue value_type = mapping.get('value_type', '') # Determine the replacement value based on value_type if value_type == 'custom_text': # Use custom text directly replacement_value = mapping.get('custom_text', '') else: # Use dynamic data from the data dictionary value_field = mapping.get('value_field', '') if not value_field: _logger.warning( 'No value_field specified for placeholder %s, using empty string', placeholder_key ) replacement_value = '' else: # Get value from data, default to empty string if not found replacement_value = data.get(value_field, '') if not replacement_value: _logger.debug( 'No data found for field %s (placeholder %s), using empty string', value_field, placeholder_key ) # Store the replacement (use empty string if no value found) # Convert to string to handle non-string values safely try: replacements[placeholder_key] = str(replacement_value) if replacement_value else '' except Exception as e: _logger.warning( 'Failed to convert value for placeholder %s to string: %s. Using empty string.', placeholder_key, str(e) ) replacements[placeholder_key] = '' except Exception as e: _logger.error( 'Error processing mapping: %s. Skipping this placeholder.', str(e), exc_info=True ) continue _logger.debug( 'Built replacement dictionary with %d placeholders (%d with values)', len(replacements), sum(1 for v in replacements.values() if v) ) except Exception as e: _logger.error( 'Error building replacement dictionary: %s', str(e), exc_info=True ) # Return empty dict rather than failing return {} return replacements def _replace_in_paragraph(self, paragraph, replacements: Dict[str, str]): """ Replace placeholders in a paragraph while preserving formatting. This method works at the run level to preserve text formatting. It replaces placeholders within individual runs, maintaining their formatting attributes (bold, italic, font size, color, etc.). Args: paragraph: python-docx Paragraph object replacements: Dictionary mapping placeholder keys to replacement values """ # Replace placeholders within each run to preserve formatting # This approach maintains the run structure and all formatting attributes for run in paragraph.runs: # Check if this run contains any placeholders run_text = run.text # Perform replacements within this run for placeholder_key, replacement_value in replacements.items(): if placeholder_key in run_text: # Replace the placeholder while keeping the run's formatting run_text = run_text.replace(placeholder_key, replacement_value) # Update the run's text if it changed if run_text != run.text: run.text = run_text def convert_to_pdf(self, docx_path: str, max_retries: int = 2, cleanup_on_error: bool = True) -> bytes: """ Convert a DOCX file to PDF using LibreOffice with retry mechanism. This method uses LibreOffice's headless mode to convert the DOCX file to PDF format. LibreOffice must be installed on the system. Performance optimizations: - Cached LibreOffice availability check - Retry mechanism with exponential backoff - Efficient file cleanup - Optimized subprocess timeout Args: docx_path: Path to the DOCX file to convert max_retries: Maximum number of retry attempts (default: 2) cleanup_on_error: Whether to cleanup temp files on error (default: True) Returns: bytes: PDF file content Raises: RuntimeError: If LibreOffice is not available or conversion fails """ if not os.path.exists(docx_path): raise ValueError(f"DOCX file not found: {docx_path}") # Check LibreOffice availability before attempting conversion (cached) is_available, error_message = self.check_libreoffice_availability() if not is_available: CertificateLogger.log_libreoffice_unavailable( error_message, {'docx_path': docx_path} ) raise RuntimeError( f'PDF conversion is not available: {error_message}\n\n' 'Please contact your system administrator to install LibreOffice.' ) # Create a temporary directory for the PDF output temp_dir = tempfile.mkdtemp() last_error = None pdf_path = None try: # Attempt conversion with retries and exponential backoff for attempt in range(max_retries): start_time = time.time() # Add exponential backoff delay for retries if attempt > 0: delay = min(2 ** attempt, 5) # Max 5 seconds delay _logger.debug(f'Waiting {delay}s before retry attempt {attempt + 1}') time.sleep(delay) try: # Log conversion start CertificateLogger.log_libreoffice_call_start( docx_path, attempt=attempt + 1, max_attempts=max_retries ) # LibreOffice command for conversion # --headless: Run without GUI # --convert-to pdf: Convert to PDF format # --outdir: Output directory # --norestore: Don't restore previous session # --nofirststartwizard: Skip first start wizard cmd = [ 'libreoffice', '--headless', '--convert-to', 'pdf', '--outdir', temp_dir, '--norestore', '--nofirststartwizard', docx_path ] # Execute the conversion with optimized timeout # Reduce timeout for faster failure detection timeout = 45 if attempt == 0 else 30 # Shorter timeout for retries result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, check=True ) _logger.debug(f"LibreOffice stdout: {result.stdout}") if result.stderr: _logger.debug(f"LibreOffice stderr: {result.stderr}") # Find the generated PDF file docx_filename = os.path.basename(docx_path) pdf_filename = os.path.splitext(docx_filename)[0] + '.pdf' pdf_path = os.path.join(temp_dir, pdf_filename) if not os.path.exists(pdf_path): error_msg = f"PDF file was not generated: {pdf_path}" last_error = RuntimeError(error_msg) CertificateLogger.log_libreoffice_call_failure( docx_path, last_error, attempt=attempt + 1, max_attempts=max_retries, stdout=result.stdout, stderr=result.stderr ) continue # Retry # Read the PDF content with open(pdf_path, 'rb') as pdf_file: pdf_content = pdf_file.read() # Verify PDF content is not empty if not pdf_content or len(pdf_content) == 0: error_msg = "Generated PDF file is empty" last_error = RuntimeError(error_msg) CertificateLogger.log_libreoffice_call_failure( docx_path, last_error, attempt=attempt + 1, max_attempts=max_retries, stdout=result.stdout, stderr=result.stderr ) continue # Retry # Calculate duration duration_ms = (time.time() - start_time) * 1000 # Log success CertificateLogger.log_libreoffice_call_success( docx_path, len(pdf_content), attempt=attempt + 1, duration_ms=round(duration_ms, 2) ) return pdf_content except FileNotFoundError as e: error_msg = ( "LibreOffice is not installed or not found in PATH. " "Please install LibreOffice to enable PDF conversion." ) CertificateLogger.log_libreoffice_unavailable( error_msg, {'docx_path': docx_path} ) # Don't retry for missing LibreOffice raise RuntimeError(error_msg) except subprocess.TimeoutExpired as e: last_error = RuntimeError(f"PDF conversion timed out after 60 seconds") CertificateLogger.log_libreoffice_call_failure( docx_path, last_error, attempt=attempt + 1, max_attempts=max_retries, stdout=getattr(e, 'stdout', None), stderr=getattr(e, 'stderr', None) ) # Continue to retry continue except subprocess.CalledProcessError as e: last_error = RuntimeError( f"LibreOffice conversion failed with exit code {e.returncode}" ) CertificateLogger.log_libreoffice_call_failure( docx_path, last_error, attempt=attempt + 1, max_attempts=max_retries, stdout=e.stdout, stderr=e.stderr, exit_code=e.returncode ) # Continue to retry continue except Exception as e: last_error = RuntimeError(f"Unexpected error during PDF conversion: {str(e)}") CertificateLogger.log_libreoffice_call_failure( docx_path, last_error, attempt=attempt + 1, max_attempts=max_retries ) # Continue to retry continue # If we get here, all retries failed if last_error: _logger.error( f"PDF conversion failed after {max_retries} attempts. " f"Last error: {str(last_error)}" ) raise last_error else: error_msg = f"PDF conversion failed after {max_retries} attempts with unknown error" _logger.error(error_msg) raise RuntimeError(error_msg) finally: # Clean up temporary directory and files efficiently self._cleanup_temp_directory(temp_dir, pdf_path if not cleanup_on_error else None) @staticmethod def _cleanup_temp_directory(temp_dir: str, preserve_file: Optional[str] = None): """ Efficiently clean up temporary directory and files. Args: temp_dir: Path to temporary directory preserve_file: Optional file path to preserve during cleanup """ if not os.path.exists(temp_dir): return try: # List all files once for efficiency files = os.listdir(temp_dir) for filename in files: file_path = os.path.join(temp_dir, filename) # Skip preserved file if preserve_file and file_path == preserve_file: continue # Delete file if os.path.isfile(file_path): try: os.unlink(file_path) except OSError as e: _logger.warning(f"Failed to delete temporary file {file_path}: {e}") # Remove directory if empty or no files preserved try: if not preserve_file or not os.listdir(temp_dir): os.rmdir(temp_dir) except OSError as e: _logger.warning(f"Failed to remove temporary directory {temp_dir}: {e}") except Exception as e: _logger.warning(f"Error during cleanup of {temp_dir}: {e}")