Configure Go Clients for OAuth/OIDC on Confluent Cloud

Prerequisites

Before you begin, ensure you have the following:

Language and tooling

  • Go 1.18+
  • confluent-kafka-go: v2.0.0 or later
  • Confluent Platform: 7.2.1 or later; 7.1.3 or later

Confluent Cloud environment

  • OAuth setup in Confluent Cloud - Configure OAuth authentication

Credentials and identifiers

  • Client ID - Your application’s identifier (like a username)
  • Client secret - Your application’s password for OAuth
  • Token endpoint URL - Where to request access tokens
  • Scopes - Permissions your application needs (for example, kafka:read kafka:write)
  • Cluster ID (lkc-xxxxx) - Your Kafka cluster identifier
  • Identity pool ID (pool-xxxxx) - If using identity pools

Client library

  • Install the latest version with OAuth support:

    go get github.com/confluentinc/confluent-kafka-go/v2/kafka
    

Configure Kafka Go clients

Go Kafka clients can authenticate to Confluent Cloud clusters using the OAuth 2.0 protocol. The Go client uses the confluent-kafka-go library which is based on librdkafka.

Configuration approach

Required parameters

Parameter Description Required Value
sasl.mechanism Sets the authentication method for OAuth. OAUTHBEARER
sasl.oauth.token.endpoint.uri OAuth token endpoint URL from your identity provider. (Provided by IdP)
sasl.oauth.client.id OAuth client ID from your identity provider. (Provided by IdP)
sasl.oauth.client.secret OAuth client secret from your identity provider. (Provided by IdP)
sasl.oauth.scope OAuth scopes for token request (optional). (Optional)
sasl.oauth.logical.cluster Logical cluster ID for Confluent Cloud. lkc-xxxxx
sasl.oauth.identity.pool.id Identity pool ID for Confluent Cloud (optional). (Optional)
sasl.oauth.connect.timeout.ms Connection timeout for token requests (optional). 10000 (default)
sasl.oauth.read.timeout.ms Read timeout for token requests (optional). 10000 (default)
oauth_cb Go function for OAuth token retrieval. Your callback function

OAuth callback function

The Go client requires an OAuth callback function with the following signature:

func oauthCallback(oauthConfig string) (string, error) {
    // Implementation here
    return "", nil
}

The callback function receives a JSON string containing the OAuth configuration parameters from librdkafka. This includes all the sasl.oauth.* parameters that were set in the client configuration. The function should parse this JSON, make the OAuth token request, and return the access token as a string.

Configuration example

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "strings"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

type OAuthConfig struct {
    TokenEndpoint string `json:"token_endpoint"`
    ClientID      string `json:"client_id"`
    ClientSecret  string `json:"client_secret"`
    Scope         string `json:"scope"`
}

type TokenResponse struct {
    AccessToken string `json:"access_token"`
    TokenType   string `json:"token_type"`
    ExpiresIn   int    `json:"expires_in"`
}

func oauthCallback(oauthConfig string) (string, error) {
    // Parse OAuth configuration from JSON string
    var config OAuthConfig
    if err := json.Unmarshal([]byte(oauthConfig), &config); err != nil {
        return "", fmt.Errorf("failed to parse OAuth config: %v", err)
    }

    // Prepare the token request
    data := url.Values{}
    data.Set("grant_type", "client_credentials")
    data.Set("client_id", config.ClientID)
    data.Set("client_secret", config.ClientSecret)

    if config.Scope != "" {
        data.Set("scope", config.Scope)
    }

    // Create HTTP request
    req, err := http.NewRequest("POST", config.TokenEndpoint, strings.NewReader(data.Encode()))
    if err != nil {
        return "", fmt.Errorf("failed to create request: %v", err)
    }

    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

    // Send request with timeout
    client := &http.Client{
        Timeout: 10 * time.Second,
    }

    resp, err := client.Do(req)
    if err != nil {
        return "", fmt.Errorf("failed to send request: %v", err)
    }
    defer resp.Body.Close()

    // Read the body ONCE
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return "", fmt.Errorf("failed to read response body: %v", err)
    }

    if resp.StatusCode != http.StatusOK {
        // Use the body we already read for the error message
        return "", fmt.Errorf("OAuth request failed with status %d: %s", resp.StatusCode, string(body))
    }

    // Parse the body we already read
    var tokenResp TokenResponse
    if err := json.Unmarshal(body, &tokenResp); err != nil {
        return "", fmt.Errorf("failed to parse token response: %v", err)
    }

    return tokenResp.AccessToken, nil
}

