Skip to content

Commit

Permalink
Added supoort for oauth2 access token for service to service (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
maniskum authored Feb 12, 2024
1 parent 5a2dbf1 commit a050275
Show file tree
Hide file tree
Showing 12 changed files with 363 additions and 5 deletions.
42 changes: 41 additions & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ Use the command below to set up a Sink connector to a Authenticated Streaming Co
}' http://localhost:8083/connectors
```
2. Using jwt_token
2. Using jwt_token **[DEPRECATED]**
- Convert private.key from adobe console to PKCS8 private using command
```shell
openssl pkcs8 -topk8 -inform PEM -outform DER -in private.key -out private-pkcs8.key -nocrypt
Expand Down Expand Up @@ -272,6 +272,46 @@ Use the command below to set up a Sink connector to a Authenticated Streaming Co
1. key: `x-adobe-flow-id`, value: `341fd4f0-cdec-4912-1ab6-fb54aeb41286`
2. key: `x-adobe-dataset-id`, value: `3096fbfd5978431948af3ba3`
Use config -
```json
"aep.connection.endpoint.headers": "{\"x-adobe-flow-id\":\"341fd4f0-cdec-4912-1ab6-fb54aeb41286\", \"x-adobe-dataset-id\": \"3096fbfd5978431948af3ba3\"}"
```
#### note : jwt_token authentication is deprecated
3. Using oauth2_access_token
- Create http connector
```shell
curl -s -X POST \
-H "Content-Type: application/json" \
--data '{
"name": "aep-auth-sink-connector",
"config": {
"connector.class": "com.adobe.platform.streaming.sink.impl.AEPSinkConnector",
"topics": "connect-test",
"tasks.max": 1,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"aep.endpoint": "https://dcs.adobedc.net/collection/{DATA_INLET_ID}",
"aep.flush.interval.seconds": 1,
"aep.flush.bytes.kb": 4,
"aep.connection.auth.enabled": true,
"aep.connection.auth.token.type": "oauth2_access_token",
"aep.connection.auth.client.id": "<client_id>",
"aep.connection.auth.client.secret": "<client_secret>"
"aep.connection.auth.endpoint": "<ims-url>",
"aep.connection.endpoint.headers": "<optional-header-that-needs-to-be-passed-to-AEP>"
}
}' http://localhost:8083/connectors
```
Note - `aep.connection.endpoint.headers` format should be JSON-encoded.
Example: To send below 2 HTTP headers -
1. key: `x-adobe-flow-id`, value: `341fd4f0-cdec-4912-1ab6-fb54aeb41286`
2. key: `x-adobe-dataset-id`, value: `3096fbfd5978431948af3ba3`
Use config -
```json
"aep.connection.endpoint.headers": "{\"x-adobe-flow-id\":\"341fd4f0-cdec-4912-1ab6-fb54aeb41286\", \"x-adobe-dataset-id\": \"3096fbfd5978431948af3ba3\"}"
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ task copyDependencies(type: Copy) {
"delivery_guarantee": [ "at_least_once"],
"kafka_connect_api": true,
"single_message_transforms": true,
"supported_encodings": [ "json" ]
"supported_encodings": [ "any" ]
},
"license": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
public enum TokenType {

ACCESS_TOKEN("access_token"),
JWT_TOKEN("jwt_token");
JWT_TOKEN("jwt_token"),
OAUTH2_ACCESS_TOKEN("oauth2_access_token");

private static final Map<String, TokenType> TOKEN_TYPES = ImmutableMap.<String, TokenType>builder()
.put(ACCESS_TOKEN.name, ACCESS_TOKEN)
.put(JWT_TOKEN.name, JWT_TOKEN)
.put(OAUTH2_ACCESS_TOKEN.name, OAUTH2_ACCESS_TOKEN)
.build();

private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public static AuthProvider getAuthProvider(TokenType tokenType, Map<String, Stri
case JWT_TOKEN:
return getJWTAuthProvider(authProperties, authProxyConfiguration);

case OAUTH2_ACCESS_TOKEN:
return getOAuth2IMSProvider(authProperties, authProxyConfiguration);

default:
throw new AuthException("Invalid token type to get auth provider");
}
Expand All @@ -61,6 +64,20 @@ private static AuthProvider getIMSAuthProvider(Map<String, String> authPropertie
new IMSTokenProvider(endpoint, clientId, clientCode, clientSecret, authProxyConfiguration);
}

private static AuthProvider getOAuth2IMSProvider(final Map<String, String> authProperties,
final AuthProxyConfiguration authProxyConfiguration) {
final String clientId = authProperties.get(AuthUtils.AUTH_CLIENT_ID);
final String clientSecret = authProperties.get(AuthUtils.AUTH_CLIENT_SECRET);

Preconditions.checkNotNull(clientId, "Invalid client Id");
Preconditions.checkNotNull(clientSecret, "Invalid client secret");

final String endpoint = authProperties.get(AuthUtils.AUTH_ENDPOINT);
return StringUtils.isEmpty(endpoint) ?
new OAuth2IMSTokenProvider(clientId, clientSecret, authProxyConfiguration) :
new OAuth2IMSTokenProvider(endpoint, clientId, clientSecret, authProxyConfiguration);
}

private static AuthProvider getJWTAuthProvider(Map<String, String> authProperties,
AuthProxyConfiguration authProxyConfiguration) {
final String clientId = authProperties.get(AuthUtils.AUTH_CLIENT_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
/**
* @author Adobe Inc.
*/
@Deprecated
public class JWTTokenProvider extends AbstractAuthProvider {

private static final Logger LOG = LoggerFactory.getLogger(JWTTokenProvider.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2024 Adobe. All rights reserved.
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/

package com.adobe.platform.streaming.auth.impl;

import com.adobe.platform.streaming.auth.AbstractAuthProvider;
import com.adobe.platform.streaming.auth.AuthException;
import com.adobe.platform.streaming.auth.AuthUtils;
import com.adobe.platform.streaming.auth.TokenResponse;
import com.adobe.platform.streaming.http.HttpException;
import com.adobe.platform.streaming.http.HttpProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* @author Adobe Inc.
*/
public class OAuth2IMSTokenProvider extends AbstractAuthProvider {

private static final Logger LOG = LoggerFactory.getLogger(OAuth2IMSTokenProvider.class);
private static final String IMS_ENDPOINT_PATH = "/ims/token/v3";
private static final String GRANT_TYPE = "client_credentials";
private static final String SCOPE = "openid,AdobeID,read_organizations,additional_info.projectedProductContext," +
"session";

private String endpoint = System.getenv(AuthUtils.AUTH_ENDPOINT);

private final String clientId;
private final String clientSecret;
private HttpProducer httpProducer;

OAuth2IMSTokenProvider(final String clientId, final String clientSecret,
final AuthProxyConfiguration authProxyConfiguration) {
this.clientId = clientId;
this.clientSecret = clientSecret;
this.httpProducer = HttpProducer.newBuilder(endpoint)
.withProxyHost(authProxyConfiguration.getProxyHost())
.withProxyPort(authProxyConfiguration.getProxyPort())
.withProxyUser(authProxyConfiguration.getProxyUsername())
.withProxyPassword(authProxyConfiguration.getProxyPassword())
.build();
}

OAuth2IMSTokenProvider(final String endpoint, final String clientId, final String clientSecret,
final AuthProxyConfiguration authProxyConfiguration) {
this(clientId, clientSecret, authProxyConfiguration);
this.endpoint = endpoint;
this.httpProducer = HttpProducer.newBuilder(endpoint)
.withProxyHost(authProxyConfiguration.getProxyHost())
.withProxyPort(authProxyConfiguration.getProxyPort())
.withProxyUser(authProxyConfiguration.getProxyUsername())
.withProxyPassword(authProxyConfiguration.getProxyPassword())
.build();
}

@Override
protected TokenResponse getTokenResponse() throws AuthException {
LOG.debug("refreshing expired oauth2 accessToken: {}", clientId);
StringBuilder params = new StringBuilder()
.append("grant_type=").append(GRANT_TYPE)
.append("&client_id=").append(clientId)
.append("&client_secret=").append(clientSecret)
.append("&scope=").append(SCOPE);

try {
final TokenResponse tokenResponse = httpProducer.post(IMS_ENDPOINT_PATH, params.toString().getBytes(),
getContentHandler());
// As the expiresIn time we get from the API is in seconds we need to convert this into milliseconds
final long expiresInMilliSecond = tokenResponse.getExpiresIn() * 1000;
final TokenResponse updatedTokenResponse = new TokenResponse(tokenResponse.getTokenType(), expiresInMilliSecond,
tokenResponse.getRefreshToken(), tokenResponse.getAccessToken());
return updatedTokenResponse;
} catch (HttpException httpException) {
throw new AuthException("Exception while fetching oauth2 access token", httpException);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2024 Adobe. All rights reserved.
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/

package com.adobe.platform.streaming.auth.impl;

import com.adobe.platform.streaming.auth.AuthException;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertThrows;

/**
* @author Adobe Inc.
*/
class OAuth2IMSTokenProviderTest {

private static final String TEST_ENDPOINT = "https://ims-na1.adobelogin.com";
private static final String TEST_CLIENT_ID = "testClientId";
private static final String TEST_CLIENT_SECRET = "testClientSecret";

@Test
void testGetTokenInvalidClientId() {
OAuth2IMSTokenProvider imsTokenProvider = new OAuth2IMSTokenProvider(TEST_ENDPOINT, null,
TEST_CLIENT_SECRET, AuthProxyConfiguration.builder().build());
assertThrows(AuthException.class, imsTokenProvider::getToken);
}

@Test
void testGetTokenInvalidSecret() {
OAuth2IMSTokenProvider imsTokenProvider = new OAuth2IMSTokenProvider(TEST_ENDPOINT, TEST_CLIENT_ID, null,
AuthProxyConfiguration.builder().build());
assertThrows(AuthException.class, imsTokenProvider::getToken);
}

@Test
void testGetToken() {
OAuth2IMSTokenProvider imsTokenProvider = new OAuth2IMSTokenProvider(
TEST_ENDPOINT,
TEST_CLIENT_ID,
TEST_CLIENT_SECRET,
AuthProxyConfiguration.builder().build()
);
assertThrows(AuthException.class, imsTokenProvider::getTokenResponse);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,25 @@ private AuthProvider getAuthProvider(Map<String, String> props) {
.equals(AEP_CONNECTION_AUTH_ENABLED_VALUE);
LOG.info("Auth Enabled for DCS Published: {}", isAuthEnabled);
if (isAuthEnabled) {
TokenType tokenType = TokenType.getTokenType(props.getOrDefault(
LOG.info("Auth Token Type = {%s}", AEP_CONNECTION_AUTH_TOKEN_TYPE);
final TokenType tokenType = TokenType.getTokenType(props.getOrDefault(
AEP_CONNECTION_AUTH_TOKEN_TYPE,
TokenType.JWT_TOKEN.getName())
);

return tokenType == TokenType.ACCESS_TOKEN ? getIMSTokenProvider(props) : getJWTTokenProvider(props);
switch (tokenType) {
case ACCESS_TOKEN:
return getIMSTokenProvider(props);

case JWT_TOKEN:
return getJWTTokenProvider(props);

case OAUTH2_ACCESS_TOKEN:
return getOAuth2IMSTokenProvider(props);

default:
throw new AuthException("Invalid token type to get auth provider");
}
}
} catch (AuthException authException) {
throw new IllegalArgumentException("Exception while instantiating the auth provider", authException);
Expand All @@ -140,6 +153,22 @@ private AuthProvider getIMSTokenProvider(Map<String, String> props) throws AuthE
.build());
}

private AuthProvider getOAuth2IMSTokenProvider(final Map<String, String> props) throws AuthException {
LOG.info("Get auth token for type oauth2_access_token");

return AuthProviderFactory.getAuthProvider(TokenType.OAUTH2_ACCESS_TOKEN, ImmutableMap.<String, String>builder()
.put(AuthUtils.AUTH_CLIENT_ID, props.get(AEP_CONNECTION_AUTH_CLIENT_ID))
.put(AuthUtils.AUTH_CLIENT_SECRET, props.get(AEP_CONNECTION_AUTH_CLIENT_SECRET))
.put(AuthUtils.AUTH_ENDPOINT, props.get(AEP_CONNECTION_AUTH_ENDPOINT))
.build(), AuthProxyConfiguration.builder()
.proxyHost(SinkUtils.getProperty(props, AEP_CONNECTION_PROXY_HOST, null))
.proxyPort(SinkUtils.getProperty(props, AEP_CONNECTION_PROXY_PORT, 443))
.proxyUsername(SinkUtils.getProperty(props, AEP_CONNECTION_PROXY_USER, null))
.proxyPassword(SinkUtils.getProperty(props, AEP_CONNECTION_PROXY_PASSWORD, null))
.build());
}

@Deprecated
private AuthProvider getJWTTokenProvider(Map<String, String> props) throws AuthException {
return AuthProviderFactory.getAuthProvider(TokenType.JWT_TOKEN, ImmutableMap.<String, String>builder()
.put(AuthUtils.AUTH_CLIENT_ID, props.get(AEP_CONNECTION_AUTH_CLIENT_ID))
Expand Down
Loading

0 comments on commit a050275

Please sign in to comment.