from odoo import api, fields, models from odoo.exceptions import UserError from odoo.fields import Domain import logging _logger = logging.getLogger(__name__) # Log when this module is loaded _logger.info("="*80) _logger.info("STOCK_RESTRICT_SOURCE_LOCATION: stock_location.py module is being loaded!") class StockQuant(models.Model): _inherit = 'stock.quant' def _get_allowed_locations(self): """Helper to extract allowed location IDs from context""" ctx = self.env.context # FIX: If we are picking from a non-internal location (like Transit, Supplier, or Customer), # we should NOT apply strict internal location restrictions, because the stock MUST come # from that exact external/transit location. loc_id = ctx.get('default_location_id') if loc_id: loc = self.env['stock.location'].sudo().browse(loc_id) if loc.exists() and loc.usage != 'internal': return [] allowed_location_ids = [] # 1. Try from explicit keys often passed by UI or patches raw_ids = (ctx.get('allowed_source_location_ids') or ctx.get('default_allowed_source_location_ids')) if raw_ids: if isinstance(raw_ids, list): if raw_ids and isinstance(raw_ids[0], (list, tuple)) and raw_ids[0][0] == 6: allowed_location_ids = raw_ids[0][2] else: allowed_location_ids = [r for r in raw_ids if isinstance(r, int)] elif isinstance(raw_ids, int): allowed_location_ids = [raw_ids] # 2. Try from active move if not allowed_location_ids: active_move_id = ctx.get('active_move_id') or ctx.get('default_move_id') if active_move_id: move = self.env['stock.move'].sudo().browse(active_move_id) if move.exists() and move.allowed_source_location_ids: allowed_location_ids = move.allowed_source_location_ids.ids # 3. Try from active MO if not allowed_location_ids: active_mo_id = ctx.get('active_mo_id') or ctx.get('default_raw_material_production_id') if active_mo_id: mo = self.env['mrp.production'].sudo().browse(active_mo_id) if mo.exists() and mo.allowed_source_location_ids: allowed_location_ids = mo.allowed_source_location_ids.ids # 4. Fallback to picking type in context # 4. Fallback to picking type in context if not allowed_location_ids: picking_type_id = ctx.get('default_picking_type_id') if picking_type_id: picking_type = self.env['stock.picking.type'].sudo().browse(picking_type_id) if picking_type.exists() and picking_type.default_location_src_ids: allowed_location_ids = picking_type.default_location_src_ids.ids return allowed_location_ids @api.model def _get_gather_domain(self, product_id, location_id, lot_id=None, package_id=None, owner_id=None, strict=False): """Override to apply location restrictions during reservation (gather)""" result_domain = super()._get_gather_domain(product_id, location_id, lot_id, package_id, owner_id, strict) # FIX: If reserving from a non-internal location, DO NOT apply internal restrictions. if location_id and location_id.usage != 'internal': return result_domain allowed_location_ids = self._get_allowed_locations() if allowed_location_ids: result_domain = Domain.AND([result_domain, [('location_id', 'in', allowed_location_ids)]]) return result_domain def _search(self, domain, offset=0, limit=None, order=None, *args, **kwargs): """Override to apply location restrictions during search (e.g. catalog or list selection)""" allowed_location_ids = self._get_allowed_locations() if allowed_location_ids: # 1. Smart Domain Swap: find if there's already a location_id restriction (like child_of) # and REPLACE it instead of AND-ing it. This prevents collisions with Odoo's # default 'child_of location_src_id' behavior. found_collision = False new_domain = [] for leaf in domain: if (isinstance(leaf, (list, tuple)) and len(leaf) == 3 and leaf[0] == 'location_id' and leaf[1] == 'child_of'): new_domain.append(('location_id', 'in', allowed_location_ids)) found_collision = True else: new_domain.append(leaf) if found_collision: domain = new_domain else: # 2. Standard AND merge if no collision found domain = Domain.AND([domain, [('location_id', 'in', allowed_location_ids)]]) return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs) class StockLocation(models.Model): _inherit = 'stock.location' @api.model def _name_search(self, name='', domain=None, operator='ilike', limit=None, order=None): """Override to restrict locations based on context""" _logger.info(f"LOCATION SEARCH: name={name}, context={self.env.context}") domain = domain or [] allowed_location_ids = self.env['stock.quant']._get_allowed_locations() if allowed_location_ids: domain = Domain.AND([domain, [('id', 'in', allowed_location_ids)]]) _logger.info(f"LOCATION SEARCH: Filtered to {allowed_location_ids}") return super()._name_search(name=name, domain=domain, operator=operator, limit=limit, order=order) class StockLot(models.Model): _inherit = 'stock.lot' @api.model def _search(self, domain, offset=0, limit=None, order=None, *args, **kwargs): ctx = self.env.context # We only want to filter if the user is in a picking using lot_id directly (like Receive operations) active_picking_id = ctx.get('active_picking_id') loc_id = ctx.get('default_location_id') if active_picking_id and loc_id: loc = self.env['stock.location'].sudo().browse(loc_id) # If the source is an internal or transit location, restrict the dropdown to lots actually present there. # If the source is a supplier, we do not filter (they could be receiving brand new lots). if loc.exists() and loc.usage != 'supplier': quant_domain = [ ('location_id', 'child_of', loc.id), ('quantity', '>', 0), ('lot_id', '!=', False) ] # Highly optimized query: only search quants for the specific product product_id = ctx.get('default_product_id') if product_id: quant_domain.append(('product_id', '=', product_id)) quants = self.env['stock.quant'].sudo().search(quant_domain) lot_ids = list(set(quants.mapped('lot_id').ids)) domain = Domain.AND([domain, [('id', 'in', lot_ids)]]) return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs) _logger.info("="*80) _logger.info("STOCK_RESTRICT_SOURCE_LOCATION: stock_location.py module loaded successfully!") _logger.info("="*80)