survey_custom_certificate_t.../models/survey_user_input.py
2025-11-29 08:46:04 +07:00

303 lines
13 KiB
Python

# -*- coding: utf-8 -*-
import logging
import base64
import time
from odoo import models, api
_logger = logging.getLogger(__name__)
class SurveyUserInput(models.Model):
_inherit = 'survey.user_input'
@api.model
def _mark_done(self):
"""
Override the survey completion method to generate custom certificates.
This method is called when a survey is marked as complete. We hook into
this workflow to automatically generate and store custom certificates
if configured for the survey.
"""
# Call the parent method to ensure normal completion workflow
result = super(SurveyUserInput, self)._mark_done()
# Generate custom certificate if configured
self._generate_and_store_certificate()
return result
def _generate_and_store_certificate(self):
"""
Generate and store custom certificate for completed survey responses.
This method checks if the survey has a custom certificate configured,
generates the certificate, and stores it as an attachment linked to
the survey response.
This method includes comprehensive error handling to ensure that
certificate generation failures do not break the survey completion workflow.
"""
# Import logger here to avoid circular imports
try:
from ..services.certificate_logger import CertificateLogger
except ImportError:
CertificateLogger = None
_logger.warning('CertificateLogger not available, using standard logging')
for user_input in self:
start_time = time.time()
try:
# Skip if survey doesn't exist or isn't configured for custom certificates
if not user_input.survey_id:
_logger.debug('User input %s has no associated survey, skipping', user_input.id)
continue
survey = user_input.survey_id
# Check if custom certificate is configured
if not survey.has_custom_certificate or not survey.custom_cert_template:
_logger.debug(
'Survey %s (ID: %s) does not have custom certificate configured, skipping',
survey.title, survey.id
)
continue
# Check if certification is enabled for this survey
if not survey.certification:
_logger.debug(
'Certification not enabled for survey %s (ID: %s), skipping',
survey.title, survey.id
)
continue
try:
# Log generation start
partner_name = user_input.partner_id.name if user_input.partner_id else None
if CertificateLogger:
CertificateLogger.log_certificate_generation_start(
survey.id,
survey.title,
user_input.id,
partner_name
)
# Generate the certificate
pdf_content = survey._generate_custom_certificate(user_input.id)
if not pdf_content:
_logger.warning(
'Certificate generation returned no content for user_input %s. '
'This may be due to missing template, mappings, or LibreOffice unavailability.',
user_input.id
)
continue
# Validate PDF content
if not isinstance(pdf_content, bytes) or len(pdf_content) == 0:
_logger.error(
'Invalid PDF content for user_input %s: expected bytes, got %s',
user_input.id, type(pdf_content)
)
continue
# Store the certificate as an attachment
try:
self._store_certificate_attachment(user_input, pdf_content)
# Calculate duration and log success
duration_ms = (time.time() - start_time) * 1000
if CertificateLogger:
CertificateLogger.log_certificate_generation_success(
survey.id,
user_input.id,
len(pdf_content),
round(duration_ms, 2)
)
# Track success to reset failure count
try:
from ..services.admin_notifier import AdminNotifier
AdminNotifier.track_generation_success(survey.id)
except ImportError:
pass # AdminNotifier not available
except Exception as e:
_logger.error(
'Failed to store certificate attachment for user_input %s: %s',
user_input.id, str(e), exc_info=True
)
if CertificateLogger:
CertificateLogger.log_certificate_generation_failure(
survey.id,
user_input.id,
e,
error_type='attachment_storage'
)
continue
except Exception as e:
# Log the error but don't break the survey completion workflow
if CertificateLogger:
CertificateLogger.log_certificate_generation_failure(
survey.id,
user_input.id,
e,
error_type='generation'
)
else:
_logger.error(
'Failed to generate certificate for user_input %s: %s',
user_input.id, str(e), exc_info=True
)
# Track failure and notify admins if threshold reached
try:
from ..services.admin_notifier import AdminNotifier
# Check if this is a LibreOffice error
error_str = str(e)
if 'LibreOffice' in error_str or 'PDF conversion' in error_str:
# Notify about LibreOffice unavailability
AdminNotifier.notify_libreoffice_unavailable(
self.env,
error_str,
{
'survey_id': survey.id,
'survey_title': survey.title,
'user_input_id': user_input.id
}
)
else:
# Track general generation failure
AdminNotifier.track_generation_failure(
self.env,
survey.id,
survey.title,
error_str
)
except ImportError:
pass # AdminNotifier not available
except Exception as notify_error:
_logger.warning(
'Failed to send admin notification: %s',
str(notify_error)
)
# Continue processing other user inputs
continue
except Exception as e:
# Outer exception handler to catch any errors in the loop
_logger.error(
'Critical error in certificate generation loop for user_input %s: %s',
user_input.id if hasattr(user_input, 'id') else 'unknown',
str(e), exc_info=True
)
# Continue to next user input
continue
def _store_certificate_attachment(self, user_input, pdf_content):
"""
Store the generated certificate as an attachment.
This method includes error handling to ensure attachment creation
failures are logged but don't crash the system.
Args:
user_input: survey.user_input record
pdf_content: Binary PDF content
Returns:
ir.attachment record if successful, None otherwise
"""
try:
# Validate inputs
if not user_input:
_logger.error('Cannot store certificate: user_input is None')
return None
if not pdf_content or not isinstance(pdf_content, bytes):
_logger.error(
'Cannot store certificate: invalid pdf_content (type: %s)',
type(pdf_content)
)
return None
# Generate a meaningful filename
try:
survey_title = user_input.survey_id.title if user_input.survey_id else 'Survey'
partner_name = user_input.partner_id.name if user_input.partner_id else 'Participant'
except Exception as e:
_logger.warning('Failed to get survey/partner names: %s. Using defaults.', str(e))
survey_title = 'Survey'
partner_name = 'Participant'
# Sanitize filename (remove special characters)
try:
safe_survey_title = ''.join(c for c in survey_title if c.isalnum() or c in (' ', '-', '_'))
safe_partner_name = ''.join(c for c in partner_name if c.isalnum() or c in (' ', '-', '_'))
# Ensure we have at least some text in the filename
if not safe_survey_title:
safe_survey_title = 'Survey'
if not safe_partner_name:
safe_partner_name = 'Participant'
filename = f"Certificate_{safe_survey_title}_{safe_partner_name}.pdf"
except Exception as e:
_logger.warning('Failed to sanitize filename: %s. Using default.', str(e))
filename = f"Certificate_{user_input.id}.pdf"
# Encode PDF content
try:
encoded_content = base64.b64encode(pdf_content)
except Exception as e:
_logger.error('Failed to encode PDF content: %s', str(e))
return None
# Create the attachment
try:
attachment = self.env['ir.attachment'].create({
'name': filename,
'type': 'binary',
'datas': encoded_content,
'res_model': 'survey.user_input',
'res_id': user_input.id,
'mimetype': 'application/pdf',
'description': f'Custom certificate for survey: {survey_title}',
})
# Log attachment creation
try:
from ..services.certificate_logger import CertificateLogger
CertificateLogger.log_attachment_creation(
user_input.id,
attachment.id,
filename,
len(pdf_content)
)
except ImportError:
_logger.info(
'Created certificate attachment %s (ID: %s) for user_input %s',
filename, attachment.id, user_input.id
)
return attachment
except Exception as e:
_logger.error(
'Failed to create attachment for user_input %s: %s',
user_input.id, str(e), exc_info=True
)
return None
except Exception as e:
_logger.error(
'Unexpected error storing certificate attachment: %s',
str(e), exc_info=True
)
return None