Django_Basic_Manufacturing_3/venv/Lib/site-packages/modeltranslation/translator.py
2025-08-22 17:05:22 +07:00

751 lines
29 KiB
Python

from __future__ import annotations
from textwrap import dedent
from copy import deepcopy
from functools import partial
from typing import Any, Callable, ClassVar, cast
from collections.abc import Collection, Iterable, Sequence
from django.core.exceptions import ImproperlyConfigured
from django.db.models import (
Field,
ForeignKey,
Manager,
ManyToManyField,
Model,
OneToOneField,
options,
)
from django.db.models import UniqueConstraint
from django.db.models.base import ModelBase
from django.db.models.signals import post_init
from django.utils.functional import cached_property
from modeltranslation.utils import get_translation_fields
from modeltranslation import settings as mt_settings
from modeltranslation.fields import (
NONE,
TranslatedManyToManyDescriptor,
TranslatedRelationIdDescriptor,
TranslationFieldDescriptor,
TranslationField,
create_translation_field,
)
from modeltranslation.manager import (
MultilingualManager,
MultilingualQuerysetManager,
rewrite_lookup_key,
)
from modeltranslation.thread_context import auto_populate_mode
from modeltranslation.utils import (
build_localized_fieldname,
parse_field,
get_language,
)
# Re-export the decorator for convenience
from modeltranslation.decorators import register
from ._compat import is_hidden, build_refresh_from_db
from ._typing import _ListOrTuple
__all__ = [
"AlreadyRegistered",
"DescendantRegistered",
"NotRegistered",
"TranslationOptions",
"Translator",
"register",
"translator",
]
class AlreadyRegistered(Exception):
pass
class NotRegistered(Exception):
pass
class DescendantRegistered(Exception):
pass
class FieldsAggregationMetaClass(type):
"""
Metaclass to handle custom inheritance of fields between classes.
"""
fields: Sequence[str]
def __new__(cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any]) -> type:
fields = attrs.get("fields", ())
if isinstance(fields, str):
raise ImproperlyConfigured(
dedent(f"""`fields` property should be list or tuple, received `str`.
Hint: Replace with:
fields = ("{fields}", )
""")
)
else:
attrs["fields"] = set(fields)
for base in bases:
if isinstance(base, FieldsAggregationMetaClass):
attrs["fields"].update(base.fields)
attrs["fields"] = tuple(attrs["fields"])
return super().__new__(cls, name, bases, attrs)
class TranslationOptions(metaclass=FieldsAggregationMetaClass):
"""
Translatable fields are declared by registering a model using
``TranslationOptions`` class with appropriate ``fields`` attribute.
Model-specific fallback values and languages can also be given as class
attributes.
Options instances hold info about translatable fields for a model and its
superclasses. The ``local_fields`` and ``all_fields`` attributes are mappings
from fields to sets of their translation fields; ``local_fields`` contains
only those fields that are handled in the model's database table (those
inherited from abstract superclasses, unless there is a concrete superclass
in between in the inheritance chain), while ``all_fields`` also includes fields
inherited from concrete supermodels (giving all translated fields available
on a model).
``related`` attribute inform whether this model is related part of some relation
with translated model. This model may be not translated itself.
``related_fields`` contains names of reverse lookup fields.
"""
required_languages: ClassVar[_ListOrTuple[str] | dict[str, _ListOrTuple[str]]] = (
mt_settings.REQUIRED_LANGUAGES
)
def __init__(self, model: type[Model]) -> None:
"""
Create fields dicts without any translation fields.
"""
self.model = model
self.registered = False
self.related = False
self.local_fields: dict[str, set[TranslationField]] = {f: set() for f in self.fields}
self.all_fields: dict[str, set[TranslationField]] = {f: set() for f in self.fields}
self.related_fields: list[str] = []
def validate(self) -> None:
"""
Perform options validation.
"""
# TODO: at the moment only required_languages is validated.
# Maybe check other options as well?
if value := self.required_languages:
if isinstance(value, (tuple, list)):
self._check_languages(value)
else:
self._check_languages(value.keys(), extra=("default",))
for fieldnames in value.values():
if any(f not in self.all_fields for f in fieldnames):
raise ImproperlyConfigured(
"Fieldname in required_languages which is not in fields option."
)
def _check_languages(
self,
languages: Collection[str],
extra: tuple[str, ...] = (),
) -> None:
correct = list(mt_settings.AVAILABLE_LANGUAGES) + list(extra)
if any(lang not in correct for lang in languages):
raise ImproperlyConfigured(
"Language in required_languages which is not in AVAILABLE_LANGUAGES."
)
def update(self, other: TranslationOptions):
"""
Update with options from a superclass.
"""
if other.model._meta.abstract:
self.local_fields.update(other.local_fields)
self.all_fields.update(other.all_fields)
def add_translation_field(self, field: str, translation_field):
"""
Add a new translation field to both fields dicts.
"""
self.local_fields[field].add(translation_field)
self.all_fields[field].add(translation_field)
def get_field_names(self) -> list[str]:
"""
Return name of all fields that can be used in filtering.
"""
return list(self.all_fields.keys()) + self.related_fields
def __str__(self) -> str:
local = tuple(self.local_fields.keys())
inherited = tuple(set(self.all_fields.keys()) - set(local))
return "%s: %s + %s" % (self.__class__.__name__, local, inherited)
class MultilingualOptions(options.Options):
@cached_property
def base_manager(self):
manager = super().base_manager
patch_manager_class(manager)
return manager
def add_translation_fields(model: type[Model], opts: TranslationOptions) -> None:
"""
Monkey patches the original model class to provide additional fields for
every language.
Adds newly created translation fields to the given translation options.
"""
model_empty_values = getattr(opts, "empty_values", NONE)
for field_name in opts.local_fields.keys():
field_empty_value = parse_field(model_empty_values, field_name, NONE)
for lang in mt_settings.AVAILABLE_LANGUAGES:
# Create a dynamic translation field
translation_field = create_translation_field(
model=model, field_name=field_name, lang=lang, empty_value=field_empty_value
)
# Construct the name for the localized field
localized_field_name = build_localized_fieldname(field_name, lang)
# Check if the model already has a field by that name
if hasattr(model, localized_field_name):
# Check if are not dealing with abstract field inherited.
for cls in model.__mro__:
if hasattr(cls, "_meta") and cls.__dict__.get(localized_field_name, None):
cls_opts = translator._get_options_for_model(cls)
if not cls._meta.abstract or field_name not in cls_opts.local_fields:
raise ValueError(
"Error adding translation field. Model '%s' already"
" contains a field named '%s'."
% (model._meta.object_name, localized_field_name)
)
# This approach implements the translation fields as full valid
# django model fields and therefore adds them via add_to_class
model.add_to_class(localized_field_name, translation_field)
opts.add_translation_field(field_name, translation_field)
# Rebuild information about parents fields. If there are opts.local_fields, field cache would be
# invalidated (by model._meta.add_field() function). Otherwise, we need to do it manually.
if len(opts.local_fields) == 0:
model._meta._expire_cache()
model._meta.get_fields()
def patch_manager_class(manager):
if isinstance(manager, MultilingualManager):
return
if manager.__class__ is Manager:
manager.__class__ = MultilingualManager
else:
class NewMultilingualManager(
MultilingualManager, manager.__class__, MultilingualQuerysetManager
):
_old_module = manager.__module__
_old_class = manager.__class__.__name__
def deconstruct(self):
return (
False, # as_manager
"%s.%s" % (self._old_module, self._old_class), # manager_class
None, # qs_class
self._constructor_args[0], # args
self._constructor_args[1], # kwargs
)
def __hash__(self):
return id(self)
def __eq__(self, other):
if isinstance(other, NewMultilingualManager):
return (
self._old_module == other._old_module
and self._old_class == other._old_class
)
if hasattr(other, "__module__") and hasattr(other, "__class__"):
return (
self._old_module == other.__module__
and self._old_class == other.__class__.__name__
)
return False
manager.__class__ = NewMultilingualManager
def add_manager(model: type[Model]) -> None:
"""
Monkey patches the original model to use MultilingualManager instead of
default managers (not only ``objects``, but also every manager defined and inherited).
Custom managers are merged with MultilingualManager.
"""
if model._meta.abstract:
return
# Make all managers local for this model to fix patching parent model managers
added = set(model._meta.managers) - set(model._meta.local_managers)
model._meta.local_managers = model._meta.managers # type: ignore[assignment]
for current_manager in model._meta.local_managers:
prev_class = current_manager.__class__
patch_manager_class(current_manager)
if current_manager in added:
# Since default_manager is fetched by order of creation, any manager
# moved from parent class to child class needs to receive a new creation_counter
# in order to be ordered after the original local managers
current_manager._set_creation_counter()
if model._default_manager.__class__ is prev_class:
# Normally model._default_manager is a reference to one of model's managers
# (and would be patched by the way).
# However, in some rare situations (mostly proxy models)
# model._default_manager is not the same instance as one of managers, but it
# share the same class.
model._default_manager.__class__ = current_manager.__class__
model._meta.__class__ = MultilingualOptions
model._meta._expire_cache()
def patch_constructor(model: type[Model]) -> None:
"""
Monkey patches the original model to rewrite fields names in __init__
"""
old_init = model.__init__
def new_init(self, *args, **kwargs):
self._mt_init = True
populate_translation_fields(self.__class__, kwargs)
for key, val in list(kwargs.items()):
new_key = rewrite_lookup_key(model, key)
# Old key is intentionally left in case old_init wants to play with it
kwargs.setdefault(new_key, val)
old_init(self, *args, **kwargs)
model.__init__ = new_init
def patch_constraints(model: type[Model], opts: TranslationOptions) -> None:
def add_unique_together():
for constraint in model._meta.unique_together:
for field_name in opts.fields:
if field_name in constraint:
for translated_name in get_translation_fields(field_name):
new_constraint = list(constraint)
new_constraint[constraint.index(field_name)] = translated_name
yield new_constraint
def add_constraints():
for c in model._meta.constraints:
if isinstance(c, UniqueConstraint):
for field_name in opts.fields:
if field_name in c.fields:
for translated_name in get_translation_fields(field_name):
new_constraint = deepcopy(c)
new_fields = list(new_constraint.fields)
new_fields[new_fields.index(field_name)] = translated_name
new_constraint.name += f"-{translated_name}"
new_constraint.fields = new_fields
yield new_constraint
model._meta.unique_together += tuple(add_unique_together()) # type: ignore[operator]
model._meta.constraints += tuple(add_constraints())
def delete_mt_init(sender: type[Model], instance: Model, **kwargs: Any) -> None:
if hasattr(instance, "_mt_init"):
del instance._mt_init
def patch_clean_fields(model: type[Model]):
"""
Patch clean_fields method to handle different form types submission.
"""
old_clean_fields = model.clean_fields
def new_clean_fields(self, exclude: Collection[str] | None = None) -> None:
if hasattr(self, "_mt_form_pending_clear") and exclude is not None:
# Some form translation fields has been marked as clearing value.
# Check if corresponding translated field was also saved (not excluded):
# - if yes, it seems like form for MT-unaware app. Ignore clearing (left value from
# translated field unchanged), as if field was omitted from form
# - if no, then proceed as normally: clear the field
for field_name, value in self._mt_form_pending_clear.items():
field = self._meta.get_field(field_name)
orig_field_name = field.translated_field.name
if orig_field_name in exclude:
field.save_form_data(self, value, check=False)
delattr(self, "_mt_form_pending_clear")
try:
setattr(self, "_mt_disable", True)
old_clean_fields(self, exclude)
finally:
setattr(self, "_mt_disable", False)
model.clean_fields = new_clean_fields
def patch_get_deferred_fields(model: type[Model]) -> None:
"""
Django >= 1.8: patch detecting deferred fields. Crucial for only/defer to work.
"""
if not hasattr(model, "get_deferred_fields"):
return
old_get_deferred_fields = model.get_deferred_fields
def new_get_deferred_fields(self) -> set[str]:
sup = old_get_deferred_fields(self)
if hasattr(self, "_fields_were_deferred"):
sup.update(self._fields_were_deferred)
return sup
model.get_deferred_fields = new_get_deferred_fields
def patch_refresh_from_db(model: type[Model]) -> None:
"""
Django >= 1.10: patch refreshing deferred fields. Crucial for only/defer to work.
"""
if not hasattr(model, "refresh_from_db"):
return
model.refresh_from_db = build_refresh_from_db(model.refresh_from_db)
def delete_cache_fields(model: type[Model]) -> None:
opts = model._meta
cached_attrs = (
"_field_cache",
"_field_name_cache",
"_name_map",
"fields",
"concrete_fields",
"local_concrete_fields",
)
for attr in cached_attrs:
try:
delattr(opts, attr)
except AttributeError:
pass
if hasattr(model._meta, "_expire_cache"):
model._meta._expire_cache()
def populate_translation_fields(sender: type[Model], kwargs: Any):
"""
When models are created or loaded from fixtures, replicates values
provided for translatable fields to some / all empty translation fields,
according to the current population mode.
Population is performed only on keys (field names) present in kwargs.
Nothing is returned, but passed kwargs dictionary is altered.
With ``mode`` set to:
-- ``all``: fills all translation fields, skipping just those for
which a translated value is also provided;
-- ``default``: fills only the default translation (unless it is
additionally provided);
-- ``required``: like ``default``, but only if the original field is
non-nullable;
At least the ``required`` mode should be used when loading untranslated
fixtures to keep the database consistent (note that Django management
commands are normally forced to run with hardcoded ``en-us`` language
active). The ``default`` mode is useful if you need to ensure fallback
values are available, and ``all`` if you need to have all translations
defined (for example to make lookups / filtering without resorting to
query fallbacks).
"""
populate = auto_populate_mode()
if not populate:
return
if populate is True:
# What was meant by ``True`` is now called ``all``.
populate = "all"
opts = translator.get_options_for_model(sender)
for key, val in list(kwargs.items()):
if key in opts.all_fields:
if populate == "all":
# Set the value for every language.
for translation_field in opts.all_fields[key]:
kwargs.setdefault(translation_field.name, val)
elif populate == "default":
default = build_localized_fieldname(key, mt_settings.DEFAULT_LANGUAGE)
kwargs.setdefault(default, val)
elif populate == "required":
default = build_localized_fieldname(key, mt_settings.DEFAULT_LANGUAGE)
if not sender._meta.get_field(key).null: # type: ignore[union-attr]
kwargs.setdefault(default, val)
else:
raise AttributeError("Unknown population mode '%s'." % populate)
def patch_related_object_descriptor_caching(ro_descriptor):
"""
Patch SingleRelatedObjectDescriptor or ReverseSingleRelatedObjectDescriptor to use
language-aware caching.
"""
class NewRelated(ro_descriptor.related.__class__):
def get_cache_name(self) -> str:
"""
Used in django > 2.x
"""
return self.cache_name
@property
def cache_name(self):
"""
Used in django >= 5.1
"""
return build_localized_fieldname(self.get_accessor_name(), get_language())
ro_descriptor.related.__class__ = NewRelated
class Translator:
"""
A Translator object encapsulates an instance of a translator. Models are
registered with the Translator using the register() method.
"""
def __init__(self) -> None:
# All seen models (model class -> ``TranslationOptions`` instance).
self._registry: dict[type[Model], TranslationOptions] = {}
# List of funcs to execute after all imports are done.
self._lazy_operations: list[Callable[..., Any]] = []
def register(
self,
model_or_iterable: type[Model] | Iterable[type[Model]],
opts_class: type[TranslationOptions] | None = None,
**options: Any,
) -> None:
"""
Registers the given model(s) with the given translation options.
The model(s) should be Model classes, not instances.
Fields declared for translation on a base class are inherited by
subclasses. If the model or one of its subclasses is already
registered for translation, this will raise an exception.
"""
if isinstance(model_or_iterable, ModelBase):
model_or_iterable = [model_or_iterable]
for model in model_or_iterable:
# Ensure that a base is not registered after a subclass (_registry
# is closed with respect to taking bases, so we can just check if
# we've seen the model).
if model in self._registry:
if self._registry[model].registered:
raise AlreadyRegistered(
'Model "%s" is already registered for translation' % model.__name__
)
else:
descendants = [
d.__name__
for d in self._registry.keys()
if issubclass(d, model) and d != model
]
if descendants:
raise DescendantRegistered(
'Model "%s" cannot be registered after its subclass'
' "%s"' % (model.__name__, descendants[0])
)
# Find inherited fields and create options instance for the model.
opts = self._get_options_for_model(model, opts_class, **options)
# If an exception is raised during registration, mark model as not-registered
try:
self._register_single_model(model, opts)
except Exception:
self._registry[model].registered = False
raise
def _register_single_model(self, model: type[Model], opts: TranslationOptions) -> None:
# Now, when all fields are initialized and inherited, validate configuration.
opts.validate()
# Mark the object explicitly as registered -- registry caches
# options of all models, registered or not.
opts.registered = True
# Add translation fields to the model.
if model._meta.proxy:
delete_cache_fields(model)
else:
add_translation_fields(model, opts)
# Delete all fields cache for related model (parent and children)
related = (
f
for f in model._meta.get_fields()
if (f.one_to_many or f.one_to_one) and f.auto_created
)
for related_obj in related:
delete_cache_fields(related_obj.model)
# Set MultilingualManager
add_manager(model)
# Patch __init__ to rewrite fields
patch_constructor(model)
# Patch constraints to correctly handle new fields
patch_constraints(model, opts)
# Connect signal for model
post_init.connect(delete_mt_init, sender=model)
# Patch clean_fields to verify form field clearing
patch_clean_fields(model)
# Patch __metaclass__ and other methods to allow deferring to work
patch_get_deferred_fields(model)
patch_refresh_from_db(model)
# Substitute original field with descriptor
model_fallback_languages = getattr(opts, "fallback_languages", None)
model_fallback_values = getattr(opts, "fallback_values", NONE)
model_fallback_undefined = getattr(opts, "fallback_undefined", NONE)
for field_name in opts.local_fields.keys():
field = cast(Field, model._meta.get_field(field_name))
field_fallback_value = parse_field(model_fallback_values, field_name, NONE)
field_fallback_undefined = parse_field(model_fallback_undefined, field_name, NONE)
descriptor = TranslationFieldDescriptor(
field,
fallback_languages=model_fallback_languages,
fallback_value=field_fallback_value,
fallback_undefined=field_fallback_undefined,
)
setattr(model, field_name, descriptor)
if isinstance(field, (ForeignKey, ManyToManyField)):
# We need to use a special descriptor so that
# _id fields on translated ForeignKeys work
# as expected.
desc_class = (
TranslatedManyToManyDescriptor
if isinstance(field, ManyToManyField)
else TranslatedRelationIdDescriptor
)
desc = desc_class(field_name, model_fallback_languages)
setattr(model, field.get_attname(), desc)
# Set related field names on other model
if not is_hidden(field.remote_field):
other_opts = self._get_options_for_model(field.remote_field.model)
other_opts.related = True
other_opts.related_fields.append(field.related_query_name())
# Add manager in case of non-registered model
add_manager(field.remote_field.model)
if isinstance(field, OneToOneField):
# Fix translated_field caching for SingleRelatedObjectDescriptor
sro_descriptor = getattr(
field.remote_field.model,
field.remote_field.get_accessor_name(),
)
patch_related_object_descriptor_caching(sro_descriptor)
def unregister(self, model_or_iterable: type[Model] | Iterable[type[Model]]) -> None:
"""
Unregisters the given model(s).
If a model isn't registered, this will raise NotRegistered. If one of
its subclasses is registered, DescendantRegistered will be raised.
"""
if isinstance(model_or_iterable, ModelBase):
model_or_iterable = [model_or_iterable]
for model in model_or_iterable:
# Check if the model is actually registered (``get_options_for_model``
# throws an exception if it's not).
self.get_options_for_model(model)
# Invalidate all submodels options and forget about
# the model itself.
for desc, desc_opts in list(self._registry.items()):
if not issubclass(desc, model):
continue
if model != desc and desc_opts.registered:
# Allowing to unregister a base would necessitate
# repatching all submodels.
raise DescendantRegistered(
'You need to unregister descendant "%s" before'
' unregistering its base "%s"' % (desc.__name__, model.__name__)
)
del self._registry[desc]
def get_registered_models(self, abstract: bool = True) -> list[type[Model]]:
"""
Returns a list of all registered models, or just concrete
registered models.
"""
return [
model
for (model, opts) in self._registry.items()
if opts.registered and (not model._meta.abstract or abstract)
]
def _get_options_for_model(
self, model: type[Model], opts_class: type[TranslationOptions] | None = None, **options: Any
) -> TranslationOptions:
"""
Returns an instance of translation options with translated fields
defined for the ``model`` and inherited from superclasses.
"""
if model not in self._registry:
# Create a new type for backwards compatibility.
opts = type(
"%sTranslationOptions" % model.__name__,
(opts_class or TranslationOptions,),
options,
)(model)
# Fields for translation may be inherited from abstract
# superclasses, so we need to look at all parents.
for base in model.__bases__:
if not hasattr(base, "_meta"):
# Things without _meta aren't functional models, so they're
# uninteresting parents.
continue
opts.update(self._get_options_for_model(base))
# Cache options for all models -- we may want to compute options
# of registered subclasses of unregistered models.
self._registry[model] = opts
return self._registry[model]
def get_options_for_model(self, model: type[Model]) -> TranslationOptions:
"""
Thin wrapper around ``_get_options_for_model`` to preserve the
semantic of throwing exception for models not directly registered.
"""
opts = self._get_options_for_model(model)
if not opts.registered and not opts.related:
raise NotRegistered('The model "%s" is not registered for translation' % model.__name__)
return opts
def execute_lazy_operations(self) -> None:
while self._lazy_operations:
self._lazy_operations.pop(0)(translator=self)
def lazy_operation(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
self._lazy_operations.append(partial(func, *args, **kwargs))
# This global object represents the singleton translator object
translator = Translator()