// Before (kafka)cfg:=&kafka.ProducerConfig{Brokers:"localhost:9092",Topic:"my-topic",AuthType:kafka.AuthTypePlain,Username:"user",DefaultCredentialConfig:secure.DefaultCredentialConfig{Password:"password",},ClientConfig:tlsProvider.ClientConfig{TLSEnable:true,TLSCA:"/path/to/ca.crt",},ProducerOptions:kafka.ProducerOptions{BatchSize:100,BatchTimeout:1000,RequiredAcks:"all",Async:true,},}// After (franz)cfg:=&franz.ProducerConfig{BaseConfig:franz.BaseConfig{Brokers:"localhost:9092",AuthType:franz.AuthTypePlain,Username:"user",DefaultCredentialConfig:secure.DefaultCredentialConfig{Password:"password",},ClientConfig:tlsProvider.ClientConfig{TLSEnable:true,TLSCA:"/path/to/ca.crt",},},DefaultTopic:"my-topic",BatchMaxRecords:100,Linger:time.Second,Acks:franz.AcksAll,// Note: Async is handled differently - use ProduceAsync() method}
// Before (kafka)producer,err:=kafka.NewProducer(cfg,logger)iferr!=nil{// handle error}deferproducer.Disconnect()// After (franz)producer,err:=franz.NewProducer(cfg,logger)iferr!=nil{// handle error}deferproducer.Close()
// Before (kafka) - Simple writeerr:=producer.Write(ctx,[]byte("Hello"))// After (franz) - Using Record builderrecord:=franz.NewRecord([]byte("Hello"))results,err:=producer.Produce(ctx,record)// Before (kafka) - Write with keyerr:=producer.WriteWithKey(ctx,[]byte("Hello"),[]byte("key-1"))// After (franz)record:=franz.NewRecord([]byte("Hello")).WithKey([]byte("key-1"))results,err:=producer.Produce(ctx,record)// Before (kafka) - Write with headersheaders:=[]kafka.Header{{Key:"trace-id",Value:[]byte("abc123")},}err:=producer.WriteWithHeaders(ctx,[]byte("Hello"),[]byte("key"),headers)// After (franz)record:=franz.NewRecord([]byte("Hello")).WithKey([]byte("key")).WithHeader("trace-id",[]byte("abc123"))results,err:=producer.Produce(ctx,record)// Before (kafka) - Write multiple messageserr:=producer.WriteMulti(ctx,[]byte("msg1"),[]byte("msg2"),[]byte("msg3"))// After (franz)records:=[]*franz.Record{franz.NewRecord([]byte("msg1")),franz.NewRecord([]byte("msg2")),franz.NewRecord([]byte("msg3")),}results,err:=producer.Produce(ctx,records...)// Before (kafka) - Write JSONerr:=producer.WriteJson(ctx,myStruct,[]byte("optional-key"))// After (franz)result,err:=producer.ProduceJSON(ctx,myStruct,[]byte("optional-key"))
// Before (kafka) - Async was a config optioncfg:=&kafka.ProducerConfig{// ...ProducerOptions:kafka.ProducerOptions{Async:true,},}producer,_:=kafka.NewProducer(cfg,logger)producer.Write(ctx,[]byte("Hello"))// Non-blocking due to Async: true// After (franz) - Explicit async method with callbackproducer,_:=franz.NewProducer(cfg,logger)err:=producer.ProduceAsync(ctx,franz.NewRecord([]byte("Hello")),func(resultfranz.ProduceResult){ifresult.Err!=nil{log.Error(result.Err,"Failed to produce")}else{log.Info("Produced",log.KV{"partition":result.Partition,"offset":result.Offset,})}})// Wait for all async records to completeproducer.Flush(ctx)
// Before (kafka)cfg:=&kafka.ConsumerConfig{Brokers:"localhost:9092",Topic:"my-topic",Group:"my-group",AuthType:kafka.AuthTypeScram256,Username:"user",DefaultCredentialConfig:secure.DefaultCredentialConfig{Password:"password",},ConsumerOptions:kafka.ConsumerOptions{StartOffset:"first",IsolationLevel:"committed",SessionTimeout:30000,// msHeartbeatInterval:3000,// msCommitInterval:5000,// ms},}// After (franz)cfg:=&franz.ConsumerConfig{BaseConfig:franz.BaseConfig{Brokers:"localhost:9092",AuthType:franz.AuthTypeScram256,Username:"user",DefaultCredentialConfig:secure.DefaultCredentialConfig{Password:"password",},},Topics:[]string{"my-topic"},Group:"my-group",StartOffset:franz.OffsetStart,// "start" instead of "first"IsolationLevel:franz.IsolationReadCommitted,SessionTimeout:30*time.Second,HeartbeatInterval:3*time.Second,AutoCommitInterval:5*time.Second,AutoCommit:true,}
// Before (kafka)ch:=make(chankafka.Message,100)goconsumer.ChannelSubscribe(ctx,ch)formsg:=rangech{processMessage(msg)}// After (franz)ch:=make(chanfranz.ConsumedRecord,100)goconsumer.ConsumeChannel(ctx,ch)forrecord:=rangech{processRecord(record)}
// Before (kafka) - SubscribeWithOffsets handled thiserr:=consumer.SubscribeWithOffsets(ctx,func(ctxcontext.Context,msgkafka.Message)error{// Process message// Commit happens automatically after handler returnsreturnnil})// After (franz) - More control over commitscfg:=&franz.ConsumerConfig{// ...AutoCommit:false,// Disable auto-commit}consumer,_:=franz.NewConsumer(cfg,logger)err:=consumer.Consume(ctx,func(ctxcontext.Context,recordfranz.ConsumedRecord)error{// Process recordiferr:=processRecord(record);err!=nil{returnerr}// Explicitly commit this recordreturnconsumer.CommitRecord(ctx,record)})// Or commit all at onceconsumer.CommitOffsets(ctx)
// Before (kafka)msg,err:=consumer.ReadMessage(ctx)// After (franz)records,err:=consumer.PollRecords(ctx,1)iferr!=nil{// handle error}iflen(records)>0{record:=records[0]// process record}
// Before (kafka)err:=consumer.Rewind()// Must call before Connect()// After (franz) - Configure in ConsumerConfigcfg:=&franz.ConsumerConfig{// ...StartOffset:franz.OffsetStart,// Start from beginning}
// Before (kafka)cfg:=&kafka.AdminConfig{Brokers:"localhost:9092",AuthType:kafka.AuthTypePlain,Username:"admin",DefaultCredentialConfig:secure.DefaultCredentialConfig{Password:"password",},}// After (franz) - Same structurecfg:=&franz.AdminConfig{BaseConfig:franz.BaseConfig{Brokers:"localhost:9092",AuthType:franz.AuthTypePlain,Username:"admin",DefaultCredentialConfig:secure.DefaultCredentialConfig{Password:"password",},},}
// Before (kafka)topics,err:=admin.ListTopics(ctx)// Returns []stringexists,err:=admin.TopicExists(ctx,"my-topic")err:=admin.CreateTopic(ctx,"new-topic",3,1)err:=admin.DeleteTopic(ctx,"old-topic")// After (franz)topics,err:=admin.ListTopics(ctx)// Returns []franz.TopicInfoexists,err:=admin.TopicExists(ctx,"my-topic")err:=admin.CreateTopics(ctx,franz.NewTopicConfig("new-topic",3,1))err:=admin.DeleteTopics(ctx,"old-topic")
// Before (kafka) - Basic topic creationerr:=admin.CreateTopic(ctx,"my-topic",6,3)// After (franz) - With configuration optionstopicCfg:=franz.NewTopicConfig("my-topic",6,3).WithConfig("retention.ms","86400000").WithConfig("cleanup.policy","compact")err:=admin.CreateTopics(ctx,topicCfg)
cfg:=&franz.ProducerConfig{BaseConfig:franz.BaseConfig{Brokers:"localhost:9092",},TransactionalID:"my-transactional-producer",Acks:franz.AcksAll,}producer,_:=franz.NewProducer(cfg,logger)err:=producer.Transact(ctx,func(tx*franz.Transaction)error{tx.Produce(franz.NewRecord([]byte("record 1")).WithTopic("topic-a"))tx.Produce(franz.NewRecord([]byte("record 2")).WithTopic("topic-b"))returnnil// Commits on success, aborts on error})
// Pause specific topicsconsumer.Pause("topic-a","topic-b")// Resume topicsconsumer.Resume("topic-a","topic-b")// Pause specific partitionsconsumer.PausePartitions(map[string][]int32{"topic-a":{0,1},})