account_allowed_journal/models/account_journal.py

101 lines
4.7 KiB
Python

# -*- coding: utf-8 -*-
from odoo import api, fields, models, _
from odoo.exceptions import UserError, AccessError
class AccountJournal(models.Model):
_inherit = 'account.journal'
@api.model
def _search(self, domain, offset=0, limit=None, order=None, **kwargs):
"""
Override _search to filter journals based on the allowed_journal_ids configuration on res.users.
This provides a secure, recursive-safe, and context-aware filtering mechanism.
"""
user = self.env.user
# Determine if we should bypass the journal visibility restrictions:
# 1. Superuser / system context (env.su) is always bypassed.
# 2. Skip if user has no allowed journals configured (empty means access to all).
# 3. Explicit bypass requested in the context (bypass_allowed_journal).
# 4. Point of Sale contexts:
# - pos_session_id or pos_config_id is present.
# - pos_last_server_date is present (POS frontend loading data).
# 5. During standard module installations/upgrades (install_mode).
if not self.env.su and user.sudo().allowed_journal_ids:
bypass = (
self.env.context.get('bypass_allowed_journal') or
self.env.context.get('pos_session_id') or
self.env.context.get('pos_config_id') or
'pos_last_server_date' in self.env.context or
self.env.context.get('install_mode')
)
if not bypass:
allowed_ids = user.sudo().allowed_journal_ids.ids
domain = [('id', 'in', allowed_ids)] + list(domain)
# Run the search as sudo to bypass standard multi-company rules
return self.sudo()._search(domain, offset=offset, limit=limit, order=order, **kwargs)
return super(AccountJournal, self)._search(domain, offset=offset, limit=limit, order=order, **kwargs)
def check_access_rule(self, operation):
"""
Overridden to bypass multi-company record rules for allowed journals,
and enforce the allowed journals restriction for write, create, and delete operations.
"""
if self.env.su:
return super(AccountJournal, self).check_access_rule(operation)
user = self.env.user
allowed_journals = user.sudo().allowed_journal_ids
if allowed_journals:
# Enforce restriction for write, create, and delete operations.
# Read operations are allowed to prevent access errors when loading
# documents (payments/moves) referencing other journals.
if operation in ('write', 'create', 'unlink'):
if not all(j.id in allowed_journals.ids for j in self):
raise AccessError(_("You do not have access to this journal."))
# If all records in self are allowed, bypass standard record rules.
if all(j.id in allowed_journals.ids for j in self):
return
return super(AccountJournal, self).check_access_rule(operation)
def write(self, vals):
"""
Restrict write access to allowed journals only.
"""
if not self.env.su:
user = self.env.user
if user.sudo().allowed_journal_ids:
bypass = (
self.env.context.get('bypass_allowed_journal') or
self.env.context.get('pos_session_id') or
self.env.context.get('pos_config_id')
)
if not bypass:
allowed_ids = user.sudo().allowed_journal_ids.ids
unallowed = self.filtered(lambda j: j.id not in allowed_ids)
if unallowed:
raise UserError(_("You are not allowed to modify the following journal(s): %s") % ", ".join(unallowed.mapped('name')))
return super(AccountJournal, self).write(vals)
def unlink(self):
"""
Restrict delete access to allowed journals only.
"""
if not self.env.su:
user = self.env.user
if user.sudo().allowed_journal_ids:
bypass = (
self.env.context.get('bypass_allowed_journal') or
self.env.context.get('pos_session_id') or
self.env.context.get('pos_config_id')
)
if not bypass:
allowed_ids = user.sudo().allowed_journal_ids.ids
unallowed = self.filtered(lambda j: j.id not in allowed_ids)
if unallowed:
raise UserError(_("You are not allowed to delete the following journal(s): %s") % ", ".join(unallowed.mapped('name')))
return super(AccountJournal, self).unlink()