From 1caeebb7a9ee14bb2d64d0d58276fd9ac55a003b Mon Sep 17 00:00:00 2001 From: Suherdy Yacob Date: Thu, 7 May 2026 16:58:34 +0700 Subject: [PATCH] refactor: replace background thread report updates with synchronous execution to prevent data race conditions --- models/pos_prep_state.py | 308 ++++++++++++++++++--------------------- 1 file changed, 143 insertions(+), 165 deletions(-) diff --git a/models/pos_prep_state.py b/models/pos_prep_state.py index 5a2549b..28d38ae 100644 --- a/models/pos_prep_state.py +++ b/models/pos_prep_state.py @@ -1,182 +1,160 @@ -import threading import logging -import time -import random from odoo import models, fields, api +from odoo.addons.pos_enterprise.utils.date_utils import compute_seconds_since _logger = logging.getLogger(__name__) -def _threaded_report_update(registry, uid, context, pdis_state_id): - """ - Final Optimized Background Worker. - Moves all logic (including stage lookups) to the background. - """ - max_retries = 3 - for attempt in range(max_retries): - try: - # Staggered start - time.sleep(random.uniform(0.1, 0.3)) - - with registry.cursor() as new_cr: - new_env = api.Environment(new_cr, uid, context) - pdis_state = new_env['pos.prep.state'].browse(pdis_state_id) - if not pdis_state.exists(): - return - - order_line = pdis_state.prep_line_id.pos_order_line_id - if not order_line: - return - - order = order_line.order_id - order_name = order.name or order.pos_reference or "Order" - stage = pdis_state.stage_id - if not stage: - return - - display = stage.prep_display_id - display_id = display.id - order_id = order.id - - # Fetch stage info inside the thread - stage_ids = display.stage_ids.ids - if not stage_ids: - return - - first_stage_id = stage_ids[0] - last_stage_id = stage_ids[-1] - second_last_stage_id = stage_ids[-2] if len(stage_ids) > 1 else False - - is_completed = False - if len(stage_ids) > 1: - if stage.id == last_stage_id: - is_completed = True - elif second_last_stage_id and stage.id == second_last_stage_id and not pdis_state.todo: - is_completed = True - else: - if not pdis_state.todo: - is_completed = True - - is_reset = (stage.id == first_stage_id and pdis_state.todo) - - if not (is_completed or is_reset): - return - - KdsLineReport = new_env['pos.kds.report.line'].sudo() - KdsOrderReport = new_env['pos.kds.report.order'].sudo() - - # Row Locking - new_cr.execute(""" - SELECT id FROM pos_kds_report_order - WHERE pos_order_id = %s AND prep_display_id = %s - FOR UPDATE - """, (order_id, display_id)) - - if is_reset: - line_report = KdsLineReport.search([ - ('pos_order_line_id', '=', order_line.id), - ('prep_display_id', '=', display_id) - ], limit=1) - if line_report: - line_report.unlink() - - order_report = KdsOrderReport.search([ - ('pos_order_id', '=', order_id), - ('prep_display_id', '=', display_id) - ], limit=1) - - if order_report: - res = KdsLineReport._read_group( - [('pos_order_id', '=', order_id), ('prep_display_id', '=', display_id)], - aggregates=['completion_time:max'] - ) - max_comp_time = res[0][0] if res else 0 - if max_comp_time == 0: - order_report.unlink() - else: - order_report.write({'completion_time': max_comp_time}) - new_cr.commit() - return - - # If completed - prep_time = max(0, order_line.preparation_time) - svc_time = max(0, order_line.service_time) - comp_time = prep_time + svc_time - - vals = { - 'pos_order_id': order_id, - 'pos_order_line_id': order_line.id, - 'product_id': order_line.product_id.id, - 'prep_display_id': display_id, - 'preparation_time': prep_time, - 'service_time': svc_time, - 'completion_time': comp_time, - 'completion_datetime': fields.Datetime.now(), - } - - line_report = KdsLineReport.search([ - ('pos_order_line_id', '=', order_line.id), - ('prep_display_id', '=', display_id) - ], limit=1) - - if line_report: - line_report.write(vals) - else: - KdsLineReport.create(vals) - - order_report = KdsOrderReport.search([ - ('pos_order_id', '=', order_id), - ('prep_display_id', '=', display_id) - ], limit=1) - - res = KdsLineReport._read_group( - [('pos_order_id', '=', order_id), ('prep_display_id', '=', display_id)], - aggregates=['completion_time:max'] - ) - max_comp_time = res[0][0] if res else comp_time - - order_vals = { - 'pos_order_id': order_id, - 'prep_display_id': display_id, - 'completion_time': max_comp_time, - 'completion_datetime': fields.Datetime.now(), - } - if order_report: - order_report.write(order_vals) - else: - KdsOrderReport.create(order_vals) - - new_cr.commit() - _logger.info("POS_KDS_PERF: Background report update SUCCESS for order %s", order_name) - return - - except Exception as e: - if "could not serialize access" in str(e) or "concurrent update" in str(e): - continue - else: - _logger.error("POS_KDS_PERF: Background KDS reporting failed: %s", e) - break - else: - _logger.error("POS_KDS_PERF: Failed to update report after retries") class PosPreparationState(models.Model): _inherit = 'pos.prep.state' - def _update_kds_report_async(self, pdis_state): - """Ultra-fast hook that only starts the thread.""" + def _update_kds_report(self, pdis_state, old_last_stage_change=None): + """ + Synchronous KDS report update. + Computes preparation/service time directly from stage timestamps, + avoiding race conditions from background threads reading uncommitted data. + """ if not pdis_state.prep_line_id: return - - thread = threading.Thread( - target=_threaded_report_update, - args=(self.env.registry, self.env.uid, self.env.context, pdis_state.id) - ) - thread.daemon = True - thread.start() + + order_line = pdis_state.prep_line_id.pos_order_line_id + if not order_line: + return + + order = order_line.order_id + stage = pdis_state.stage_id + if not stage: + return + + display = stage.prep_display_id + if not display: + return + + stage_ids = display.stage_ids.ids + if not stage_ids: + return + + first_stage_id = stage_ids[0] + last_stage_id = stage_ids[-1] + second_last_stage_id = stage_ids[-2] if len(stage_ids) > 1 else False + + # Determine if this is a completion or a reset + is_completed = False + if len(stage_ids) > 1: + if stage.id == last_stage_id: + is_completed = True + elif second_last_stage_id and stage.id == second_last_stage_id and not pdis_state.todo: + is_completed = True + else: + if not pdis_state.todo: + is_completed = True + + is_reset = (stage.id == first_stage_id and pdis_state.todo) + + if not (is_completed or is_reset): + return + + display_id = display.id + order_id = order.id + order_name = order.name or order.pos_reference or "Order" + + KdsLineReport = self.env['pos.kds.report.line'].sudo() + KdsOrderReport = self.env['pos.kds.report.order'].sudo() + + try: + if is_reset: + line_report = KdsLineReport.search([ + ('pos_order_line_id', '=', order_line.id), + ('prep_display_id', '=', display_id) + ], limit=1) + if line_report: + line_report.unlink() + + order_report = KdsOrderReport.search([ + ('pos_order_id', '=', order_id), + ('prep_display_id', '=', display_id) + ], limit=1) + + if order_report: + res = KdsLineReport._read_group( + [('pos_order_id', '=', order_id), ('prep_display_id', '=', display_id)], + aggregates=['completion_time:max'] + ) + max_comp_time = res[0][0] if res else 0 + if max_comp_time == 0: + order_report.unlink() + else: + order_report.write({'completion_time': max_comp_time}) + return + + # --- Completed --- + # Read directly from the in-memory ORM record (set by super() in the same transaction) + prep_time = max(0, order_line.preparation_time) + svc_time = max(0, order_line.service_time) + + # Fallback: if the base module hasn't set the value yet (still -1), + # compute it ourselves from the last_stage_change timestamp + if order_line.preparation_time == -1: + if old_last_stage_change: + prep_time = int(compute_seconds_since(old_last_stage_change)) + else: + prep_time = int(compute_seconds_since(pdis_state.last_stage_change)) + + if order_line.service_time == -1: + svc_time = 0 # Service time may not apply if there's no service stage + + comp_time = prep_time + svc_time + + vals = { + 'pos_order_id': order_id, + 'pos_order_line_id': order_line.id, + 'product_id': order_line.product_id.id, + 'prep_display_id': display_id, + 'preparation_time': prep_time, + 'service_time': svc_time, + 'completion_time': comp_time, + 'completion_datetime': fields.Datetime.now(), + } + + line_report = KdsLineReport.search([ + ('pos_order_line_id', '=', order_line.id), + ('prep_display_id', '=', display_id) + ], limit=1) + + if line_report: + line_report.write(vals) + else: + KdsLineReport.create(vals) + + order_report = KdsOrderReport.search([ + ('pos_order_id', '=', order_id), + ('prep_display_id', '=', display_id) + ], limit=1) + + res = KdsLineReport._read_group( + [('pos_order_id', '=', order_id), ('prep_display_id', '=', display_id)], + aggregates=['completion_time:max'] + ) + max_comp_time = res[0][0] if res else comp_time + + order_vals = { + 'pos_order_id': order_id, + 'prep_display_id': display_id, + 'completion_time': max_comp_time, + 'completion_datetime': fields.Datetime.now(), + } + if order_report: + order_report.write(order_vals) + else: + KdsOrderReport.create(order_vals) + + except Exception as e: + _logger.error("KDS reporting failed for order %s: %s", order_name, e) def _record_status_change_prep_time(self, pdis_state): super()._record_status_change_prep_time(pdis_state) - self._update_kds_report_async(pdis_state) + self._update_kds_report(pdis_state) def _record_stage_change_prep_time(self, pdis_state, old_last_stage_change, prep_order_completion_time): super()._record_stage_change_prep_time(pdis_state, old_last_stage_change, prep_order_completion_time) - self._update_kds_report_async(pdis_state) + self._update_kds_report(pdis_state, old_last_stage_change=old_last_stage_change)