refactor: replace background thread report updates with synchronous execution to prevent data race conditions

This commit is contained in:
Suherdy Yacob 2026-05-07 16:58:34 +07:00
parent 243ebdb4d5
commit 1caeebb7a9

View File

@ -1,26 +1,20 @@
import threading
import logging import logging
import time
import random
from odoo import models, fields, api from odoo import models, fields, api
from odoo.addons.pos_enterprise.utils.date_utils import compute_seconds_since
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
def _threaded_report_update(registry, uid, context, pdis_state_id):
"""
Final Optimized Background Worker.
Moves all logic (including stage lookups) to the background.
"""
max_retries = 3
for attempt in range(max_retries):
try:
# Staggered start
time.sleep(random.uniform(0.1, 0.3))
with registry.cursor() as new_cr: class PosPreparationState(models.Model):
new_env = api.Environment(new_cr, uid, context) _inherit = 'pos.prep.state'
pdis_state = new_env['pos.prep.state'].browse(pdis_state_id)
if not pdis_state.exists(): def _update_kds_report(self, pdis_state, old_last_stage_change=None):
"""
Synchronous KDS report update.
Computes preparation/service time directly from stage timestamps,
avoiding race conditions from background threads reading uncommitted data.
"""
if not pdis_state.prep_line_id:
return return
order_line = pdis_state.prep_line_id.pos_order_line_id order_line = pdis_state.prep_line_id.pos_order_line_id
@ -28,16 +22,14 @@ def _threaded_report_update(registry, uid, context, pdis_state_id):
return return
order = order_line.order_id order = order_line.order_id
order_name = order.name or order.pos_reference or "Order"
stage = pdis_state.stage_id stage = pdis_state.stage_id
if not stage: if not stage:
return return
display = stage.prep_display_id display = stage.prep_display_id
display_id = display.id if not display:
order_id = order.id return
# Fetch stage info inside the thread
stage_ids = display.stage_ids.ids stage_ids = display.stage_ids.ids
if not stage_ids: if not stage_ids:
return return
@ -46,6 +38,7 @@ def _threaded_report_update(registry, uid, context, pdis_state_id):
last_stage_id = stage_ids[-1] last_stage_id = stage_ids[-1]
second_last_stage_id = stage_ids[-2] if len(stage_ids) > 1 else False second_last_stage_id = stage_ids[-2] if len(stage_ids) > 1 else False
# Determine if this is a completion or a reset
is_completed = False is_completed = False
if len(stage_ids) > 1: if len(stage_ids) > 1:
if stage.id == last_stage_id: if stage.id == last_stage_id:
@ -61,16 +54,14 @@ def _threaded_report_update(registry, uid, context, pdis_state_id):
if not (is_completed or is_reset): if not (is_completed or is_reset):
return return
KdsLineReport = new_env['pos.kds.report.line'].sudo() display_id = display.id
KdsOrderReport = new_env['pos.kds.report.order'].sudo() order_id = order.id
order_name = order.name or order.pos_reference or "Order"
# Row Locking KdsLineReport = self.env['pos.kds.report.line'].sudo()
new_cr.execute(""" KdsOrderReport = self.env['pos.kds.report.order'].sudo()
SELECT id FROM pos_kds_report_order
WHERE pos_order_id = %s AND prep_display_id = %s
FOR UPDATE
""", (order_id, display_id))
try:
if is_reset: if is_reset:
line_report = KdsLineReport.search([ line_report = KdsLineReport.search([
('pos_order_line_id', '=', order_line.id), ('pos_order_line_id', '=', order_line.id),
@ -94,12 +85,24 @@ def _threaded_report_update(registry, uid, context, pdis_state_id):
order_report.unlink() order_report.unlink()
else: else:
order_report.write({'completion_time': max_comp_time}) order_report.write({'completion_time': max_comp_time})
new_cr.commit()
return return
# If completed # --- Completed ---
# Read directly from the in-memory ORM record (set by super() in the same transaction)
prep_time = max(0, order_line.preparation_time) prep_time = max(0, order_line.preparation_time)
svc_time = max(0, order_line.service_time) svc_time = max(0, order_line.service_time)
# Fallback: if the base module hasn't set the value yet (still -1),
# compute it ourselves from the last_stage_change timestamp
if order_line.preparation_time == -1:
if old_last_stage_change:
prep_time = int(compute_seconds_since(old_last_stage_change))
else:
prep_time = int(compute_seconds_since(pdis_state.last_stage_change))
if order_line.service_time == -1:
svc_time = 0 # Service time may not apply if there's no service stage
comp_time = prep_time + svc_time comp_time = prep_time + svc_time
vals = { vals = {
@ -145,38 +148,13 @@ def _threaded_report_update(registry, uid, context, pdis_state_id):
else: else:
KdsOrderReport.create(order_vals) KdsOrderReport.create(order_vals)
new_cr.commit()
_logger.info("POS_KDS_PERF: Background report update SUCCESS for order %s", order_name)
return
except Exception as e: except Exception as e:
if "could not serialize access" in str(e) or "concurrent update" in str(e): _logger.error("KDS reporting failed for order %s: %s", order_name, e)
continue
else:
_logger.error("POS_KDS_PERF: Background KDS reporting failed: %s", e)
break
else:
_logger.error("POS_KDS_PERF: Failed to update report after retries")
class PosPreparationState(models.Model):
_inherit = 'pos.prep.state'
def _update_kds_report_async(self, pdis_state):
"""Ultra-fast hook that only starts the thread."""
if not pdis_state.prep_line_id:
return
thread = threading.Thread(
target=_threaded_report_update,
args=(self.env.registry, self.env.uid, self.env.context, pdis_state.id)
)
thread.daemon = True
thread.start()
def _record_status_change_prep_time(self, pdis_state): def _record_status_change_prep_time(self, pdis_state):
super()._record_status_change_prep_time(pdis_state) super()._record_status_change_prep_time(pdis_state)
self._update_kds_report_async(pdis_state) self._update_kds_report(pdis_state)
def _record_stage_change_prep_time(self, pdis_state, old_last_stage_change, prep_order_completion_time): def _record_stage_change_prep_time(self, pdis_state, old_last_stage_change, prep_order_completion_time):
super()._record_stage_change_prep_time(pdis_state, old_last_stage_change, prep_order_completion_time) super()._record_stage_change_prep_time(pdis_state, old_last_stage_change, prep_order_completion_time)
self._update_kds_report_async(pdis_state) self._update_kds_report(pdis_state, old_last_stage_change=old_last_stage_change)