// Enhanced OAuth callback with retry logic
func oauthCallbackWithRetry(oauthConfig string) (string, error) {
    maxRetries := 3
    baseDelay := time.Second

    for attempt := 1; attempt <= maxRetries; attempt++ {
        token, err := oauthCallback(oauthConfig)
        if err == nil {
            return token, nil
        }

        fmt.Printf("OAuth token request failed (attempt %d/%d): %v\n", attempt, maxRetries, err)

        if attempt < maxRetries {
            delay := time.Duration(float64(baseDelay) * float64(1<<(attempt-1))) // Exponential backoff
            time.Sleep(delay)
        }
    }

    return "", fmt.Errorf("failed to obtain OAuth token after %d attempts", maxRetries)
}

func main() {
    // Producer configuration with integrated retry logic
    producerConfig := &kafka.ConfigMap{
        "bootstrap.servers":        "your-cluster-endpoint:9092",
        "security.protocol":        "SASL_SSL",
        "sasl.mechanism":          "OAUTHBEARER",
        "sasl.oauth.token.endpoint.uri": "http://your-oauth-provider.com/oauth2/token",
        "sasl.oauth.client.id":    "your-client-id",
        "sasl.oauth.client.secret": "your-client-secret",
        "sasl.oauth.scope":        "kafka:read kafka:write",
        "sasl.oauth.logical.cluster": "lkc-xxxxx",
        "sasl.oauth.identity.pool.id": "pool-xxxxx",
        "sasl.oauth.connect.timeout.ms": 10000,
        "sasl.oauth.read.timeout.ms":   10000,
        "oauth_cb":                oauthCallbackWithRetry, // Using enhanced callback with retry
    }

    // Create producer
    producer, err := kafka.NewProducer(producerConfig)
    if err != nil {
        panic(fmt.Sprintf("Failed to create producer: %v", err))
    }
    defer producer.Close()

    // Consumer configuration with integrated retry logic
    consumerConfig := &kafka.ConfigMap{
        "bootstrap.servers":        "your-cluster-endpoint:9092",
        "security.protocol":        "SASL_SSL",
        "sasl.mechanism":          "OAUTHBEARER",
        "sasl.oauth.token.endpoint.uri": "http://your-oauth-provider.com/oauth2/token",
        "sasl.oauth.client.id":    "your-client-id",
        "sasl.oauth.client.secret": "your-client-secret",
        "sasl.oauth.scope":        "kafka:read kafka:write",
        "sasl.oauth.logical.cluster": "lkc-xxxxx",
        "sasl.oauth.identity.pool.id": "pool-xxxxx",
        "oauth_cb":                oauthCallbackWithRetry, // Using enhanced callback with retry
        "group.id":                "your-consumer-group",
        "auto.offset.reset":       "earliest",
    }

    // Create consumer
    consumer, err := kafka.NewConsumer(consumerConfig)
    if err != nil {
        panic(fmt.Sprintf("Failed to create consumer: %v", err))
    }
    defer consumer.Close()
}

Test your configuration

  1. Enable debug logging to troubleshoot OAuth issues:

    // Add to your configuration
    config := &kafka.ConfigMap{
        // ... your OAuth config
        "debug": "security,protocol,broker",
    }
    
  2. Test with a simple producer:

    // Test producer
    producer, err := kafka.NewProducer(config)
    if err != nil {
        log.Fatal("Failed to create producer: ", err)
    }
    defer producer.Close()
    
    topic := "test-topic"
    message := "test-message"
    
    producer.ProduceChannel() <- &kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          []byte(message),
    }
    
    // Wait for delivery
    for e := range producer.Events() {
        switch ev := e.(type) {
        case *kafka.Message:
            if ev.TopicPartition.Error != nil {
                fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error)
            } else {
                fmt.Printf("Message delivered to %s [%d] - OAuth is working!\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition)
            }
            return
        }
    }
    
  3. Check for common errors: - “SASL authentication failed” - Check your OAuth credentials and endpoint - “Invalid token” - Verify your callback function is returning a valid token - “Connection timeout” - Check your bootstrap servers and network connectivity

  4. Verify token refresh - The client should automatically refresh tokens when they expire

Custom OAuth implementations

For advanced use cases, implement custom token providers:

import (
    "crypto/rsa"
    "crypto/x509"
    "encoding/pem"
    "fmt"
    "os"
    "time"

    "github.com/golang-jwt/jwt/v4"
)

type JWTCustomTokenProvider struct {
    privateKey *rsa.PrivateKey
    clientID   string
    tokenEndpoint string
}

