OAuth Configuration Reference for Confluent Cloud Clients

This section provides a comprehensive reference for OAuth configuration parameters across different client types and platforms.

Java client parameters

Parameter Description Default Required
sasl.mechanism SASL mechanism for OAuth authentication OAUTHBEARER Yes
sasl.jaas.config JAAS configuration for OAuth login module None Yes
sasl.login.callback.handler.class OAuth callback handler class io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler Yes
sasl.login.connect.timeout.ms Connection timeout for token requests 10000 No
sasl.login.read.timeout.ms Read timeout for token requests 10000 No
sasl.login.retry.backoff.ms Initial retry backoff for failed requests 100 No
sasl.login.retry.backoff.max.ms Maximum retry backoff for failed requests 10000 No
oauth.token.endpoint.uri OAuth token endpoint URL None Yes
oauth.client.id OAuth client ID None Yes
oauth.client.secret OAuth client secret None Yes
oauth.scope OAuth scopes for token request None No
oauth.logical.cluster Logical cluster ID for Confluent Cloud None Yes
oauth.identity.pool.id Identity pool ID for Confluent Cloud None No

JAAS configuration parameters

Parameter Description Default Required
oauth.token.endpoint.uri OAuth token endpoint URL None Yes
oauth.client.id OAuth client ID None Yes
oauth.client.secret OAuth client secret None Yes
oauth.scope OAuth scopes for token request None No
oauth.logical.cluster Logical cluster ID for Confluent Cloud None Yes
oauth.identity.pool.id Identity pool ID for Confluent Cloud None No

Python client parameters

Parameter Description Default Required
sasl.mechanism SASL mechanism for OAuth authentication OAUTHBEARER Yes
sasl.oauth.token.endpoint.uri OAuth token endpoint URL None Yes
sasl.oauth.client.id OAuth client ID None Yes
sasl.oauth.client.secret OAuth client secret None Yes
sasl.oauth.scope OAuth scopes for token request None No
sasl.oauth.logical.cluster Logical cluster ID for Confluent Cloud None Yes
sasl.oauth.identity.pool.id Identity pool ID for Confluent Cloud None No
sasl.oauth.connect.timeout.ms Connection timeout for token requests 10000 No
sasl.oauth.read.timeout.ms Read timeout for token requests 10000 No

.NET client parameters

Parameter Description Default Required
sasl.mechanism SASL mechanism for OAuth authentication OAUTHBEARER Yes
sasl.oauth.token.endpoint.uri OAuth token endpoint URL None Yes
sasl.oauth.client.id OAuth client ID None Yes
sasl.oauth.client.secret OAuth client secret None Yes
sasl.oauth.scope OAuth scopes for token request None No
sasl.oauth.logical.cluster Logical cluster ID for Confluent Cloud None Yes
sasl.oauth.identity.pool.id Identity pool ID for Confluent Cloud None No
sasl.oauth.connect.timeout.ms Connection timeout for token requests 10000 No
sasl.oauth.read.timeout.ms Read timeout for token requests 10000 No

Go client parameters

Parameter Description Default Required
sasl.mechanism SASL mechanism for OAuth authentication OAUTHBEARER Yes
sasl.oauth.token.endpoint.uri OAuth token endpoint URL None Yes
sasl.oauth.client.id OAuth client ID None Yes
sasl.oauth.client.secret OAuth client secret None Yes
sasl.oauth.scope OAuth scopes for token request None No
sasl.oauth.logical.cluster Logical cluster ID for Confluent Cloud None Yes
sasl.oauth.identity.pool.id Identity pool ID for Confluent Cloud None No
sasl.oauth.connect.timeout.ms Connection timeout for token requests 10000 No
sasl.oauth.read.timeout.ms Read timeout for token requests 10000 No

Schema Registry client parameters

Parameter Description Default Required
bearer.auth.credentials.source Authentication method. Use OAUTHBEARER for OAuth None Yes
bearer.auth.issuer.endpoint.url OAuth token endpoint URL None Yes
bearer.auth.client.id OAuth client ID None Yes
bearer.auth.client.secret OAuth client secret None Yes
bearer.auth.scope OAuth scopes for token request None No
bearer.auth.logical.cluster Schema Registry logical cluster ID (lsrc-xxxxx) None Yes
bearer.auth.identity.pool.id Identity pool ID for Confluent Cloud None No

Configuration validation

When configuring OAuth authentication, validate your configuration:

  • Required parameters: Ensure all required parameters are provided.
  • URL format: Verify token endpoint URLs are properly formatted
  • Credentials: Validate client ID and secret are correct
  • Network access: Confirm network connectivity to OAuth provider
  • Permissions: Verify client has appropriate scopes and permissions
