survey_custom_certificate_t.../services/certificate_generator.py
2025-11-29 08:46:04 +07:00

723 lines
28 KiB
Python

# -*- coding: utf-8 -*-
import re
import logging
import subprocess
import tempfile
import os
import time
import hashlib
from io import BytesIO
from typing import Dict, List, Optional
from functools import lru_cache
try:
from docx import Document
from docx.opc.exceptions import PackageNotFoundError
except ImportError:
Document = None
PackageNotFoundError = Exception
from zipfile import BadZipFile
from .certificate_logger import CertificateLogger
from .admin_notifier import AdminNotifier
_logger = logging.getLogger(__name__)
class CertificateGenerator:
"""
Service class for generating personalized certificates from DOCX templates.
This generator replaces placeholders in DOCX templates with actual participant
data and converts the result to PDF format using LibreOffice.
Performance Optimizations:
- Caches LibreOffice availability check
- Caches parsed template structure
- Optimizes subprocess calls with retry mechanism
- Implements efficient file cleanup
"""
# Class-level cache for LibreOffice availability check
_libreoffice_available = None
_libreoffice_check_error = None
# Class-level cache for parsed templates (LRU cache with max 50 templates)
_template_cache = {}
_template_cache_max_size = 50
def __init__(self):
"""Initialize the certificate generator."""
if Document is None:
raise ImportError(
"python-docx library is required. "
"Install it with: pip install python-docx"
)
@classmethod
def check_libreoffice_availability(cls) -> tuple:
"""
Check if LibreOffice is available on the system.
This method checks if LibreOffice can be executed and caches the result
to avoid repeated system calls.
Returns:
tuple: (is_available: bool, error_message: str)
- is_available: True if LibreOffice is available, False otherwise
- error_message: Empty string if available, error description if not
"""
# Return cached result if available
if cls._libreoffice_available is not None:
return cls._libreoffice_available, cls._libreoffice_check_error or ''
try:
# Try to execute LibreOffice with --version flag
result = subprocess.run(
['libreoffice', '--version'],
capture_output=True,
text=True,
timeout=10,
check=False
)
if result.returncode == 0:
version_info = result.stdout.strip()
_logger.info(f'LibreOffice is available: {version_info}')
cls._libreoffice_available = True
cls._libreoffice_check_error = None
return True, ''
else:
error_msg = (
'LibreOffice is installed but returned an error. '
f'Exit code: {result.returncode}'
)
_logger.warning(error_msg)
cls._libreoffice_available = False
cls._libreoffice_check_error = error_msg
return False, error_msg
except FileNotFoundError:
error_msg = (
'LibreOffice is not installed or not found in system PATH. '
'PDF conversion will not be available. '
'Please install LibreOffice to enable certificate generation.'
)
_logger.error(error_msg)
cls._libreoffice_available = False
cls._libreoffice_check_error = error_msg
return False, error_msg
except subprocess.TimeoutExpired:
error_msg = 'LibreOffice version check timed out after 10 seconds'
_logger.error(error_msg)
cls._libreoffice_available = False
cls._libreoffice_check_error = error_msg
return False, error_msg
except Exception as e:
error_msg = f'Unexpected error checking LibreOffice availability: {str(e)}'
_logger.error(error_msg, exc_info=True)
cls._libreoffice_available = False
cls._libreoffice_check_error = error_msg
return False, error_msg
@classmethod
def reset_libreoffice_check(cls):
"""
Reset the cached LibreOffice availability check.
This can be called after LibreOffice is installed or if the check
needs to be re-run.
"""
cls._libreoffice_available = None
cls._libreoffice_check_error = None
_logger.info('LibreOffice availability check cache cleared')
@classmethod
def _get_template_cache_key(cls, template_binary: bytes) -> str:
"""
Generate a cache key for a template based on its content.
Args:
template_binary: Binary content of the template
Returns:
str: SHA256 hash of the template content
"""
return hashlib.sha256(template_binary).hexdigest()
@classmethod
def _get_cached_template(cls, cache_key: str) -> Optional[Document]:
"""
Retrieve a cached template if available.
Args:
cache_key: Cache key for the template
Returns:
Document: Cached template document, or None if not cached
"""
cached = cls._template_cache.get(cache_key)
if cached:
_logger.debug(f'Template cache hit for key: {cache_key[:16]}...')
return cached
@classmethod
def _cache_template(cls, cache_key: str, template_doc: Document):
"""
Cache a parsed template document.
Implements LRU eviction when cache is full.
Args:
cache_key: Cache key for the template
template_doc: Parsed template document
"""
# Implement simple LRU: remove oldest entry if cache is full
if len(cls._template_cache) >= cls._template_cache_max_size:
# Remove the first (oldest) entry
oldest_key = next(iter(cls._template_cache))
del cls._template_cache[oldest_key]
_logger.debug(f'Evicted oldest template from cache: {oldest_key[:16]}...')
cls._template_cache[cache_key] = template_doc
_logger.debug(f'Cached template with key: {cache_key[:16]}... (cache size: {len(cls._template_cache)})')
@classmethod
def clear_template_cache(cls):
"""
Clear the template cache.
This can be called to free memory or when templates are updated.
"""
cache_size = len(cls._template_cache)
cls._template_cache.clear()
_logger.info(f'Cleared template cache ({cache_size} entries removed)')
def generate_certificate(
self,
template_binary: bytes,
mappings: Dict,
data: Dict,
use_cache: bool = True
) -> Optional[bytes]:
"""
Generate a PDF certificate from a DOCX template with placeholder replacement.
This is the main entry point for certificate generation. It:
1. Loads the template DOCX (with optional caching)
2. Replaces placeholders with actual data
3. Converts the result to PDF
Args:
template_binary: Binary content of the DOCX template
mappings: Dictionary containing placeholder mappings
Format: {'placeholders': [{'key': '...', 'value_type': '...', ...}]}
data: Dictionary containing actual data for replacement
use_cache: Whether to use template caching (default: True)
Returns:
bytes: PDF certificate content, or None if generation fails
Raises:
ValueError: If template is invalid or data is missing
"""
if not template_binary:
raise ValueError("Template binary data is required")
if not isinstance(template_binary, bytes):
raise ValueError("Template must be provided as binary data")
if not mappings or 'placeholders' not in mappings:
raise ValueError("Mappings must contain 'placeholders' key")
if not data:
raise ValueError("Data dictionary is required")
temp_docx_path = None
try:
# Check cache for template
cache_key = None
if use_cache:
cache_key = self._get_template_cache_key(template_binary)
cached_doc = self._get_cached_template(cache_key)
if cached_doc:
# Use cached template (create a copy to avoid modifying cached version)
doc_stream = BytesIO(template_binary)
document = Document(doc_stream)
else:
# Load and cache the template
doc_stream = BytesIO(template_binary)
document = Document(doc_stream)
# Note: We cache the original template, not the modified one
self._cache_template(cache_key, document)
else:
# Load without caching
doc_stream = BytesIO(template_binary)
document = Document(doc_stream)
# Replace placeholders with actual data
document = self.replace_placeholders(document, mappings, data)
# Save the modified document to a temporary file
temp_docx = tempfile.NamedTemporaryFile(
suffix='.docx',
delete=False
)
temp_docx_path = temp_docx.name
temp_docx.close()
# Save the document
document.save(temp_docx_path)
# Convert to PDF
pdf_content = self.convert_to_pdf(temp_docx_path)
return pdf_content
except (PackageNotFoundError, BadZipFile) as e:
error_msg = "Template file is not a valid DOCX file or is corrupted"
_logger.error(f"{error_msg}: {e}")
raise ValueError(error_msg)
except Exception as e:
error_msg = f"Failed to generate certificate: {str(e)}"
_logger.error(error_msg, exc_info=True)
raise ValueError(error_msg)
finally:
# Clean up temporary DOCX file
if temp_docx_path and os.path.exists(temp_docx_path):
try:
os.unlink(temp_docx_path)
except Exception as e:
_logger.warning(
f"Failed to delete temporary file {temp_docx_path}: {e}"
)
def replace_placeholders(
self,
template_doc: Document,
mappings: Dict,
data: Dict
) -> Document:
"""
Replace all placeholders in the document with actual values.
This method processes all paragraphs, tables, headers, and footers
in the document, replacing placeholders according to the mappings.
Args:
template_doc: python-docx Document object
mappings: Dictionary containing placeholder mappings
data: Dictionary containing actual data for replacement
Returns:
Document: Modified document with placeholders replaced
"""
# Build a replacement dictionary from mappings and data
replacements = self._build_replacement_dict(mappings, data)
# Replace in paragraphs
for paragraph in template_doc.paragraphs:
self._replace_in_paragraph(paragraph, replacements)
# Replace in tables
for table in template_doc.tables:
for row in table.rows:
for cell in row.cells:
for paragraph in cell.paragraphs:
self._replace_in_paragraph(paragraph, replacements)
# Replace in headers and footers
for section in template_doc.sections:
# Header
for paragraph in section.header.paragraphs:
self._replace_in_paragraph(paragraph, replacements)
# Footer
for paragraph in section.footer.paragraphs:
self._replace_in_paragraph(paragraph, replacements)
return template_doc
def _build_replacement_dict(
self,
mappings: Dict,
data: Dict
) -> Dict[str, str]:
"""
Build a dictionary mapping placeholder keys to their replacement values.
This method handles missing data gracefully by using empty strings for
unmapped or unavailable placeholders, preventing generation failures.
Args:
mappings: Dictionary containing placeholder mappings
data: Dictionary containing actual data
Returns:
Dict[str, str]: Dictionary mapping placeholder keys to replacement values
Missing or unmapped values are represented as empty strings
"""
replacements = {}
try:
placeholders = mappings.get('placeholders', [])
if not placeholders:
_logger.warning('No placeholders found in mappings')
return replacements
for mapping in placeholders:
try:
placeholder_key = mapping.get('key', '')
if not placeholder_key:
_logger.warning('Skipping mapping with empty placeholder key')
continue
value_type = mapping.get('value_type', '')
# Determine the replacement value based on value_type
if value_type == 'custom_text':
# Use custom text directly
replacement_value = mapping.get('custom_text', '')
else:
# Use dynamic data from the data dictionary
value_field = mapping.get('value_field', '')
if not value_field:
_logger.warning(
'No value_field specified for placeholder %s, using empty string',
placeholder_key
)
replacement_value = ''
else:
# Get value from data, default to empty string if not found
replacement_value = data.get(value_field, '')
if not replacement_value:
_logger.debug(
'No data found for field %s (placeholder %s), using empty string',
value_field, placeholder_key
)
# Store the replacement (use empty string if no value found)
# Convert to string to handle non-string values safely
try:
replacements[placeholder_key] = str(replacement_value) if replacement_value else ''
except Exception as e:
_logger.warning(
'Failed to convert value for placeholder %s to string: %s. Using empty string.',
placeholder_key, str(e)
)
replacements[placeholder_key] = ''
except Exception as e:
_logger.error(
'Error processing mapping: %s. Skipping this placeholder.',
str(e), exc_info=True
)
continue
_logger.debug(
'Built replacement dictionary with %d placeholders (%d with values)',
len(replacements),
sum(1 for v in replacements.values() if v)
)
except Exception as e:
_logger.error(
'Error building replacement dictionary: %s',
str(e), exc_info=True
)
# Return empty dict rather than failing
return {}
return replacements
def _replace_in_paragraph(self, paragraph, replacements: Dict[str, str]):
"""
Replace placeholders in a paragraph while preserving formatting.
This method works at the run level to preserve text formatting.
It replaces placeholders within individual runs, maintaining their
formatting attributes (bold, italic, font size, color, etc.).
Args:
paragraph: python-docx Paragraph object
replacements: Dictionary mapping placeholder keys to replacement values
"""
# Replace placeholders within each run to preserve formatting
# This approach maintains the run structure and all formatting attributes
for run in paragraph.runs:
# Check if this run contains any placeholders
run_text = run.text
# Perform replacements within this run
for placeholder_key, replacement_value in replacements.items():
if placeholder_key in run_text:
# Replace the placeholder while keeping the run's formatting
run_text = run_text.replace(placeholder_key, replacement_value)
# Update the run's text if it changed
if run_text != run.text:
run.text = run_text
def convert_to_pdf(self, docx_path: str, max_retries: int = 2, cleanup_on_error: bool = True) -> bytes:
"""
Convert a DOCX file to PDF using LibreOffice with retry mechanism.
This method uses LibreOffice's headless mode to convert the DOCX
file to PDF format. LibreOffice must be installed on the system.
Performance optimizations:
- Cached LibreOffice availability check
- Retry mechanism with exponential backoff
- Efficient file cleanup
- Optimized subprocess timeout
Args:
docx_path: Path to the DOCX file to convert
max_retries: Maximum number of retry attempts (default: 2)
cleanup_on_error: Whether to cleanup temp files on error (default: True)
Returns:
bytes: PDF file content
Raises:
RuntimeError: If LibreOffice is not available or conversion fails
"""
if not os.path.exists(docx_path):
raise ValueError(f"DOCX file not found: {docx_path}")
# Check LibreOffice availability before attempting conversion (cached)
is_available, error_message = self.check_libreoffice_availability()
if not is_available:
CertificateLogger.log_libreoffice_unavailable(
error_message,
{'docx_path': docx_path}
)
raise RuntimeError(
f'PDF conversion is not available: {error_message}\n\n'
'Please contact your system administrator to install LibreOffice.'
)
# Create a temporary directory for the PDF output
temp_dir = tempfile.mkdtemp()
last_error = None
pdf_path = None
try:
# Attempt conversion with retries and exponential backoff
for attempt in range(max_retries):
start_time = time.time()
# Add exponential backoff delay for retries
if attempt > 0:
delay = min(2 ** attempt, 5) # Max 5 seconds delay
_logger.debug(f'Waiting {delay}s before retry attempt {attempt + 1}')
time.sleep(delay)
try:
# Log conversion start
CertificateLogger.log_libreoffice_call_start(
docx_path,
attempt=attempt + 1,
max_attempts=max_retries
)
# LibreOffice command for conversion
# --headless: Run without GUI
# --convert-to pdf: Convert to PDF format
# --outdir: Output directory
# --norestore: Don't restore previous session
# --nofirststartwizard: Skip first start wizard
cmd = [
'libreoffice',
'--headless',
'--convert-to',
'pdf',
'--outdir',
temp_dir,
'--norestore',
'--nofirststartwizard',
docx_path
]
# Execute the conversion with optimized timeout
# Reduce timeout for faster failure detection
timeout = 45 if attempt == 0 else 30 # Shorter timeout for retries
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
check=True
)
_logger.debug(f"LibreOffice stdout: {result.stdout}")
if result.stderr:
_logger.debug(f"LibreOffice stderr: {result.stderr}")
# Find the generated PDF file
docx_filename = os.path.basename(docx_path)
pdf_filename = os.path.splitext(docx_filename)[0] + '.pdf'
pdf_path = os.path.join(temp_dir, pdf_filename)
if not os.path.exists(pdf_path):
error_msg = f"PDF file was not generated: {pdf_path}"
last_error = RuntimeError(error_msg)
CertificateLogger.log_libreoffice_call_failure(
docx_path,
last_error,
attempt=attempt + 1,
max_attempts=max_retries,
stdout=result.stdout,
stderr=result.stderr
)
continue # Retry
# Read the PDF content
with open(pdf_path, 'rb') as pdf_file:
pdf_content = pdf_file.read()
# Verify PDF content is not empty
if not pdf_content or len(pdf_content) == 0:
error_msg = "Generated PDF file is empty"
last_error = RuntimeError(error_msg)
CertificateLogger.log_libreoffice_call_failure(
docx_path,
last_error,
attempt=attempt + 1,
max_attempts=max_retries,
stdout=result.stdout,
stderr=result.stderr
)
continue # Retry
# Calculate duration
duration_ms = (time.time() - start_time) * 1000
# Log success
CertificateLogger.log_libreoffice_call_success(
docx_path,
len(pdf_content),
attempt=attempt + 1,
duration_ms=round(duration_ms, 2)
)
return pdf_content
except FileNotFoundError as e:
error_msg = (
"LibreOffice is not installed or not found in PATH. "
"Please install LibreOffice to enable PDF conversion."
)
CertificateLogger.log_libreoffice_unavailable(
error_msg,
{'docx_path': docx_path}
)
# Don't retry for missing LibreOffice
raise RuntimeError(error_msg)
except subprocess.TimeoutExpired as e:
last_error = RuntimeError(f"PDF conversion timed out after 60 seconds")
CertificateLogger.log_libreoffice_call_failure(
docx_path,
last_error,
attempt=attempt + 1,
max_attempts=max_retries,
stdout=getattr(e, 'stdout', None),
stderr=getattr(e, 'stderr', None)
)
# Continue to retry
continue
except subprocess.CalledProcessError as e:
last_error = RuntimeError(
f"LibreOffice conversion failed with exit code {e.returncode}"
)
CertificateLogger.log_libreoffice_call_failure(
docx_path,
last_error,
attempt=attempt + 1,
max_attempts=max_retries,
stdout=e.stdout,
stderr=e.stderr,
exit_code=e.returncode
)
# Continue to retry
continue
except Exception as e:
last_error = RuntimeError(f"Unexpected error during PDF conversion: {str(e)}")
CertificateLogger.log_libreoffice_call_failure(
docx_path,
last_error,
attempt=attempt + 1,
max_attempts=max_retries
)
# Continue to retry
continue
# If we get here, all retries failed
if last_error:
_logger.error(
f"PDF conversion failed after {max_retries} attempts. "
f"Last error: {str(last_error)}"
)
raise last_error
else:
error_msg = f"PDF conversion failed after {max_retries} attempts with unknown error"
_logger.error(error_msg)
raise RuntimeError(error_msg)
finally:
# Clean up temporary directory and files efficiently
self._cleanup_temp_directory(temp_dir, pdf_path if not cleanup_on_error else None)
@staticmethod
def _cleanup_temp_directory(temp_dir: str, preserve_file: Optional[str] = None):
"""
Efficiently clean up temporary directory and files.
Args:
temp_dir: Path to temporary directory
preserve_file: Optional file path to preserve during cleanup
"""
if not os.path.exists(temp_dir):
return
try:
# List all files once for efficiency
files = os.listdir(temp_dir)
for filename in files:
file_path = os.path.join(temp_dir, filename)
# Skip preserved file
if preserve_file and file_path == preserve_file:
continue
# Delete file
if os.path.isfile(file_path):
try:
os.unlink(file_path)
except OSError as e:
_logger.warning(f"Failed to delete temporary file {file_path}: {e}")
# Remove directory if empty or no files preserved
try:
if not preserve_file or not os.listdir(temp_dir):
os.rmdir(temp_dir)
except OSError as e:
_logger.warning(f"Failed to remove temporary directory {temp_dir}: {e}")
except Exception as e:
_logger.warning(f"Error during cleanup of {temp_dir}: {e}")