func NewJWTCustomTokenProvider(privateKeyPath, clientID, tokenEndpoint string) (*JWTCustomTokenProvider, error) {
    // Load private key
    keyBytes, err := os.ReadFile(privateKeyPath)
    if err != nil {
        return nil, fmt.Errorf("failed to read private key: %v", err)
    }

    block, _ := pem.Decode(keyBytes)
    if block == nil {
        return nil, fmt.Errorf("failed to decode PEM block")
    }

    privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
    if err != nil {
        return nil, fmt.Errorf("failed to parse private key: %v", err)
    }

    return &JWTCustomTokenProvider{
        privateKey:   privateKey,
        clientID:     clientID,
        tokenEndpoint: tokenEndpoint,
    }, nil
}

func (p *JWTCustomTokenProvider) CreateJwtAssertion(audience, issuer string) (string, error) {
    now := time.Now()

    claims := jwt.MapClaims{
        "iss": issuer,
        "sub": p.clientID,
        "aud": audience,
        "iat": now.Unix(),
        "exp": now.Add(time.Hour).Unix(),
        "jti": fmt.Sprintf("jwt-%d", now.Unix()),
    }

    token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
    return token.SignedString(p.privateKey)
}

func (p *JWTCustomTokenProvider) GetToken(audience, issuer string) (string, error) {
    assertion, err := p.CreateJwtAssertion(audience, issuer)
    if err != nil {
        return "", fmt.Errorf("failed to create JWT assertion: %v", err)
    }

    data := url.Values{}
    data.Set("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer")
    data.Set("assertion", assertion)

    req, err := http.NewRequest("POST", p.tokenEndpoint, strings.NewReader(data.Encode()))
    if err != nil {
        return "", fmt.Errorf("failed to create request: %v", err)
    }

    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

    client := &http.Client{Timeout: 10 * time.Second}
    resp, err := client.Do(req)
    if err != nil {
        return "", fmt.Errorf("failed to send request: %v", err)
    }
    defer resp.Body.Close()

    // Read the body ONCE
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return "", fmt.Errorf("failed to read response body: %v", err)
    }

    if resp.StatusCode != http.StatusOK {
        return "", fmt.Errorf("token request failed with status %d: %s", resp.StatusCode, string(body))
    }

    var tokenResp TokenResponse
    if err := json.Unmarshal(body, &tokenResp); err != nil {
        return "", fmt.Errorf("failed to parse token response: %v", err)
    }

    return tokenResp.AccessToken, nil
}

// NewCustomOAuthCallback creates and returns a Kafka OAuth callback function
// that uses a JWT provider for token retrieval.
func NewCustomOAuthCallback(privateKeyPath, clientID, tokenEndpoint string) (func(string) (string, error), error) {
    jwtProvider, err := NewJWTCustomTokenProvider(
        privateKeyPath,
        clientID,
        tokenEndpoint,
    )
    if err != nil {
        return nil, fmt.Errorf("failed to create JWT provider: %v", err)
    }

    // Return a closure that has access to the jwtProvider
    return func(oauthConfig string) (string, error) {
        return jwtProvider.GetToken(
            "http://your-oauth-provider.com", // audience
            "your-client-id",                // issuer
        )
    }, nil
}

// --- In your main application setup ---

// 1. Create the callback function once
customCallback, err := NewCustomOAuthCallback(
    "/path/to/private_key.pem",
    "your-client-id",
    "http://your-oauth-provider.com/oauth2/token",
)
if err != nil {
    panic(err)
}

// 2. Use it in the Kafka configuration
producerConfig := &kafka.ConfigMap{
    // ... other properties
    "oauth_cb": customCallback,
}

Google OIDC integration

For Google OIDC integration with Go clients:

import (
    "context"
    "fmt"
    "time"

    "golang.org/x/oauth2/google"
    "google.golang.org/api/option"
)

func googleOAuthCallback(oauthConfig string) (string, error) {
    // Load service account credentials
    credentials, err := google.CredentialsFromFile(
        "path/to/service-account-key.json",
        "http://www.googleapis.com/auth/cloud-platform",
    )
    if err != nil {
        return "", fmt.Errorf("failed to load credentials: %v", err)
    }

    // Get token
    token, err := credentials.TokenSource.Token()
    if err != nil {
        return "", fmt.Errorf("failed to get token: %v", err)
    }

    return token.AccessToken, nil
}

