Skip to content

Commit

Permalink
Merge pull request #414 from trepel/use-uma-well-known-endpoints
Browse files Browse the repository at this point in the history
Use uma_well_known endpoints in Rego Policy
  • Loading branch information
averevki authored May 28, 2024
2 parents aea8897 + 5d3d0df commit 669fd89
Showing 1 changed file with 41 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,41 @@
pytestmark = [pytest.mark.authorino]


# pylint: disable=line-too-long
@pytest.fixture(scope="module")
def rego_policy(keycloak):
def resource_owner_auth(keycloak):
"""
Auth for user who owns the protected resource, a.k.a. "owner" user.
The "uma_protection" client role is assigned to the user so that they are allowed to create protected resources.
"""
owner = keycloak.realm.create_user("owner", "owner")
role = keycloak.realm.admin.get_client_role(client_id=keycloak.client.client_id, role_name="uma_protection")
keycloak.realm.admin.assign_client_role(user_id=owner.user_id, client_id=keycloak.client.client_id, roles=[role])
return HttpxOidcClientAuth.from_user(keycloak.get_token(owner.username, owner.password), owner)


@pytest.fixture(scope="module")
def requester_auth(keycloak):
"""Auth for user who requests the access to the protected resource, a.k.a. "requester" user"""
requester = keycloak.realm.create_user("requester", "requester")
return HttpxOidcClientAuth.from_user(keycloak.get_token(requester.username, requester.password), requester)


@pytest.fixture(scope="module")
def owner_uma(keycloak, resource_owner_auth):
"""UMA client used to create a protected resource and assign permissions for "requester" to access it."""
keycloak_connection = KeycloakOpenIDConnection(
server_url=keycloak.server_url,
client_id=keycloak.client_name,
client_secret_key=keycloak.client.secret,
username=resource_owner_auth.username,
password=resource_owner_auth.password,
realm_name=keycloak.realm_name,
)
return KeycloakUMA(keycloak_connection)


@pytest.fixture(scope="module")
def rego_policy(owner_uma):
"""
Complex OPA REGO policy that implements the UMA authorization flow.
See https://www.keycloak.org/docs/latest/authorization_services/index.html#_service_uma_authorization_process
Expand All @@ -30,27 +62,26 @@ def rego_policy(keycloak):
under the used scope (HTTP GET) this REGO policy authorizes the request.
"""
return f"""\
pat := http.send({{"url":"{keycloak.server_url}realms/{keycloak.realm.name}/protocol/openid-connect/token",\
pat := http.send({{"url": "{owner_uma.uma_well_known["token_endpoint"]}",\
"method": "post","headers":{{"Content-Type":"application/x-www-form-urlencoded"}},\
"raw_body":"grant_type=client_credentials&client_id={keycloak.client_name}&client_secret={keycloak.client.secret}"}})\
.body.access_token
"raw_body":"grant_type=client_credentials&client_id={owner_uma.connection.client_id}\
&client_secret={owner_uma.connection.client_secret_key}"}}).body.access_token
resource_id := http.send({{"url":concat("",["{keycloak.server_url}realms/{keycloak.realm.name}/authz/protection/\
resource_set?uri=",input.context.request.http.path]),"method":"get", "headers":\
{{"Authorization":concat(" ",["Bearer ",pat])}}}}).body[0]
resource_id := http.send({{"url":concat("",["{owner_uma.uma_well_known["resource_registration_endpoint"]}?uri=",\
input.context.request.http.path]),"method":"get", "headers":{{"Authorization":concat(" ",["Bearer ",pat])}}}}).body[0]
scope := lower(input.context.request.http.method)
access_token := trim_prefix(input.context.request.http.headers.authorization, "Bearer ")
default rpt = ""
rpt = access_token {{ object.get(input.auth.identity, "authorization", {{}}).permissions }}
else = rpt_str {{
ticket := http.send({{"url":"{keycloak.server_url}realms/{keycloak.realm.name}/authz/protection/permission",\
ticket := http.send({{"url":"{owner_uma.uma_well_known["permission_endpoint"]}",\
"method":"post","headers":{{"Authorization":concat(" ",["Bearer ",pat]),"Content-Type":"application/json"}},\
"raw_body":concat("",["[{{\\"resource_id\\":\\"",resource_id,"\\",\\"resource_scopes\\":[\\"",scope,"\\"]}}]"\
])}}).body.ticket
rpt_str := object.get(http.send({{"url":"{keycloak.server_url}realms/{keycloak.realm.name}/protocol/openid-connect/token",\
rpt_str := object.get(http.send({{"url":"{owner_uma.uma_well_known["token_endpoint"]}",\
"method":"post","headers":{{"Authorization":concat(" ",\
["Bearer ",access_token]),"Content-Type":"application/x-www-form-urlencoded"}},"raw_body":concat("",\
["grant_type=urn:ietf:params:oauth:grant-type:uma-ticket&ticket=",ticket,"&submit_request=true"])}})\
Expand Down Expand Up @@ -84,39 +115,6 @@ def authorization(authorization, rego_policy):
return authorization


@pytest.fixture(scope="module")
def resource_owner_auth(keycloak):
"""
Auth for user who owns the protected resource, a.k.a. "owner" user.
The "uma_protection" client role is assigned to the user so that they are allowed to create protected resources.
"""
owner = keycloak.realm.create_user("owner", "owner")
role = keycloak.realm.admin.get_client_role(client_id=keycloak.client.client_id, role_name="uma_protection")
keycloak.realm.admin.assign_client_role(user_id=owner.user_id, client_id=keycloak.client.client_id, roles=[role])
return HttpxOidcClientAuth.from_user(keycloak.get_token(owner.username, owner.password), owner)


@pytest.fixture(scope="module")
def requester_auth(keycloak):
"""Auth for user who requests the access to the protected resource, a.k.a. "requester" user"""
requester = keycloak.realm.create_user("requester", "requester")
return HttpxOidcClientAuth.from_user(keycloak.get_token(requester.username, requester.password), requester)


@pytest.fixture(scope="module")
def owner_uma(keycloak, resource_owner_auth):
"""UMA client used to create a protected resource and assign permissions for "requester" to access it."""
keycloak_connection = KeycloakOpenIDConnection(
server_url=keycloak.server_url,
client_id=keycloak.client_name,
client_secret_key=keycloak.client.secret,
username=resource_owner_auth.username,
password=resource_owner_auth.password,
realm_name=keycloak.realm_name,
)
return KeycloakUMA(keycloak_connection)


@pytest.fixture(scope="module")
def protected_resource(owner_uma, resource_owner_auth):
"""
Expand Down

0 comments on commit 669fd89

Please sign in to comment.