refactor: simplify location restriction logic by moving to web_search_read and streamlining picking type lookups
This commit is contained in:
parent
e341adf41e
commit
02d027b104
@ -1,198 +1,106 @@
|
|||||||
from odoo import api, fields, models
|
|
||||||
from odoo.exceptions import UserError
|
|
||||||
from odoo.fields import Domain
|
|
||||||
import logging
|
import logging
|
||||||
|
from odoo import api, fields, models, _
|
||||||
|
from odoo.osv import expression
|
||||||
|
from odoo.exceptions import UserError, ValidationError
|
||||||
|
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Log when this module is loaded
|
|
||||||
_logger.info("="*80)
|
|
||||||
_logger.info("STOCK_RESTRICT_SOURCE_LOCATION: stock_location.py module is being loaded!")
|
|
||||||
|
|
||||||
class StockQuant(models.Model):
|
|
||||||
_inherit = 'stock.quant'
|
|
||||||
|
|
||||||
def _get_allowed_locations(self):
|
|
||||||
"""Helper to extract allowed location IDs from context"""
|
|
||||||
ctx = self.env.context
|
|
||||||
|
|
||||||
# 1. Skip restrictions if we are performing a bypass or internal system operation
|
|
||||||
if ctx.get('skip_location_restriction') or ctx.get('prefetch_fields'):
|
|
||||||
return []
|
|
||||||
|
|
||||||
# 2. FIX: If we are picking from a non-internal location (like Transit, Supplier, or Customer),
|
|
||||||
# we should NOT apply strict internal location restrictions.
|
|
||||||
loc_id = ctx.get('default_location_id')
|
|
||||||
if loc_id:
|
|
||||||
loc = self.env['stock.location'].sudo().browse(loc_id)
|
|
||||||
if loc.exists() and loc.usage != 'internal':
|
|
||||||
return []
|
|
||||||
|
|
||||||
allowed_location_ids = []
|
|
||||||
|
|
||||||
# 3. Try from explicit keys often passed by UI or patches
|
|
||||||
raw_ids = (ctx.get('allowed_source_location_ids') or
|
|
||||||
ctx.get('default_allowed_source_location_ids'))
|
|
||||||
if raw_ids:
|
|
||||||
if isinstance(raw_ids, list):
|
|
||||||
if raw_ids and isinstance(raw_ids[0], (list, tuple)) and raw_ids[0][0] == 6:
|
|
||||||
allowed_location_ids = raw_ids[0][2]
|
|
||||||
else:
|
|
||||||
allowed_location_ids = [r for r in raw_ids if isinstance(r, int)]
|
|
||||||
elif isinstance(raw_ids, int):
|
|
||||||
allowed_location_ids = [raw_ids]
|
|
||||||
|
|
||||||
# 4. Try from active move
|
|
||||||
if not allowed_location_ids:
|
|
||||||
active_move_id = ctx.get('active_move_id') or ctx.get('default_move_id')
|
|
||||||
if active_move_id:
|
|
||||||
move = self.env['stock.move'].sudo().browse(active_move_id)
|
|
||||||
if move.exists() and move.allowed_source_location_ids:
|
|
||||||
allowed_location_ids = move.allowed_source_location_ids.ids
|
|
||||||
|
|
||||||
# 5. Try from active MO
|
|
||||||
if not allowed_location_ids:
|
|
||||||
active_mo_id = ctx.get('active_mo_id') or ctx.get('default_raw_material_production_id')
|
|
||||||
if active_mo_id:
|
|
||||||
mo = self.env['mrp.production'].sudo().browse(active_mo_id)
|
|
||||||
if mo.exists() and mo.allowed_source_location_ids:
|
|
||||||
allowed_location_ids = mo.allowed_source_location_ids.ids
|
|
||||||
|
|
||||||
# 6. Fallback to picking type in context
|
|
||||||
if not allowed_location_ids:
|
|
||||||
picking_type_id = ctx.get('default_picking_type_id')
|
|
||||||
if picking_type_id:
|
|
||||||
picking_type = self.env['stock.picking.type'].sudo().browse(picking_type_id)
|
|
||||||
if picking_type.exists() and picking_type.default_location_src_ids:
|
|
||||||
allowed_location_ids = picking_type.default_location_src_ids.ids
|
|
||||||
|
|
||||||
# 7. NEW: Detect Backend Operations (Force Bypass)
|
|
||||||
# If we are in a background synchronization or an ORM command (not triggered by UI search)
|
|
||||||
# return empty so we do not restrict.
|
|
||||||
is_ui_search = (ctx.get('params') or
|
|
||||||
ctx.get('bin_size') or
|
|
||||||
ctx.get('search_view_ref') or
|
|
||||||
ctx.get('list_view_ref'))
|
|
||||||
|
|
||||||
# If it's NOT a UI search (i.e. it's a Save, Post, Validate background call), bypass.
|
|
||||||
if not is_ui_search:
|
|
||||||
# Check for common background flags
|
|
||||||
if (ctx.get('mail_create_nolog') or
|
|
||||||
ctx.get('tracking_disable') or
|
|
||||||
not ctx.get('uid')): # System user
|
|
||||||
return []
|
|
||||||
|
|
||||||
return allowed_location_ids
|
|
||||||
|
|
||||||
def _search(self, domain, offset=0, limit=None, order=None, *args, **kwargs):
|
|
||||||
"""Override to apply location restrictions during search (e.g. catalog or list selection)"""
|
|
||||||
ctx = self.env.context
|
|
||||||
|
|
||||||
# 0. CRITICAL BYPASS: Only restrict if it's a UI Search (params, bin_size, etc.)
|
|
||||||
# AND we are not searching for specific record IDs (backend sync).
|
|
||||||
search_by_id = any(isinstance(leaf, (list, tuple)) and leaf[0] == 'id' for leaf in domain)
|
|
||||||
is_ui_search = (ctx.get('params') or
|
|
||||||
ctx.get('bin_size') or
|
|
||||||
ctx.get('search_view_ref') or
|
|
||||||
ctx.get('list_view_ref'))
|
|
||||||
|
|
||||||
if (search_by_id or
|
|
||||||
ctx.get('skip_location_restriction') or
|
|
||||||
not is_ui_search or
|
|
||||||
not ctx.get('uid')):
|
|
||||||
return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs)
|
|
||||||
|
|
||||||
allowed_location_ids = self._get_allowed_locations()
|
|
||||||
if allowed_location_ids:
|
|
||||||
# 1. Smart Domain Swap: find if there's already a location_id restriction (like child_of)
|
|
||||||
found_collision = False
|
|
||||||
new_domain = []
|
|
||||||
for leaf in domain:
|
|
||||||
if (isinstance(leaf, (list, tuple)) and
|
|
||||||
len(leaf) == 3 and
|
|
||||||
leaf[0] == 'location_id' and
|
|
||||||
leaf[1] == 'child_of'):
|
|
||||||
|
|
||||||
new_domain.append(('location_id', 'in', allowed_location_ids))
|
|
||||||
found_collision = True
|
|
||||||
else:
|
|
||||||
new_domain.append(leaf)
|
|
||||||
|
|
||||||
if found_collision:
|
|
||||||
domain = new_domain
|
|
||||||
else:
|
|
||||||
domain = Domain.AND([domain, [('location_id', 'in', allowed_location_ids)]])
|
|
||||||
|
|
||||||
return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs)
|
|
||||||
|
|
||||||
allowed_location_ids = self._get_allowed_locations()
|
|
||||||
if allowed_location_ids:
|
|
||||||
# 1. Smart Domain Swap: find if there's already a location_id restriction (like child_of)
|
|
||||||
found_collision = False
|
|
||||||
new_domain = []
|
|
||||||
for leaf in domain:
|
|
||||||
if (isinstance(leaf, (list, tuple)) and
|
|
||||||
len(leaf) == 3 and
|
|
||||||
leaf[0] == 'location_id' and
|
|
||||||
leaf[1] == 'child_of'):
|
|
||||||
|
|
||||||
new_domain.append(('location_id', 'in', allowed_location_ids))
|
|
||||||
found_collision = True
|
|
||||||
else:
|
|
||||||
new_domain.append(leaf)
|
|
||||||
|
|
||||||
if found_collision:
|
|
||||||
domain = new_domain
|
|
||||||
else:
|
|
||||||
domain = Domain.AND([domain, [('location_id', 'in', allowed_location_ids)]])
|
|
||||||
|
|
||||||
return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs)
|
|
||||||
|
|
||||||
class StockLocation(models.Model):
|
class StockLocation(models.Model):
|
||||||
_inherit = 'stock.location'
|
_inherit = 'stock.location'
|
||||||
|
|
||||||
|
def _get_allowed_locations(self):
|
||||||
|
"""
|
||||||
|
Helper to retrieve allowed locations based on the current context (picking type).
|
||||||
|
Used by UI-level overrides in StockQuant and StockLot.
|
||||||
|
"""
|
||||||
|
ctx = self.env.context
|
||||||
|
# 1. Identify the Picking Type (Operation Type)
|
||||||
|
picking_type_id = ctx.get('default_picking_type_id') or ctx.get('picking_type_id')
|
||||||
|
|
||||||
|
# 2. Support for Manufacturing Orders: if we have an MO ID but no picking type, find it.
|
||||||
|
if not picking_type_id and ctx.get('active_mo_id'):
|
||||||
|
mo = self.env['mrp.production'].browse(ctx.get('active_mo_id'))
|
||||||
|
if mo.exists():
|
||||||
|
picking_type_id = mo.picking_type_id.id
|
||||||
|
|
||||||
|
if not picking_type_id:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# 3. Retrieve allowed locations from the picking type
|
||||||
|
picking_type = self.env['stock.picking.type'].browse(picking_type_id)
|
||||||
|
if picking_type.exists() and picking_type.allowed_source_location_ids:
|
||||||
|
return picking_type.allowed_source_location_ids.ids
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
|
class StockQuant(models.Model):
|
||||||
|
_inherit = 'stock.quant'
|
||||||
|
|
||||||
@api.model
|
@api.model
|
||||||
def _name_search(self, name='', domain=None, operator='ilike', limit=None, order=None):
|
def web_search_read(self, domain, specification, offset=0, limit=None, order=None, count_limit=None):
|
||||||
"""Override to restrict locations based on context"""
|
"""
|
||||||
_logger.info(f"LOCATION SEARCH: name={name}, context={self.env.context}")
|
UI-SURFACE OVERRIDE: Applies location filtering ONLY for the web interface.
|
||||||
domain = domain or []
|
This method is called by the M2M Catalog and list view searches.
|
||||||
allowed_location_ids = self.env['stock.quant']._get_allowed_locations()
|
Internal search() and _gather() will NOT be affected.
|
||||||
|
"""
|
||||||
|
ctx = self.env.context
|
||||||
|
if not ctx.get('skip_location_restriction') and ctx.get('uid'):
|
||||||
|
allowed_location_ids = self.env['stock.location']._get_allowed_locations()
|
||||||
if allowed_location_ids:
|
if allowed_location_ids:
|
||||||
domain = Domain.AND([domain, [('id', 'in', allowed_location_ids)]])
|
# Add location filter to the domain
|
||||||
_logger.info(f"LOCATION SEARCH: Filtered to {allowed_location_ids}")
|
domain = expression.AND([domain, [('location_id', 'in', allowed_location_ids)]])
|
||||||
|
|
||||||
return super()._name_search(name=name, domain=domain, operator=operator, limit=limit, order=order)
|
return super().web_search_read(domain, specification, offset=offset, limit=limit, order=order, count_limit=count_limit)
|
||||||
|
|
||||||
class StockLot(models.Model):
|
class StockLot(models.Model):
|
||||||
_inherit = 'stock.lot'
|
_inherit = 'stock.lot'
|
||||||
|
|
||||||
@api.model
|
@api.model
|
||||||
def _search(self, domain, offset=0, limit=None, order=None, *args, **kwargs):
|
def name_search(self, name='', args=None, operator='ilike', limit=100):
|
||||||
|
"""
|
||||||
|
UI-SURFACE OVERRIDE: Filters the many2one lot selection dropdown.
|
||||||
|
This is ONLY used by the web client for autocomplete/dropdown lookups.
|
||||||
|
"""
|
||||||
ctx = self.env.context
|
ctx = self.env.context
|
||||||
|
if not ctx.get('skip_location_restriction') and ctx.get('uid'):
|
||||||
|
allowed_location_ids = self.env['stock.location']._get_allowed_locations()
|
||||||
|
|
||||||
# 0. NEW: Detect UI Search vs Backend Sync
|
# If no explicit mapping found on the picking type,
|
||||||
search_by_id = any(isinstance(leaf, (list, tuple)) and leaf[0] == 'id' for leaf in domain)
|
# check if a default_location_id was passed to the view context.
|
||||||
is_ui_search = (ctx.get('params') or
|
if not allowed_location_ids and ctx.get('default_location_id'):
|
||||||
ctx.get('bin_size') or
|
allowed_location_ids = [ctx.get('default_location_id')]
|
||||||
ctx.get('search_view_ref') or
|
|
||||||
ctx.get('list_view_ref'))
|
|
||||||
|
|
||||||
# FIX: If we are searching for specific IDs OR skip flag set OR NOT UI search, bypass
|
if allowed_location_ids:
|
||||||
if (search_by_id or
|
# Find quants in the allowed locations that have positive stock for this product
|
||||||
ctx.get('skip_location_restriction') or
|
quant_domain = [
|
||||||
not is_ui_search or
|
('location_id', 'in', allowed_location_ids),
|
||||||
not ctx.get('uid')):
|
('quantity', '>', 0),
|
||||||
return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs)
|
('lot_id', '!=', False)
|
||||||
|
]
|
||||||
|
product_id = ctx.get('default_product_id')
|
||||||
|
if product_id:
|
||||||
|
quant_domain.append(('product_id', '=', product_id))
|
||||||
|
|
||||||
# 1. Identify which locations we should look into for quants
|
# We sudo() the quant search to ensure we find quants even if there are record rules,
|
||||||
allowed_location_ids = self.env['stock.quant']._get_allowed_locations()
|
# as this is strictly for filtering the UI dropdown.
|
||||||
|
quants = self.env['stock.quant'].with_context(skip_location_restriction=True).sudo().search(quant_domain)
|
||||||
|
lot_ids = quants.mapped('lot_id').ids
|
||||||
|
|
||||||
# 2. If no explicit allowed locations, fallback to the default source location in context
|
args = expression.AND([args or [], [('id', 'in', lot_ids)]])
|
||||||
if not allowed_location_ids:
|
|
||||||
loc_id = ctx.get('default_location_id')
|
return super().name_search(name, args=args, operator=operator, limit=limit)
|
||||||
if loc_id:
|
|
||||||
allowed_location_ids = [loc_id]
|
@api.model
|
||||||
|
def web_search_read(self, domain, specification, offset=0, limit=None, order=None, count_limit=None):
|
||||||
|
"""
|
||||||
|
UI-SURFACE OVERRIDE: Applies filtering for the Lot Catalog and list views.
|
||||||
|
"""
|
||||||
|
ctx = self.env.context
|
||||||
|
if not ctx.get('skip_location_restriction') and ctx.get('uid'):
|
||||||
|
allowed_location_ids = self.env['stock.location']._get_allowed_locations()
|
||||||
|
|
||||||
|
if not allowed_location_ids and ctx.get('default_location_id'):
|
||||||
|
allowed_location_ids = [ctx.get('default_location_id')]
|
||||||
|
|
||||||
if allowed_location_ids:
|
if allowed_location_ids:
|
||||||
quant_domain = [
|
quant_domain = [
|
||||||
@ -204,15 +112,9 @@ class StockLot(models.Model):
|
|||||||
if product_id:
|
if product_id:
|
||||||
quant_domain.append(('product_id', '=', product_id))
|
quant_domain.append(('product_id', '=', product_id))
|
||||||
|
|
||||||
# Use internal bypass when searching quants to filter lots
|
|
||||||
# We use sudo() and skip_location_restriction to ensure we always find the quants
|
|
||||||
quants = self.env['stock.quant'].with_context(skip_location_restriction=True).sudo().search(quant_domain)
|
quants = self.env['stock.quant'].with_context(skip_location_restriction=True).sudo().search(quant_domain)
|
||||||
lot_ids = list(set(quants.mapped('lot_id').ids))
|
lot_ids = quants.mapped('lot_id').ids
|
||||||
|
|
||||||
domain = Domain.AND([domain, [('id', 'in', lot_ids)]])
|
domain = expression.AND([domain, [('id', 'in', lot_ids)]])
|
||||||
|
|
||||||
return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs)
|
return super().web_search_read(domain, specification, offset=offset, limit=limit, order=order, count_limit=count_limit)
|
||||||
|
|
||||||
_logger.info("="*80)
|
|
||||||
_logger.info("STOCK_RESTRICT_SOURCE_LOCATION: stock_location.py module loaded successfully!")
|
|
||||||
_logger.info("="*80)
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user