stock_restrict_source_location/models/stock_location.py

167 lines
7.5 KiB
Python

from odoo import api, fields, models
from odoo.exceptions import UserError
from odoo.fields import Domain
import logging
_logger = logging.getLogger(__name__)
# Log when this module is loaded
_logger.info("="*80)
_logger.info("STOCK_RESTRICT_SOURCE_LOCATION: stock_location.py module is being loaded!")
class StockQuant(models.Model):
_inherit = 'stock.quant'
def _get_allowed_locations(self):
"""Helper to extract allowed location IDs from context"""
ctx = self.env.context
# FIX: If we are picking from a non-internal location (like Transit, Supplier, or Customer),
# we should NOT apply strict internal location restrictions, because the stock MUST come
# from that exact external/transit location.
loc_id = ctx.get('default_location_id')
if loc_id:
loc = self.env['stock.location'].sudo().browse(loc_id)
if loc.exists() and loc.usage != 'internal':
return []
allowed_location_ids = []
# 1. Try from explicit keys often passed by UI or patches
raw_ids = (ctx.get('allowed_source_location_ids') or
ctx.get('default_allowed_source_location_ids'))
if raw_ids:
if isinstance(raw_ids, list):
if raw_ids and isinstance(raw_ids[0], (list, tuple)) and raw_ids[0][0] == 6:
allowed_location_ids = raw_ids[0][2]
else:
allowed_location_ids = [r for r in raw_ids if isinstance(r, int)]
elif isinstance(raw_ids, int):
allowed_location_ids = [raw_ids]
# 2. Try from active move
if not allowed_location_ids:
active_move_id = ctx.get('active_move_id') or ctx.get('default_move_id')
if active_move_id:
move = self.env['stock.move'].sudo().browse(active_move_id)
if move.exists() and move.allowed_source_location_ids:
allowed_location_ids = move.allowed_source_location_ids.ids
# 3. Try from active MO
if not allowed_location_ids:
active_mo_id = ctx.get('active_mo_id') or ctx.get('default_raw_material_production_id')
if active_mo_id:
mo = self.env['mrp.production'].sudo().browse(active_mo_id)
if mo.exists() and mo.allowed_source_location_ids:
allowed_location_ids = mo.allowed_source_location_ids.ids
# 4. Fallback to picking type in context
# 4. Fallback to picking type in context
if not allowed_location_ids:
picking_type_id = ctx.get('default_picking_type_id')
if picking_type_id:
picking_type = self.env['stock.picking.type'].sudo().browse(picking_type_id)
if picking_type.exists() and picking_type.default_location_src_ids:
allowed_location_ids = picking_type.default_location_src_ids.ids
return allowed_location_ids
@api.model
def _get_gather_domain(self, product_id, location_id, lot_id=None, package_id=None, owner_id=None, strict=False):
"""Override to apply location restrictions during reservation (gather)"""
result_domain = super()._get_gather_domain(product_id, location_id, lot_id, package_id, owner_id, strict)
# FIX: If reserving from a non-internal location, DO NOT apply internal restrictions.
if location_id and location_id.usage != 'internal':
return result_domain
allowed_location_ids = self._get_allowed_locations()
if allowed_location_ids:
result_domain = Domain.AND([result_domain, [('location_id', 'in', allowed_location_ids)]])
return result_domain
def _search(self, domain, offset=0, limit=None, order=None, *args, **kwargs):
"""Override to apply location restrictions during search (e.g. catalog or list selection)"""
allowed_location_ids = self._get_allowed_locations()
if allowed_location_ids:
# 1. Smart Domain Swap: find if there's already a location_id restriction (like child_of)
# and REPLACE it instead of AND-ing it. This prevents collisions with Odoo's
# default 'child_of location_src_id' behavior.
found_collision = False
new_domain = []
for leaf in domain:
if (isinstance(leaf, (list, tuple)) and
len(leaf) == 3 and
leaf[0] == 'location_id' and
leaf[1] == 'child_of'):
new_domain.append(('location_id', 'in', allowed_location_ids))
found_collision = True
else:
new_domain.append(leaf)
if found_collision:
domain = new_domain
else:
# 2. Standard AND merge if no collision found
domain = Domain.AND([domain, [('location_id', 'in', allowed_location_ids)]])
return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs)
class StockLocation(models.Model):
_inherit = 'stock.location'
@api.model
def _name_search(self, name='', domain=None, operator='ilike', limit=None, order=None):
"""Override to restrict locations based on context"""
_logger.info(f"LOCATION SEARCH: name={name}, context={self.env.context}")
domain = domain or []
allowed_location_ids = self.env['stock.quant']._get_allowed_locations()
if allowed_location_ids:
domain = Domain.AND([domain, [('id', 'in', allowed_location_ids)]])
_logger.info(f"LOCATION SEARCH: Filtered to {allowed_location_ids}")
return super()._name_search(name=name, domain=domain, operator=operator, limit=limit, order=order)
class StockLot(models.Model):
_inherit = 'stock.lot'
@api.model
def _search(self, domain, offset=0, limit=None, order=None, *args, **kwargs):
ctx = self.env.context
# We only want to filter if the user is in a picking using lot_id directly (like Receive operations)
active_picking_id = ctx.get('active_picking_id')
loc_id = ctx.get('default_location_id')
if active_picking_id and loc_id:
loc = self.env['stock.location'].sudo().browse(loc_id)
# If the source is an internal or transit location, restrict the dropdown to lots actually present there.
# If the source is a supplier, we do not filter (they could be receiving brand new lots).
if loc.exists() and loc.usage != 'supplier':
quant_domain = [
('location_id', 'child_of', loc.id),
('quantity', '>', 0),
('lot_id', '!=', False)
]
# Highly optimized query: only search quants for the specific product
product_id = ctx.get('default_product_id')
if product_id:
quant_domain.append(('product_id', '=', product_id))
quants = self.env['stock.quant'].sudo().search(quant_domain)
lot_ids = list(set(quants.mapped('lot_id').ids))
domain = Domain.AND([domain, [('id', 'in', lot_ids)]])
return super()._search(domain, offset=offset, limit=limit, order=order, *args, **kwargs)
_logger.info("="*80)
_logger.info("STOCK_RESTRICT_SOURCE_LOCATION: stock_location.py module loaded successfully!")
_logger.info("="*80)