import logging from odoo import api, fields, models, _ from odoo.osv import expression _logger = logging.getLogger(__name__) class StockLocation(models.Model): _inherit = 'stock.location' def _get_allowed_locations(self): """ Server-side lookup for allowed locations with direct database priority. """ ctx = self.env.context # 1. HIGHEST PRIORITY: Direct Manufacturing Order Database Lookup # This bypasses incomplete UI data by querying the MO directly via context IDs. mo_id = (ctx.get('active_mo_id') or ctx.get('default_mo_id') or ctx.get('default_production_id') or ctx.get('mo_id') or ctx.get('production_id')) if mo_id: # sudo() ensures we can read the MO and its allowed locations regardless of field access mo = self.env['mrp.production'].sudo().browse(mo_id) if mo.exists() and mo.allowed_source_location_ids: allowed_ids = mo.allowed_source_location_ids.ids _logger.error(f"DEBUG_RESTRICT: Found IDs via Direct MO Lookup ({mo.name}): {allowed_ids}") return allowed_ids # 2. FALLBACK 1: Explicit IDs passed in context (from Views/JS) target_keys = ['allowed_source_location_ids', 'default_allowed_source_location_ids'] for key in target_keys: val = ctx.get(key) if not val: continue # Simple IDs or Commands if isinstance(val, list): if val and all(isinstance(x, int) for x in val): _logger.error(f"DEBUG_RESTRICT: Found IDs in context key '{key}': {val}") return val col_ids = [] for entry in val: if isinstance(entry, (list, tuple)): if entry[0] == 6: return entry[2] # SET if entry[0] == 4: col_ids.append(entry[1]) # LINK if col_ids: _logger.error(f"DEBUG_RESTRICT: Extracted IDs from context command '{key}': {col_ids}") return col_ids if isinstance(val, int): _logger.error(f"DEBUG_RESTRICT: Found single ID in context key '{key}': {val}") return [val] # 3. FALLBACK 2: Operation Type (Picking Type) Manual Lookup picking_type_id = (ctx.get('default_picking_type_id') or ctx.get('picking_type_id') or ctx.get('active_picking_type_id')) if picking_type_id: pt = self.env['stock.picking.type'].sudo().browse(picking_type_id) if pt.exists(): if pt.default_location_src_ids: _logger.error(f"DEBUG_RESTRICT: Found IDs via Picking Type {pt.display_name}: {pt.default_location_src_ids.ids}") return pt.default_location_src_ids.ids if pt.default_location_src_id: _logger.error(f"DEBUG_RESTRICT: Found ID via Picking Type {pt.display_name} (M21): {pt.default_location_src_id.id}") return [pt.default_location_src_id.id] # 4. FINAL FALLBACK: Current Default Source Location if ctx.get('default_location_id'): _logger.error(f"DEBUG_RESTRICT: Falling back to default_location_id: {ctx.get('default_location_id')}") return [ctx.get('default_location_id')] return False class StockQuant(models.Model): _inherit = 'stock.quant' @api.model def web_search_read(self, domain, specification, offset=0, limit=None, order=None, count_limit=None): """ UI Override with DOMAIN STRIPPING to prevent Odoo's native JS from blocking us. """ ctx = self.env.context if not ctx.get('skip_location_restriction') and ctx.get('uid'): allowed_ids = self.env['stock.location']._get_allowed_locations() if allowed_ids: # STRIP any existing native location_id filters from the domain. # Odoo's JS often adds its own ["location_id", "child_of", single_id]. # By stripping them, we ensure our multi-location list takes precedence. stripped_domain = [] for leaf in domain: if isinstance(leaf, (list, tuple)) and len(leaf) == 3 and leaf[0] == 'location_id': continue stripped_domain.append(leaf) # Re-apply our multi-location filter domain = expression.AND([stripped_domain, [('location_id', 'child_of', allowed_ids)]]) _logger.error(f"DEBUG_RESTRICT: SEARCH QUANT - Allowed: {allowed_ids} - Final Domain: {domain}") return super().web_search_read(domain, specification, offset=offset, limit=limit, order=order, count_limit=count_limit) class StockLot(models.Model): _inherit = 'stock.lot' @api.model def name_search(self, name='', args=None, operator='ilike', limit=100): ctx = self.env.context if not ctx.get('skip_location_restriction') and ctx.get('uid'): allowed_ids = self.env['stock.location']._get_allowed_locations() if allowed_ids: quant_domain = [('location_id', 'child_of', allowed_ids), ('quantity', '>', 0), ('lot_id', '!=', False)] if ctx.get('default_product_id'): quant_domain.append(('product_id', '=', ctx.get('default_product_id'))) quants = self.env['stock.quant'].with_context(skip_location_restriction=True).sudo().search(quant_domain) args = expression.AND([args or [], [('id', 'in', quants.mapped('lot_id').ids)]]) return super().name_search(name, args=args, operator=operator, limit=limit) @api.model def web_search_read(self, domain, specification, offset=0, limit=None, order=None, count_limit=None): ctx = self.env.context if not ctx.get('skip_location_restriction') and ctx.get('uid'): allowed_ids = self.env['stock.location']._get_allowed_locations() if allowed_ids: quant_domain = [('location_id', 'child_of', allowed_ids), ('quantity', '>', 0), ('lot_id', '!=', False)] if ctx.get('default_product_id'): quant_domain.append(('product_id', '=', ctx.get('default_product_id'))) quants = self.env['stock.quant'].with_context(skip_location_restriction=True).sudo().search(quant_domain) domain = expression.AND([domain, [('id', 'in', quants.mapped('lot_id').ids)]]) return super().web_search_read(domain, specification, offset=offset, limit=limit, order=order, count_limit=count_limit)