303 lines
16 KiB
Python
Executable File
303 lines
16 KiB
Python
Executable File
# -*- coding: utf-8 -*-
|
|
import base64
|
|
import io
|
|
import os
|
|
import time
|
|
from dateutil.relativedelta import relativedelta
|
|
from datetime import timedelta
|
|
|
|
from reportlab.lib.utils import ImageReader
|
|
from reportlab.pdfbase import pdfmetrics
|
|
from reportlab.pdfbase.ttfonts import TTFont
|
|
from reportlab.rl_config import TTFSearchPath
|
|
from reportlab.pdfgen import canvas
|
|
from reportlab.platypus import Paragraph
|
|
from reportlab.lib.styles import ParagraphStyle
|
|
from reportlab.pdfbase.pdfmetrics import stringWidth
|
|
from PIL import UnidentifiedImageError
|
|
|
|
from odoo import api, fields, models, _, Command
|
|
from odoo.exceptions import UserError, ValidationError
|
|
from odoo.tools.pdf import PdfFileReader, PdfFileWriter, PdfReadError, reshape_text
|
|
|
|
import logging
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
# Helper function copied from sign/models/sign_request.py
|
|
def _fix_image_transparency(image):
|
|
pixels = image.load()
|
|
for x in range(image.size[0]):
|
|
for y in range(image.size[1]):
|
|
if pixels[x, y] == (0, 0, 0, 0):
|
|
pixels[x, y] = (255, 255, 255, 0)
|
|
|
|
class SignRequest(models.Model):
|
|
_inherit = "sign.request"
|
|
|
|
def _sign(self):
|
|
""" Override to generate sequence numbers when request is signed """
|
|
# We perform sequence generation BEFORE calling super()._sign()
|
|
# because super()._sign() triggers _send_completed_document() immediately.
|
|
# We need the sequence values to be ready before the document is generated and sent.
|
|
|
|
# We only generate if we are about to complete the signing process.
|
|
# The logic in _post_fill_request_item calls _sign only when all items are completed.
|
|
|
|
for request in self:
|
|
# Generate sequence numbers for sequence items
|
|
for sign_item in request.template_id.sign_item_ids:
|
|
if sign_item.type_id.item_type == 'sequence' and sign_item.sequence_id:
|
|
# Find the responsible request item (signer)
|
|
# Use role_id to match.
|
|
# We check if there is a signer for this role in this request.
|
|
request_items = request.request_item_ids.filtered(lambda r: r.role_id == sign_item.responsible_id)
|
|
if not request_items:
|
|
continue
|
|
|
|
# We only generate if it hasn't been generated yet.
|
|
existing_value = request.env['sign.request.item.value'].search([
|
|
('sign_request_item_id', 'in', request_items.ids),
|
|
('sign_item_id', '=', sign_item.id)
|
|
], limit=1)
|
|
|
|
# If it exists but value is empty/false OR it equals the placeholder "Sequence Number"
|
|
# we must regenerate it.
|
|
if not existing_value or not existing_value.value or existing_value.value == "Sequence Number":
|
|
new_seq = sign_item.sequence_id.next_by_id()
|
|
if new_seq:
|
|
if existing_value:
|
|
# Update existing empty record
|
|
existing_value.write({'value': new_seq})
|
|
else:
|
|
# Create value for the first matching signer
|
|
request.env['sign.request.item.value'].create({
|
|
'sign_request_item_id': request_items[0].id,
|
|
'sign_item_id': sign_item.id,
|
|
'value': new_seq
|
|
})
|
|
|
|
# Rename the document if requested
|
|
# User asked to put sequence in prefix of document name.
|
|
# request.reference is the document name.
|
|
# Check if already renamed to avoid double prefixing
|
|
if not request.reference.startswith(f"{new_seq} - "):
|
|
request.write({'reference': f"{new_seq} - {request.reference}"})
|
|
|
|
return super(SignRequest, self)._sign()
|
|
|
|
def _generate_completed_document(self, password=""):
|
|
# We need to override this entire method to handle 'sequence' and 'image' item types
|
|
# as the original method doesn't have hooks or generic handling.
|
|
_logger.info("Starting _generate_completed_document for request %s", self.id)
|
|
self.ensure_one()
|
|
if self.state != 'signed':
|
|
raise UserError(_("The completed document cannot be created because the sign request is not fully signed"))
|
|
if not self.template_id.sign_item_ids:
|
|
_logger.info("No sign items, copying original attachment")
|
|
self.completed_document = self.template_id.attachment_id.datas
|
|
else:
|
|
try:
|
|
_logger.info("Reading original PDF")
|
|
old_pdf = PdfFileReader(io.BytesIO(base64.b64decode(self.template_id.attachment_id.datas)), strict=False, overwriteWarnings=False)
|
|
num_pages = old_pdf.getNumPages()
|
|
_logger.info("Original PDF has %s pages", num_pages)
|
|
except Exception as e:
|
|
_logger.error("Failed to read PDF: %s", e)
|
|
raise ValidationError(_("ERROR: Invalid PDF file!"))
|
|
|
|
isEncrypted = old_pdf.isEncrypted
|
|
if isEncrypted and not old_pdf.decrypt(password):
|
|
# password is not correct
|
|
return
|
|
|
|
font = self._get_font()
|
|
normalFontSize = self._get_normal_font_size()
|
|
|
|
packet = io.BytesIO()
|
|
can = canvas.Canvas(packet, pagesize=self.get_page_size(old_pdf))
|
|
itemsByPage = self.template_id._get_sign_items_by_page()
|
|
items_ids = [id for items in itemsByPage.values() for id in items.ids]
|
|
_logger.info("Fetching item values for %s items", len(items_ids))
|
|
values_dict = self.env['sign.request.item.value']._read_group(
|
|
[('sign_item_id', 'in', items_ids), ('sign_request_id', '=', self.id)],
|
|
groupby=['sign_item_id'],
|
|
aggregates=['value:array_agg', 'frame_value:array_agg', 'frame_has_hash:array_agg']
|
|
)
|
|
values = {
|
|
sign_item.id : {
|
|
'value': v[0] if v else None,
|
|
'frame': f[0] if f else None,
|
|
'frame_has_hash': h[0] if h else None,
|
|
}
|
|
for sign_item, v, f, h in values_dict
|
|
}
|
|
_logger.info("Retrieved %s item values", len(values))
|
|
|
|
for p in range(0, old_pdf.getNumPages()):
|
|
page = old_pdf.getPage(p)
|
|
# Absolute values are taken as it depends on the MediaBox template PDF metadata, they may be negative
|
|
width = float(abs(page.mediaBox.getWidth()))
|
|
height = float(abs(page.mediaBox.getHeight()))
|
|
|
|
# Set page orientation (either 0, 90, 180 or 270)
|
|
rotation = page['/Rotate'] if '/Rotate' in page else 0
|
|
if rotation and isinstance(rotation, int):
|
|
can.rotate(rotation)
|
|
# Translate system so that elements are placed correctly
|
|
# despite of the orientation
|
|
if rotation == 90:
|
|
width, height = height, width
|
|
can.translate(0, -height)
|
|
elif rotation == 180:
|
|
can.translate(-width, -height)
|
|
elif rotation == 270:
|
|
width, height = height, width
|
|
can.translate(-width, 0)
|
|
|
|
items = itemsByPage[p + 1] if p + 1 in itemsByPage else []
|
|
for item in items:
|
|
value_dict = values.get(item.id)
|
|
if not value_dict:
|
|
continue
|
|
# only get the 1st
|
|
value = value_dict['value']
|
|
frame = value_dict['frame']
|
|
|
|
if frame:
|
|
try:
|
|
image_reader = ImageReader(io.BytesIO(base64.b64decode(frame[frame.find(',')+1:])))
|
|
except UnidentifiedImageError:
|
|
raise ValidationError(_("There was an issue downloading your document. Please contact an administrator."))
|
|
_fix_image_transparency(image_reader._image)
|
|
can.drawImage(
|
|
image_reader,
|
|
width*item.posX,
|
|
height*(1-item.posY-item.height),
|
|
width*item.width,
|
|
height*item.height,
|
|
'auto',
|
|
True
|
|
)
|
|
|
|
# LOGIC FOR SEQUENCE TYPE (Same as text)
|
|
if item.type_id.item_type == "sequence":
|
|
if value:
|
|
value = reshape_text(value)
|
|
can.setFont(font, height*item.height*0.8)
|
|
if item.alignment == "left":
|
|
can.drawString(width*item.posX, height*(1-item.posY-item.height*0.9), value)
|
|
elif item.alignment == "right":
|
|
can.drawRightString(width*(item.posX+item.width), height*(1-item.posY-item.height*0.9), value)
|
|
else:
|
|
can.drawCentredString(width*(item.posX+item.width/2), height*(1-item.posY-item.height*0.9), value)
|
|
|
|
elif item.type_id.item_type == "text":
|
|
if value:
|
|
value = reshape_text(value)
|
|
can.setFont(font, height*item.height*0.8)
|
|
if item.alignment == "left":
|
|
can.drawString(width*item.posX, height*(1-item.posY-item.height*0.9), value)
|
|
elif item.alignment == "right":
|
|
can.drawRightString(width*(item.posX+item.width), height*(1-item.posY-item.height*0.9), value)
|
|
else:
|
|
can.drawCentredString(width*(item.posX+item.width/2), height*(1-item.posY-item.height*0.9), value)
|
|
|
|
elif item.type_id.item_type == "selection":
|
|
if value:
|
|
content = []
|
|
for option in item.option_ids:
|
|
if option.id != int(value):
|
|
content.append("<strike>%s</strike>" % (option.value))
|
|
else:
|
|
content.append(option.value)
|
|
font_size = height * normalFontSize * 0.8
|
|
text = " / ".join(content)
|
|
string_width = stringWidth(text.replace("<strike>", "").replace("</strike>", ""), font, font_size)
|
|
p = Paragraph(text, ParagraphStyle(name='Selection Paragraph', fontName=font, fontSize=font_size, leading=12))
|
|
posX = width * (item.posX + item.width * 0.5) - string_width // 2
|
|
posY = height * (1 - item.posY - item.height * 0.5) - p.wrap(width, height)[1] // 2
|
|
p.drawOn(can, posX, posY)
|
|
|
|
elif item.type_id.item_type == "textarea":
|
|
if value:
|
|
font_size = height * normalFontSize * 0.8
|
|
can.setFont(font, font_size)
|
|
lines = value.split('\n')
|
|
y = (1-item.posY)
|
|
for line in lines:
|
|
empty_space = width * item.width - can.stringWidth(line, font, font_size)
|
|
x_shift = 0
|
|
if item.alignment == 'center':
|
|
x_shift = empty_space / 2
|
|
elif item.alignment == 'right':
|
|
x_shift = empty_space
|
|
y -= normalFontSize * 0.9
|
|
line = reshape_text(line)
|
|
can.drawString(width * item.posX + x_shift, height * y, line)
|
|
y -= normalFontSize * 0.1
|
|
|
|
elif item.type_id.item_type == "checkbox":
|
|
can.setFont(font, height*item.height*0.8)
|
|
value = 'X' if value == 'on' else ''
|
|
can.drawString(width*item.posX, height*(1-item.posY-item.height*0.9), value)
|
|
elif item.type_id.item_type == "radio":
|
|
x = width * item.posX
|
|
y = height * (1 - item.posY)
|
|
w = item.width * width
|
|
h = item.height * height
|
|
# Calculate the center of the sign item rectangle.
|
|
c_x = x + w * 0.5
|
|
c_y = y - h * 0.5
|
|
# Draw the outer empty circle.
|
|
can.circle(c_x, c_y, h * 0.5)
|
|
if value == "on":
|
|
# Draw the inner filled circle.
|
|
can.circle(x_cen=c_x, y_cen=c_y, r=h * 0.5 * 0.75, fill=1)
|
|
elif item.type_id.item_type in ["signature", "initial", "image"]:
|
|
if value:
|
|
try:
|
|
image_reader = ImageReader(io.BytesIO(base64.b64decode(value[value.find(',')+1:])))
|
|
except UnidentifiedImageError:
|
|
raise ValidationError(_("There was an issue downloading your document. Please contact an administrator."))
|
|
_fix_image_transparency(image_reader._image)
|
|
can.drawImage(image_reader, width*item.posX, height*(1-item.posY-item.height), width*item.width, height*item.height, 'auto', True)
|
|
|
|
can.showPage()
|
|
|
|
can.save()
|
|
|
|
item_pdf = PdfFileReader(packet, overwriteWarnings=False)
|
|
new_pdf = PdfFileWriter()
|
|
|
|
for p in range(0, old_pdf.getNumPages()):
|
|
page = old_pdf.getPage(p)
|
|
page.mergePage(item_pdf.getPage(p))
|
|
new_pdf.addPage(page)
|
|
|
|
if isEncrypted:
|
|
new_pdf.encrypt(password)
|
|
|
|
try:
|
|
output = io.BytesIO()
|
|
new_pdf.write(output)
|
|
self.completed_document = base64.b64encode(output.getvalue())
|
|
except PdfReadError:
|
|
raise ValidationError(_("There was an issue downloading your document. Please contact an administrator."))
|
|
|
|
_logger.info("Completed document generated, size: %s bytes", len(self.completed_document))
|
|
output.close()
|
|
|
|
# Odoo 18 logic to create attachment and link it
|
|
# This matches enterprise/sign/models/sign_request.py:797+
|
|
_logger.info("Creating ir.attachment for completed document")
|
|
attachment = self.env['ir.attachment'].create({
|
|
'name': "%s.pdf" % self.reference if self.reference.split('.')[-1] != 'pdf' else self.reference,
|
|
'datas': self.completed_document,
|
|
'type': 'binary',
|
|
'res_model': self._name,
|
|
'res_id': self.id,
|
|
})
|
|
self.completed_document_attachment_ids = [Command.set([attachment.id])]
|
|
_logger.info("Attachment created: %s", attachment.id)
|
|
|