From 5d3d0df9b721116c6dfeffeac84cdd64af75b4c5 Mon Sep 17 00:00:00 2001 From: Tomas Repel Date: Tue, 28 May 2024 12:52:00 +0200 Subject: [PATCH] Use uma_well_known endpoints in Rego Policy --- .../opa/test_authorization_services.py | 84 +++++++++---------- 1 file changed, 41 insertions(+), 43 deletions(-) diff --git a/testsuite/tests/kuadrant/authorino/authorization/opa/test_authorization_services.py b/testsuite/tests/kuadrant/authorino/authorization/opa/test_authorization_services.py index 92b77551..289b20c3 100644 --- a/testsuite/tests/kuadrant/authorino/authorization/opa/test_authorization_services.py +++ b/testsuite/tests/kuadrant/authorino/authorization/opa/test_authorization_services.py @@ -19,9 +19,41 @@ pytestmark = [pytest.mark.authorino] -# pylint: disable=line-too-long @pytest.fixture(scope="module") -def rego_policy(keycloak): +def resource_owner_auth(keycloak): + """ + Auth for user who owns the protected resource, a.k.a. "owner" user. + The "uma_protection" client role is assigned to the user so that they are allowed to create protected resources. + """ + owner = keycloak.realm.create_user("owner", "owner") + role = keycloak.realm.admin.get_client_role(client_id=keycloak.client.client_id, role_name="uma_protection") + keycloak.realm.admin.assign_client_role(user_id=owner.user_id, client_id=keycloak.client.client_id, roles=[role]) + return HttpxOidcClientAuth.from_user(keycloak.get_token(owner.username, owner.password), owner) + + +@pytest.fixture(scope="module") +def requester_auth(keycloak): + """Auth for user who requests the access to the protected resource, a.k.a. "requester" user""" + requester = keycloak.realm.create_user("requester", "requester") + return HttpxOidcClientAuth.from_user(keycloak.get_token(requester.username, requester.password), requester) + + +@pytest.fixture(scope="module") +def owner_uma(keycloak, resource_owner_auth): + """UMA client used to create a protected resource and assign permissions for "requester" to access it.""" + keycloak_connection = KeycloakOpenIDConnection( + server_url=keycloak.server_url, + client_id=keycloak.client_name, + client_secret_key=keycloak.client.secret, + username=resource_owner_auth.username, + password=resource_owner_auth.password, + realm_name=keycloak.realm_name, + ) + return KeycloakUMA(keycloak_connection) + + +@pytest.fixture(scope="module") +def rego_policy(owner_uma): """ Complex OPA REGO policy that implements the UMA authorization flow. See https://www.keycloak.org/docs/latest/authorization_services/index.html#_service_uma_authorization_process @@ -30,14 +62,13 @@ def rego_policy(keycloak): under the used scope (HTTP GET) this REGO policy authorizes the request. """ return f"""\ -pat := http.send({{"url":"{keycloak.server_url}realms/{keycloak.realm.name}/protocol/openid-connect/token",\ +pat := http.send({{"url": "{owner_uma.uma_well_known["token_endpoint"]}",\ "method": "post","headers":{{"Content-Type":"application/x-www-form-urlencoded"}},\ -"raw_body":"grant_type=client_credentials&client_id={keycloak.client_name}&client_secret={keycloak.client.secret}"}})\ -.body.access_token +"raw_body":"grant_type=client_credentials&client_id={owner_uma.connection.client_id}\ +&client_secret={owner_uma.connection.client_secret_key}"}}).body.access_token -resource_id := http.send({{"url":concat("",["{keycloak.server_url}realms/{keycloak.realm.name}/authz/protection/\ -resource_set?uri=",input.context.request.http.path]),"method":"get", "headers":\ -{{"Authorization":concat(" ",["Bearer ",pat])}}}}).body[0] +resource_id := http.send({{"url":concat("",["{owner_uma.uma_well_known["resource_registration_endpoint"]}?uri=",\ +input.context.request.http.path]),"method":"get", "headers":{{"Authorization":concat(" ",["Bearer ",pat])}}}}).body[0] scope := lower(input.context.request.http.method) access_token := trim_prefix(input.context.request.http.headers.authorization, "Bearer ") @@ -45,12 +76,12 @@ def rego_policy(keycloak): rpt = access_token {{ object.get(input.auth.identity, "authorization", {{}}).permissions }} else = rpt_str {{ - ticket := http.send({{"url":"{keycloak.server_url}realms/{keycloak.realm.name}/authz/protection/permission",\ + ticket := http.send({{"url":"{owner_uma.uma_well_known["permission_endpoint"]}",\ "method":"post","headers":{{"Authorization":concat(" ",["Bearer ",pat]),"Content-Type":"application/json"}},\ "raw_body":concat("",["[{{\\"resource_id\\":\\"",resource_id,"\\",\\"resource_scopes\\":[\\"",scope,"\\"]}}]"\ ])}}).body.ticket - rpt_str := object.get(http.send({{"url":"{keycloak.server_url}realms/{keycloak.realm.name}/protocol/openid-connect/token",\ + rpt_str := object.get(http.send({{"url":"{owner_uma.uma_well_known["token_endpoint"]}",\ "method":"post","headers":{{"Authorization":concat(" ",\ ["Bearer ",access_token]),"Content-Type":"application/x-www-form-urlencoded"}},"raw_body":concat("",\ ["grant_type=urn:ietf:params:oauth:grant-type:uma-ticket&ticket=",ticket,"&submit_request=true"])}})\ @@ -84,39 +115,6 @@ def authorization(authorization, rego_policy): return authorization -@pytest.fixture(scope="module") -def resource_owner_auth(keycloak): - """ - Auth for user who owns the protected resource, a.k.a. "owner" user. - The "uma_protection" client role is assigned to the user so that they are allowed to create protected resources. - """ - owner = keycloak.realm.create_user("owner", "owner") - role = keycloak.realm.admin.get_client_role(client_id=keycloak.client.client_id, role_name="uma_protection") - keycloak.realm.admin.assign_client_role(user_id=owner.user_id, client_id=keycloak.client.client_id, roles=[role]) - return HttpxOidcClientAuth.from_user(keycloak.get_token(owner.username, owner.password), owner) - - -@pytest.fixture(scope="module") -def requester_auth(keycloak): - """Auth for user who requests the access to the protected resource, a.k.a. "requester" user""" - requester = keycloak.realm.create_user("requester", "requester") - return HttpxOidcClientAuth.from_user(keycloak.get_token(requester.username, requester.password), requester) - - -@pytest.fixture(scope="module") -def owner_uma(keycloak, resource_owner_auth): - """UMA client used to create a protected resource and assign permissions for "requester" to access it.""" - keycloak_connection = KeycloakOpenIDConnection( - server_url=keycloak.server_url, - client_id=keycloak.client_name, - client_secret_key=keycloak.client.secret, - username=resource_owner_auth.username, - password=resource_owner_auth.password, - realm_name=keycloak.realm_name, - ) - return KeycloakUMA(keycloak_connection) - - @pytest.fixture(scope="module") def protected_resource(owner_uma, resource_owner_auth): """