# -*- coding: utf-8 -*- from odoo import api, fields, models, _ from odoo.exceptions import UserError, AccessError class AccountJournal(models.Model): _inherit = 'account.journal' @api.model def _search(self, domain, offset=0, limit=None, order=None, **kwargs): """ Override _search to filter journals based on the allowed_journal_ids configuration on res.users. This provides a secure, recursive-safe, and context-aware filtering mechanism. """ user = self.env.user # Determine if we should bypass the journal visibility restrictions: # 1. Superuser / system context (env.su) is always bypassed. # 2. Skip if user has no allowed journals configured (empty means access to all). # 3. Explicit bypass requested in the context (bypass_allowed_journal). # 4. Point of Sale contexts: # - pos_session_id or pos_config_id is present. # - pos_last_server_date is present (POS frontend loading data). # 5. During standard module installations/upgrades (install_mode). if not self.env.su and user.sudo().allowed_journal_ids: bypass = ( self.env.context.get('bypass_allowed_journal') or self.env.context.get('pos_session_id') or self.env.context.get('pos_config_id') or 'pos_last_server_date' in self.env.context or self.env.context.get('install_mode') ) if not bypass: allowed_ids = list(user.sudo().allowed_journal_ids.ids) # When Odoo's fetch() verifies a record it just loaded, it re-runs # _search with [('id', 'in', [X])]. The record was already cleared # by _check_access/check_access_rule, so we must honour the explicit # IDs present in the domain and add them to the allow-list so they # survive the sudo search. for leaf in domain: if ( isinstance(leaf, (list, tuple)) and len(leaf) == 3 and leaf[0] == 'id' and leaf[1] in ('=', 'in') ): value = leaf[2] if isinstance(value, (list, tuple, set)): allowed_ids.extend(value) elif isinstance(value, int): allowed_ids.append(value) domain = [('id', 'in', allowed_ids)] + list(domain) # Run the search as sudo to bypass standard multi-company rules return self.sudo()._search(domain, offset=offset, limit=limit, order=order, **kwargs) return super(AccountJournal, self)._search(domain, offset=offset, limit=limit, order=order, **kwargs) def _check_access(self, operation): """ Overridden to bypass multi-company record rules for allowed journals, and enforce the allowed journals restriction for write, create, and delete operations. """ if self.env.su: return super(AccountJournal, self)._check_access(operation) user = self.env.user allowed_journals = user.sudo().allowed_journal_ids if allowed_journals: # Enforce restriction for write, create, and delete operations. if operation in ('write', 'create', 'unlink'): forbidden = self.filtered(lambda j: j.id not in allowed_journals.ids) if forbidden: import functools return forbidden, functools.partial(AccessError, _("You do not have access to this journal.")) # Read operations are allowed to prevent access errors when loading # documents (payments/moves) referencing other journals. if operation == 'read': return None # If all records in self are allowed, bypass standard record rules. if all(j.id in allowed_journals.ids for j in self): return None return super(AccountJournal, self)._check_access(operation) def check_access_rule(self, operation): """ Overridden to bypass multi-company record rules for allowed journals, and enforce the allowed journals restriction for write, create, and delete operations. """ if self.env.su: return super(AccountJournal, self).check_access_rule(operation) user = self.env.user allowed_journals = user.sudo().allowed_journal_ids if allowed_journals: # Enforce restriction for write, create, and delete operations. # Read operations are allowed to prevent access errors when loading # documents (payments/moves) referencing other journals. if operation in ('write', 'create', 'unlink'): if not all(j.id in allowed_journals.ids for j in self): raise AccessError(_("You do not have access to this journal.")) if operation == 'read': return # If all records in self are allowed, bypass standard record rules. if all(j.id in allowed_journals.ids for j in self): return return super(AccountJournal, self).check_access_rule(operation) def write(self, vals): """ Restrict write access to allowed journals only. """ if not self.env.su: user = self.env.user if user.sudo().allowed_journal_ids: bypass = ( self.env.context.get('bypass_allowed_journal') or self.env.context.get('pos_session_id') or self.env.context.get('pos_config_id') ) if not bypass: allowed_ids = user.sudo().allowed_journal_ids.ids unallowed = self.filtered(lambda j: j.id not in allowed_ids) if unallowed: raise UserError(_("You are not allowed to modify the following journal(s): %s") % ", ".join(unallowed.mapped('name'))) return super(AccountJournal, self).write(vals) def unlink(self): """ Restrict delete access to allowed journals only. """ if not self.env.su: user = self.env.user if user.sudo().allowed_journal_ids: bypass = ( self.env.context.get('bypass_allowed_journal') or self.env.context.get('pos_session_id') or self.env.context.get('pos_config_id') ) if not bypass: allowed_ids = user.sudo().allowed_journal_ids.ids unallowed = self.filtered(lambda j: j.id not in allowed_ids) if unallowed: raise UserError(_("You are not allowed to delete the following journal(s): %s") % ", ".join(unallowed.mapped('name'))) return super(AccountJournal, self).unlink()