// Configuration validation example
public static void validateOAuthConfig(Properties props) {
    // Check required parameters
    String[] required = {
        "sasl.mechanism",
        "sasl.jaas.config",
        "oauth.token.endpoint.uri",
        "oauth.client.id",
        "oauth.client.secret",
        "oauth.logical.cluster"
    };

    for (String param : required) {
        if (!props.containsKey(param)) {
            throw new IllegalArgumentException("Missing required parameter: " + param);
        }
    }

    // Validate timeout values
    int connectTimeout = Integer.parseInt(props.getProperty("sasl.login.connect.timeout.ms", "10000"));
    int readTimeout = Integer.parseInt(props.getProperty("sasl.login.read.timeout.ms", "10000"));

    if (connectTimeout < 1000 || readTimeout < 1000) {
        throw new IllegalArgumentException("Timeout values must be at least 1000ms");
    }
}

Configuration templates

Use configuration templates for consistent setup:

# oauth-config-template.yaml
kafka:
  oauth:
    token_endpoint: ${OAUTH_TOKEN_ENDPOINT}
    client_id: ${OAUTH_CLIENT_ID}
    client_secret: ${OAUTH_CLIENT_SECRET}
    scope: ${OAUTH_SCOPE}
    logical_cluster: ${LOGICAL_CLUSTER_ID}
    identity_pool: ${IDENTITY_POOL_ID}
    timeouts:
      connect: 10000
      read: 10000
      retry_backoff: 100
      retry_max: 10000

Environment-specific configurations

Development environment

# Development OAuth configuration
sasl.oauthbearer.token.endpoint.url=http://dev-auth.example.com/oauth2/token
sasl.oauthbearer.client.id=dev-kafka-client
sasl.oauthbearer.client.secret=${DEV_CLIENT_SECRET}
sasl.oauthbearer.scope=kafka:dev

Staging environment

# Staging OAuth configuration
sasl.oauthbearer.token.endpoint.url=http://staging-auth.example.com/oauth2/token
sasl.oauthbearer.client.id=staging-kafka-client
sasl.oauthbearer.client.secret=${STAGING_CLIENT_SECRET}
sasl.oauthbearer.scope=kafka:staging

Production environment

# Production OAuth configuration
sasl.oauthbearer.token.endpoint.url=http://prod-auth.example.com/oauth2/token
sasl.oauthbearer.client.id=prod-kafka-client
sasl.oauthbearer.client.secret=${PROD_CLIENT_SECRET}
sasl.oauthbearer.scope=kafka:prod

Advanced configuration options

Custom token providers

For advanced use cases, implement custom token providers:

public class CustomTokenProvider implements BearerAuthCredentialProvider {

    private final String tokenEndpoint;
    private final String clientId;
    private final String clientSecret;
    private final Map<String, String> additionalHeaders;

    public CustomTokenProvider(String tokenEndpoint, String clientId,
                             String clientSecret, Map<String, String> additionalHeaders) {
        this.tokenEndpoint = tokenEndpoint;
        this.clientId = clientId;
        this.clientSecret = clientSecret;
        this.additionalHeaders = additionalHeaders;
    }

    @Override
    public String getBearerToken(URL url) throws IOException {
        // Custom token retrieval logic
        HttpURLConnection connection = (HttpURLConnection) new URL(tokenEndpoint).openConnection();
        connection.setRequestMethod("POST");
        connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");

        // Add custom headers
        if (additionalHeaders != null) {
            for (Map.Entry<String, String> header : additionalHeaders.entrySet()) {
                connection.setRequestProperty(header.getKey(), header.getValue());
            }
        }

        // Set up authentication
        String auth = clientId + ":" + clientSecret;
        String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes());
        connection.setRequestProperty("Authorization", "Basic " + encodedAuth);

        // Send request
        String postData = "grant_type=client_credentials";
        connection.setDoOutput(true);
        try (OutputStream os = connection.getOutputStream()) {
            os.write(postData.getBytes());
        }

        // Parse response
        try (BufferedReader br = new BufferedReader(
                new InputStreamReader(connection.getInputStream()))) {
            StringBuilder response = new StringBuilder();
            String line;
            while ((line = br.readLine()) != null) {
                response.append(line);
            }

            // Parse JSON response
            ObjectMapper mapper = new ObjectMapper();
            JsonNode jsonNode = mapper.readTree(response.toString());
            return jsonNode.get("access_token").asText();
        }
    }
}

Retry and circuit breaker patterns

public class CircuitBreaker {
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final AtomicLong lastFailureTime = new AtomicLong(0);
    private final int threshold;
    private final long timeout;

    public CircuitBreaker(int threshold, long timeout) {
        this.threshold = threshold;
        this.timeout = timeout;
    }

