import logging from odoo import api, fields, models, _ from odoo.osv import expression _logger = logging.getLogger(__name__) class StockLocation(models.Model): _inherit = 'stock.location' def _get_allowed_locations(self): """ Server-side lookup for allowed locations with direct database priority. """ ctx = self.env.context # 1. HIGHEST PRIORITY: Direct Manufacturing Order Database Lookup # This bypasses incomplete UI data by querying the MO directly via context IDs. mo_id = (ctx.get('active_mo_id') or ctx.get('default_production_id') or ctx.get('production_id') or ctx.get('active_id') if ctx.get('active_model') == 'mrp.production' else None) if mo_id: # sudo() ensures we can read the MO and its allowed locations regardless of field access mo = self.env['mrp.production'].sudo().browse(mo_id) if mo.exists() and mo.allowed_source_location_ids: allowed_ids = mo.allowed_source_location_ids.ids _logger.error(f"DEBUG_RESTRICT: Found IDs via Direct MO Lookup ({mo.name}): {allowed_ids}") return allowed_ids # 2. FALLBACK 1: Explicit IDs passed in context (from Views/JS) target_keys = ['allowed_source_location_ids', 'default_allowed_source_location_ids'] for key in target_keys: val = ctx.get(key) if not val: continue # Simple IDs or Commands if isinstance(val, list): if val and all(isinstance(x, int) for x in val): _logger.error(f"DEBUG_RESTRICT: Found IDs in context key '{key}': {val}") return val col_ids = [] for entry in val: if isinstance(entry, (list, tuple)): if entry[0] == 6: return entry[2] # SET if entry[0] == 4: col_ids.append(entry[1]) # LINK if col_ids: _logger.error(f"DEBUG_RESTRICT: Extracted IDs from context command '{key}': {col_ids}") return col_ids if isinstance(val, int): _logger.error(f"DEBUG_RESTRICT: Found single ID in context key '{key}': {val}") return [val] # 3. FALLBACK 2: Operation Type (Picking Type) Manual Lookup picking_type_id = (ctx.get('default_picking_type_id') or ctx.get('picking_type_id') or ctx.get('active_picking_type_id')) if picking_type_id: pt = self.env['stock.picking.type'].sudo().browse(picking_type_id) if pt.exists(): if pt.default_location_src_ids: _logger.error(f"DEBUG_RESTRICT: Found IDs via Picking Type {pt.display_name}: {pt.default_location_src_ids.ids}") return pt.default_location_src_ids.ids if pt.default_location_src_id: _logger.error(f"DEBUG_RESTRICT: Found ID via Picking Type {pt.display_name} (M21): {pt.default_location_src_id.id}") return [pt.default_location_src_id.id] # 4. FINAL FALLBACK: Current Default Source Location if ctx.get('default_location_id'): _logger.error(f"DEBUG_RESTRICT: Falling back to default_location_id: {ctx.get('default_location_id')}") return [ctx.get('default_location_id')] return False class StockQuant(models.Model): _inherit = 'stock.quant' @api.model def web_search_read(self, domain, specification, offset=0, limit=None, order=None, count_limit=None): """ UI Override with ROBUST DOMAIN STRIPPING to fix RPC_ERROR/ValueError. """ ctx = self.env.context if not ctx.get('skip_location_restriction') and ctx.get('uid'): allowed_ids = self.env['stock.location']._get_allowed_locations() if allowed_ids: # STRIP both native location_id filters AND their orphaned operators. # The safest way is to extract only the non-location tuples (leaves). # expression.AND(list_of_tuples) reconstruction a perfectly valid # Polish notation with implicit 'AND' between our leaves. clean_leaves = [] for leaf in domain: if isinstance(leaf, (list, tuple)): if len(leaf) == 3 and leaf[0] == 'location_id': continue clean_leaves.append(leaf) # Re-apply our multi-location filter using expression.AND. # This treats clean_leaves + [location_id filter] as a fresh, valid domain. domain = expression.AND([clean_leaves, [('location_id', 'child_of', allowed_ids)]]) _logger.error(f"DEBUG_RESTRICT: SEARCH QUANT - Allowed: {allowed_ids} - Final Domain: {domain}") return super().web_search_read(domain, specification, offset=offset, limit=limit, order=order, count_limit=count_limit) class StockLot(models.Model): _inherit = 'stock.lot' @api.model def name_search(self, name='', args=None, operator='ilike', limit=100): ctx = self.env.context if not ctx.get('skip_location_restriction') and ctx.get('uid'): allowed_ids = self.env['stock.location']._get_allowed_locations() if allowed_ids: quant_domain = [('location_id', 'child_of', allowed_ids), ('quantity', '>', 0), ('lot_id', '!=', False)] if ctx.get('default_product_id'): quant_domain.append(('product_id', '=', ctx.get('default_product_id'))) quants = self.env['stock.quant'].with_context(skip_location_restriction=True).sudo().search(quant_domain) args = expression.AND([args or [], [('id', 'in', quants.mapped('lot_id').ids)]]) return super().name_search(name, args=args, operator=operator, limit=limit) @api.model def web_search_read(self, domain, specification, offset=0, limit=None, order=None, count_limit=None): ctx = self.env.context if not ctx.get('skip_location_restriction') and ctx.get('uid'): allowed_ids = self.env['stock.location']._get_allowed_locations() if allowed_ids: quant_domain = [('location_id', 'child_of', allowed_ids), ('quantity', '>', 0), ('lot_id', '!=', False)] if ctx.get('default_product_id'): quant_domain.append(('product_id', '=', ctx.get('default_product_id'))) quants = self.env['stock.quant'].with_context(skip_location_restriction=True).sudo().search(quant_domain) domain = expression.AND([domain, [('id', 'in', quants.mapped('lot_id').ids)]]) return super().web_search_read(domain, specification, offset=offset, limit=limit, order=order, count_limit=count_limit)