252 lines
8.7 KiB
Python
252 lines
8.7 KiB
Python
import re
|
|
from abc import ABC, abstractmethod
|
|
from typing import Optional
|
|
from warnings import warn
|
|
|
|
from django.urls import reverse
|
|
from django.urls.exceptions import NoReverseMatch
|
|
|
|
from axes.conf import settings
|
|
from axes.helpers import (
|
|
get_failure_limit,
|
|
is_client_ip_address_blacklisted,
|
|
is_client_ip_address_whitelisted,
|
|
is_client_method_whitelisted,
|
|
is_user_attempt_whitelisted,
|
|
)
|
|
|
|
|
|
class AbstractAxesHandler(ABC):
|
|
"""
|
|
Contract that all handlers need to follow
|
|
"""
|
|
|
|
@abstractmethod
|
|
def user_login_failed(self, sender, credentials: dict, request=None, **kwargs):
|
|
"""
|
|
Handles the Django ``django.contrib.auth.signals.user_login_failed`` authentication signal.
|
|
"""
|
|
raise NotImplementedError("user_login_failed should be implemented")
|
|
|
|
@abstractmethod
|
|
def user_logged_in(self, sender, request, user, **kwargs):
|
|
"""
|
|
Handles the Django ``django.contrib.auth.signals.user_logged_in`` authentication signal.
|
|
"""
|
|
raise NotImplementedError("user_logged_in should be implemented")
|
|
|
|
@abstractmethod
|
|
def user_logged_out(self, sender, request, user, **kwargs):
|
|
"""
|
|
Handles the Django ``django.contrib.auth.signals.user_logged_out`` authentication signal.
|
|
"""
|
|
raise NotImplementedError("user_logged_out should be implemented")
|
|
|
|
@abstractmethod
|
|
def get_failures(self, request, credentials: Optional[dict] = None) -> int:
|
|
"""
|
|
Checks the number of failures associated to the given request and credentials.
|
|
|
|
This is a virtual method that needs an implementation in the handler subclass
|
|
if the ``settings.AXES_LOCK_OUT_AT_FAILURE`` flag is set to ``True``.
|
|
"""
|
|
raise NotImplementedError("get_failures should be implemented")
|
|
|
|
|
|
class AxesBaseHandler: # pylint: disable=unused-argument
|
|
"""
|
|
Handler API definition for implementations that are used by the ``AxesProxyHandler``.
|
|
|
|
If you wish to specialize your own handler class, override the necessary methods
|
|
and configure the class for use by setting ``settings.AXES_HANDLER = 'module.path.to.YourClass'``.
|
|
Make sure that new the handler is compliant with AbstractAxesHandler and make sure it extends from this mixin.
|
|
Refer to `AxesHandler` for an example.
|
|
|
|
The default implementation that is actually used by Axes is ``axes.handlers.database.AxesDatabaseHandler``.
|
|
|
|
.. note:: This is a virtual class and **can not be used without specialization**.
|
|
"""
|
|
|
|
def is_allowed(self, request, credentials: Optional[dict] = None) -> bool:
|
|
"""
|
|
Checks if the user is allowed to access or use given functionality such as a login view or authentication.
|
|
|
|
This method is abstract and other backends can specialize it as needed, but the default implementation
|
|
checks if the user has attempted to authenticate into the site too many times through the
|
|
Django authentication backends and returns ``False`` if user exceeds the configured Axes thresholds.
|
|
|
|
This checker can implement arbitrary checks such as IP whitelisting or blacklisting,
|
|
request frequency checking, failed attempt monitoring or similar functions.
|
|
|
|
Please refer to the ``axes.handlers.database.AxesDatabaseHandler`` for the default implementation
|
|
and inspiration on some common checks and access restrictions before writing your own implementation.
|
|
"""
|
|
|
|
if settings.AXES_ONLY_ADMIN_SITE and not self.is_admin_request(request):
|
|
return True
|
|
|
|
if self.is_blacklisted(request, credentials):
|
|
return False
|
|
|
|
if self.is_whitelisted(request, credentials):
|
|
return True
|
|
|
|
if self.is_locked(request, credentials):
|
|
return False
|
|
|
|
return True
|
|
|
|
def is_blacklisted(self, request, credentials: Optional[dict] = None) -> bool:
|
|
"""
|
|
Checks if the request or given credentials are blacklisted from access.
|
|
"""
|
|
|
|
if is_client_ip_address_blacklisted(request):
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_whitelisted(self, request, credentials: Optional[dict] = None) -> bool:
|
|
"""
|
|
Checks if the request or given credentials are whitelisted for access.
|
|
"""
|
|
|
|
if is_user_attempt_whitelisted(request, credentials):
|
|
return True
|
|
|
|
if is_client_ip_address_whitelisted(request):
|
|
return True
|
|
|
|
if is_client_method_whitelisted(request):
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_locked(self, request, credentials: Optional[dict] = None) -> bool:
|
|
"""
|
|
Checks if the request or given credentials are locked.
|
|
"""
|
|
|
|
if settings.AXES_LOCK_OUT_AT_FAILURE:
|
|
# get_failures will have to be implemented by each specialized handler
|
|
return self.get_failures( # type: ignore
|
|
request, credentials
|
|
) >= get_failure_limit(request, credentials)
|
|
|
|
return False
|
|
|
|
def get_admin_url(self) -> Optional[str]:
|
|
"""
|
|
Returns admin url if exists, otherwise returns None
|
|
"""
|
|
try:
|
|
return reverse("admin:index")
|
|
except NoReverseMatch:
|
|
return None
|
|
|
|
def is_admin_request(self, request) -> bool:
|
|
"""
|
|
Checks that request located under admin site
|
|
"""
|
|
if hasattr(request, "path"):
|
|
admin_url = self.get_admin_url()
|
|
return (
|
|
admin_url is not None
|
|
and re.match(f"^{admin_url}", request.path) is not None
|
|
)
|
|
|
|
return False
|
|
|
|
def is_admin_site(self, request) -> bool:
|
|
"""
|
|
Checks if the request is NOT for admin site
|
|
if `settings.AXES_ONLY_ADMIN_SITE` is True.
|
|
"""
|
|
warn(
|
|
(
|
|
"This method is deprecated and will be removed in future versions. "
|
|
"If you looking for method that checks if `request.path` located under "
|
|
"admin site, use `is_admin_request` instead."
|
|
),
|
|
DeprecationWarning,
|
|
)
|
|
if settings.AXES_ONLY_ADMIN_SITE and hasattr(request, "path"):
|
|
try:
|
|
admin_url = reverse("admin:index")
|
|
except NoReverseMatch:
|
|
return True
|
|
return not re.match(f"^{admin_url}", request.path)
|
|
|
|
return False
|
|
|
|
def reset_attempts(
|
|
self,
|
|
*,
|
|
ip_address: Optional[str] = None,
|
|
username: Optional[str] = None,
|
|
ip_or_username: bool = False,
|
|
) -> int:
|
|
"""
|
|
Resets access attempts that match the given IP address or username.
|
|
|
|
This method makes more sense for the DB backend, but as it is used by the ProxyHandler
|
|
(via inherent), it needs to be defined here, so we get compliant with all proxy methods.
|
|
|
|
Please overwrite it on each specialized handler as needed.
|
|
"""
|
|
return 0
|
|
|
|
def reset_logs(self, *, age_days: Optional[int] = None) -> int:
|
|
"""
|
|
Resets access logs that are older than given number of days.
|
|
|
|
This method makes more sense for the DB backend, but as it is used by the ProxyHandler
|
|
(via inherent), it needs to be defined here, so we get compliant with all proxy methods.
|
|
|
|
Please overwrite it on each specialized handler as needed.
|
|
"""
|
|
return 0
|
|
|
|
def reset_failure_logs(self, *, age_days: Optional[int] = None) -> int:
|
|
"""
|
|
Resets access failure logs that are older than given number of days.
|
|
|
|
This method makes more sense for the DB backend, but as it is used by the ProxyHandler
|
|
(via inherent), it needs to be defined here, so we get compliant with all proxy methods.
|
|
|
|
Please overwrite it on each specialized handler as needed.
|
|
"""
|
|
return 0
|
|
|
|
def remove_out_of_limit_failure_logs(
|
|
self, *, username: str, limit: Optional[int] = None
|
|
) -> int:
|
|
"""Remove access failure logs that are over
|
|
AXES_ACCESS_FAILURE_LOG_PER_USER_LIMIT for user username.
|
|
|
|
This method makes more sense for the DB backend, but as it is used by the ProxyHandler
|
|
(via inherent), it needs to be defined here, so we get compliant with all proxy methods.
|
|
|
|
Please overwrite it on each specialized handler as needed.
|
|
|
|
"""
|
|
return 0
|
|
|
|
|
|
class AxesHandler(AbstractAxesHandler, AxesBaseHandler):
|
|
"""
|
|
Signal bare handler implementation without any storage backend.
|
|
"""
|
|
|
|
def user_login_failed(self, sender, credentials: dict, request=None, **kwargs):
|
|
pass
|
|
|
|
def user_logged_in(self, sender, request, user, **kwargs):
|
|
pass
|
|
|
|
def user_logged_out(self, sender, request, user, **kwargs):
|
|
pass
|
|
|
|
def get_failures(self, request, credentials: Optional[dict] = None) -> int:
|
|
return 0
|