    public boolean isOpen() {
        long now = System.currentTimeMillis();
        if (failureCount.get() >= threshold) {
            if (now - lastFailureTime.get() < timeout) {
                return true; // Circuit is open
            } else {
                // Reset after timeout
                failureCount.set(0);
                return false;
            }
        }
        return false;
    }

    public void recordFailure() {
        failureCount.incrementAndGet();
        lastFailureTime.set(System.currentTimeMillis());
    }

    public void recordSuccess() {
        failureCount.set(0);
    }
}

public class RetryTemplate {
    private final int maxRetries;
    private final long backoffMs;

    public RetryTemplate(int maxRetries, long backoffMs) {
        this.maxRetries = maxRetries;
        this.backoffMs = backoffMs;
    }

    public <T> T execute(Supplier<T> operation) throws Exception {
        int attempts = 0;
        while (attempts < maxRetries) {
            try {
                return operation.get();
            } catch (Exception e) {
                attempts++;
                if (attempts >= maxRetries) {
                    throw e;
                }
                Thread.sleep(backoffMs * attempts);
            }
        }
        throw new RuntimeException("Max retries exceeded");
    }
}

Deployment considerations

Container deployment

# Dockerfile for OAuth-enabled Kafka client
FROM openjdk:11-jre-slim

# Copy application
COPY target/oauth-kafka-client.jar /app/

# Set environment variables
ENV OAUTH_TOKEN_ENDPOINT=http://your-oauth-provider.com/oauth2/token
ENV OAUTH_CLIENT_ID=your-client-id
ENV OAUTH_CLIENT_SECRET=your-client-secret
ENV OAUTH_SCOPE=kafka:read kafka:write
ENV LOGICAL_CLUSTER_ID=lkc-xxxxx
ENV IDENTITY_POOL_ID=pool-xxxxx

# Run application
CMD ["java", "-jar", "/app/oauth-kafka-client.jar"]

Kubernetes deployment

# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: oauth-kafka-client
spec:
  replicas: 3
  selector:
    matchLabels:
      app: oauth-kafka-client
  template:
    metadata:
      labels:
        app: oauth-kafka-client
    spec:
      containers:
      - name: kafka-client
        image: your-registry/oauth-kafka-client:latest
        env:
        - name: OAUTH_TOKEN_ENDPOINT
          valueFrom:
            secretKeyRef:
              name: oauth-secrets
              key: token-endpoint
        - name: OAUTH_CLIENT_ID
          valueFrom:
            secretKeyRef:
              name: oauth-secrets
              key: client-id
        - name: OAUTH_CLIENT_SECRET
          valueFrom:
            secretKeyRef:
              name: oauth-secrets
              key: client-secret
        - name: OAUTH_SCOPE
          value: "kafka:read kafka:write"
        - name: LOGICAL_CLUSTER_ID
          value: "lkc-xxxxx"
        - name: IDENTITY_POOL_ID
          value: "pool-xxxxx"

CI/CD integration

# .github/workflows/deploy.yml
name: Deploy OAuth Kafka Client

on:
  push:
    branches: [main]

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2

    - name: Set up JDK
      uses: actions/setup-java@v2
      with:
        java-version: '11'

    - name: Build with Maven
      run: mvn clean package

    - name: Build Docker image
      run: docker build -t oauth-kafka-client .

    - name: Deploy to Kubernetes
      run: |
        kubectl apply -f kubernetes-deployment.yaml
        kubectl rollout restart deployment/oauth-kafka-client

Integration testing

Mock OAuth server

@Test
public void testOAuthIntegration() {
    // Start mock OAuth server
    MockOAuthServer mockServer = new MockOAuthServer();
    mockServer.start();

    try {
        // Configure client with mock server
        Properties props = new Properties();
        props.put("sasl.mechanism", "OAUTHBEARER");
        props.put("sasl.jaas.config",
            "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
            "oauth.token.endpoint.uri=\"" + mockServer.getTokenEndpoint() + "\" " +
            "oauth.client.id=\"test-client\" " +
            "oauth.client.secret=\"test-secret\";");

        // Test authentication
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        // ... test operations

    } finally {
        mockServer.stop();
    }
}

Performance testing

@Test
public void testOAuthPerformance() {
    // Measure token retrieval performance
    long startTime = System.currentTimeMillis();

    for (int i = 0; i < 100; i++) {
        // Perform OAuth token request
        String token = oauthProvider.getToken();
        assertNotNull(token);
    }

    long endTime = System.currentTimeMillis();
    long duration = endTime - startTime;

    // Assert performance requirements
    assertTrue("OAuth performance below threshold", duration < 5000);
}