Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.net.URL;
import java.time.Clock;
import java.time.Duration;
import org.apache.pulsar.client.api.Authentication;

/**
Expand All @@ -31,9 +32,9 @@ public final class AuthenticationFactoryOAuth2 {
/**
* Authenticate with client credentials.
*
* @param issuerUrl the issuer URL
* @param issuerUrl the issuer URL
* @param credentialsUrl the credentials URL
* @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0.
* @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0.
* @return an Authentication object
*/
public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) {
Expand All @@ -43,23 +44,144 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl
/**
* Authenticate with client credentials.
*
* @param issuerUrl the issuer URL
* @param issuerUrl the issuer URL
* @param credentialsUrl the credentials URL
* @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0.
* @param scope An optional field. The value of the scope parameter is expressed as a list of space-delimited,
* case-sensitive strings. The strings are defined by the authorization server.
* If the value contains multiple space-delimited strings, their order does not matter,
* and each string adds an additional access range to the requested scope.
* From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
* @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0.
* @param scope An optional field. The value of the scope parameter is expressed as a list of
* space-delimited,
* case-sensitive strings. The strings are defined by the authorization server.
* If the value contains multiple space-delimited strings, their order does not matter,
* and each string adds an additional access range to the requested scope.
* From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
* @return an Authentication object
*/
public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope) {
ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
.issuerUrl(issuerUrl)
.privateKey(credentialsUrl.toExternalForm())
.audience(audience)
.scope(scope)
.build();
return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
return clientCredentialsBuilder().issuerUrl(issuerUrl).credentialsUrl(credentialsUrl).audience(audience)
.scope(scope).build();
}

/**
* A builder to create an authentication with client credentials.
*
* @return the builder
*/
public static ClientCredentialsBuilder clientCredentialsBuilder() {
return new ClientCredentialsBuilder();
}

public static class ClientCredentialsBuilder {

private URL issuerUrl;
private URL credentialsUrl;
private String audience;
private String scope;
private Duration connectTimeout;
private Duration readTimeout;
private String trustCertsFilePath;

private ClientCredentialsBuilder() {
}

/**
* Required issuer URL.
*
* @param issuerUrl the issuer URL
* @return the builder
*/
public ClientCredentialsBuilder issuerUrl(URL issuerUrl) {
this.issuerUrl = issuerUrl;
return this;
}

/**
* Required credentials URL.
*
* @param credentialsUrl the credentials URL
* @return the builder
*/
public ClientCredentialsBuilder credentialsUrl(URL credentialsUrl) {
this.credentialsUrl = credentialsUrl;
return this;
}

/**
* Optional audience identifier used by some Identity Providers, like Auth0.
*
* @param audience the audiance
* @return the builder
*/
public ClientCredentialsBuilder audience(String audience) {
this.audience = audience;
return this;
}

/**
* Optional scope expressed as a list of space-delimited, case-sensitive strings.
* The strings are defined by the authorization server.
* If the value contains multiple space-delimited strings, their order does not matter,
* and each string adds an additional access range to the requested scope.
* From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
*
* @param scope the scope
* @return the builder
*/
public ClientCredentialsBuilder scope(String scope) {
this.scope = scope;
return this;
}

/**
* Optional HTTP connection timeout.
*
* @param connectTimeout the connect timeout
* @return the builder
*/
public ClientCredentialsBuilder connectTimeout(Duration connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}

/**
* Optional HTTP read timeout.
*
* @param readTimeout the read timeout
* @return the builder
*/
public ClientCredentialsBuilder readTimeout(Duration readTimeout) {
this.readTimeout = readTimeout;
return this;
}

/**
* Optional path to the file containing the trusted certificate(s) of the token issuer.
*
* @param trustCertsFilePath the path to the file containing the trusted certificate(s)
* @return the builder
*/
public ClientCredentialsBuilder trustCertsFilePath(String trustCertsFilePath) {
this.trustCertsFilePath = trustCertsFilePath;
return this;
}

/**
* Authenticate with client credentials.
*
* @return an Authentication object
*/
public Authentication build() {
ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
.issuerUrl(issuerUrl)
.privateKey(credentialsUrl == null ? null : credentialsUrl.toExternalForm())
.audience(audience)
.scope(scope)
.connectTimeout(connectTimeout)
.readTimeout(readTimeout)
.trustCertsFilePath(trustCertsFilePath)
.build();
return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
}

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -60,62 +61,17 @@ class ClientCredentialsFlow extends FlowBase {
private boolean initialized = false;

@Builder
public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope) {
super(issuerUrl);
public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope,
Duration connectTimeout, Duration readTimeout, String trustCertsFilePath) {
super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath);
this.audience = audience;
this.privateKey = privateKey;
this.scope = scope;
}

