Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

122 generate unique UUID #132

Merged
merged 13 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 41 additions & 26 deletions importer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,21 @@ def handle_request(request_type, payload, url):
# it also adds the user to the provided keycloak group
# and sets the user password
def create_user(user):
(firstName, lastName, username, email, id, userType, _, keycloakGroupID,
keycloakGroupName, applicationID, password) = user

with open("json_payloads/keycloak_user_payload.json") as json_file:
payload_string = json_file.read()

obj = json.loads(payload_string)
obj["firstName"] = user[0]
obj["lastName"] = user[1]
obj["username"] = user[2]
obj["email"] = user[3]
obj["attributes"]["fhir_core_app_id"][0] = user[9]
obj["firstName"] = firstName
obj["lastName"] = lastName
obj["username"] = username
obj["email"] = email
obj["attributes"]["fhir_core_app_id"][0] = applicationID

final_string = json.dumps(obj)
logging.info("Creating user: " + user[2])
logging.info("Creating user: " + username)
r = handle_request("POST", final_string, config.keycloak_url + "/users")

if r.status_code == 201:
Expand All @@ -115,14 +118,14 @@ def create_user(user):
user_id = (new_user_location.split("/"))[-1]

# add user to group
payload = '{"id": "' + user[7] + '", "name": "' + user[8] + '"}'
group_endpoint = user_id + "/groups/" + user[7]
payload = '{"id": "' + keycloakGroupID + '", "name": "' + keycloakGroupName + '"}'
group_endpoint = user_id + "/groups/" + keycloakGroupID
url = config.keycloak_url + "/users/" + group_endpoint
logging.info("Adding user to Keycloak group: " + user[8])
logging.info("Adding user to Keycloak group: " + keycloakGroupName)
r = handle_request("PUT", payload, url)

# set password
payload = '{"temporary":false,"type":"password","value":"' + user[10] + '"}'
payload = '{"temporary":false,"type":"password","value":"' + password + '"}'
password_endpoint = user_id + "/reset-password"
url = config.keycloak_url + "/users/" + password_endpoint
logging.info("Setting user password")
Expand All @@ -137,14 +140,27 @@ def create_user(user):
# new user and posts them to the FHIR api for creation
def create_user_resources(user_id, user):
logging.info("Creating user resources")
(firstName, lastName, username, email, id, userType,
_, keycloakGroupID, keycloakGroupName, _, password) = user

# generate uuids
if len(str(user[4]).strip()) == 0:
practitioner_uuid = str(uuid.uuid4())
if len(str(id).strip()) == 0:
practitioner_uuid = str(
uuid.uuid5(
uuid.NAMESPACE_DNS, username + keycloakGroupID + "practitioner_uuid"
)
)
else:
practitioner_uuid = user[4]
practitioner_uuid = id

group_uuid = str(uuid.uuid4())
practitioner_role_uuid = str(uuid.uuid4())
group_uuid = str(
uuid.uuid5(uuid.NAMESPACE_DNS, username + keycloakGroupID + "group_uuid")
)
practitioner_role_uuid = str(
uuid.uuid5(
uuid.NAMESPACE_DNS, username + keycloakGroupID + "practitioner_role_uuid"
)
)

# get payload and replace strings
initial_string = """{"resourceType": "Bundle","type": "transaction","meta": {"lastUpdated": ""},"entry": """
Expand All @@ -155,15 +171,15 @@ def create_user_resources(user_id, user):
ff = (
payload_string.replace("$practitioner_uuid", practitioner_uuid)
.replace("$keycloak_user_uuid", user_id)
.replace("$firstName", user[0])
.replace("$lastName", user[1])
.replace("$email", user[3])
.replace("$firstName", firstName)
.replace("$lastName", lastName)
.replace("$email", email)
.replace("$group_uuid", group_uuid)
.replace("$practitioner_role_uuid", practitioner_role_uuid)
)

obj = json.loads(ff)
if user[5].strip() == "Supervisor":
if userType.strip() == "Supervisor":
obj[2]["resource"]["code"] = {
"coding": [
{
Expand All @@ -173,7 +189,7 @@ def create_user_resources(user_id, user):
}
]
}
elif user[5].strip() == "Practitioner":
elif userType.strip() == "Practitioner":
obj[2]["resource"]["code"] = {
"coding": [
{
Expand All @@ -188,7 +204,7 @@ def create_user_resources(user_id, user):
ff = json.dumps(obj, indent=4)

payload = initial_string + ff + "}"
handle_request("POST", payload, config.fhir_base_url)
return payload


# custom extras for organizations
Expand Down Expand Up @@ -259,9 +275,7 @@ def location_extras(resource, payload_string):
"$pt_display", "Jurisdiction"
)
else:
logging.error(
"Unsupported location physical type provided for " + name
)
logging.error("Unsupported location physical type provided for " + name)
obj = json.loads(payload_string)
del obj["resource"]["type"]
payload_string = json.dumps(obj, indent=4)
Expand Down Expand Up @@ -437,7 +451,7 @@ def build_org_affiliation(resources, resource_list):

for key in resources:
rp = ""
unique_uuid = str(uuid.uuid4())
unique_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, key))
org_name = get_org_name(key, resource_list)

