152 lines
7.0 KiB
Python
152 lines
7.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
from odoo import api, fields, models, _
|
|
from odoo.exceptions import UserError, AccessError
|
|
|
|
class AccountJournal(models.Model):
|
|
_inherit = 'account.journal'
|
|
|
|
@api.model
|
|
def _search(self, domain, offset=0, limit=None, order=None, **kwargs):
|
|
"""
|
|
Override _search to filter journals based on the allowed_journal_ids configuration on res.users.
|
|
This provides a secure, recursive-safe, and context-aware filtering mechanism.
|
|
"""
|
|
user = self.env.user
|
|
|
|
# Determine if we should bypass the journal visibility restrictions:
|
|
# 1. Superuser / system context (env.su) is always bypassed.
|
|
# 2. Skip if user has no allowed journals configured (empty means access to all).
|
|
# 3. Explicit bypass requested in the context (bypass_allowed_journal).
|
|
# 4. Point of Sale contexts:
|
|
# - pos_session_id or pos_config_id is present.
|
|
# - pos_last_server_date is present (POS frontend loading data).
|
|
# 5. During standard module installations/upgrades (install_mode).
|
|
if not self.env.su and user.sudo().allowed_journal_ids:
|
|
bypass = (
|
|
self.env.context.get('bypass_allowed_journal') or
|
|
self.env.context.get('pos_session_id') or
|
|
self.env.context.get('pos_config_id') or
|
|
'pos_last_server_date' in self.env.context or
|
|
self.env.context.get('install_mode')
|
|
)
|
|
if not bypass:
|
|
allowed_ids = list(user.sudo().allowed_journal_ids.ids)
|
|
|
|
# When Odoo's fetch() verifies a record it just loaded, it re-runs
|
|
# _search with [('id', 'in', [X])]. The record was already cleared
|
|
# by _check_access/check_access_rule, so we must honour the explicit
|
|
# IDs present in the domain and add them to the allow-list so they
|
|
# survive the sudo search.
|
|
for leaf in domain:
|
|
if (
|
|
isinstance(leaf, (list, tuple))
|
|
and len(leaf) == 3
|
|
and leaf[0] == 'id'
|
|
and leaf[1] in ('=', 'in')
|
|
):
|
|
value = leaf[2]
|
|
if isinstance(value, (list, tuple, set)):
|
|
allowed_ids.extend(value)
|
|
elif isinstance(value, int):
|
|
allowed_ids.append(value)
|
|
|
|
domain = [('id', 'in', allowed_ids)] + list(domain)
|
|
# Run the search as sudo to bypass standard multi-company rules
|
|
return self.sudo()._search(domain, offset=offset, limit=limit, order=order, **kwargs)
|
|
|
|
return super(AccountJournal, self)._search(domain, offset=offset, limit=limit, order=order, **kwargs)
|
|
|
|
def _check_access(self, operation):
|
|
"""
|
|
Overridden to bypass multi-company record rules for allowed journals,
|
|
and enforce the allowed journals restriction for write, create, and delete operations.
|
|
"""
|
|
if self.env.su:
|
|
return super(AccountJournal, self)._check_access(operation)
|
|
|
|
user = self.env.user
|
|
allowed_journals = user.sudo().allowed_journal_ids
|
|
if allowed_journals:
|
|
# Enforce restriction for write, create, and delete operations.
|
|
if operation in ('write', 'create', 'unlink'):
|
|
forbidden = self.filtered(lambda j: j.id not in allowed_journals.ids)
|
|
if forbidden:
|
|
import functools
|
|
return forbidden, functools.partial(AccessError, _("You do not have access to this journal."))
|
|
|
|
# Read operations are allowed to prevent access errors when loading
|
|
# documents (payments/moves) referencing other journals.
|
|
if operation == 'read':
|
|
return None
|
|
|
|
# If all records in self are allowed, bypass standard record rules.
|
|
if all(j.id in allowed_journals.ids for j in self):
|
|
return None
|
|
|
|
return super(AccountJournal, self)._check_access(operation)
|
|
|
|
def check_access_rule(self, operation):
|
|
"""
|
|
Overridden to bypass multi-company record rules for allowed journals,
|
|
and enforce the allowed journals restriction for write, create, and delete operations.
|
|
"""
|
|
if self.env.su:
|
|
return super(AccountJournal, self).check_access_rule(operation)
|
|
|
|
user = self.env.user
|
|
allowed_journals = user.sudo().allowed_journal_ids
|
|
if allowed_journals:
|
|
# Enforce restriction for write, create, and delete operations.
|
|
# Read operations are allowed to prevent access errors when loading
|
|
# documents (payments/moves) referencing other journals.
|
|
if operation in ('write', 'create', 'unlink'):
|
|
if not all(j.id in allowed_journals.ids for j in self):
|
|
raise AccessError(_("You do not have access to this journal."))
|
|
|
|
if operation == 'read':
|
|
return
|
|
|
|
# If all records in self are allowed, bypass standard record rules.
|
|
if all(j.id in allowed_journals.ids for j in self):
|
|
return
|
|
|
|
return super(AccountJournal, self).check_access_rule(operation)
|
|
|
|
def write(self, vals):
|
|
"""
|
|
Restrict write access to allowed journals only.
|
|
"""
|
|
if not self.env.su:
|
|
user = self.env.user
|
|
if user.sudo().allowed_journal_ids:
|
|
bypass = (
|
|
self.env.context.get('bypass_allowed_journal') or
|
|
self.env.context.get('pos_session_id') or
|
|
self.env.context.get('pos_config_id')
|
|
)
|
|
if not bypass:
|
|
allowed_ids = user.sudo().allowed_journal_ids.ids
|
|
unallowed = self.filtered(lambda j: j.id not in allowed_ids)
|
|
if unallowed:
|
|
raise UserError(_("You are not allowed to modify the following journal(s): %s") % ", ".join(unallowed.mapped('name')))
|
|
return super(AccountJournal, self).write(vals)
|
|
|
|
def unlink(self):
|
|
"""
|
|
Restrict delete access to allowed journals only.
|
|
"""
|
|
if not self.env.su:
|
|
user = self.env.user
|
|
if user.sudo().allowed_journal_ids:
|
|
bypass = (
|
|
self.env.context.get('bypass_allowed_journal') or
|
|
self.env.context.get('pos_session_id') or
|
|
self.env.context.get('pos_config_id')
|
|
)
|
|
if not bypass:
|
|
allowed_ids = user.sudo().allowed_journal_ids.ids
|
|
unallowed = self.filtered(lambda j: j.id not in allowed_ids)
|
|
if unallowed:
|
|
raise UserError(_("You are not allowed to delete the following journal(s): %s") % ", ".join(unallowed.mapped('name')))
|
|
return super(AccountJournal, self).unlink()
|