blueprint.provider.franz¶
Blueprint Kafka client built on franz-go, providing high-performance Kafka operations with native batch processing, transactions, and async production.
Features¶
- High-performance: Built on franz-go for optimal throughput and latency
- Batch processing: Native batch consumption with
ConsumeBatchesandConsumeFetches - Async production: Non-blocking production with per-record callbacks
- Transactions: Full support for Kafka transactions with automatic commit/abort
- Multiple authentication methods: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, AWS MSK IAM, OAuth/OIDC
- Secure credentials: In-memory encryption for passwords and secrets
- TLS support: Full TLS configuration for secure connections
- Fluent API: Builder pattern for record creation
- Admin operations: Topic and consumer group management
Authentication Types¶
| Type | Constant | Description |
|---|---|---|
| None | AuthTypeNone |
No authentication |
| PLAIN | AuthTypePlain |
SASL PLAIN authentication |
| SCRAM-256 | AuthTypeScram256 |
SCRAM-SHA-256 authentication |
| SCRAM-512 | AuthTypeScram512 |
SCRAM-SHA-512 authentication |
| AWS MSK IAM | AuthTypeAWSMSKIAM |
AWS MSK IAM authentication |
| OAuth | AuthTypeOAuth |
OAuth/OIDC OAUTHBEARER |
Producer¶
Configuration¶
type ProducerConfig struct {
BaseConfig
DefaultTopic string // Default topic for records without explicit topic
// Batching
BatchMaxRecords int // Max records per batch (default: 10000)
BatchMaxBytes int // Max bytes per batch (default: 1MB)
Linger time.Duration // Time to wait for batch fill (default: 0)
// Reliability
Acks string // none, leader, all (default: leader)
Idempotent bool // Enable idempotent producer
TransactionalID string // For transactional producer
// Compression
Compression string // none, gzip, snappy, lz4, zstd
}
Creating a Producer¶
package main
import (
"context"
"github.com/oddbit-project/blueprint/crypt/secure"
"github.com/oddbit-project/blueprint/log"
"github.com/oddbit-project/blueprint/provider/franz"
tlsProvider "github.com/oddbit-project/blueprint/provider/tls"
)
func main() {
logger := log.New("kafka-producer")
cfg := &franz.ProducerConfig{
BaseConfig: franz.BaseConfig{
Brokers: "localhost:9092,localhost:9093",
AuthType: franz.AuthTypeScram256,
Username: "producer-user",
DefaultCredentialConfig: secure.DefaultCredentialConfig{
Password: "producer-password",
// Or use environment variables:
// PasswordEnvVar: "KAFKA_PASSWORD",
// Or use files:
// PasswordFile: "/run/secrets/kafka-password",
},
ClientConfig: tlsProvider.ClientConfig{
TLSEnable: true,
TLSCA: "/path/to/ca.crt",
},
DialTimeout: 30 * time.Second,
RequestTimeout: 30 * time.Second,
},
DefaultTopic: "my-topic",
BatchMaxRecords: 10000,
BatchMaxBytes: 1048576,
Acks: franz.AcksAll,
Compression: franz.CompressionSnappy,
}
producer, err := franz.NewProducer(cfg, logger)
if err != nil {
logger.Fatal(err, "Failed to create producer")
}
defer producer.Close()
// Producer is ready to use
}
Synchronous Production¶
ctx := context.Background()
// Create a record using the fluent builder
record := franz.NewRecord([]byte("Hello, Kafka!")).
WithKey([]byte("user-123")).
WithHeader("trace-id", []byte("abc123"))
// Produce synchronously
results, err := producer.Produce(ctx, record)
if err != nil {
logger.Error(err, "Failed to produce")
return
}
for _, result := range results {
if result.Err != nil {
logger.Error(result.Err, "Record failed", log.KV{
"partition": result.Partition,
})
} else {
logger.Info("Record sent", log.KV{
"partition": result.Partition,
"offset": result.Offset,
})
}
}
Asynchronous Production¶
// Produce asynchronously with callback
err := producer.ProduceAsync(ctx, record, func(result franz.ProduceResult) {
if result.Err != nil {
logger.Error(result.Err, "Async produce failed")
} else {
logger.Info("Async produce succeeded", log.KV{
"partition": result.Partition,
"offset": result.Offset,
})
}
})
if err != nil {
logger.Error(err, "Failed to queue record")
}
// Wait for all buffered records to be sent
if err := producer.Flush(ctx); err != nil {
logger.Error(err, "Flush failed")
}
JSON Production¶
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
}
event := Event{
ID: "evt-123",
Type: "user.created",
Timestamp: time.Now(),
}
// Synchronous JSON production
result, err := producer.ProduceJSON(ctx, event, []byte("user-123"))
if err != nil {
logger.Error(err, "Failed to produce JSON")
}
// Asynchronous JSON production
err = producer.ProduceJSONAsync(ctx, event, []byte("user-123"), func(result franz.ProduceResult) {
if result.Err != nil {
logger.Error(result.Err, "Async JSON produce failed")
}
})
Producing to Multiple Topics¶
// Records can specify their own topic
records := []*franz.Record{
franz.NewRecord([]byte("to topic A")).WithTopic("topic-a"),
franz.NewRecord([]byte("to topic B")).WithTopic("topic-b"),
franz.NewRecord([]byte("to default topic")), // Uses DefaultTopic
}
results, err := producer.Produce(ctx, records...)
Consumer¶
Configuration¶
type ConsumerConfig struct {
BaseConfig
Topics []string // Topics to consume
Group string // Consumer group (required for group consumption)
// Consumer behavior
StartOffset string // start, end (default: end)
IsolationLevel string // uncommitted, committed (default: committed)
// Group settings
SessionTimeout time.Duration // Default: 45s
RebalanceTimeout time.Duration // Default: 60s
HeartbeatInterval time.Duration // Default: 3s
// Fetch settings
FetchMinBytes int // Default: 1
FetchMaxBytes int // Default: 50MB
FetchMaxWait time.Duration // Default: 5s
// Offset management
AutoCommit bool // Default: true
AutoCommitInterval time.Duration // Default: 5s
}
Creating a Consumer¶
package main
import (
"context"
"github.com/oddbit-project/blueprint/crypt/secure"
"github.com/oddbit-project/blueprint/log"
"github.com/oddbit-project/blueprint/provider/franz"
)
func main() {
logger := log.New("kafka-consumer")
cfg := &franz.ConsumerConfig{
BaseConfig: franz.BaseConfig{
Brokers: "localhost:9092",
AuthType: franz.AuthTypeScram256,
Username: "consumer-user",
DefaultCredentialConfig: secure.DefaultCredentialConfig{
Password: "consumer-password",
},
},
Topics: []string{"my-topic", "another-topic"},
Group: "my-consumer-group",
StartOffset: franz.OffsetEnd,
IsolationLevel: franz.IsolationReadCommitted,
SessionTimeout: 45 * time.Second,
AutoCommit: true,
AutoCommitInterval: 5 * time.Second,
}
consumer, err := franz.NewConsumer(cfg, logger)
if err != nil {
logger.Fatal(err, "Failed to create consumer")
}
defer consumer.Close()
// Consumer is ready to use
}
Record-by-Record Consumption¶
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Process records one at a time
err := consumer.Consume(ctx, func(ctx context.Context, record franz.ConsumedRecord) error {
logger.Info("Received record", log.KV{
"topic": record.Topic,
"partition": record.Partition,
"offset": record.Offset,
"key": string(record.Key),
})
// Process the record
if err := processRecord(record); err != nil {
return err // Stops consumption on error
}
return nil
})
if err != nil {
logger.Error(err, "Consumption error")
}
Batch Consumption¶
Batch consumption groups records by topic/partition for more efficient processing:
err := consumer.ConsumeBatches(ctx, func(ctx context.Context, batch franz.Batch) error {
logger.Info("Received batch", log.KV{
"topic": batch.Topic,
"partition": batch.Partition,
"recordCount": len(batch.Records),
})
// Process all records in the batch
for _, record := range batch.Records {
if err := processRecord(record); err != nil {
return err
}
}
// Optionally commit the batch manually (if AutoCommit is disabled)
// return consumer.CommitBatch(ctx, batch)
return nil
})
Fetch-Level Consumption¶
For maximum control and throughput, process entire fetch results:
err := consumer.ConsumeFetches(ctx, func(ctx context.Context, result *franz.FetchResult) error {
if result.HasErrors() {
for _, fetchErr := range result.Errors {
logger.Error(fetchErr.Err, "Fetch error", log.KV{
"topic": fetchErr.Topic,
"partition": fetchErr.Partition,
})
}
// Decide how to handle errors
}
logger.Info("Processing fetch", log.KV{
"batchCount": len(result.Batches),
"recordCount": result.RecordCount(),
})
for _, batch := range result.Batches {
for _, record := range batch.Records {
processRecord(record)
}
}
return nil
})
Channel-Based Consumption¶
Send records to a channel for processing in separate goroutines:
records := make(chan franz.ConsumedRecord, 100)
// Start consumer in goroutine
go func() {
err := consumer.ConsumeChannel(ctx, records)
if err != nil {
logger.Error(err, "Channel consumption error")
}
close(records)
}()
// Process records from channel
for record := range records {
processRecord(record)
}
Manual Offset Commits¶
When AutoCommit is disabled, commit offsets manually:
cfg := &franz.ConsumerConfig{
// ...
AutoCommit: false,
}
consumer, _ := franz.NewConsumer(cfg, logger)
err := consumer.Consume(ctx, func(ctx context.Context, record franz.ConsumedRecord) error {
if err := processRecord(record); err != nil {
return err
}
// Commit this specific record's offset
return consumer.CommitRecord(ctx, record)
})
// Or commit all consumed offsets at once
if err := consumer.CommitOffsets(ctx); err != nil {
logger.Error(err, "Failed to commit offsets")
}
Pause and Resume¶
Temporarily pause consumption of specific topics or partitions:
// Pause specific topics
consumer.Pause("topic-a", "topic-b")
// Resume topics
consumer.Resume("topic-a", "topic-b")
// Pause specific partitions
consumer.PausePartitions(map[string][]int32{
"topic-a": {0, 1},
"topic-b": {2},
})
// Resume specific partitions
consumer.ResumePartitions(map[string][]int32{
"topic-a": {0, 1},
})
Polling Records¶
For more control, poll records directly:
// Poll up to 100 records
records, err := consumer.PollRecords(ctx, 100)
if err != nil {
logger.Error(err, "Poll failed")
return
}
for _, record := range records {
processRecord(record)
}
// Or poll entire fetch result
result, err := consumer.Poll(ctx)
if err != nil {
logger.Error(err, "Poll failed")
return
}
if !result.IsEmpty() {
for _, record := range result.Records() {
processRecord(record)
}
}
Transactions¶
Transactions ensure atomic production of multiple records.
Configuration¶
cfg := &franz.ProducerConfig{
BaseConfig: franz.BaseConfig{
Brokers: "localhost:9092",
},
TransactionalID: "my-transactional-producer",
Acks: franz.AcksAll, // Required for transactions
}
Using the Transact Helper¶
The Transact method handles begin, commit, and abort automatically:
err := producer.Transact(ctx, func(tx *franz.Transaction) error {
// Add records to the transaction
tx.Produce(franz.NewRecord([]byte("record 1")).WithTopic("topic-a"))
tx.Produce(franz.NewRecord([]byte("record 2")).WithTopic("topic-b"))
// If this function returns an error, the transaction is aborted
// If it returns nil, the transaction is committed
return nil
})
if err != nil {
logger.Error(err, "Transaction failed")
}
Convenience Method for Multiple Records¶
records := []*franz.Record{
franz.NewRecord([]byte("record 1")),
franz.NewRecord([]byte("record 2")),
franz.NewRecord([]byte("record 3")),
}
err := producer.TransactRecords(ctx, records...)
Manual Transaction Control¶
For more control over the transaction lifecycle:
tx, err := producer.BeginTransaction(ctx)
if err != nil {
logger.Error(err, "Failed to begin transaction")
return
}
// Add records
tx.Produce(franz.NewRecord([]byte("record 1")))
tx.ProduceMany(
franz.NewRecord([]byte("record 2")),
franz.NewRecord([]byte("record 3")),
)
// Check state
logger.Info("Transaction state", log.KV{
"recordCount": tx.RecordCount(),
"isAborted": tx.IsAborted(),
})
// Commit or abort based on some condition
if shouldCommit {
if err := tx.Commit(); err != nil {
logger.Error(err, "Failed to commit transaction")
}
} else {
if err := tx.Abort(); err != nil {
logger.Error(err, "Failed to abort transaction")
}
}
Admin Client¶
Creating an Admin Client¶
cfg := &franz.AdminConfig{
BaseConfig: franz.BaseConfig{
Brokers: "localhost:9092",
AuthType: franz.AuthTypeNone,
},
}
admin, err := franz.NewAdmin(cfg, logger)
if err != nil {
logger.Fatal(err, "Failed to create admin client")
}
defer admin.Close()
Topic Management¶
ctx := context.Background()
// Create a topic
topicCfg := franz.NewTopicConfig("my-new-topic", 6, 3).
WithConfig("retention.ms", "86400000"). // 1 day
WithConfig("cleanup.policy", "delete")
if err := admin.CreateTopics(ctx, topicCfg); err != nil {
logger.Error(err, "Failed to create topic")
}
// Create multiple topics
err := admin.CreateTopics(ctx,
franz.NewTopicConfig("topic-1", 3, 1),
franz.NewTopicConfig("topic-2", 6, 2),
)
// List all topics
topics, err := admin.ListTopics(ctx)
if err != nil {
logger.Error(err, "Failed to list topics")
}
for _, topic := range topics {
logger.Info("Topic", log.KV{
"name": topic.Name,
"partitions": len(topic.Partitions),
"internal": topic.Internal,
})
}
// Check if topic exists
exists, err := admin.TopicExists(ctx, "my-topic")
// Describe specific topics
details, err := admin.DescribeTopics(ctx, "topic-1", "topic-2")
// Delete topics
if err := admin.DeleteTopics(ctx, "old-topic-1", "old-topic-2"); err != nil {
logger.Error(err, "Failed to delete topics")
}
Broker Information¶
brokers, err := admin.ListBrokers(ctx)
if err != nil {
logger.Error(err, "Failed to list brokers")
}
for _, broker := range brokers {
logger.Info("Broker", log.KV{
"id": broker.ID,
"host": broker.Host,
"port": broker.Port,
"rack": broker.Rack,
})
}
Consumer Group Management¶
// List all consumer groups
groups, err := admin.ListGroups(ctx)
if err != nil {
logger.Error(err, "Failed to list groups")
}
for _, group := range groups {
logger.Info("Consumer group", log.KV{"name": group})
}
// Describe specific groups
details, err := admin.DescribeGroups(ctx, "group-1", "group-2")
for _, group := range details {
logger.Info("Group details", log.KV{
"name": group.Name,
"state": group.State,
"protocol": group.Protocol,
"memberCount": len(group.Members),
})
}
// Delete consumer groups (groups must be empty)
if err := admin.DeleteGroups(ctx, "old-group"); err != nil {
logger.Error(err, "Failed to delete group")
}
Authentication Examples¶
SCRAM Authentication¶
cfg := &franz.BaseConfig{
Brokers: "localhost:9092",
AuthType: franz.AuthTypeScram512, // or AuthTypeScram256
Username: "kafka-user",
DefaultCredentialConfig: secure.DefaultCredentialConfig{
// Direct password
Password: "secret-password",
// Or from environment variable
// PasswordEnvVar: "KAFKA_PASSWORD",
// Or from file
// PasswordFile: "/run/secrets/kafka-password",
},
}
AWS MSK IAM Authentication¶
cfg := &franz.BaseConfig{
Brokers: "b-1.msk-cluster.region.amazonaws.com:9098",
AuthType: franz.AuthTypeAWSMSKIAM,
AWSRegion: "us-east-1",
// Option 1: Use IAM role (recommended for EKS, EC2, Lambda)
// Leave AWSAccessKey empty to use default credential chain
// Option 2: Explicit credentials
AWSAccessKey: "AKIAIOSFODNN7EXAMPLE",
AwsCredentialConfig: franz.AwsCredentialConfig{
AwsSecret: secure.DefaultCredentialConfig{
Password: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
// Or use environment variable:
// PasswordEnvVar: "AWS_SECRET_ACCESS_KEY",
},
},
ClientConfig: tlsProvider.ClientConfig{
TLSEnable: true, // Required for MSK IAM
},
}
OAuth/OIDC Authentication¶
cfg := &franz.BaseConfig{
Brokers: "kafka.example.com:9092",
AuthType: franz.AuthTypeOAuth,
OAuthTokenURL: "https://auth.example.com/oauth/token",
OAuthClientID: "kafka-client",
OAuthScope: "kafka:read kafka:write", // Optional
OAuthCredentialConfig: franz.OAuthCredentialConfig{
OAuthSecret: secure.DefaultCredentialConfig{
Password: "client-secret",
// Or from environment:
// PasswordEnvVar: "OAUTH_CLIENT_SECRET",
},
},
}
Record Builder¶
The fluent builder pattern makes record creation clean and readable:
// Simple record
record := franz.NewRecord([]byte("Hello"))
// With key
record := franz.NewRecord([]byte("Hello")).
WithKey([]byte("user-123"))
// With topic override
record := franz.NewRecord([]byte("Hello")).
WithTopic("specific-topic")
// With partition
record := franz.NewRecord([]byte("Hello")).
WithPartition(0)
// With timestamp
record := franz.NewRecord([]byte("Hello")).
WithTimestamp(time.Now())
// With headers
record := franz.NewRecord([]byte("Hello")).
WithHeader("trace-id", []byte("abc123")).
WithHeader("source", []byte("api"))
// With multiple headers at once
record := franz.NewRecord([]byte("Hello")).
WithHeaders(
franz.Header{Key: "trace-id", Value: []byte("abc123")},
franz.Header{Key: "source", Value: []byte("api")},
)
// Full example
record := franz.NewRecord([]byte(`{"event":"user.created"}`)).
WithKey([]byte("user-123")).
WithTopic("events").
WithHeader("content-type", []byte("application/json")).
WithHeader("trace-id", []byte("abc123"))
Compression¶
Available compression types:
| Constant | Description |
|---|---|
CompressionNone |
No compression |
CompressionGzip |
Gzip compression |
CompressionSnappy |
Snappy compression |
CompressionLz4 |
LZ4 compression |
CompressionZstd |
Zstandard compression |
cfg := &franz.ProducerConfig{
// ...
Compression: franz.CompressionZstd, // Best compression ratio
// or
Compression: franz.CompressionSnappy, // Fast, good for real-time
}
Acks Configuration¶
| Constant | Description |
|---|---|
AcksNone |
No acknowledgment (fire and forget) |
AcksLeader |
Wait for leader acknowledgment |
AcksAll |
Wait for all in-sync replicas |
Offset Configuration¶
| Constant | Description |
|---|---|
OffsetStart |
Start consuming from the earliest offset |
OffsetEnd |
Start consuming from the latest offset (default) |
Isolation Levels¶
| Constant | Description |
|---|---|
IsolationReadUncommitted |
Read all messages including uncommitted |
IsolationReadCommitted |
Only read committed messages (default) |
Accessing Underlying Clients¶
For advanced use cases, access the underlying franz-go clients:
// Producer
kgoClient := producer.Client() // *kgo.Client
// Consumer
kgoClient := consumer.Client() // *kgo.Client
// Admin
kgoClient := admin.Client() // *kgo.Client
kadmClient := admin.AdminClient() // *kadm.Client
Error Constants¶
ErrNilConfig // config is nil
ErrMissingBrokers // brokers address is required
ErrMissingTopic // topic is required
ErrMissingGroup // consumer group is required for group consumption
ErrClientClosed // client is closed
ErrInvalidAuthType // invalid authentication type
ErrInvalidAcks // invalid acks value
ErrInvalidCompression // invalid compression type
ErrInvalidOffset // invalid start offset value
ErrInvalidIsolation // invalid isolation level
ErrTransactionAborted // transaction was aborted
ErrNoTransactionalID // transactional ID required for transactions
ErrNilHandler // handler function is nil
ErrNilContext // context is nil
ErrMissingAWSRegion // AWS region is required for MSK IAM authentication
ErrMissingOAuthTokenURL // OAuth token URL is required for OAuth authentication
Best Practices¶
- Use batch consumption for high-throughput scenarios to reduce per-record overhead
- Enable compression (snappy or zstd) to reduce network bandwidth
- Set appropriate acks - use
AcksAllfor critical data,AcksLeaderfor lower latency - Use transactions when you need atomic writes across multiple records or topics
- Tune fetch settings - increase
FetchMaxBytesandFetchMaxWaitfor throughput - Handle errors gracefully - check
FetchResult.HasErrors()when using fetch-level consumption - Use secure credentials - prefer environment variables or files over hardcoded passwords
- Enable TLS in production environments
- Use consumer groups for scalable consumption across multiple instances
- Set appropriate timeouts - balance between responsiveness and stability