Skip to content

Commit

Permalink
Merge pull request #70 from onaio/add-ability-to-check-that-multiple-…
Browse files Browse the repository at this point in the history
…fields-are-unique

Add ability to check if multiple fields are unique
  • Loading branch information
FrankApiyo authored Sep 19, 2024
2 parents 7577b15 + b9d635f commit b4fa7b3
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ OPENID_CONNECT_VIEWSET_CONFIG = {
"preferred_username": "username",
},
"SPLIT_NAME_CLAIM": False, # Whether to split the `name` claim into first_name & last_name if present
"USER_UNIQUE_FILTER_FIELD": "username",
"USER_UNIQUE_FILTER_FIELDS": ["username", "email"],
"USE_SSO_COOKIE": True,
"SSO_COOKIE_DATA": "email",
"SSO_COOKIE_MAX_AGE": None,
Expand Down
2 changes: 1 addition & 1 deletion oidc/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
},
"SPLIT_NAME_CLAIM": False,
"USE_EMAIL_USERNAME": False,
"USER_UNIQUE_FILTER_FIELD": "username",
"USER_UNIQUE_FILTER_FIELDS": ["username", "email"],
"SSO_COOKIE_DATA": "email",
"JWT_ALGORITHM": "HS256",
"FIELD_VALIDATION_REGEX": {
Expand Down
40 changes: 22 additions & 18 deletions oidc/viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ def __init__(self, *args, **kwargs):
self.auth_backend = config.get(
"AUTH_BACKEND", "django.contrib.auth.backends.ModelBackend"
)
self.unique_user_filter_field = (
config.get("USER_UNIQUE_FILTER_FIELD")
or default_config["USER_UNIQUE_FILTER_FIELD"]
self.unique_user_filter_fields = (
config.get("USER_UNIQUE_FILTER_FIELDS")
or default_config["USER_UNIQUE_FILTER_FIELDS"]
)
self.replaceable_username_characters = config.get(
"REPLACE_USERNAME_CHARACTERS", default_config["REPLACE_USERNAME_CHARACTERS"]
Expand Down Expand Up @@ -130,18 +130,20 @@ def logout(self, request: HttpRequest, **kwargs: dict) -> HttpResponse:
_("Unable to process OpenID connect logout request."),
)

def _check_user_uniqueness(self, user_data: dict) -> bool:
def _check_user_uniqueness(self, user_data: dict) -> Optional[str]:
"""
Helper function that checks if the supplied user data is unique. If user_data does not
contain the unique user field the assumption is that the user
exists.
"""
if user_data.get(self.unique_user_filter_field):
unique_field_value = user_data.get(self.unique_user_filter_field)
unique_field = self.unique_user_filter_field + "__iexact"
filter_kwargs = {unique_field: unique_field_value}
return not self.user_model.objects.filter(**filter_kwargs).count() > 0
return False
for user_field in self.unique_user_filter_fields:
if user_data.get(user_field):
unique_field_value = user_data.get(user_field)
unique_field = user_field + "__iexact"
filter_kwargs = {unique_field: unique_field_value}
if self.user_model.objects.filter(**filter_kwargs).count() > 0:
return user_field
return None

def generate_successful_response(
self, request, user, redirect_after=None
Expand Down Expand Up @@ -334,14 +336,16 @@ def callback(self, request: HttpRequest, **kwargs: dict) -> HttpResponse: # noq
status=status.HTTP_400_BAD_REQUEST,
template_name="oidc/oidc_unrecoverable_error.html",
)
elif not self._check_user_uniqueness(user_data):
data = {
"id_token": id_token,
"error": f"{self.unique_user_filter_field.capitalize()} field is already in use.",
}
return Response(
data, template_name="oidc/oidc_user_data_entry.html"
)
else:
field = self._check_user_uniqueness(user_data)
if field:
data = {
"id_token": id_token,
"error": f"{field.capitalize()} field is already in use.",
}
return Response(
data, template_name="oidc/oidc_user_data_entry.html"
)

self.validate_fields(user_data)

Expand Down
81 changes: 80 additions & 1 deletion tests/test_viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Tests for the OpenID Client
"""

import json

from django.contrib.auth import get_user_model
from django.test import TestCase
from django.test.utils import override_settings
Expand Down Expand Up @@ -41,7 +43,7 @@
},
"SPLIT_NAME_CLAIM": True,
"USE_EMAIL_USERNAME": True,
"USER_UNIQUE_FILTER_FIELD": "email",
"USER_UNIQUE_FILTER_FIELDS": ["email", "username"],
"SSO_COOKIE_DATA": "email",
"JWT_ALGORITHM": "HS256",
"JWT_SECRET_KEY": "abc",
Expand Down Expand Up @@ -85,6 +87,83 @@ def test_returns_data_entry_template_on_missing_username_claim(self):
self.assertEqual(response.status_code, 200)
self.assertEqual(response.template_name, "oidc/oidc_user_data_entry.html")

@override_settings(OPENID_CONNECT_VIEWSET_CONFIG=OPENID_CONNECT_VIEWSET_CONFIG)
def test_recreating_already_existing_user(self):
"""
Trying to create a user that already exists will ask you to chose a different username
"""
view = UserModelOpenIDConnectViewset.as_view({"post": "callback"})
with patch(
"oidc.viewsets.OpenIDClient.verify_and_decode_id_token"
) as mock_func:
mock_func.return_value = {
"family_name": "Frankline",
"given_name": "Benjamin",
"username": "bfrank",
"email": "[email protected]",
}

data = {"id_token": "sadsdaio3209lkasdlkas0d.sdojdsiad.iosdadia"}
request = self.factory.post("/", data=data)
response = view(request, auth_server="default")
# Creating the user for the first time will work ok
self.assertEqual(response.status_code, 302)
user = User.objects.get(username="bfrank")
self.assertEqual(user.email, "[email protected]")

with patch(
"oidc.viewsets.OpenIDClient.verify_and_decode_id_token"
) as mock_func:
mock_func.return_value = {
"family_name": "Frankline",
"given_name": "Benjamin",
"username": "bfrank",
"email": "[email protected]",
}

data = {"id_token": "sadsdaio3209lkasdlkas0d.sdojdsiad.iosdadia"}
request = self.factory.post("/", data=data)
response = view(request, auth_server="default")
# Creating the user for the second time will not work ok
self.assertEqual(response.status_code, 200)

response_data = json.loads(response.rendered_content.decode("utf-8"))
self.assertEqual(
"Username field is already in use.", response_data["error"]
)
self.assertEqual(response.template_name, "oidc/oidc_user_data_entry.html")

# Original user with original email address still exists
user = User.objects.get(username="bfrank")
self.assertEqual(user.email, "[email protected]")

# Try creating the same user in uppercase
with patch(
"oidc.viewsets.OpenIDClient.verify_and_decode_id_token"
) as mock_func:
mock_func.return_value = {
"family_name": "Frankline",
"given_name": "Benjamin",
"username": "BFRANK",
"email": "[email protected]",
}

data = {"id_token": "sadsdaio3209lkasdlkas0d.sdojdsiad.iosdadia"}
request = self.factory.post("/", data=data)
response = view(request, auth_server="default")
# Creating the user for the second time will not work ok
self.assertEqual(response.status_code, 200)

response_data = json.loads(response.rendered_content.decode("utf-8"))
self.assertEqual(
"Username field is already in use.", response_data["error"]
)
self.assertEqual(response.template_name, "oidc/oidc_user_data_entry.html")

# Original user with original email address still exists
user = User.objects.get(username="bfrank")
self.assertEqual(user.email, "[email protected]")

@override_settings(OPENID_CONNECT_VIEWSET_CONFIG=OPENID_CONNECT_VIEWSET_CONFIG)
def test_user_created_successfully_when_email_has_a_valid_username(self):
"""
Expand Down

0 comments on commit b4fa7b3

Please sign in to comment.