feat: implement retry logic and row locking for threaded KDS report updates to prevent concurrency conflicts
This commit is contained in:
parent
c6aa68fe39
commit
88571bc418
@ -1,104 +1,138 @@
|
||||
import threading
|
||||
import logging
|
||||
import time
|
||||
import random
|
||||
from odoo import models, fields, api
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
def _threaded_report_update(registry, uid, context, pdis_state_id, is_reset, is_completed):
|
||||
"""Background worker to update KDS reports without blocking the POS transaction."""
|
||||
try:
|
||||
with registry.cursor() as new_cr:
|
||||
new_env = api.Environment(new_cr, uid, context)
|
||||
pdis_state = new_env['pos.prep.state'].browse(pdis_state_id)
|
||||
if not pdis_state.exists():
|
||||
return
|
||||
"""
|
||||
Background worker with Retry Logic and Row Locking.
|
||||
"""
|
||||
max_retries = 5
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
# Staggered start to avoid simultaneous collisions
|
||||
time.sleep(random.uniform(0.05, 0.2))
|
||||
|
||||
order_line = pdis_state.prep_line_id.pos_order_line_id
|
||||
stage = pdis_state.stage_id
|
||||
display_id = stage.prep_display_id.id
|
||||
with registry.cursor() as new_cr:
|
||||
new_env = api.Environment(new_cr, uid, context)
|
||||
pdis_state = new_env['pos.prep.state'].browse(pdis_state_id)
|
||||
if not pdis_state.exists():
|
||||
return
|
||||
|
||||
KdsLineReport = new_env['pos.kds.report.line'].sudo()
|
||||
KdsOrderReport = new_env['pos.kds.report.order'].sudo()
|
||||
order_line = pdis_state.prep_line_id.pos_order_line_id
|
||||
if not order_line:
|
||||
return
|
||||
|
||||
order = order_line.order_id
|
||||
order_name = order.name or order.pos_reference or "Unknown Order"
|
||||
stage = pdis_state.stage_id
|
||||
display_id = stage.prep_display_id.id
|
||||
order_id = order.id
|
||||
|
||||
KdsLineReport = new_env['pos.kds.report.line'].sudo()
|
||||
KdsOrderReport = new_env['pos.kds.report.order'].sudo()
|
||||
|
||||
# --- ROW LOCKING ---
|
||||
# Attempt to lock the existing order report if it exists
|
||||
new_cr.execute("""
|
||||
SELECT id FROM pos_kds_report_order
|
||||
WHERE pos_order_id = %s AND prep_display_id = %s
|
||||
FOR UPDATE
|
||||
""", (order_id, display_id))
|
||||
# -------------------
|
||||
|
||||
if is_reset:
|
||||
line_report = KdsLineReport.search([
|
||||
('pos_order_line_id', '=', order_line.id),
|
||||
('prep_display_id', '=', display_id)
|
||||
], limit=1)
|
||||
if line_report:
|
||||
line_report.unlink()
|
||||
|
||||
order_report = KdsOrderReport.search([
|
||||
('pos_order_id', '=', order_id),
|
||||
('prep_display_id', '=', display_id)
|
||||
], limit=1)
|
||||
|
||||
if order_report:
|
||||
res = KdsLineReport._read_group(
|
||||
[('pos_order_id', '=', order_id), ('prep_display_id', '=', display_id)],
|
||||
aggregates=['completion_time:max']
|
||||
)
|
||||
max_comp_time = res[0][0] if res else 0
|
||||
if max_comp_time == 0:
|
||||
order_report.unlink()
|
||||
else:
|
||||
order_report.write({'completion_time': max_comp_time})
|
||||
new_cr.commit()
|
||||
_logger.info("POS_KDS_PERF: Reset report SUCCESS for %s", order_name)
|
||||
return
|
||||
|
||||
# If completed
|
||||
prep_time = max(0, order_line.preparation_time)
|
||||
svc_time = max(0, order_line.service_time)
|
||||
comp_time = prep_time + svc_time
|
||||
|
||||
vals = {
|
||||
'pos_order_id': order_id,
|
||||
'pos_order_line_id': order_line.id,
|
||||
'product_id': order_line.product_id.id,
|
||||
'prep_display_id': display_id,
|
||||
'preparation_time': prep_time,
|
||||
'service_time': svc_time,
|
||||
'completion_time': comp_time,
|
||||
'completion_datetime': fields.Datetime.now(),
|
||||
}
|
||||
|
||||
if is_reset:
|
||||
line_report = KdsLineReport.search([
|
||||
('pos_order_line_id', '=', order_line.id),
|
||||
('prep_display_id', '=', display_id)
|
||||
], limit=1)
|
||||
|
||||
if line_report:
|
||||
line_report.unlink()
|
||||
line_report.write(vals)
|
||||
else:
|
||||
KdsLineReport.create(vals)
|
||||
|
||||
# Update order-level report
|
||||
order_report = KdsOrderReport.search([
|
||||
('pos_order_id', '=', order_line.order_id.id),
|
||||
('pos_order_id', '=', order_id),
|
||||
('prep_display_id', '=', display_id)
|
||||
], limit=1)
|
||||
|
||||
res = KdsLineReport._read_group(
|
||||
[('pos_order_id', '=', order_id), ('prep_display_id', '=', display_id)],
|
||||
aggregates=['completion_time:max']
|
||||
)
|
||||
max_comp_time = res[0][0] if res else comp_time
|
||||
|
||||
order_vals = {
|
||||
'pos_order_id': order_id,
|
||||
'prep_display_id': display_id,
|
||||
'completion_time': max_comp_time,
|
||||
'completion_datetime': fields.Datetime.now(),
|
||||
}
|
||||
if order_report:
|
||||
res = KdsLineReport._read_group(
|
||||
[('pos_order_id', '=', order_line.order_id.id), ('prep_display_id', '=', display_id)],
|
||||
aggregates=['completion_time:max']
|
||||
)
|
||||
max_comp_time = res[0][0] if res else 0
|
||||
if max_comp_time == 0:
|
||||
order_report.unlink()
|
||||
else:
|
||||
order_report.write({'completion_time': max_comp_time})
|
||||
order_report.write(order_vals)
|
||||
else:
|
||||
KdsOrderReport.create(order_vals)
|
||||
|
||||
new_cr.commit()
|
||||
return
|
||||
_logger.info("POS_KDS_PERF: Background report update SUCCESS for order %s", order_name)
|
||||
return # Exit loop on success
|
||||
|
||||
# If completed
|
||||
prep_time = max(0, order_line.preparation_time)
|
||||
svc_time = max(0, order_line.service_time)
|
||||
comp_time = prep_time + svc_time
|
||||
|
||||
vals = {
|
||||
'pos_order_id': order_line.order_id.id,
|
||||
'pos_order_line_id': order_line.id,
|
||||
'product_id': order_line.product_id.id,
|
||||
'prep_display_id': display_id,
|
||||
'preparation_time': prep_time,
|
||||
'service_time': svc_time,
|
||||
'completion_time': comp_time,
|
||||
'completion_datetime': fields.Datetime.now(),
|
||||
}
|
||||
|
||||
line_report = KdsLineReport.search([
|
||||
('pos_order_line_id', '=', order_line.id),
|
||||
('prep_display_id', '=', display_id)
|
||||
], limit=1)
|
||||
|
||||
if line_report:
|
||||
line_report.write(vals)
|
||||
except Exception as e:
|
||||
if "could not serialize access" in str(e) or "concurrent update" in str(e):
|
||||
_logger.warning("POS_KDS_PERF: Concurrent update detected, retrying (%s/%s)...", attempt + 1, max_retries)
|
||||
continue
|
||||
else:
|
||||
KdsLineReport.create(vals)
|
||||
|
||||
# Update order-level report
|
||||
order_report = KdsOrderReport.search([
|
||||
('pos_order_id', '=', order_line.order_id.id),
|
||||
('prep_display_id', '=', display_id)
|
||||
], limit=1)
|
||||
|
||||
res = KdsLineReport._read_group(
|
||||
[('pos_order_id', '=', order_line.order_id.id), ('prep_display_id', '=', display_id)],
|
||||
aggregates=['completion_time:max']
|
||||
)
|
||||
max_comp_time = res[0][0] if res else comp_time
|
||||
|
||||
order_vals = {
|
||||
'pos_order_id': order_line.order_id.id,
|
||||
'prep_display_id': display_id,
|
||||
'completion_time': max_comp_time,
|
||||
'completion_datetime': fields.Datetime.now(),
|
||||
}
|
||||
if order_report:
|
||||
order_report.write(order_vals)
|
||||
else:
|
||||
KdsOrderReport.create(order_vals)
|
||||
|
||||
new_cr.commit()
|
||||
except Exception as e:
|
||||
_logger.error("Background KDS reporting failed: %s", e)
|
||||
_logger.error("POS_KDS_PERF: Background KDS reporting failed: %s", e)
|
||||
break
|
||||
else:
|
||||
_logger.error("POS_KDS_PERF: Failed to update report after %s retries", max_retries)
|
||||
|
||||
class PosPreparationState(models.Model):
|
||||
_inherit = 'pos.prep.state'
|
||||
@ -111,7 +145,6 @@ class PosPreparationState(models.Model):
|
||||
if not stage:
|
||||
return
|
||||
|
||||
# Determine status synchronously to decide if we need a thread
|
||||
if not hasattr(self.env, '_kds_display_cache'):
|
||||
self.env._kds_display_cache = {}
|
||||
|
||||
@ -146,8 +179,6 @@ class PosPreparationState(models.Model):
|
||||
if not (is_completed or is_reset):
|
||||
return
|
||||
|
||||
# Start background thread to handle the reporting DB operations
|
||||
# This makes the main POS sync call nearly instant
|
||||
thread = threading.Thread(
|
||||
target=_threaded_report_update,
|
||||
args=(self.env.registry, self.env.uid, self.env.context, pdis_state.id, is_reset, is_completed)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user