1
0
forked from Mapan/odoo17e
odoo17e-kedaikipas58/addons/account_reports/models/account_sales_report.py
2024-12-10 09:04:09 +07:00

397 lines
20 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from collections import defaultdict
from odoo import _, api, fields, models
from odoo.tools import get_lang
class ECSalesReportCustomHandler(models.AbstractModel):
_name = 'account.ec.sales.report.handler'
_inherit = 'account.report.custom.handler'
_description = 'EC Sales Report Custom Handler'
def _get_custom_display_config(self):
return {
'components': {
'AccountReportFilters': 'account_reports.SalesReportFilters',
},
}
def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None):
"""
Generate the dynamic lines for the report in a vertical style (one line per tax per partner).
"""
lines = []
totals_by_column_group = {
column_group_key: {
'balance': 0.0,
'goods': 0.0,
'triangular': 0.0,
'services': 0.0,
'vat_number': '',
'country_code': '',
'sales_type_code': '',
}
for column_group_key in options['column_groups']
}
operation_categories = options['sales_report_taxes'].get('operation_category', {})
ec_tax_filter_selection = {v.get('id'): v.get('selected') for v in options.get('ec_tax_filter_selection', [])}
for partner, results in self._query_partners(report, options, warnings):
for tax_ec_category in ('goods', 'triangular', 'services'):
if not ec_tax_filter_selection[tax_ec_category]:
# Skip the line if the tax is not selected
continue
partner_values = defaultdict(dict)
country_specific_code = operation_categories.get(tax_ec_category)
has_found_a_line = False
for col_grp_key in options['column_groups']:
partner_sum = results.get(col_grp_key, {})
partner_values[col_grp_key]['vat_number'] = partner_sum.get('vat_number', 'UNKNOWN')
partner_values[col_grp_key]['country_code'] = partner_sum.get('country_code', 'UNKNOWN')
partner_values[col_grp_key]['sales_type_code'] = []
partner_values[col_grp_key]['balance'] = partner_sum.get(tax_ec_category, 0.0)
totals_by_column_group[col_grp_key]['balance'] += partner_sum.get(tax_ec_category, 0.0)
for i, operation_id in enumerate(partner_sum.get('tax_element_id', [])):
if operation_id in options['sales_report_taxes'][tax_ec_category]:
has_found_a_line = True
partner_values[col_grp_key]['sales_type_code'] += [
country_specific_code or
(partner_sum.get('sales_type_code') and partner_sum.get('sales_type_code')[i])
or None]
partner_values[col_grp_key]['sales_type_code'] = ', '.join(set(partner_values[col_grp_key]['sales_type_code']))
if has_found_a_line:
lines.append((0, self._get_report_line_partner(report, options, partner, partner_values, markup=tax_ec_category)))
# Report total line.
if lines:
lines.append((0, self._get_report_line_total(report, options, totals_by_column_group)))
return lines
def _caret_options_initializer(self):
"""
Add custom caret option for the report to link to the partner and allow cleaner overrides.
"""
return {
'ec_sales': [
{'name': _("View Partner"), 'action': 'caret_option_open_record_form'}
],
}
def _custom_options_initializer(self, report, options, previous_options=None):
"""
Add the invoice lines search domain that is specific to the country.
Typically, the taxes tag_ids relative to the country for the triangular, sale of goods or services
:param dict options: Report options
:param dict previous_options: Previous report options
"""
super()._custom_options_initializer(report, options, previous_options=previous_options)
self._init_core_custom_options(report, options, previous_options)
options.update({
'sales_report_taxes': {
'goods': tuple(self.env['account.tax'].search([
*self.env['account.tax']._check_company_domain(self.env.company),
('amount', '=', 0.0),
('amount_type', '=', 'percent'),
('type_tax_use', '=', 'sale'),
]).ids),
'services': tuple(),
'triangular': tuple(),
'use_taxes_instead_of_tags': True,
# We can't use tags as we don't have a country tax report correctly set, 'use_taxes_instead_of_tags'
# should never be used outside this case
}
})
country_ids = self.env['res.country'].search([
('code', 'in', tuple(self._get_ec_country_codes(options)))
]).ids
other_country_ids = tuple(set(country_ids) - {self.env.company.account_fiscal_country_id.id})
options.setdefault('forced_domain', []).extend([
'|',
('move_id.partner_shipping_id.country_id', 'in', other_country_ids),
'&',
('move_id.partner_shipping_id', '=', False),
('partner_id.country_id', 'in', other_country_ids),
])
report._init_options_journals(options, previous_options=previous_options)
self._enable_export_buttons_for_common_vat_groups_in_branches(options)
def _init_core_custom_options(self, report, options, previous_options=None):
"""
Add the invoice lines search domain that is common to all countries.
:param dict options: Report options
:param dict previous_options: Previous report options
"""
default_tax_filter = [
{'id': 'goods', 'name': _('Goods'), 'selected': True},
{'id': 'triangular', 'name': _('Triangular'), 'selected': True},
{'id': 'services', 'name': _('Services'), 'selected': True},
]
ec_tax_filter_selection = (previous_options or {}).get('ec_tax_filter_selection', default_tax_filter)
# In case we have a EC sale list report with more ec_tax_filter_selection the previous options will have extra
# item we just keep the default ones, and we let variant extend the function to add the ones they need
if ec_tax_filter_selection != default_tax_filter:
filtered_ec_tax_filter_selection = [item for item in ec_tax_filter_selection if item['id'] in {item['id'] for item in default_tax_filter}]
options['ec_tax_filter_selection'] = filtered_ec_tax_filter_selection
else:
options['ec_tax_filter_selection'] = ec_tax_filter_selection
def _get_report_line_partner(self, report, options, partner, partner_values, markup=''):
"""
Convert the partner values to a report line.
:param dict options: Report options
:param recordset partner: the corresponding res.partner record
:param dict partner_values: Dictionary of values for the report line
:return dict: Return a dict with the values for the report line.
"""
column_values = []
for column in options['columns']:
value = partner_values[column['column_group_key']].get(column['expression_label'])
column_values.append(report._build_column_dict(value, column, options=options))
return {
'id': report._get_generic_line_id('res.partner', partner.id, markup=markup),
'name': partner is not None and (partner.name or '')[:128] or _('Unknown Partner'),
'columns': column_values,
'level': 2,
'trust': partner.trust if partner else None,
'caret_options': 'ec_sales',
}
def _get_report_line_total(self, report, options, totals_by_column_group):
"""
Convert the total values to a report line.
:param dict options: Report options
:param dict totals_by_column_group: Dictionary of values for the total line
:return dict: Return a dict with the values for the report line.
"""
column_values = []
for column in options['columns']:
col_value = totals_by_column_group[column['column_group_key']].get(column['expression_label'])
col_value = col_value if column['figure_type'] == 'monetary' else ''
column_values.append(report._build_column_dict(col_value, column, options=options))
return {
'id': report._get_generic_line_id(None, None, markup='total'),
'name': _('Total'),
'class': 'total',
'level': 1,
'columns': column_values,
}
def _query_partners(self, report, options, warnings=None):
''' Execute the queries, perform all the computation, then
returns a lists of tuple (partner, fetched_values) sorted by the table's model _order:
- partner is a res.parter record.
- fetched_values is a dictionary containing:
- sums by operation type: {'goods': float,
'triangular': float,
'services': float,
- tax identifiers: 'tax_element_id': list[int], > the tag_id in almost every case
'sales_type_code': list[str],
- partner identifier elements: 'vat_number': str,
'full_vat_number': str,
'country_code': str}
:param options: The report options.
:return: (accounts_values, taxes_results)
'''
groupby_partners = {}
def assign_sum(row):
"""
Assign corresponding values from the SQL querry row to the groupby_partners dictionary.
If the line balance isn't 0, find the tax tag_id and check in which column/report line the SQL line balance
should be displayed.
The tricky part is to allow for the report to be displayed in vertical or horizontal format.
In vertical, you have up to 3 lines per partner (one for each operation type).
In horizontal, you have one line with 3 columns per partner (one for each operation type).
Add then the more straightforward data (vat number, country code, ...)
:param dict row:
"""
if not company_currency.is_zero(row['balance']):
groupby_partners.setdefault(row['groupby'], defaultdict(lambda: defaultdict(float)))
groupby_partners_keyed = groupby_partners[row['groupby']][row['column_group_key']]
if row['tax_element_id'] in options['sales_report_taxes']['goods']:
groupby_partners_keyed['goods'] += row['balance']
elif row['tax_element_id'] in options['sales_report_taxes']['triangular']:
groupby_partners_keyed['triangular'] += row['balance']
elif row['tax_element_id'] in options['sales_report_taxes']['services']:
groupby_partners_keyed['services'] += row['balance']
groupby_partners_keyed.setdefault('tax_element_id', []).append(row['tax_element_id'])
groupby_partners_keyed.setdefault('sales_type_code', []).append(row['sales_type_code'])
vat = row['vat_number'] or ''
vat_country_code = vat[:2] if vat[:2].isalpha() else None
groupby_partners_keyed.setdefault('vat_number', vat if not vat_country_code else vat[2:])
groupby_partners_keyed.setdefault('full_vat_number', vat)
groupby_partners_keyed.setdefault('country_code', vat_country_code or row.get('country_code'))
if warnings is not None:
if row['country_code'] not in self._get_ec_country_codes(options):
warnings['account_reports.sales_report_warning_non_ec_country'] = {'alert_type': 'warning'}
elif not row.get('vat_number'):
warnings['account_reports.sales_report_warning_missing_vat'] = {'alert_type': 'warning'}
if row.get('same_country') and row['country_code']:
warnings['account_reports.sales_report_warning_same_country'] = {'alert_type': 'warning'}
company_currency = self.env.company.currency_id
# Execute the queries and dispatch the results.
query, params = self._get_query_sums(report, options)
self._cr.execute(query, params)
dictfetchall = self._cr.dictfetchall()
for res in dictfetchall:
assign_sum(res)
if groupby_partners:
partners = self.env['res.partner'].with_context(active_test=False).browse(groupby_partners.keys())
else:
partners = self.env['res.partner']
return [(partner, groupby_partners[partner.id]) for partner in partners.sorted()]
def _get_query_sums(self, report, options):
''' Construct a query retrieving all the aggregated sums to build the report. It includes:
- sums for all partners.
- sums for the initial balances.
:param options: The report options.
:return: (query, params)
'''
params = []
queries = []
# Create the currency table.
ct_query = report._get_query_currency_table(options)
allowed_ids = self._get_tag_ids_filtered(options)
# In the case of the generic report, we don't have a country defined. So no reliable tax report whose
# tag_ids can be used. So we have a fallback to tax_ids.
lang = self.env.user.lang or get_lang(self.env).code
if options.get('sales_report_taxes', {}).get('use_taxes_instead_of_tags'):
tax_elem_table = 'account_tax'
aml_rel_table = 'account_move_line_account_tax_rel'
tax_elem_table_name = f"COALESCE(account_tax.name->>'{lang}', account_tax.name->>'en_US')" if \
self.pool['account.tax'].name.translate else 'account_tax.name'
else:
tax_elem_table = 'account_account_tag'
aml_rel_table = 'account_account_tag_account_move_line_rel'
tax_elem_table_name = f"COALESCE(account_account_tag.name->>'{lang}', account_account_tag.name->>'en_US')" if \
self.pool['account.account.tag'].name.translate else 'account_account_tag.name'
for column_group_key, column_group_options in report._split_options_per_column_group(options).items():
tables, where_clause, where_params = report._query_get(column_group_options, 'strict_range')
params.append(column_group_key)
params += where_params
if allowed_ids:
where_clause += f" AND {tax_elem_table}.id IN %s" # Add the tax element filter.
params.append(tuple(allowed_ids))
queries.append(f"""
SELECT
%s AS column_group_key,
account_move_line.partner_id AS groupby,
res_partner.vat AS vat_number,
res_country.code AS country_code,
-SUM(account_move_line.balance) AS balance,
{tax_elem_table_name} AS sales_type_code,
{tax_elem_table}.id AS tax_element_id,
(comp_partner.country_id = res_partner.country_id) AS same_country
FROM {tables}
JOIN {ct_query} ON currency_table.company_id = account_move_line.company_id
JOIN {aml_rel_table} ON {aml_rel_table}.account_move_line_id = account_move_line.id
JOIN {tax_elem_table} ON {aml_rel_table}.{tax_elem_table}_id = {tax_elem_table}.id
JOIN res_partner ON account_move_line.partner_id = res_partner.id
JOIN res_country ON res_partner.country_id = res_country.id
JOIN res_company ON res_company.id = account_move_line.company_id
JOIN res_partner comp_partner ON comp_partner.id = res_company.partner_id
WHERE {where_clause}
GROUP BY {tax_elem_table}.id, {tax_elem_table}.name, account_move_line.partner_id,
res_partner.vat, res_country.code, comp_partner.country_id, res_partner.country_id
""")
return ' UNION ALL '.join(queries), params
@api.model
def _get_tag_ids_filtered(self, options):
"""
Helper function to get all the tag_ids concerned by the report for the given options.
:param dict options: Report options
:return tuple: tag_ids untyped after filtering
"""
allowed_taxes = set()
for operation_type in options.get('ec_tax_filter_selection', []):
if operation_type.get('selected'):
allowed_taxes.update(options['sales_report_taxes'][operation_type.get('id')])
return allowed_taxes
@api.model
def _get_ec_country_codes(self, options):
"""
Return the list of country codes for the EC countries.
:param dict options: Report options
:return set: List of country codes for a given date (UK case)
"""
rslt = {'AT', 'BE', 'BG', 'HR', 'CY', 'CZ', 'DK', 'EE', 'FI', 'FR', 'DE', 'GR', 'HU',
'IE', 'IT', 'LV', 'LT', 'LU', 'MT', 'NL', 'PL', 'PT', 'RO', 'SK', 'SI', 'ES', 'SE', 'XI'}
# GB left the EU on January 1st 2021. But before this date, it's still to be considered as a EC country
if fields.Date.from_string(options['date']['date_from']) < fields.Date.from_string('2021-01-01'):
rslt.add('GB')
# Monaco is treated as part of France for VAT purposes (but should not be displayed within FR context)
if self.env.company.account_fiscal_country_id.code != 'FR':
rslt.add('MC')
return rslt
def get_warning_act_window(self, options, params):
act_window = {'type': 'ir.actions.act_window', 'context': {}}
if params['type'] == 'no_vat':
aml_domains = [
('partner_id.vat', '=', None),
('partner_id.country_id.code', 'in', tuple(self._get_ec_country_codes(options))),
]
act_window.update({
'name': _("Entries with partners with no VAT"),
'context': {'search_default_group_by_partner': 1, 'expand': 1}
})
elif params['type'] == 'non_ec_country':
aml_domains = [('partner_id.country_id.code', 'not in', tuple(self._get_ec_country_codes(options)))]
act_window['name'] = _("EC tax on non EC countries")
else:
aml_domains = [('partner_id.country_id.code', '=', options.get('same_country_warning'))]
act_window['name'] = _("EC tax on same country")
use_taxes_instead_of_tags = options.get('sales_report_taxes', {}).get('use_taxes_instead_of_tags')
tax_or_tag_field = 'tax_ids.id' if use_taxes_instead_of_tags else 'tax_tag_ids.id'
amls = self.env['account.move.line'].search([
*aml_domains,
*self.env['account.report']._get_options_date_domain(options, 'strict_range'),
(tax_or_tag_field, 'in', tuple(self._get_tag_ids_filtered(options)))
])
if params['model'] == 'move':
act_window.update({
'views': [[self.env.ref('account.view_move_tree').id, 'list'], (False, 'form')],
'res_model': 'account.move',
'domain': [('id', 'in', amls.move_id.ids)],
})
else:
act_window.update({
'views': [(False, 'list'), (False, 'form')],
'res_model': 'res.partner',
'domain': [('id', 'in', amls.move_id.partner_id.ids)],
})
return act_window