survey_upload_image/models/survey_user_input.py

171 lines
7.3 KiB
Python
Executable File

# -*- coding: utf-8 -*-
##############################################################################
#
# Cybrosys Technologies Pvt. Ltd.
#
# Copyright (C) 2025-TODAY Cybrosys Technologies(<https://www.cybrosys.com>)
# Author: Mohammed Dilshad Tk (odoo@cybrosys.com)
#
# You can modify it under the terms of the GNU LESSER
# GENERAL PUBLIC LICENSE (LGPL v3), Version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU LESSER GENERAL PUBLIC LICENSE (LGPL v3) for more details.
#
# You should have received a copy of the GNU LESSER GENERAL PUBLIC LICENSE
# (LGPL v3) along with this program.
# If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from odoo import models
class SurveyUserInput(models.Model):
"""
This class extends the 'survey.user_input' model to add custom
functionality for saving user answers.
Methods:
_save_lines: Save the user's answer for the given question
_save_line_file:Save the user's file upload answer for the given
question
_get_line_answer_file_upload_values:
Get the values to use when creating or updating a user input line
for a file upload answer
"""
_inherit = "survey.user_input"
def _save_lines(self, question, answer, comment=None,
overwrite_existing=False):
"""Save the user's answer for the given question."""
old_answers = self.env['survey.user_input.line'].search([
('user_input_id', '=', self.id),
('question_id', '=', question.id), ])
if question.question_type == 'upload_file':
res = self._save_line_simple_answers(question, old_answers, answer)
elif not question.question_type:
# Fallback for missing question type: silently return empty recordset
# raising AttributeError causes RPC_ERROR for the user
return self.env['survey.user_input.line']
else:
res = super(SurveyUserInput, self)._save_lines(question, answer, comment,
overwrite_existing)
return res
def _save_line_simple_answers(self, question, old_answers, answer):
""" Save the user's file upload answer for the given question."""
vals = self._get_line_answer_file_upload_values(question,
'upload_file', answer)
if old_answers:
old_answers.write(vals)
return old_answers
else:
return self.env['survey.user_input.line'].create(vals)
def _get_line_answer_file_upload_values(self, question, answer_type,
answer):
"""Get the values to use when creating or updating a user input line
for a file upload answer.
Auto-compress images if they are uploaded."""
vals = {
'user_input_id': self.id,
'question_id': question.id,
'skipped': False,
'answer_type': answer_type,
}
if answer_type == 'upload_file':
# ---------------------------------------------------------
# RETRIEVAL LOGIC: Same as in validate_question
# If standard controller failed to pass the answer (because key mismatch),
# we try to fetch it from request.params ourselves.
# ---------------------------------------------------------
import logging
_logger = logging.getLogger(__name__)
if not answer or answer == '[]':
from odoo.http import request
if request:
keys_to_check = [f'upload_{question.id}', str(question.id), '']
for key in keys_to_check:
val = request.params.get(key)
if val and val != '[]':
answer = val
break
if isinstance(answer, str):
import json
try:
answer = json.loads(answer)
except Exception as e:
_logger.error(f"SAVE QUESTION {question.id}: JSON Decode Error: {e}")
pass
if not answer or not isinstance(answer, list) or len(answer) < 2:
return vals # Skipped=False, but no files.
file_data = answer[0]
file_name = answer[1]
attachment_ids = []
for i in range(len(answer[1])):
data = file_data[i]
fname = file_name[i]
# Validation: Restrict to PNG, JPG, JPEG
if not fname.lower().endswith(('.png', '.jpg', '.jpeg')):
from odoo.exceptions import UserError
raise UserError(f"Only image files (PNG, JPG, JPEG) are allowed. Invalid file: {fname}")
# Auto-compression logic
try:
# Check if file is an image based on extension or simple check
if fname.lower().endswith(('.png', '.jpg', '.jpeg')):
import base64
import io
from PIL import Image
# Decode base64
image_stream = io.BytesIO(base64.b64decode(data))
img = Image.open(image_stream)
# Convert to RGB if RGBA/P to avoid issues saving as JPEG
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
# Resize if too large (max 1024x1024)
max_size = (1024, 1024)
img.thumbnail(max_size, Image.Resampling.LANCZOS)
# Compress with lower quality
output_stream = io.BytesIO()
img.save(output_stream, format='JPEG', quality=50, optimize=True)
data = base64.b64encode(output_stream.getvalue())
except Exception as e:
_logger.error(f"SAVE QUESTION {question.id}: Compression Error: {e}")
pass
attachment = self.env['ir.attachment'].create({
'name': fname,
'type': 'binary',
'datas': data,
})
attachment_ids.append(attachment.id)
# Use Command tuples for Many2many relationship
# (6, 0, ids) replaces all existing records with the new list
vals['value_file_data_ids'] = [(6, 0, attachment_ids)]
return vals
def action_delete_uploaded_files(self):
"""Action to delete uploaded files for selected surveys."""
for record in self:
file_answers = record.user_input_line_ids.filtered(
lambda l: l.answer_type == 'upload_file' and l.value_file_data_ids
)
for line in file_answers:
# Unlink attachments
line.value_file_data_ids.unlink()