From b9d635f5115d0a5a3da3909cb45d785f401e1652 Mon Sep 17 00:00:00 2001 From: FrankApiyo Date: Thu, 19 Sep 2024 12:21:42 +0300 Subject: [PATCH] Add ability to check if multiple fields are unique --- README.md | 2 +- oidc/settings.py | 2 +- oidc/viewsets.py | 40 +++++++++++---------- tests/test_viewsets.py | 81 +++++++++++++++++++++++++++++++++++++++++- 4 files changed, 104 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 6454932..fe89aa0 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ OPENID_CONNECT_VIEWSET_CONFIG = { "preferred_username": "username", }, "SPLIT_NAME_CLAIM": False, # Whether to split the `name` claim into first_name & last_name if present - "USER_UNIQUE_FILTER_FIELD": "username", + "USER_UNIQUE_FILTER_FIELDS": ["username", "email"], "USE_SSO_COOKIE": True, "SSO_COOKIE_DATA": "email", "SSO_COOKIE_MAX_AGE": None, diff --git a/oidc/settings.py b/oidc/settings.py index fcf24e0..e72eeaf 100644 --- a/oidc/settings.py +++ b/oidc/settings.py @@ -13,7 +13,7 @@ }, "SPLIT_NAME_CLAIM": False, "USE_EMAIL_USERNAME": False, - "USER_UNIQUE_FILTER_FIELD": "username", + "USER_UNIQUE_FILTER_FIELDS": ["username", "email"], "SSO_COOKIE_DATA": "email", "JWT_ALGORITHM": "HS256", "FIELD_VALIDATION_REGEX": { diff --git a/oidc/viewsets.py b/oidc/viewsets.py index e994069..eff99dd 100644 --- a/oidc/viewsets.py +++ b/oidc/viewsets.py @@ -82,9 +82,9 @@ def __init__(self, *args, **kwargs): self.auth_backend = config.get( "AUTH_BACKEND", "django.contrib.auth.backends.ModelBackend" ) - self.unique_user_filter_field = ( - config.get("USER_UNIQUE_FILTER_FIELD") - or default_config["USER_UNIQUE_FILTER_FIELD"] + self.unique_user_filter_fields = ( + config.get("USER_UNIQUE_FILTER_FIELDS") + or default_config["USER_UNIQUE_FILTER_FIELDS"] ) self.replaceable_username_characters = config.get( "REPLACE_USERNAME_CHARACTERS", default_config["REPLACE_USERNAME_CHARACTERS"] @@ -130,18 +130,20 @@ def logout(self, request: HttpRequest, **kwargs: dict) -> HttpResponse: _("Unable to process OpenID connect logout request."), ) - def _check_user_uniqueness(self, user_data: dict) -> bool: + def _check_user_uniqueness(self, user_data: dict) -> Optional[str]: """ Helper function that checks if the supplied user data is unique. If user_data does not contain the unique user field the assumption is that the user exists. """ - if user_data.get(self.unique_user_filter_field): - unique_field_value = user_data.get(self.unique_user_filter_field) - unique_field = self.unique_user_filter_field + "__iexact" - filter_kwargs = {unique_field: unique_field_value} - return not self.user_model.objects.filter(**filter_kwargs).count() > 0 - return False + for user_field in self.unique_user_filter_fields: + if user_data.get(user_field): + unique_field_value = user_data.get(user_field) + unique_field = user_field + "__iexact" + filter_kwargs = {unique_field: unique_field_value} + if self.user_model.objects.filter(**filter_kwargs).count() > 0: + return user_field + return None def generate_successful_response( self, request, user, redirect_after=None @@ -334,14 +336,16 @@ def callback(self, request: HttpRequest, **kwargs: dict) -> HttpResponse: # noq status=status.HTTP_400_BAD_REQUEST, template_name="oidc/oidc_unrecoverable_error.html", ) - elif not self._check_user_uniqueness(user_data): - data = { - "id_token": id_token, - "error": f"{self.unique_user_filter_field.capitalize()} field is already in use.", - } - return Response( - data, template_name="oidc/oidc_user_data_entry.html" - ) + else: + field = self._check_user_uniqueness(user_data) + if field: + data = { + "id_token": id_token, + "error": f"{field.capitalize()} field is already in use.", + } + return Response( + data, template_name="oidc/oidc_user_data_entry.html" + ) self.validate_fields(user_data) diff --git a/tests/test_viewsets.py b/tests/test_viewsets.py index e9c3cc5..c94e1c0 100644 --- a/tests/test_viewsets.py +++ b/tests/test_viewsets.py @@ -2,6 +2,8 @@ Tests for the OpenID Client """ +import json + from django.contrib.auth import get_user_model from django.test import TestCase from django.test.utils import override_settings @@ -41,7 +43,7 @@ }, "SPLIT_NAME_CLAIM": True, "USE_EMAIL_USERNAME": True, - "USER_UNIQUE_FILTER_FIELD": "email", + "USER_UNIQUE_FILTER_FIELDS": ["email", "username"], "SSO_COOKIE_DATA": "email", "JWT_ALGORITHM": "HS256", "JWT_SECRET_KEY": "abc", @@ -85,6 +87,83 @@ def test_returns_data_entry_template_on_missing_username_claim(self): self.assertEqual(response.status_code, 200) self.assertEqual(response.template_name, "oidc/oidc_user_data_entry.html") + @override_settings(OPENID_CONNECT_VIEWSET_CONFIG=OPENID_CONNECT_VIEWSET_CONFIG) + def test_recreating_already_existing_user(self): + """ + Trying to create a user that already exists will ask you to chose a different username + """ + view = UserModelOpenIDConnectViewset.as_view({"post": "callback"}) + with patch( + "oidc.viewsets.OpenIDClient.verify_and_decode_id_token" + ) as mock_func: + mock_func.return_value = { + "family_name": "Frankline", + "given_name": "Benjamin", + "username": "bfrank", + "email": "bfrank@example.com", + } + + data = {"id_token": "sadsdaio3209lkasdlkas0d.sdojdsiad.iosdadia"} + request = self.factory.post("/", data=data) + response = view(request, auth_server="default") + # Creating the user for the first time will work ok + self.assertEqual(response.status_code, 302) + user = User.objects.get(username="bfrank") + self.assertEqual(user.email, "bfrank@example.com") + + with patch( + "oidc.viewsets.OpenIDClient.verify_and_decode_id_token" + ) as mock_func: + mock_func.return_value = { + "family_name": "Frankline", + "given_name": "Benjamin", + "username": "bfrank", + "email": "bfrank@ona.io", + } + + data = {"id_token": "sadsdaio3209lkasdlkas0d.sdojdsiad.iosdadia"} + request = self.factory.post("/", data=data) + response = view(request, auth_server="default") + # Creating the user for the second time will not work ok + self.assertEqual(response.status_code, 200) + + response_data = json.loads(response.rendered_content.decode("utf-8")) + self.assertEqual( + "Username field is already in use.", response_data["error"] + ) + self.assertEqual(response.template_name, "oidc/oidc_user_data_entry.html") + + # Original user with original email address still exists + user = User.objects.get(username="bfrank") + self.assertEqual(user.email, "bfrank@example.com") + + # Try creating the same user in uppercase + with patch( + "oidc.viewsets.OpenIDClient.verify_and_decode_id_token" + ) as mock_func: + mock_func.return_value = { + "family_name": "Frankline", + "given_name": "Benjamin", + "username": "BFRANK", + "email": "bfrank@ona.io", + } + + data = {"id_token": "sadsdaio3209lkasdlkas0d.sdojdsiad.iosdadia"} + request = self.factory.post("/", data=data) + response = view(request, auth_server="default") + # Creating the user for the second time will not work ok + self.assertEqual(response.status_code, 200) + + response_data = json.loads(response.rendered_content.decode("utf-8")) + self.assertEqual( + "Username field is already in use.", response_data["error"] + ) + self.assertEqual(response.template_name, "oidc/oidc_user_data_entry.html") + + # Original user with original email address still exists + user = User.objects.get(username="bfrank") + self.assertEqual(user.email, "bfrank@example.com") + @override_settings(OPENID_CONNECT_VIEWSET_CONFIG=OPENID_CONNECT_VIEWSET_CONFIG) def test_user_created_successfully_when_email_has_a_valid_username(self): """