forked from Mapan/odoo17e
348 lines
16 KiB
Python
348 lines
16 KiB
Python
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
from odoo import models, api, fields, _
|
|
from odoo.exceptions import UserError, ValidationError
|
|
from odoo.tools import SQL
|
|
|
|
from psycopg2 import ProgrammingError, errorcodes
|
|
|
|
from dateutil.relativedelta import relativedelta
|
|
|
|
import ast
|
|
import timeit
|
|
import logging
|
|
import re
|
|
|
|
from odoo.osv.expression import get_unaccent_wrapper
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
# Merge list of list based on their common element
|
|
# Input: [['a', 'b'], ['b', 'c'], ['d', 'e']]
|
|
# Output: [['a', 'b', 'c'], ['d', 'e']]
|
|
# https://stackoverflow.com/a/9112588
|
|
def merge_common_lists(lsts):
|
|
sets = [set(lst) for lst in lsts if lst]
|
|
merged = True
|
|
while merged:
|
|
merged = False
|
|
results = []
|
|
while sets:
|
|
common, rest = sets[0], sets[1:]
|
|
sets = []
|
|
for x in rest:
|
|
if x.isdisjoint(common):
|
|
sets.append(x)
|
|
else:
|
|
merged = True
|
|
common |= x
|
|
results.append(common)
|
|
sets = results
|
|
return sets
|
|
|
|
|
|
class DataMergeModel(models.Model):
|
|
_name = 'data_merge.model'
|
|
_description = 'Deduplication Model'
|
|
_order = 'name'
|
|
|
|
name = fields.Char(string='Name', readonly=False, store=True, required=True, copy=False, compute='_compute_name')
|
|
active = fields.Boolean(default=True)
|
|
|
|
res_model_id = fields.Many2one('ir.model', string='Model', required=True, ondelete='cascade')
|
|
res_model_name = fields.Char(related='res_model_id.model', string='Model Name', readonly=True, store=True)
|
|
domain = fields.Char(string='Domain', help='Records eligible for the deduplication process')
|
|
removal_mode = fields.Selection([
|
|
('archive', 'Archive'),
|
|
('delete', 'Delete')], string='Duplicate Removal', default='archive')
|
|
merge_mode = fields.Selection([
|
|
('manual', 'Manual'),
|
|
('automatic', 'Automatic')], string='Merge Mode', default='manual')
|
|
custom_merge_method = fields.Boolean(compute='_compute_custom_merge_method')
|
|
|
|
rule_ids = fields.One2many('data_merge.rule', 'model_id', string="Deduplication Rules", help='Suggest to merge records matching at least one of these rules')
|
|
records_to_merge_count = fields.Integer(compute='_compute_records_to_merge_count')
|
|
|
|
mix_by_company = fields.Boolean('Cross-Company', default=False, help="When enabled, duplicates across different companies will be suggested")
|
|
|
|
### User Notifications for Manual merge
|
|
notify_user_ids = fields.Many2many('res.users', string='Notify Users',
|
|
help='List of users to notify when there are new records to merge',
|
|
domain=lambda self: [('groups_id', 'in', self.env.ref('base.group_system').id)])
|
|
notify_frequency = fields.Integer(string='Notify', default=1)
|
|
notify_frequency_period = fields.Selection([
|
|
('days', 'Days'),
|
|
('weeks', 'Weeks'),
|
|
('months', 'Months')], string='Notify Frequency Period', default='weeks')
|
|
last_notification = fields.Datetime(readonly=True)
|
|
|
|
### Similarity Threshold for Automatic merge
|
|
merge_threshold = fields.Integer(string='Similarity Threshold', default=75, help='Records with a similarity percentage above this threshold will be automatically merged')
|
|
create_threshold = fields.Integer(string='Suggestion Threshold', default=0, help='Duplicates with a similarity below this threshold will not be suggested', groups='base.group_no_one')
|
|
|
|
### Contextual menu action
|
|
is_contextual_merge_action = fields.Boolean(string='Merge action attached', help='If True, this record is used for contextual menu action "Merge" on the target model.')
|
|
|
|
_sql_constraints = [
|
|
('uniq_name', 'UNIQUE(name)', 'This name is already taken'),
|
|
('check_notif_freq', 'CHECK(notify_frequency > 0)', 'The notification frequency should be greater than 0'),
|
|
]
|
|
|
|
@api.depends('res_model_id')
|
|
def _compute_name(self):
|
|
for dm_model in self:
|
|
dm_model.name = dm_model.res_model_id.name if dm_model.res_model_id else ''
|
|
|
|
@api.onchange('res_model_id')
|
|
def _onchange_res_model_id(self):
|
|
self._check_prevent_merge()
|
|
if self.res_model_name:
|
|
self.env[self.res_model_name].check_access_rights('read')
|
|
if any(rule.field_id.model_id != self.res_model_id for rule in self.rule_ids):
|
|
self.rule_ids = [(5, 0, 0)]
|
|
|
|
def _compute_records_to_merge_count(self):
|
|
count_data = self.env['data_merge.record'].with_context(data_merge_model_ids=tuple(self.ids))._read_group([('model_id', 'in', self.ids)], ['model_id'], ['__count'])
|
|
counts = {model.id: count for model, count in count_data}
|
|
for dm_model in self:
|
|
dm_model.records_to_merge_count = counts[dm_model.id] if dm_model.id in counts else 0
|
|
|
|
@api.onchange('res_model_name')
|
|
def _compute_custom_merge_method(self):
|
|
for dm_model in self:
|
|
if dm_model.res_model_name:
|
|
dm_model.custom_merge_method = hasattr(self.env[dm_model.res_model_name], '_merge_method')
|
|
else:
|
|
dm_model.custom_merge_method = False
|
|
|
|
############################
|
|
### Cron / find duplicates
|
|
############################
|
|
def _notify_new_duplicates(self):
|
|
"""
|
|
Notify the configured users when new duplicate records are found.
|
|
The method is called after the identification process and will notify based on the configured frequency.
|
|
"""
|
|
for dm_model in self.env['data_merge.model'].search([('merge_mode', '=', 'manual')]):
|
|
if not dm_model.notify_user_ids or not dm_model.notify_frequency:
|
|
continue
|
|
|
|
if dm_model.notify_frequency_period == 'days':
|
|
delta = relativedelta(day=dm_model.notify_frequency)
|
|
elif dm_model.notify_frequency_period == 'weeks':
|
|
delta = relativedelta(weeks=dm_model.notify_frequency)
|
|
else:
|
|
delta = relativedelta(months=dm_model.notify_frequency)
|
|
|
|
if not dm_model.last_notification or (dm_model.last_notification + delta) < fields.Datetime.now():
|
|
dm_model.last_notification = fields.Datetime.now()
|
|
dm_model._send_notification(delta)
|
|
|
|
def _send_notification(self, delta):
|
|
"""
|
|
Send a notification to the users if there are duplicates created since today minus `delta`
|
|
|
|
:param delta: delta representing the notification frequency
|
|
"""
|
|
self.ensure_one()
|
|
last_date = fields.Date.today() - delta
|
|
num_records = self.env['data_merge.record'].search_count([
|
|
('model_id', '=', self.id),
|
|
('create_date', '>=', last_date),
|
|
])
|
|
if num_records:
|
|
partner_ids = self.notify_user_ids.partner_id.ids
|
|
menu_id = self.env.ref('data_recycle.menu_data_cleaning_root').id
|
|
self.env['mail.thread'].sudo().message_notify(
|
|
body=self.env['ir.qweb']._render(
|
|
'data_merge.data_merge_duplicate',
|
|
dict(
|
|
num_records=num_records,
|
|
res_model_label=self.res_model_id.name,
|
|
model_id=self.id,
|
|
menu_id=menu_id
|
|
)
|
|
),
|
|
model=self._name,
|
|
notify_author=True,
|
|
partner_ids=partner_ids,
|
|
res_id=self.id,
|
|
subject=_('Duplicates to Merge'),
|
|
)
|
|
|
|
def _cron_find_duplicates(self):
|
|
"""
|
|
Identify duplicate records for each active model and either notify the users or automatically merge the duplicates
|
|
"""
|
|
self.env['data_merge.model'].sudo().search([]).find_duplicates(batch_commits=True)
|
|
self._notify_new_duplicates()
|
|
|
|
def find_duplicates(self, batch_commits=False):
|
|
"""
|
|
Search for duplicate records and create the data_merge.group along with its data_merge.record
|
|
|
|
:param bool batch_commits: If set, will automatically commit every X records
|
|
"""
|
|
unaccent = get_unaccent_wrapper(self.env.cr)
|
|
self.env.flush_all()
|
|
for dm_model in self:
|
|
t1 = timeit.default_timer()
|
|
ids = []
|
|
res_model = self.env[dm_model.res_model_name]
|
|
table = res_model._table
|
|
|
|
for rule in dm_model.rule_ids:
|
|
domain = ast.literal_eval(dm_model.domain or '[]')
|
|
query = res_model._where_calc(domain)
|
|
sql_field = res_model._field_to_sql(table, rule.field_id.name, query)
|
|
if rule.field_id.relation:
|
|
related_model = self.env[rule.field_id.relation]
|
|
lhs_alias, lhs_column = re.findall(r'"([^"]+)"', sql_field.code)
|
|
rhs_alias = query.join(lhs_alias, lhs_column, related_model._table, 'id', lhs_column)
|
|
sql_field = related_model._field_to_sql(rhs_alias, related_model._rec_name, query)
|
|
|
|
if rule.match_mode == 'accent':
|
|
# Since unaccent is case sensitive, we must add a lower to make sql_field insensitive
|
|
sql_field = unaccent(SQL('lower(%s)', sql_field))
|
|
|
|
sql_group_by = SQL()
|
|
company_field = res_model._fields.get('company_id')
|
|
if company_field and not dm_model.mix_by_company:
|
|
if company_field.store:
|
|
sql_group_by = SQL(', %s', res_model._field_to_sql(table, 'company_id', query))
|
|
elif company_field.related and company_field.related_field.store:
|
|
sql_group_by = SQL(', %s', res_model._field_to_sql(table, company_field.related, query))
|
|
|
|
# Get all the rows matching the rule defined
|
|
# (e.g. exact match of the name) having at least 2 records
|
|
# Each row contains the matched value and an array of matching records:
|
|
# | value matched | {array of record IDs matching the field}
|
|
sql = SQL(
|
|
"""
|
|
SELECT %(field)s AS group_field_name,
|
|
array_agg(%(table_id)s ORDER BY %(table_id)s ASC)
|
|
FROM %(tables)s
|
|
WHERE length(%(field)s) > 0 AND %(where_clause)s
|
|
GROUP BY group_field_name %(group_by)s
|
|
HAVING COUNT(%(field)s) > 1
|
|
""",
|
|
field=sql_field,
|
|
table_id=SQL.identifier(table, 'id'),
|
|
tables=query.from_clause,
|
|
where_clause=query.where_clause or SQL("TRUE"),
|
|
group_by=sql_group_by,
|
|
)
|
|
|
|
try:
|
|
self._cr.execute(sql)
|
|
except ProgrammingError as e:
|
|
if e.pgcode == errorcodes.UNDEFINED_FUNCTION:
|
|
raise UserError(_('Missing required PostgreSQL extension: unaccent'))
|
|
raise
|
|
|
|
rows = self._cr.fetchall()
|
|
ids = ids + [row[1] for row in rows]
|
|
|
|
# Fetches the IDs of all the records who already matched (and are not merged),
|
|
# as well as the discarded ones.
|
|
# This prevents creating twice the same groups.
|
|
self._cr.execute("""
|
|
SELECT
|
|
ARRAY_AGG(res_id ORDER BY res_id ASC)
|
|
FROM data_merge_record
|
|
WHERE model_id = %s
|
|
GROUP BY group_id""", [dm_model.id])
|
|
done_groups_res_ids = [set(x[0]) for x in self._cr.fetchall()]
|
|
|
|
_logger.info('Query identification done after %s' % str(timeit.default_timer() - t1))
|
|
t1 = timeit.default_timer()
|
|
if ast.literal_eval(self.env['ir.config_parameter'].get_param('data_merge.merge_lists', 'True')):
|
|
merge_list = merge_common_lists
|
|
else:
|
|
merge_list = lambda x: x
|
|
groups_to_create = [set(r) for r in merge_list(ids) if len(r) > 1]
|
|
_logger.info('Merging lists done after %s' % str(timeit.default_timer() - t1))
|
|
t1 = timeit.default_timer()
|
|
_logger.info('Record creation started at %s', str(t1))
|
|
groups_created = 0
|
|
groups_to_create_count = len(groups_to_create)
|
|
for group_to_create in groups_to_create:
|
|
groups_created += 1
|
|
if groups_created % 100 == 0:
|
|
_logger.info('Created groups %s / %s' % (groups_created, groups_to_create_count))
|
|
|
|
# Check if the IDs of the group to create is already part of an existing group
|
|
# e.g.
|
|
# The group with records A B C already exists:
|
|
# 1/ If group_to_create equals A B, do not create a new group
|
|
# 2/ If group_to_create equals A D, create the new group (A D is not a subset of A B C)
|
|
if any(group_to_create <= x for x in done_groups_res_ids):
|
|
continue
|
|
|
|
group = self.env['data_merge.group'].with_context(prefetch_fields=False).create({'model_id': dm_model.id})
|
|
d = [{'group_id': group.id, 'res_id': rec} for rec in group_to_create]
|
|
self.env['data_merge.record'].with_context(prefetch_fields=False).create(d)
|
|
|
|
if groups_created % 1000 == 0 and batch_commits:
|
|
self.env.cr.commit()
|
|
|
|
group._elect_master_record()
|
|
|
|
if dm_model.create_threshold > 0 and group.similarity * 100 <= dm_model.create_threshold:
|
|
group.unlink()
|
|
continue
|
|
|
|
if dm_model.merge_mode == 'automatic':
|
|
if group.similarity * 100 >= dm_model.merge_threshold:
|
|
group.merge_records()
|
|
group.unlink()
|
|
|
|
_logger.info('Record creation done after %s' % str(timeit.default_timer() - t1))
|
|
|
|
##############
|
|
### Overrides
|
|
##############
|
|
@api.constrains('res_model_id')
|
|
def _check_prevent_merge(self):
|
|
models = set(self.env['ir.model'].browse(self.res_model_id.ids).mapped('model'))
|
|
for model_name in models:
|
|
if model_name and hasattr(self.env[model_name], '_prevent_merge') and self.env[model_name]._prevent_merge:
|
|
raise ValidationError(_('Deduplication is forbidden on the model: %s', model_name))
|
|
|
|
def copy(self, default=None):
|
|
self.ensure_one()
|
|
default = default or {}
|
|
if not default.get('name'):
|
|
default['name'] = _('%s (copy)', self.name)
|
|
return super().copy(default)
|
|
|
|
def write(self, vals):
|
|
if 'active' in vals and not vals['active']:
|
|
self.env['data_merge.group'].search([('model_id', 'in', self.ids)]).unlink()
|
|
|
|
if 'create_threshold' in vals and vals['create_threshold']:
|
|
self.env['data_merge.group'].search([('model_id', 'in', self.ids), ('similarity', '<=', vals['create_threshold'] / 100)]).unlink()
|
|
|
|
return super(DataMergeModel, self).write(vals)
|
|
|
|
def unlink(self):
|
|
if self.ids:
|
|
self.env["mail.message"].search(
|
|
[("model", "=", "data_merge.model"), ("res_id", "in", self.ids)]
|
|
).sudo().unlink()
|
|
return super().unlink()
|
|
|
|
#############
|
|
### Actions
|
|
#############
|
|
def open_records(self):
|
|
self.ensure_one()
|
|
|
|
action = self.env["ir.actions.actions"]._for_xml_id("data_merge.action_data_merge_record")
|
|
action['context'] = dict(ast.literal_eval(action.get('context')), searchpanel_default_model_id=self.id)
|
|
return action
|
|
|
|
def action_find_duplicates(self):
|
|
self.sudo().find_duplicates()
|
|
return self.open_records()
|