@Override
public void initialize() throws PulsarClientException {
super.initialize();
assert this.metadata != null;

URL tokenUrl = this.metadata.getTokenEndpoint();
this.exchanger = new TokenClient(tokenUrl);
initialized = true;
}

public TokenResult authenticate() throws PulsarClientException {
// read the private key from storage
KeyFile keyFile;
try {
keyFile = loadPrivateKey(this.privateKey);
} catch (IOException e) {
throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage());
}

// request an access token using client credentials
ClientCredentialsExchangeRequest req = ClientCredentialsExchangeRequest.builder()
.clientId(keyFile.getClientId())
.clientSecret(keyFile.getClientSecret())
.audience(this.audience)
.scope(this.scope)
.build();
TokenResult tr;
if (!initialized) {
initialize();
}
try {
tr = this.exchanger.exchangeClientCredentials(req);
} catch (TokenExchangeException | IOException e) {
throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: "
+ e.getMessage());
}

return tr;
}

@Override
public void close() throws Exception {
if (exchanger != null) {
exchanger.close();
}
}

/**
* Constructs a {@link ClientCredentialsFlow} from configuration parameters.
*
* @param params
* @return
*/
Expand All @@ -125,16 +81,24 @@ public static ClientCredentialsFlow fromParameters(Map<String, String> params) {
// These are optional parameters, so we only perform a get
String scope = params.get(CONFIG_PARAM_SCOPE);
String audience = params.get(CONFIG_PARAM_AUDIENCE);
Duration connectTimeout = parseParameterDuration(params, CONFIG_PARAM_CONNECT_TIMEOUT);
Duration readTimeout = parseParameterDuration(params, CONFIG_PARAM_READ_TIMEOUT);
String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH);

return ClientCredentialsFlow.builder()
.issuerUrl(issuerUrl)
.audience(audience)
.privateKey(privateKeyUrl)
.scope(scope)
.connectTimeout(connectTimeout)
.readTimeout(readTimeout)
.trustCertsFilePath(trustCertsFilePath)
.build();
}

/**
* Loads the private key from the given URL.
*
* @param privateKeyURL
* @return
* @throws IOException
Expand Down Expand Up @@ -162,4 +126,52 @@ private static KeyFile loadPrivateKey(String privateKeyURL) throws IOException {
throw new IOException("Invalid privateKey format", e);
}
}

@Override
public void initialize() throws PulsarClientException {
super.initialize();
assert this.metadata != null;

URL tokenUrl = this.metadata.getTokenEndpoint();
this.exchanger = new TokenClient(tokenUrl, httpClient);
initialized = true;
}

public TokenResult authenticate() throws PulsarClientException {
// read the private key from storage
KeyFile keyFile;
try {
keyFile = loadPrivateKey(this.privateKey);
} catch (IOException e) {
throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage());
}

// request an access token using client credentials
ClientCredentialsExchangeRequest req = ClientCredentialsExchangeRequest.builder()
.clientId(keyFile.getClientId())
.clientSecret(keyFile.getClientSecret())
.audience(this.audience)
.scope(this.scope)
.build();
TokenResult tr;
if (!initialized) {
initialize();
}
try {
tr = this.exchanger.exchangeClientCredentials(req);
} catch (TokenExchangeException | IOException e) {
throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: "
+ e.getMessage());
}

return tr;
}

@Override
public void close() throws Exception {
super.close();
if (exchanger != null) {
exchanger.close();
}
}
}
Loading
Loading