Django_Basic_Manufacturing_3/venv/Lib/site-packages/axes/handlers/proxy.py
2025-08-22 17:05:22 +07:00

144 lines
5.0 KiB
Python

# pylint: disable=arguments-differ
# pylint generates false negatives from proxy class method overrides
from logging import getLogger
from typing import Optional
from django.utils.module_loading import import_string
from django.utils.timezone import now
from axes.conf import settings
from axes.handlers.base import AxesBaseHandler, AbstractAxesHandler, AxesHandler
from axes.helpers import (
get_client_ip_address,
get_client_user_agent,
get_client_path_info,
get_client_http_accept,
toggleable,
)
log = getLogger(__name__)
class AxesProxyHandler(AbstractAxesHandler, AxesBaseHandler):
"""
Proxy interface for configurable Axes signal handler class.
If you wish to implement a custom version of this handler,
you can override the settings.AXES_HANDLER configuration string
with a class that implements a compatible interface and methods.
Defaults to using axes.handlers.proxy.AxesProxyHandler if not overridden.
Refer to axes.handlers.proxy.AxesProxyHandler for default implementation.
"""
implementation = None # type: AxesHandler
@classmethod
def get_implementation(cls, force: bool = False) -> AxesHandler:
"""
Fetch and initialize configured handler implementation and memoize it to avoid reinitialization.
This method is re-entrant and can be called multiple times from e.g. Django application loader.
"""
if force or not cls.implementation:
cls.implementation = import_string(settings.AXES_HANDLER)()
return cls.implementation
@classmethod
def reset_attempts(
cls,
*,
ip_address: Optional[str] = None,
username: Optional[str] = None,
ip_or_username: bool = False,
) -> int:
return cls.get_implementation().reset_attempts(
ip_address=ip_address, username=username, ip_or_username=ip_or_username
)
@classmethod
def reset_logs(cls, *, age_days: Optional[int] = None) -> int:
return cls.get_implementation().reset_logs(age_days=age_days)
@classmethod
def reset_failure_logs(cls, *, age_days: Optional[int] = None) -> int:
return cls.get_implementation().reset_failure_logs(age_days=age_days)
@classmethod
def remove_out_of_limit_failure_logs(
cls, *, username: str, limit: Optional[int] = None
) -> int:
return cls.get_implementation().remove_out_of_limit_failure_logs(
username=username
)
@staticmethod
def update_request(request):
"""
Update request attributes before passing them into the selected handler class.
"""
if request is None:
log.error(
"AXES: AxesProxyHandler.update_request can not set request attributes to a None request"
)
return
if not hasattr(request, "axes_updated"):
if not hasattr(request, "axes_locked_out"):
request.axes_locked_out = False
request.axes_attempt_time = now()
request.axes_ip_address = get_client_ip_address(request)
request.axes_user_agent = get_client_user_agent(request)
request.axes_path_info = get_client_path_info(request)
request.axes_http_accept = get_client_http_accept(request)
request.axes_failures_since_start = None
request.axes_updated = True
request.axes_credentials = None
@classmethod
def is_locked(cls, request, credentials: Optional[dict] = None) -> bool:
cls.update_request(request)
return cls.get_implementation().is_locked(request, credentials)
@classmethod
def is_allowed(cls, request, credentials: Optional[dict] = None) -> bool:
cls.update_request(request)
return cls.get_implementation().is_allowed(request, credentials)
@classmethod
def get_failures(cls, request, credentials: Optional[dict] = None) -> int:
cls.update_request(request)
return cls.get_implementation().get_failures(request, credentials)
@classmethod
@toggleable
def user_login_failed(cls, sender, credentials: dict, request=None, **kwargs):
cls.update_request(request)
return cls.get_implementation().user_login_failed(
sender, credentials, request, **kwargs
)
@classmethod
@toggleable
def user_logged_in(cls, sender, request, user, **kwargs):
cls.update_request(request)
return cls.get_implementation().user_logged_in(sender, request, user, **kwargs)
@classmethod
@toggleable
def user_logged_out(cls, sender, request, user, **kwargs):
cls.update_request(request)
return cls.get_implementation().user_logged_out(sender, request, user, **kwargs)
@classmethod
@toggleable
def post_save_access_attempt(cls, instance, **kwargs):
return cls.get_implementation().post_save_access_attempt(instance, **kwargs)
@classmethod
@toggleable
def post_delete_access_attempt(cls, instance, **kwargs):
return cls.get_implementation().post_delete_access_attempt(instance, **kwargs)