// Google OIDC configuration
googleConfig := &kafka.ConfigMap{
    "bootstrap.servers":        "your-cluster-endpoint:9092",
    "security.protocol":        "SASL_SSL",
    "sasl.mechanism":          "OAUTHBEARER",
    "sasl.oauth.logical.cluster": "lkc-xxxxx",
    "sasl.oauth.identity.pool.id": "pool-xxxxx",
    "oauth_cb":                googleOAuthCallback,
}

Note

The googleOAuthCallback function loads credentials directly from the service account JSON file and ignores the oauthConfig string passed by the client. For this reason, properties like sasl.oauth.token.endpoint.uri are not needed in the ConfigMap when using this callback.

Configure Schema Registry Go clients

The Schema Registry Go client authenticates using a built-in OAuth mechanism. You only need to provide the correct configuration parameters, and the client handles the token request automatically. It does not use the oauth_cb callback.

Required parameters

The following parameters must be included in the Schema Registry client configuration:

Parameter Description
bearer.auth.credentials.source Set to OAUTHBEARER for OAuth authentication
bearer.auth.issuer.endpoint.url OAuth token endpoint URL from your identity provider
bearer.auth.client.id OAuth client ID from your identity provider
bearer.auth.client.secret OAuth client secret from your identity provider
bearer.auth.scope OAuth scopes for token request (optional)
bearer.auth.logical.cluster Logical cluster ID for Schema Registry (lsrc-xxxxx)
bearer.auth.identity.pool.id Identity pool ID for Confluent Cloud (optional)

Configuration example

import (
    "github.com/confluentinc/confluent-kafka-go/schemaregistry"
)

// Schema Registry configuration with OAuth
schemaRegistryConfig := map[string]string{
    "url": "http://<your-schema-registry-endpoint>",
    "bearer.auth.credentials.source": "OAUTHBEARER",
    "bearer.auth.issuer.endpoint.url": "http://<your-idp.com>/oauth2/token",
    "bearer.auth.client.id": "<your-client-id>",
    "bearer.auth.client.secret": "<your-client-secret>",
    "bearer.auth.scope": "schema:read",
    "bearer.auth.logical.cluster": "<lsrc-xxxxx>",
    "bearer.auth.identity.pool.id": "<pool-yyyyy>",
}

// Create Schema Registry client
schemaRegistryClient, err := schemaregistry.NewClient(schemaRegistryConfig)
if err != nil {
    panic(fmt.Sprintf("Failed to create Schema Registry client: %v", err))
}

Inherit OAuth configuration from Kafka client

You can also inherit the OAuth configuration from your Kafka client:

// Schema Registry configuration inheriting from Kafka client
schemaRegistryConfig := map[string]string{
    "url": "http://<your-schema-registry-endpoint>",
    "bearer.auth.credentials.source": "SASL_OAUTHBEARER_INHERIT",
    "bearer.auth.logical.cluster": "<lsrc-xxxxx>",
    "bearer.auth.identity.pool.id": "<pool-yyyyy>",
}

Custom token provider

For custom OAuth implementations, use the following configuration:

schemaRegistryConfig := map[string]string{
    "url": "http://<your-schema-registry-endpoint>",
    "bearer.auth.credentials.source": "CUSTOM",
    "bearer.auth.custom.provider.class": "<fully-qualified-class-name>",
    "bearer.auth.logical.cluster": "<lsrc-resource-id>",
    "bearer.auth.identity.pool.id": "<identity-pool-id>",
}

Troubleshoot Go OAuth clients

Common issues and solutions for Go OAuth clients:

Authentication failures

  • Verify client ID and secret are correct
  • Check token endpoint URL is accessible
  • Ensure logical cluster ID is valid
  • Validate identity pool ID if used

Network issues

  • Confirm network connectivity to OAuth provider
  • Check firewall rules allow OAuth traffic
  • Verify SSL certificate validation

Configuration issues

  • Ensure all required parameters are provided
  • Validate OAuth callback function signature
  • Check timeout values are reasonable

Debug logging

Enable librdkafka’s internal debug logs for troubleshooting OAuth issues by setting the debug property in your ConfigMap. This provides far more detail than standard Go logging for client-broker communication.

producerConfig := &kafka.ConfigMap{
    // ... other properties
    "debug": "security,protocol,broker",
}

Common Go-specific issues

  • Import errors: Ensure confluent-kafka-go is properly installed
  • Callback function: Verify the OAuth callback function signature matches requirements
  • Token refresh: Implement proper token refresh logic with error handling
  • Thread safety: Ensure OAuth callback is thread-safe for multi-threaded applications

Performance considerations

  • Implement token caching to avoid repeated OAuth requests
  • Use connection pooling for HTTP requests to OAuth provider
  • Consider implementing token refresh before expiration