diff --git a/models/stock_location.py b/models/stock_location.py index b947322..91e8ba7 100644 --- a/models/stock_location.py +++ b/models/stock_location.py @@ -1,218 +1,120 @@ -from odoo import api, fields, models -from odoo.exceptions import UserError -from odoo.fields import Domain import logging +from odoo import api, fields, models, _ +from odoo.osv import expression +from odoo.exceptions import UserError, ValidationError _logger = logging.getLogger(__name__) -# Log when this module is loaded -_logger.info("="*80) -_logger.info("STOCK_RESTRICT_SOURCE_LOCATION: stock_location.py module is being loaded!") - -class StockQuant(models.Model): - _inherit = 'stock.quant' - - def _get_allowed_locations(self): - """Helper to extract allowed location IDs from context""" - ctx = self.env.context - - # 1. Skip restrictions if we are performing a bypass or internal system operation - if ctx.get('skip_location_restriction') or ctx.get('prefetch_fields'): - return [] - - # 2. FIX: If we are picking from a non-internal location (like Transit, Supplier, or Customer), - # we should NOT apply strict internal location restrictions. - loc_id = ctx.get('default_location_id') - if loc_id: - loc = self.env['stock.location'].sudo().browse(loc_id) - if loc.exists() and loc.usage != 'internal': - return [] - - allowed_location_ids = [] - - # 3. Try from explicit keys often passed by UI or patches - raw_ids = (ctx.get('allowed_source_location_ids') or - ctx.get('default_allowed_source_location_ids')) - if raw_ids: - if isinstance(raw_ids, list): - if raw_ids and isinstance(raw_ids[0], (list, tuple)) and raw_ids[0][0] == 6: - allowed_location_ids = raw_ids[0][2] - else: - allowed_location_ids = [r for r in raw_ids if isinstance(r, int)] - elif isinstance(raw_ids, int): - allowed_location_ids = [raw_ids] - - # 4. Try from active move - if not allowed_location_ids: - active_move_id = ctx.get('active_move_id') or ctx.get('default_move_id') - if active_move_id: - move = self.env['stock.move'].sudo().browse(active_move_id) - if move.exists() and move.allowed_source_location_ids: - allowed_location_ids = move.allowed_source_location_ids.ids - - # 5. Try from active MO - if not allowed_location_ids: - active_mo_id = ctx.get('active_mo_id') or ctx.get('default_raw_material_production_id') - if active_mo_id: - mo = self.env['mrp.production'].sudo().browse(active_mo_id) - if mo.exists() and mo.allowed_source_location_ids: - allowed_location_ids = mo.allowed_source_location_ids.ids - - # 6. Fallback to picking type in context - if not allowed_location_ids: - picking_type_id = ctx.get('default_picking_type_id') - if picking_type_id: - picking_type = self.env['stock.picking.type'].sudo().browse(picking_type_id) - if picking_type.exists() and picking_type.default_location_src_ids: - allowed_location_ids = picking_type.default_location_src_ids.ids - - # 7. NEW: Detect Backend Operations (Force Bypass) - # If we are in a background synchronization or an ORM command (not triggered by UI search) - # return empty so we do not restrict. - is_ui_search = (ctx.get('params') or - ctx.get('bin_size') or - ctx.get('search_view_ref') or - ctx.get('list_view_ref')) - - # If it's NOT a UI search (i.e. it's a Save, Post, Validate background call), bypass. - if not is_ui_search: - # Check for common background flags - if (ctx.get('mail_create_nolog') or - ctx.get('tracking_disable') or - not ctx.get('uid')): # System user - return [] - - return allowed_location_ids - - def _search(self, domain, offset=0, limit=None, order=None, *args, **kwargs): - """Override to apply location restrictions during search (e.g. catalog or list selection)""" - ctx = self.env.context - - # 0. CRITICAL BYPASS: Only restrict if it's a UI Search (params, bin_size, etc.) - # AND we are not searching for specific record IDs (backend sync). - search_by_id = any(isinstance(leaf, (list, tuple)) and leaf[0] == 'id' for leaf in domain) - is_ui_search = (ctx.get('params') or - ctx.get('bin_size') or - ctx.get('search_view_ref') or - ctx.get('list_view_ref')) - - if (search_by_id or - ctx.get('skip_location_restriction') or - not is_ui_search or - not ctx.get('uid')): - return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs) - - allowed_location_ids = self._get_allowed_locations() - if allowed_location_ids: - # 1. Smart Domain Swap: find if there's already a location_id restriction (like child_of) - found_collision = False - new_domain = [] - for leaf in domain: - if (isinstance(leaf, (list, tuple)) and - len(leaf) == 3 and - leaf[0] == 'location_id' and - leaf[1] == 'child_of'): - - new_domain.append(('location_id', 'in', allowed_location_ids)) - found_collision = True - else: - new_domain.append(leaf) - - if found_collision: - domain = new_domain - else: - domain = Domain.AND([domain, [('location_id', 'in', allowed_location_ids)]]) - - return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs) - - allowed_location_ids = self._get_allowed_locations() - if allowed_location_ids: - # 1. Smart Domain Swap: find if there's already a location_id restriction (like child_of) - found_collision = False - new_domain = [] - for leaf in domain: - if (isinstance(leaf, (list, tuple)) and - len(leaf) == 3 and - leaf[0] == 'location_id' and - leaf[1] == 'child_of'): - - new_domain.append(('location_id', 'in', allowed_location_ids)) - found_collision = True - else: - new_domain.append(leaf) - - if found_collision: - domain = new_domain - else: - domain = Domain.AND([domain, [('location_id', 'in', allowed_location_ids)]]) - - return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs) - class StockLocation(models.Model): _inherit = 'stock.location' + def _get_allowed_locations(self): + """ + Helper to retrieve allowed locations based on the current context (picking type). + Used by UI-level overrides in StockQuant and StockLot. + """ + ctx = self.env.context + # 1. Identify the Picking Type (Operation Type) + picking_type_id = ctx.get('default_picking_type_id') or ctx.get('picking_type_id') + + # 2. Support for Manufacturing Orders: if we have an MO ID but no picking type, find it. + if not picking_type_id and ctx.get('active_mo_id'): + mo = self.env['mrp.production'].browse(ctx.get('active_mo_id')) + if mo.exists(): + picking_type_id = mo.picking_type_id.id + + if not picking_type_id: + return [] + + # 3. Retrieve allowed locations from the picking type + picking_type = self.env['stock.picking.type'].browse(picking_type_id) + if picking_type.exists() and picking_type.allowed_source_location_ids: + return picking_type.allowed_source_location_ids.ids + + return [] + +class StockQuant(models.Model): + _inherit = 'stock.quant' + @api.model - def _name_search(self, name='', domain=None, operator='ilike', limit=None, order=None): - """Override to restrict locations based on context""" - _logger.info(f"LOCATION SEARCH: name={name}, context={self.env.context}") - domain = domain or [] - allowed_location_ids = self.env['stock.quant']._get_allowed_locations() - - if allowed_location_ids: - domain = Domain.AND([domain, [('id', 'in', allowed_location_ids)]]) - _logger.info(f"LOCATION SEARCH: Filtered to {allowed_location_ids}") - - return super()._name_search(name=name, domain=domain, operator=operator, limit=limit, order=order) + def web_search_read(self, domain, specification, offset=0, limit=None, order=None, count_limit=None): + """ + UI-SURFACE OVERRIDE: Applies location filtering ONLY for the web interface. + This method is called by the M2M Catalog and list view searches. + Internal search() and _gather() will NOT be affected. + """ + ctx = self.env.context + if not ctx.get('skip_location_restriction') and ctx.get('uid'): + allowed_location_ids = self.env['stock.location']._get_allowed_locations() + if allowed_location_ids: + # Add location filter to the domain + domain = expression.AND([domain, [('location_id', 'in', allowed_location_ids)]]) + + return super().web_search_read(domain, specification, offset=offset, limit=limit, order=order, count_limit=count_limit) class StockLot(models.Model): _inherit = 'stock.lot' @api.model - def _search(self, domain, offset=0, limit=None, order=None, *args, **kwargs): + def name_search(self, name='', args=None, operator='ilike', limit=100): + """ + UI-SURFACE OVERRIDE: Filters the many2one lot selection dropdown. + This is ONLY used by the web client for autocomplete/dropdown lookups. + """ ctx = self.env.context - - # 0. NEW: Detect UI Search vs Backend Sync - search_by_id = any(isinstance(leaf, (list, tuple)) and leaf[0] == 'id' for leaf in domain) - is_ui_search = (ctx.get('params') or - ctx.get('bin_size') or - ctx.get('search_view_ref') or - ctx.get('list_view_ref')) - - # FIX: If we are searching for specific IDs OR skip flag set OR NOT UI search, bypass - if (search_by_id or - ctx.get('skip_location_restriction') or - not is_ui_search or - not ctx.get('uid')): - return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs) - - # 1. Identify which locations we should look into for quants - allowed_location_ids = self.env['stock.quant']._get_allowed_locations() - - # 2. If no explicit allowed locations, fallback to the default source location in context - if not allowed_location_ids: - loc_id = ctx.get('default_location_id') - if loc_id: - allowed_location_ids = [loc_id] - - if allowed_location_ids: - quant_domain = [ - ('location_id', 'in', allowed_location_ids), - ('quantity', '>', 0), - ('lot_id', '!=', False) - ] - product_id = ctx.get('default_product_id') - if product_id: - quant_domain.append(('product_id', '=', product_id)) + if not ctx.get('skip_location_restriction') and ctx.get('uid'): + allowed_location_ids = self.env['stock.location']._get_allowed_locations() - # Use internal bypass when searching quants to filter lots - # We use sudo() and skip_location_restriction to ensure we always find the quants - quants = self.env['stock.quant'].with_context(skip_location_restriction=True).sudo().search(quant_domain) - lot_ids = list(set(quants.mapped('lot_id').ids)) - - domain = Domain.AND([domain, [('id', 'in', lot_ids)]]) + # If no explicit mapping found on the picking type, + # check if a default_location_id was passed to the view context. + if not allowed_location_ids and ctx.get('default_location_id'): + allowed_location_ids = [ctx.get('default_location_id')] + + if allowed_location_ids: + # Find quants in the allowed locations that have positive stock for this product + quant_domain = [ + ('location_id', 'in', allowed_location_ids), + ('quantity', '>', 0), + ('lot_id', '!=', False) + ] + product_id = ctx.get('default_product_id') + if product_id: + quant_domain.append(('product_id', '=', product_id)) - return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs) + # We sudo() the quant search to ensure we find quants even if there are record rules, + # as this is strictly for filtering the UI dropdown. + quants = self.env['stock.quant'].with_context(skip_location_restriction=True).sudo().search(quant_domain) + lot_ids = quants.mapped('lot_id').ids + + args = expression.AND([args or [], [('id', 'in', lot_ids)]]) -_logger.info("="*80) -_logger.info("STOCK_RESTRICT_SOURCE_LOCATION: stock_location.py module loaded successfully!") -_logger.info("="*80) + return super().name_search(name, args=args, operator=operator, limit=limit) + + @api.model + def web_search_read(self, domain, specification, offset=0, limit=None, order=None, count_limit=None): + """ + UI-SURFACE OVERRIDE: Applies filtering for the Lot Catalog and list views. + """ + ctx = self.env.context + if not ctx.get('skip_location_restriction') and ctx.get('uid'): + allowed_location_ids = self.env['stock.location']._get_allowed_locations() + + if not allowed_location_ids and ctx.get('default_location_id'): + allowed_location_ids = [ctx.get('default_location_id')] + + if allowed_location_ids: + quant_domain = [ + ('location_id', 'in', allowed_location_ids), + ('quantity', '>', 0), + ('lot_id', '!=', False) + ] + product_id = ctx.get('default_product_id') + if product_id: + quant_domain.append(('product_id', '=', product_id)) + + quants = self.env['stock.quant'].with_context(skip_location_restriction=True).sudo().search(quant_domain) + lot_ids = quants.mapped('lot_id').ids + + domain = expression.AND([domain, [('id', 'in', lot_ids)]]) + + return super().web_search_read(domain, specification, offset=offset, limit=limit, order=order, count_limit=count_limit)