stock_inventory_backdate/models/stock_inventory_backdate.py

553 lines
22 KiB
Python
Executable File

from odoo import api, fields, models, _
from odoo.exceptions import UserError, ValidationError
import logging
_logger = logging.getLogger(__name__)
class StockInventoryBackdate(models.Model):
_name = 'stock.inventory.backdate'
_description = 'Backdated Inventory Adjustment'
_inherit = ['mail.thread', 'mail.activity.mixin']
_order = 'backdate_datetime desc, id desc'
name = fields.Char(string='Reference', required=True, default='New', readonly=True, tracking=True)
backdate_datetime = fields.Datetime(
string='Adjustment Date & Time',
required=True,
default=fields.Datetime.now,
help="The date and time for the backdated inventory adjustment",
tracking=True
)
location_id = fields.Many2one(
'stock.location',
string='Location',
required=True,
domain="[('usage', '=', 'internal')]",
tracking=True
)
company_id = fields.Many2one(
'res.company',
string='Company',
required=True,
default=lambda self: self.env.company
)
state = fields.Selection([
('draft', 'Draft'),
('done', 'Done'),
('cancel', 'Cancelled')
], string='Status', default='draft', readonly=True, tracking=True)
line_ids = fields.One2many(
'stock.inventory.backdate.line',
'inventory_id',
string='Inventory Lines'
)
notes = fields.Text(string='Notes')
@api.model
def create(self, vals):
if vals.get('name', 'New') == 'New':
vals['name'] = self.env['ir.sequence'].next_by_code('stock.inventory.backdate') or 'New'
return super(StockInventoryBackdate, self).create(vals)
def action_load_products(self):
"""Load products with current inventory at the location"""
self.ensure_one()
if self.state != 'draft':
raise UserError(_('You can only load products in draft state.'))
if not self.location_id:
raise UserError(_('Please select a location first.'))
# Get all quants at this location
quants = self.env['stock.quant'].search([
('location_id', '=', self.location_id.id),
('company_id', '=', self.company_id.id),
('quantity', '!=', 0)
])
if not quants:
raise UserError(_('No products found at this location. You can add products manually.'))
# Get existing product IDs to avoid duplicates
existing_product_ids = self.line_ids.mapped('product_id').ids
lines = []
for quant in quants:
# Skip if already in lines
if quant.product_id.id in existing_product_ids:
continue
# Get inventory position at the backdate
historical_qty = self._get_historical_quantity(
quant.product_id,
quant.location_id,
quant.lot_id,
quant.package_id,
quant.owner_id,
self.backdate_datetime
)
lines.append((0, 0, {
'product_id': quant.product_id.id,
'lot_id': quant.lot_id.id,
'package_id': quant.package_id.id,
'owner_id': quant.owner_id.id,
'theoretical_qty': historical_qty,
'counted_qty': historical_qty,
'difference_qty': 0.0,
}))
if lines:
self.line_ids = [(0, 0, line[2]) for line in lines]
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _('Products Loaded'),
'message': _('%s product(s) loaded successfully.') % len(lines),
'type': 'success',
'sticky': False,
}
}
else:
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _('No New Products'),
'message': _('All products are already in the list.'),
'type': 'info',
'sticky': False,
}
}
def _get_historical_quantity(self, product, location, lot, package, owner, date):
"""Calculate inventory quantity at a specific date"""
base_domain = [
('product_id', '=', product.id),
('state', '=', 'done'),
('date', '<=', date),
]
if lot:
base_domain.append(('lot_id', '=', lot.id))
if package:
base_domain.append(('package_id', '=', package.id))
if owner:
base_domain.append(('owner_id', '=', owner.id))
# Get all incoming moves (destination = our location)
domain_in = base_domain + [('location_dest_id', '=', location.id)]
moves_in = self.env['stock.move.line'].search(domain_in)
# Get all outgoing moves (source = our location)
domain_out = base_domain + [('location_id', '=', location.id)]
moves_out = self.env['stock.move.line'].search(domain_out)
qty_in = sum(moves_in.mapped('quantity'))
qty_out = sum(moves_out.mapped('quantity'))
return qty_in - qty_out
def _get_historical_cost(self, product, date, company_id):
"""
Calculate historical unit cost at a specific date.
- Standard Price: Returns current standard price.
- AVCO/FIFO: Calculates historical weighted average.
"""
cost_method = product.categ_id.property_cost_method
if cost_method == 'standard':
return product.standard_price
# Optimized Key: Use SQL for aggregation instead of loading all objects
sql = """
SELECT SUM(quantity), SUM(value)
FROM stock_valuation_layer
WHERE product_id = %s
AND create_date <= %s
AND company_id = %s
"""
self.env.cr.execute(sql, (product.id, date, company_id.id))
result = self.env.cr.fetchone()
quantity = result[0] or 0.0
value = result[1] or 0.0
if quantity != 0:
return value / quantity
# Fallback: Try to find the last valid unit_cost from an incoming layer
# This helps if current stock is 0 but we want the 'last known cost'
last_in_sql = """
SELECT value, quantity
FROM stock_valuation_layer
WHERE product_id = %s
AND create_date <= %s
AND company_id = %s
AND quantity > 0
ORDER BY create_date DESC, id DESC
LIMIT 1
"""
self.env.cr.execute(last_in_sql, (product.id, date, company_id.id))
last_in = self.env.cr.fetchone()
if last_in and last_in[1] != 0:
return last_in[0] / last_in[1]
return product.standard_price
def action_validate(self):
"""Validate and create backdated stock moves"""
self.ensure_one()
if self.state != 'draft':
raise UserError(_('Only draft adjustments can be validated.'))
if not self.line_ids:
raise UserError(_('Please add at least one inventory line.'))
# Create stock moves for each line with differences
for line in self.line_ids:
if line.difference_qty != 0:
line._create_stock_move()
self.write({'state': 'done'})
return True
def action_cancel(self):
"""Cancel the adjustment"""
self.ensure_one()
if self.state == 'done':
raise UserError(_('Cannot cancel a validated adjustment.'))
self.write({'state': 'cancel'})
return True
def action_draft(self):
"""Reset to draft"""
self.ensure_one()
self.write({'state': 'draft'})
return True
class StockInventoryBackdateLine(models.Model):
_name = 'stock.inventory.backdate.line'
_description = 'Backdated Inventory Adjustment Line'
_sql_constraints = [
('unique_product_per_inventory',
'unique(inventory_id, product_id, lot_id, package_id, owner_id)',
'You cannot have duplicate products with the same lot/package/owner in the same adjustment!')
]
inventory_id = fields.Many2one(
'stock.inventory.backdate',
string='Inventory Adjustment',
required=True,
ondelete='cascade'
)
product_id = fields.Many2one(
'product.product',
string='Product',
required=True,
domain="[('type', '=', 'product')]"
)
lot_id = fields.Many2one('stock.lot', string='Lot/Serial Number')
package_id = fields.Many2one('stock.quant.package', string='Package')
owner_id = fields.Many2one('res.partner', string='Owner')
theoretical_qty = fields.Float(
string='Theoretical Quantity',
readonly=True,
help="Quantity at the backdated time (can be negative if there was negative stock)"
)
counted_qty = fields.Float(
string='Counted Quantity',
required=True,
default=0.0
)
difference_qty = fields.Float(
string='Adjustment Qty (+/-)',
default=0.0,
help="Positive value adds stock, negative value removes stock."
)
unit_cost = fields.Float(
string='Unit Cost',
digits='Product Price',
help="Custom unit cost for this backdated adjustment. If left 0, it may use standard price."
)
product_uom_id = fields.Many2one(
'uom.uom',
string='Unit of Measure',
related='product_id.uom_id',
readonly=True
)
state = fields.Selection(related='inventory_id.state', string='Status')
has_negative_theoretical = fields.Boolean(
string='Has Negative Theoretical',
compute='_compute_has_negative_theoretical',
help="Indicates if theoretical quantity is negative"
)
@api.onchange('counted_qty')
def _onchange_counted_qty(self):
for line in self:
line.difference_qty = line.counted_qty - line.theoretical_qty
@api.onchange('difference_qty')
def _onchange_difference_qty(self):
for line in self:
line.counted_qty = line.theoretical_qty + line.difference_qty
@api.depends('theoretical_qty')
def _compute_has_negative_theoretical(self):
for line in self:
line.has_negative_theoretical = line.theoretical_qty < 0
@api.onchange('product_id', 'lot_id', 'package_id', 'owner_id')
def _onchange_product_id(self):
"""Auto-calculate theoretical quantity when product is selected"""
if self.product_id and self.inventory_id.location_id and self.inventory_id.backdate_datetime:
self.theoretical_qty = self.inventory_id._get_historical_quantity(
self.product_id,
self.inventory_id.location_id,
self.lot_id,
self.package_id,
self.owner_id,
self.inventory_id.backdate_datetime
)
# Set counted_qty to theoretical_qty by default (Adjustment 0)
if not self.counted_qty and not self.difference_qty:
self.counted_qty = self.theoretical_qty
self.difference_qty = 0.0
# Calculate historical unit cost
if self.inventory_id.backdate_datetime:
limit_date = self.inventory_id.backdate_datetime
self.unit_cost = self.inventory_id._get_historical_cost(
self.product_id,
limit_date,
self.inventory_id.company_id
)
else:
self.unit_cost = self.product_id.standard_price
def _create_stock_move(self):
"""Create backdated stock move for this line"""
self.ensure_one()
if self.difference_qty == 0:
return
# Find inventory adjustment location
inventory_location = self.env['stock.location'].search([
('usage', '=', 'inventory'),
('company_id', 'in', [self.inventory_id.company_id.id, False])
], limit=1)
if not inventory_location:
raise UserError(_('Inventory adjustment location not found. Please check your stock configuration.'))
# Determine source and destination based on difference
if self.difference_qty > 0:
# Increase inventory
location_id = inventory_location.id
location_dest_id = self.inventory_id.location_id.id
qty = self.difference_qty
else:
# Decrease inventory
location_id = self.inventory_id.location_id.id
location_dest_id = inventory_location.id
qty = abs(self.difference_qty)
# Create stock move with backdated datetime
backdate = self.inventory_id.backdate_datetime
move_vals = {
'name': _('Backdated Inventory Adjustment: %s') % self.inventory_id.name,
'product_id': self.product_id.id,
'product_uom': self.product_uom_id.id,
'product_uom_qty': qty,
'location_id': location_id,
'location_dest_id': location_dest_id,
'company_id': self.inventory_id.company_id.id,
'is_inventory': True,
'origin': self.inventory_id.name,
'date': backdate,
}
move = self.env['stock.move'].create(move_vals)
move._action_confirm()
# Check if move line was already created by _action_confirm (e.g. reservation)
move_line = move.move_line_ids.filtered(lambda ml: ml.product_id.id == self.product_id.id)
move_line_vals = {
'product_id': self.product_id.id,
'product_uom_id': self.product_uom_id.id,
'quantity': qty,
'location_id': location_id,
'location_dest_id': location_dest_id,
'lot_id': self.lot_id.id if self.lot_id else False,
'package_id': self.package_id.id if self.package_id else False,
'owner_id': self.owner_id.id if self.owner_id else False,
'date': backdate,
}
if move_line:
# Update existing line
move_line = move_line[0]
move_line.write(move_line_vals)
_logger.info(f"Updated existing move line {move_line.id} with quantity={qty}")
else:
# Create new line if none exists
move_line_vals['move_id'] = move.id
move_line = self.env['stock.move.line'].create(move_line_vals)
_logger.info(f"Created new move line {move_line.id} with quantity={qty}")
_logger.info(f"Created move line {move_line.id} with quantity_done={qty}")
# Log product valuation settings
product = self.product_id
_logger.info(f"Product: {product.name}, Category: {product.categ_id.name}")
_logger.info(f"Valuation: {product.categ_id.property_valuation}, Cost Method: {product.categ_id.property_cost_method}")
_logger.info(f"Product Cost: {product.standard_price}")
# Mark as picked (required for Odoo 17 _action_done)
move.picked = True
for ml in move.move_line_ids:
ml.picked = True
# Mark as done
_logger.info(f"Move state before _action_done: {move.state}")
result = move._action_done()
_logger.info(f"Move state after _action_done: {move.state}")
_logger.info(f"_action_done returned: {result}")
# Refresh move to get latest data
move = self.env['stock.move'].browse(move.id)
_logger.info(f"Move state after refresh: {move.state}")
# CRITICAL: Update dates via direct SQL after _action_done
# The _action_done method overwrites dates, so we must update after
_logger.info(f"Backdating move {move.id} to {backdate}")
# Flush all pending ORM operations to DB before running raw SQL
self.env.flush_all()
# Update stock move
self.env.cr.execute(
"UPDATE stock_move SET date = %s WHERE id = %s",
(backdate, move.id)
)
_logger.info(f"Updated stock_move {move.id}, rows affected: {self.env.cr.rowcount}")
# Update stock move lines
self.env.cr.execute(
"UPDATE stock_move_line SET date = %s WHERE move_id = %s",
(backdate, move.id)
)
_logger.info(f"Updated stock_move_line for move {move.id}, rows affected: {self.env.cr.rowcount}")
# Update stock valuation layer
# Check if valuation layer exists
svl = self.env['stock.valuation.layer'].search([('stock_move_id', '=', move.id)], limit=1)
# svl_count = self.env['stock.valuation.layer'].search_count([('stock_move_id', '=', move.id)])
_logger.info(f"Found stock valuation layer for move {move.id}: {svl}")
if svl:
new_val_layer_vals = {}
# Update Date
new_val_layer_vals['create_date'] = backdate
# --- COST ADJUSTMENT LOGIC ---
# If unit_cost is provided, we override the value on the layer
# and potentially update the account move
if self.unit_cost and self.unit_cost > 0:
original_value = svl.value
new_value = self.unit_cost * qty
if self.difference_qty < 0:
new_value = -abs(new_value) # Ensure negative if qty is negative
if abs(new_value - original_value) > 0.01:
_logger.info(f"Overriding SVL Value from {original_value} to {new_value} (Unit Cost: {self.unit_cost})")
# Update SVL via SQL to avoid constraint issues or re-triggering logic
# Also need to update remaining_value if applicable?
# For incoming (positive diff), remaining_value should match value.
# For outgoing, usually remaining_value is 0 on the layer itself (it consumes others).
update_sql = "UPDATE stock_valuation_layer SET create_date = %s, value = %s, unit_cost = %s"
params = [backdate, new_value, self.unit_cost]
if self.difference_qty > 0:
# Incoming: update remaining_value too
update_sql += ", remaining_value = %s"
params.append(new_value)
update_sql += " WHERE id = %s"
params.append(svl.id)
self.env.cr.execute(update_sql, tuple(params))
# Check Account Move and update amount
if move.account_move_ids:
for am in move.account_move_ids:
# Update lines
# We need to find which line is Debit/Credit and scale them?
# Or just assuming 2 lines, simpler to match the total?
# Let's check amounts.
# Usually Inventory Adjustment creates:
# Dr Stock, Cr Inventory Adjustment (for Gain)
# Dr Inventory Adjustment, Cr Stock (for Loss)
# We just need to replace the absolute amount on all lines that matched the old absolute amount?
# Risky if multiple lines.
# Better: Iterate lines. If line amount matches old abs(value), update to new abs(value).
for line in am.line_ids:
if abs(line.debit - abs(original_value)) < 0.01:
self.env.cr.execute("UPDATE account_move_line SET debit = %s WHERE id = %s", (abs(new_value), line.id))
if abs(line.credit - abs(original_value)) < 0.01:
self.env.cr.execute("UPDATE account_move_line SET credit = %s WHERE id = %s", (abs(new_value), line.id))
_logger.info(f"Updated account_move {am.id} amounts to match new value {new_value}")
else:
# just update date
self.env.cr.execute(
"UPDATE stock_valuation_layer SET create_date = %s WHERE id = %s",
(backdate, svl.id)
)
else:
# No custom cost, just update date
self.env.cr.execute(
"UPDATE stock_valuation_layer SET create_date = %s WHERE id = %s",
(backdate, svl.id)
)
_logger.info(f"Updated stock_valuation_layer for move {move.id}")
else:
_logger.warning(f"No stock valuation layer found for move {move.id}. Product may not use real-time valuation or cost is zero.")
# Update account moves dates if they exist (Run unconditionally as date fix is always needed)
# Refresh move to get account_move_ids
move = self.env['stock.move'].browse(move.id)
if move.account_move_ids:
account_date = backdate.date()
for account_move in move.account_move_ids:
self.env.cr.execute(
"UPDATE account_move SET date = %s WHERE id = %s",
(account_date, account_move.id)
)
self.env.cr.execute(
"UPDATE account_move_line SET date = %s WHERE move_id = %s",
(account_date, account_move.id)
)
_logger.info(f"Updated account_move {account_move.id} and lines to {account_date}")
# Invalidate cache to ensure ORM reloads data from DB
self.env.invalidate_all()
# Invalidate cache
self.env.cache.invalidate()
return move