rp = (
Expand Down Expand Up @@ -872,7 +886,8 @@ def main(
# check practitioner
practitioner_exists = confirm_practitioner(user, user_id)
if not practitioner_exists:
create_user_resources(user_id, user)
payload = create_user_resources(user_id, user)
handle_request("POST", payload, config.fhir_base_url)
logging.info("Processing complete!")
elif resource_type == "locations":
logging.info("Processing locations")
Expand Down
140 changes: 139 additions & 1 deletion importer/test_main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import unittest
from jsonschema import validate
from main import read_csv, build_payload, build_org_affiliation, extract_matches
from main import read_csv, build_payload, build_org_affiliation, extract_matches, create_user_resources


class TestMain(unittest.TestCase):
Expand Down Expand Up @@ -187,6 +187,144 @@ def test_build_org_affiliation(self):
self.assertEqual(payload_obj["resourceType"], "Bundle")
self.assertEqual(len(payload_obj["entry"]), 2)

def test_uuid_generated_in_creating_user_resources_is_unique_and_repeatable(self):
users = [
[
"Jane",
"Doe",
"Janey",
"[email protected]",
"",
"Practitioner",
"TRUE",
"a715b562-27f2-432a-b1ba-e57db35e0f93",
"test",
"demo",
"pa$$word",
],
[
"John",
"Doe",
"Janey",
"[email protected]",
"",
"Practitioner",
"TRUE",
"a715b562-27f2-432a-b1ba-e57db35e0f93",
"test",
"demo",
"pa$$word",
],
[
"Janice",
"Doe",
"Jenn",
"[email protected]",
"99d54e3c-c26f-4500-a7f9-3f4cb788673f",
"Supervisor",
"TRUE",
"a715b562-27f2-432a-b1ba-e57db35e0f93",
"test",
"demo",
"pa$$word",
],
]

users_uuids = {}
for user_id, user in enumerate(users):
payload = create_user_resources(user[4], user)
payload_obj = json.loads(payload)
practitioner_uuid = payload_obj["entry"][0]["resource"]["id"]
group_uuid = payload_obj["entry"][1]["resource"]["id"]
practitioner_role_uuid = payload_obj["entry"][2]["resource"]["id"]
users_uuids[user_id] = [
practitioner_uuid,
group_uuid,
practitioner_role_uuid,
]

# practitioner_uuid
self.assertEqual(users_uuids[0][0], users_uuids[1][0])
self.assertNotEqual(users_uuids[1][0], users_uuids[2][0])

# group_uuid
self.assertEqual(users_uuids[0][1], users_uuids[1][1])
self.assertNotEqual(users_uuids[1][1], users_uuids[2][1])

# practitioner_role_uuid
self.assertEqual(users_uuids[0][2], users_uuids[1][2])
self.assertNotEqual(users_uuids[1][2], users_uuids[2][2])

def test_uuid_generated_for_locations_is_unique_and_repeatable(self):
resources = [
[
"City1",
"active",
"create",
"",
"test location-1",
"18fcbc2e-4240-4a84-a270-7a444523d7b6",
"jurisdiction",
"jurisdiction",
],
[
"Building1",
"active",
"create",
"",
"test location-1",
"18fcbc2e-4240-4a84-a270-7a444523d7b6",
"building",
"building",
],
[
"City1",
"active",
"create",
"",
"test location-1",
"18fcbc2e-4240-4a84-a270-7a444523d7b6",
"jurisdiction",
"jurisdiction",
],
]

payload = build_payload(
"locations", resources, "json_payloads/locations_payload.json"
)
payload_obj = json.loads(payload)
location1 = payload_obj["entry"][0]["resource"]["id"]
location2 = payload_obj["entry"][1]["resource"]["id"]
location3 = payload_obj["entry"][2]["resource"]["id"]
print(location1, location2, location3)

self.assertNotEqual(location1, location2)
self.assertEqual(location1, location3)

def test_uuid_generated_in_build_org_affiliation_is_unique_and_repeatable(self):
resource_list = [
["HealthyU", "a9137781-eb94-4d5f-8d39-471a92aec9f2", "World", "138396"],
["HealthyU", "a9137781-eb94-4d5f-8d39-471a92aec9f2", "Kenya", "54876"],
["HealthyU", "a9137781-eb94-4d5f-8d39-471a92aec9f2", "Nairobi", "105167"],
["One Org", "8342dd77-aecd-48ab-826b-75c7c33039ed", "World", "138396"],
]

resources = extract_matches(resource_list)
payload = build_org_affiliation(resources, resource_list)
payload_obj = json.loads(payload)
organization_affiliation1 = payload_obj["entry"][0]["resource"]["id"]
organization_affiliation2 = payload_obj["entry"][1]["resource"]["id"]

self.assertNotEqual(organization_affiliation1, organization_affiliation2)

payload2 = build_org_affiliation(resources, resource_list)
payload2_obj = json.loads(payload2)
organization_affiliation3 = payload2_obj["entry"][0]["resource"]["id"]
organization_affiliation4 = payload2_obj["entry"][1]["resource"]["id"]

self.assertEqual(organization_affiliation1, organization_affiliation3)
self.assertEqual(organization_affiliation2, organization_affiliation4)


if __name__ == "__main__":
unittest.main()
Loading