303 lines
13 KiB
Python
303 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import logging
|
|
import base64
|
|
import time
|
|
from odoo import models, api
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SurveyUserInput(models.Model):
|
|
_inherit = 'survey.user_input'
|
|
|
|
@api.model
|
|
def _mark_done(self):
|
|
"""
|
|
Override the survey completion method to generate custom certificates.
|
|
|
|
This method is called when a survey is marked as complete. We hook into
|
|
this workflow to automatically generate and store custom certificates
|
|
if configured for the survey.
|
|
"""
|
|
# Call the parent method to ensure normal completion workflow
|
|
result = super(SurveyUserInput, self)._mark_done()
|
|
|
|
# Generate custom certificate if configured
|
|
self._generate_and_store_certificate()
|
|
|
|
return result
|
|
|
|
def _generate_and_store_certificate(self):
|
|
"""
|
|
Generate and store custom certificate for completed survey responses.
|
|
|
|
This method checks if the survey has a custom certificate configured,
|
|
generates the certificate, and stores it as an attachment linked to
|
|
the survey response.
|
|
|
|
This method includes comprehensive error handling to ensure that
|
|
certificate generation failures do not break the survey completion workflow.
|
|
"""
|
|
# Import logger here to avoid circular imports
|
|
try:
|
|
from ..services.certificate_logger import CertificateLogger
|
|
except ImportError:
|
|
CertificateLogger = None
|
|
_logger.warning('CertificateLogger not available, using standard logging')
|
|
|
|
for user_input in self:
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Skip if survey doesn't exist or isn't configured for custom certificates
|
|
if not user_input.survey_id:
|
|
_logger.debug('User input %s has no associated survey, skipping', user_input.id)
|
|
continue
|
|
|
|
survey = user_input.survey_id
|
|
|
|
# Check if custom certificate is configured
|
|
if not survey.has_custom_certificate or not survey.custom_cert_template:
|
|
_logger.debug(
|
|
'Survey %s (ID: %s) does not have custom certificate configured, skipping',
|
|
survey.title, survey.id
|
|
)
|
|
continue
|
|
|
|
# Check if certification is enabled for this survey
|
|
if not survey.certification:
|
|
_logger.debug(
|
|
'Certification not enabled for survey %s (ID: %s), skipping',
|
|
survey.title, survey.id
|
|
)
|
|
continue
|
|
|
|
try:
|
|
# Log generation start
|
|
partner_name = user_input.partner_id.name if user_input.partner_id else None
|
|
if CertificateLogger:
|
|
CertificateLogger.log_certificate_generation_start(
|
|
survey.id,
|
|
survey.title,
|
|
user_input.id,
|
|
partner_name
|
|
)
|
|
|
|
# Generate the certificate
|
|
pdf_content = survey._generate_custom_certificate(user_input.id)
|
|
|
|
if not pdf_content:
|
|
_logger.warning(
|
|
'Certificate generation returned no content for user_input %s. '
|
|
'This may be due to missing template, mappings, or LibreOffice unavailability.',
|
|
user_input.id
|
|
)
|
|
continue
|
|
|
|
# Validate PDF content
|
|
if not isinstance(pdf_content, bytes) or len(pdf_content) == 0:
|
|
_logger.error(
|
|
'Invalid PDF content for user_input %s: expected bytes, got %s',
|
|
user_input.id, type(pdf_content)
|
|
)
|
|
continue
|
|
|
|
# Store the certificate as an attachment
|
|
try:
|
|
self._store_certificate_attachment(user_input, pdf_content)
|
|
|
|
# Calculate duration and log success
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
if CertificateLogger:
|
|
CertificateLogger.log_certificate_generation_success(
|
|
survey.id,
|
|
user_input.id,
|
|
len(pdf_content),
|
|
round(duration_ms, 2)
|
|
)
|
|
|
|
# Track success to reset failure count
|
|
try:
|
|
from ..services.admin_notifier import AdminNotifier
|
|
AdminNotifier.track_generation_success(survey.id)
|
|
except ImportError:
|
|
pass # AdminNotifier not available
|
|
|
|
except Exception as e:
|
|
_logger.error(
|
|
'Failed to store certificate attachment for user_input %s: %s',
|
|
user_input.id, str(e), exc_info=True
|
|
)
|
|
if CertificateLogger:
|
|
CertificateLogger.log_certificate_generation_failure(
|
|
survey.id,
|
|
user_input.id,
|
|
e,
|
|
error_type='attachment_storage'
|
|
)
|
|
continue
|
|
|
|
except Exception as e:
|
|
# Log the error but don't break the survey completion workflow
|
|
if CertificateLogger:
|
|
CertificateLogger.log_certificate_generation_failure(
|
|
survey.id,
|
|
user_input.id,
|
|
e,
|
|
error_type='generation'
|
|
)
|
|
else:
|
|
_logger.error(
|
|
'Failed to generate certificate for user_input %s: %s',
|
|
user_input.id, str(e), exc_info=True
|
|
)
|
|
|
|
# Track failure and notify admins if threshold reached
|
|
try:
|
|
from ..services.admin_notifier import AdminNotifier
|
|
|
|
# Check if this is a LibreOffice error
|
|
error_str = str(e)
|
|
if 'LibreOffice' in error_str or 'PDF conversion' in error_str:
|
|
# Notify about LibreOffice unavailability
|
|
AdminNotifier.notify_libreoffice_unavailable(
|
|
self.env,
|
|
error_str,
|
|
{
|
|
'survey_id': survey.id,
|
|
'survey_title': survey.title,
|
|
'user_input_id': user_input.id
|
|
}
|
|
)
|
|
else:
|
|
# Track general generation failure
|
|
AdminNotifier.track_generation_failure(
|
|
self.env,
|
|
survey.id,
|
|
survey.title,
|
|
error_str
|
|
)
|
|
except ImportError:
|
|
pass # AdminNotifier not available
|
|
except Exception as notify_error:
|
|
_logger.warning(
|
|
'Failed to send admin notification: %s',
|
|
str(notify_error)
|
|
)
|
|
|
|
# Continue processing other user inputs
|
|
continue
|
|
|
|
except Exception as e:
|
|
# Outer exception handler to catch any errors in the loop
|
|
_logger.error(
|
|
'Critical error in certificate generation loop for user_input %s: %s',
|
|
user_input.id if hasattr(user_input, 'id') else 'unknown',
|
|
str(e), exc_info=True
|
|
)
|
|
# Continue to next user input
|
|
continue
|
|
|
|
def _store_certificate_attachment(self, user_input, pdf_content):
|
|
"""
|
|
Store the generated certificate as an attachment.
|
|
|
|
This method includes error handling to ensure attachment creation
|
|
failures are logged but don't crash the system.
|
|
|
|
Args:
|
|
user_input: survey.user_input record
|
|
pdf_content: Binary PDF content
|
|
|
|
Returns:
|
|
ir.attachment record if successful, None otherwise
|
|
"""
|
|
try:
|
|
# Validate inputs
|
|
if not user_input:
|
|
_logger.error('Cannot store certificate: user_input is None')
|
|
return None
|
|
|
|
if not pdf_content or not isinstance(pdf_content, bytes):
|
|
_logger.error(
|
|
'Cannot store certificate: invalid pdf_content (type: %s)',
|
|
type(pdf_content)
|
|
)
|
|
return None
|
|
|
|
# Generate a meaningful filename
|
|
try:
|
|
survey_title = user_input.survey_id.title if user_input.survey_id else 'Survey'
|
|
partner_name = user_input.partner_id.name if user_input.partner_id else 'Participant'
|
|
except Exception as e:
|
|
_logger.warning('Failed to get survey/partner names: %s. Using defaults.', str(e))
|
|
survey_title = 'Survey'
|
|
partner_name = 'Participant'
|
|
|
|
# Sanitize filename (remove special characters)
|
|
try:
|
|
safe_survey_title = ''.join(c for c in survey_title if c.isalnum() or c in (' ', '-', '_'))
|
|
safe_partner_name = ''.join(c for c in partner_name if c.isalnum() or c in (' ', '-', '_'))
|
|
|
|
# Ensure we have at least some text in the filename
|
|
if not safe_survey_title:
|
|
safe_survey_title = 'Survey'
|
|
if not safe_partner_name:
|
|
safe_partner_name = 'Participant'
|
|
|
|
filename = f"Certificate_{safe_survey_title}_{safe_partner_name}.pdf"
|
|
except Exception as e:
|
|
_logger.warning('Failed to sanitize filename: %s. Using default.', str(e))
|
|
filename = f"Certificate_{user_input.id}.pdf"
|
|
|
|
# Encode PDF content
|
|
try:
|
|
encoded_content = base64.b64encode(pdf_content)
|
|
except Exception as e:
|
|
_logger.error('Failed to encode PDF content: %s', str(e))
|
|
return None
|
|
|
|
# Create the attachment
|
|
try:
|
|
attachment = self.env['ir.attachment'].create({
|
|
'name': filename,
|
|
'type': 'binary',
|
|
'datas': encoded_content,
|
|
'res_model': 'survey.user_input',
|
|
'res_id': user_input.id,
|
|
'mimetype': 'application/pdf',
|
|
'description': f'Custom certificate for survey: {survey_title}',
|
|
})
|
|
|
|
# Log attachment creation
|
|
try:
|
|
from ..services.certificate_logger import CertificateLogger
|
|
CertificateLogger.log_attachment_creation(
|
|
user_input.id,
|
|
attachment.id,
|
|
filename,
|
|
len(pdf_content)
|
|
)
|
|
except ImportError:
|
|
_logger.info(
|
|
'Created certificate attachment %s (ID: %s) for user_input %s',
|
|
filename, attachment.id, user_input.id
|
|
)
|
|
|
|
return attachment
|
|
|
|
except Exception as e:
|
|
_logger.error(
|
|
'Failed to create attachment for user_input %s: %s',
|
|
user_input.id, str(e), exc_info=True
|
|
)
|
|
return None
|
|
|
|
except Exception as e:
|
|
_logger.error(
|
|
'Unexpected error storing certificate attachment: %s',
|
|
str(e), exc_info=True
|
|
)
|
|
return None
|