blob: 75025f25e6b3a720110ed673c9147787ee8fdb0f [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
Joey Armstrong9cdee9f2024-01-03 04:56:14 -05002* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors
Scott Baker2c1c4822019-10-16 11:02:41 -07003
Joey Armstrong7f8436c2023-07-09 20:23:27 -04004* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
Scott Baker2c1c4822019-10-16 11:02:41 -07007
Joey Armstrong7f8436c2023-07-09 20:23:27 -04008* http://www.apache.org/licenses/LICENSE-2.0
Scott Baker2c1c4822019-10-16 11:02:41 -07009
Joey Armstrong7f8436c2023-07-09 20:23:27 -040010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
Scott Baker2c1c4822019-10-16 11:02:41 -070015 */
Akash Reddy Kankanala05aff182025-05-06 12:57:32 +053016//nolint:staticcheck
Scott Baker2c1c4822019-10-16 11:02:41 -070017package kafka
18
19import (
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080020 "context"
Scott Baker2c1c4822019-10-16 11:02:41 -070021 "errors"
22 "fmt"
serkant.uluderyab38671c2019-11-01 09:35:38 -070023 "strings"
24 "sync"
25 "time"
26
Abhay Kumar40252eb2025-10-13 13:25:53 +000027 "github.com/IBM/sarama"
Scott Bakerfa2f6ee2019-11-19 14:53:14 -080028 "github.com/eapache/go-resiliency/breaker"
Scott Baker2c1c4822019-10-16 11:02:41 -070029 "github.com/golang/protobuf/proto"
30 "github.com/google/uuid"
khenaidoo26721882021-08-11 17:42:52 -040031 "github.com/opencord/voltha-lib-go/v7/pkg/log"
Scott Baker2c1c4822019-10-16 11:02:41 -070032)
33
Scott Baker2c1c4822019-10-16 11:02:41 -070034// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
35// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
Joey Armstrong7f8436c2023-07-09 20:23:27 -040036// consumer or a group consumer
Scott Baker2c1c4822019-10-16 11:02:41 -070037type consumerChannels struct {
38 consumers []interface{}
khenaidoo26721882021-08-11 17:42:52 -040039 channels []chan proto.Message
Scott Baker2c1c4822019-10-16 11:02:41 -070040}
41
Kent Hagermanccfa2132019-12-17 13:29:34 -050042// static check to ensure SaramaClient implements Client
43var _ Client = &SaramaClient{}
44
Scott Baker2c1c4822019-10-16 11:02:41 -070045// SaramaClient represents the messaging proxy
46type SaramaClient struct {
47 cAdmin sarama.ClusterAdmin
Neha Sharmadd9af392020-04-28 09:03:57 +000048 KafkaAddress string
Scott Baker2c1c4822019-10-16 11:02:41 -070049 producer sarama.AsyncProducer
50 consumer sarama.Consumer
Abhay Kumar40252eb2025-10-13 13:25:53 +000051 groupConsumers map[string]sarama.ConsumerGroup
Scott Baker2c1c4822019-10-16 11:02:41 -070052 lockOfGroupConsumers sync.RWMutex
53 consumerGroupPrefix string
54 consumerType int
55 consumerGroupName string
56 producerFlushFrequency int
57 producerFlushMessages int
58 producerFlushMaxmessages int
59 producerRetryMax int
60 producerRetryBackOff time.Duration
61 producerReturnSuccess bool
62 producerReturnErrors bool
63 consumerMaxwait int
64 maxProcessingTime int
65 numPartitions int
66 numReplicas int
67 autoCreateTopic bool
68 doneCh chan int
Scott Baker84a55ce2020-04-17 10:11:30 -070069 metadataCallback func(fromTopic string, timestamp time.Time)
Scott Baker2c1c4822019-10-16 11:02:41 -070070 topicToConsumerChannelMap map[string]*consumerChannels
71 lockTopicToConsumerChannelMap sync.RWMutex
72 topicLockMap map[string]*sync.RWMutex
73 lockOfTopicLockMap sync.RWMutex
74 metadataMaxRetry int
Scott Baker104b67d2019-10-29 15:56:27 -070075 alive bool
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -070076 livenessMutex sync.Mutex
Scott Baker104b67d2019-10-29 15:56:27 -070077 liveness chan bool
78 livenessChannelInterval time.Duration
79 lastLivenessTime time.Time
80 started bool
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -070081 healthinessMutex sync.Mutex
Scott Baker0fef6982019-12-12 09:49:42 -080082 healthy bool
83 healthiness chan bool
Scott Baker2c1c4822019-10-16 11:02:41 -070084}
85
86type SaramaClientOption func(*SaramaClient)
87
Neha Sharmadd9af392020-04-28 09:03:57 +000088func Address(address string) SaramaClientOption {
Scott Baker2c1c4822019-10-16 11:02:41 -070089 return func(args *SaramaClient) {
Neha Sharmadd9af392020-04-28 09:03:57 +000090 args.KafkaAddress = address
Scott Baker2c1c4822019-10-16 11:02:41 -070091 }
92}
93
94func ConsumerGroupPrefix(prefix string) SaramaClientOption {
95 return func(args *SaramaClient) {
96 args.consumerGroupPrefix = prefix
97 }
98}
99
100func ConsumerGroupName(name string) SaramaClientOption {
101 return func(args *SaramaClient) {
102 args.consumerGroupName = name
103 }
104}
105
106func ConsumerType(consumer int) SaramaClientOption {
107 return func(args *SaramaClient) {
108 args.consumerType = consumer
109 }
110}
111
112func ProducerFlushFrequency(frequency int) SaramaClientOption {
113 return func(args *SaramaClient) {
114 args.producerFlushFrequency = frequency
115 }
116}
117
118func ProducerFlushMessages(num int) SaramaClientOption {
119 return func(args *SaramaClient) {
120 args.producerFlushMessages = num
121 }
122}
123
124func ProducerFlushMaxMessages(num int) SaramaClientOption {
125 return func(args *SaramaClient) {
126 args.producerFlushMaxmessages = num
127 }
128}
129
130func ProducerMaxRetries(num int) SaramaClientOption {
131 return func(args *SaramaClient) {
132 args.producerRetryMax = num
133 }
134}
135
136func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
137 return func(args *SaramaClient) {
138 args.producerRetryBackOff = duration
139 }
140}
141
142func ProducerReturnOnErrors(opt bool) SaramaClientOption {
143 return func(args *SaramaClient) {
144 args.producerReturnErrors = opt
145 }
146}
147
148func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
149 return func(args *SaramaClient) {
150 args.producerReturnSuccess = opt
151 }
152}
153
154func ConsumerMaxWait(wait int) SaramaClientOption {
155 return func(args *SaramaClient) {
156 args.consumerMaxwait = wait
157 }
158}
159
160func MaxProcessingTime(pTime int) SaramaClientOption {
161 return func(args *SaramaClient) {
162 args.maxProcessingTime = pTime
163 }
164}
165
166func NumPartitions(number int) SaramaClientOption {
167 return func(args *SaramaClient) {
168 args.numPartitions = number
169 }
170}
171
172func NumReplicas(number int) SaramaClientOption {
173 return func(args *SaramaClient) {
174 args.numReplicas = number
175 }
176}
177
178func AutoCreateTopic(opt bool) SaramaClientOption {
179 return func(args *SaramaClient) {
180 args.autoCreateTopic = opt
181 }
182}
183
184func MetadatMaxRetries(retry int) SaramaClientOption {
185 return func(args *SaramaClient) {
186 args.metadataMaxRetry = retry
187 }
188}
189
Scott Baker104b67d2019-10-29 15:56:27 -0700190func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
191 return func(args *SaramaClient) {
192 args.livenessChannelInterval = opt
193 }
194}
195
Scott Baker2c1c4822019-10-16 11:02:41 -0700196func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
197 client := &SaramaClient{
Neha Sharmadd9af392020-04-28 09:03:57 +0000198 KafkaAddress: DefaultKafkaAddress,
Scott Baker2c1c4822019-10-16 11:02:41 -0700199 }
200 client.consumerType = DefaultConsumerType
201 client.producerFlushFrequency = DefaultProducerFlushFrequency
202 client.producerFlushMessages = DefaultProducerFlushMessages
203 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
204 client.producerReturnErrors = DefaultProducerReturnErrors
205 client.producerReturnSuccess = DefaultProducerReturnSuccess
206 client.producerRetryMax = DefaultProducerRetryMax
207 client.producerRetryBackOff = DefaultProducerRetryBackoff
208 client.consumerMaxwait = DefaultConsumerMaxwait
209 client.maxProcessingTime = DefaultMaxProcessingTime
210 client.numPartitions = DefaultNumberPartitions
211 client.numReplicas = DefaultNumberReplicas
212 client.autoCreateTopic = DefaultAutoCreateTopic
213 client.metadataMaxRetry = DefaultMetadataMaxRetry
Scott Baker104b67d2019-10-29 15:56:27 -0700214 client.livenessChannelInterval = DefaultLivenessChannelInterval
Scott Baker2c1c4822019-10-16 11:02:41 -0700215
216 for _, option := range opts {
217 option(client)
218 }
219
Abhay Kumar40252eb2025-10-13 13:25:53 +0000220 client.groupConsumers = make(map[string]sarama.ConsumerGroup)
Scott Baker2c1c4822019-10-16 11:02:41 -0700221
222 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
223 client.topicLockMap = make(map[string]*sync.RWMutex)
224 client.lockOfTopicLockMap = sync.RWMutex{}
225 client.lockOfGroupConsumers = sync.RWMutex{}
Scott Baker104b67d2019-10-29 15:56:27 -0700226
Scott Baker0fef6982019-12-12 09:49:42 -0800227 // healthy and alive until proven otherwise
Scott Baker104b67d2019-10-29 15:56:27 -0700228 client.alive = true
Scott Baker0fef6982019-12-12 09:49:42 -0800229 client.healthy = true
Scott Baker104b67d2019-10-29 15:56:27 -0700230
Scott Baker2c1c4822019-10-16 11:02:41 -0700231 return client
232}
233
Neha Sharma94f16a92020-06-26 04:17:55 +0000234func (sc *SaramaClient) Start(ctx context.Context) error {
235 logger.Info(ctx, "Starting-kafka-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700236
237 // Create the Done channel
238 sc.doneCh = make(chan int, 1)
239
240 var err error
241
242 // Add a cleanup in case of failure to startup
243 defer func() {
244 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000245 sc.Stop(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700246 }
247 }()
248
249 // Create the Cluster Admin
Neha Sharma94f16a92020-06-26 04:17:55 +0000250 if err = sc.createClusterAdmin(ctx); err != nil {
251 logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700252 return err
253 }
254
255 // Create the Publisher
Neha Sharma94f16a92020-06-26 04:17:55 +0000256 if err := sc.createPublisher(ctx); err != nil {
257 logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700258 return err
259 }
260
261 if sc.consumerType == DefaultConsumerType {
262 // Create the master consumers
Neha Sharma94f16a92020-06-26 04:17:55 +0000263 if err := sc.createConsumer(ctx); err != nil {
264 logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700265 return err
266 }
267 }
268
269 // Create the topic to consumers/channel map
270 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
271
Neha Sharma94f16a92020-06-26 04:17:55 +0000272 logger.Info(ctx, "kafka-sarama-client-started")
Scott Baker2c1c4822019-10-16 11:02:41 -0700273
Scott Baker104b67d2019-10-29 15:56:27 -0700274 sc.started = true
275
Scott Baker2c1c4822019-10-16 11:02:41 -0700276 return nil
277}
278
Neha Sharma94f16a92020-06-26 04:17:55 +0000279func (sc *SaramaClient) Stop(ctx context.Context) {
280 logger.Info(ctx, "stopping-sarama-client")
Scott Baker2c1c4822019-10-16 11:02:41 -0700281
Scott Baker104b67d2019-10-29 15:56:27 -0700282 sc.started = false
283
Scott Baker2c1c4822019-10-16 11:02:41 -0700284 //Send a message over the done channel to close all long running routines
285 sc.doneCh <- 1
286
287 if sc.producer != nil {
288 if err := sc.producer.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000289 logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700290 }
291 }
292
293 if sc.consumer != nil {
294 if err := sc.consumer.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000295 logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700296 }
297 }
298
299 for key, val := range sc.groupConsumers {
Neha Sharma94f16a92020-06-26 04:17:55 +0000300 logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700301 if err := val.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000302 logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
Scott Baker2c1c4822019-10-16 11:02:41 -0700303 }
304 }
305
306 if sc.cAdmin != nil {
307 if err := sc.cAdmin.Close(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000308 logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700309 }
310 }
311
312 //TODO: Clear the consumers map
313 //sc.clearConsumerChannelMap()
314
Neha Sharma94f16a92020-06-26 04:17:55 +0000315 logger.Info(ctx, "sarama-client-stopped")
Scott Baker2c1c4822019-10-16 11:02:41 -0700316}
317
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400318// createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
Scott Baker2c1c4822019-10-16 11:02:41 -0700319// the invoking function must hold the lock
Neha Sharma94f16a92020-06-26 04:17:55 +0000320func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700321 // Set the topic details
322 topicDetail := &sarama.TopicDetail{}
323 topicDetail.NumPartitions = int32(numPartition)
324 topicDetail.ReplicationFactor = int16(repFactor)
325 topicDetail.ConfigEntries = make(map[string]*string)
326 topicDetails := make(map[string]*sarama.TopicDetail)
327 topicDetails[topic.Name] = topicDetail
328
329 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
kesavandd85e52b2022-03-15 16:38:08 +0530330 switch typedErr := err.(type) {
331 case *sarama.TopicError:
332 if typedErr.Err == sarama.ErrTopicAlreadyExists {
333 err = nil
334 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700335 }
kesavandd85e52b2022-03-15 16:38:08 +0530336 if err != nil {
337 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
338 return err
339 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700340 }
341 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
342 // do so.
Neha Sharma94f16a92020-06-26 04:17:55 +0000343 logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
Scott Baker2c1c4822019-10-16 11:02:41 -0700344 return nil
345}
346
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400347// CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
Scott Baker2c1c4822019-10-16 11:02:41 -0700348// ensure no two go routines are performing operations on the same topic
Neha Sharma94f16a92020-06-26 04:17:55 +0000349func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700350 sc.lockTopic(topic)
351 defer sc.unLockTopic(topic)
352
Neha Sharma94f16a92020-06-26 04:17:55 +0000353 return sc.createTopic(ctx, topic, numPartition, repFactor)
Scott Baker2c1c4822019-10-16 11:02:41 -0700354}
355
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400356// DeleteTopic removes a topic from the kafka Broker
Neha Sharma94f16a92020-06-26 04:17:55 +0000357func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700358 sc.lockTopic(topic)
359 defer sc.unLockTopic(topic)
360
361 // Remove the topic from the broker
362 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
363 if err == sarama.ErrUnknownTopicOrPartition {
364 // Not an error as does not exist
Neha Sharma94f16a92020-06-26 04:17:55 +0000365 logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700366 return nil
367 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000368 logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700369 return err
370 }
371
372 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
Neha Sharma94f16a92020-06-26 04:17:55 +0000373 if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
374 logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700375 return err
376 }
377 return nil
378}
379
380// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
381// messages from that topic
khenaidoo26721882021-08-11 17:42:52 -0400382func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700383 sc.lockTopic(topic)
384 defer sc.unLockTopic(topic)
385
Neha Sharma94f16a92020-06-26 04:17:55 +0000386 logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700387
388 // If a consumers already exist for that topic then resuse it
389 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000390 logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700391 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo26721882021-08-11 17:42:52 -0400392 ch := make(chan proto.Message)
Neha Sharma94f16a92020-06-26 04:17:55 +0000393 sc.addChannelToConsumerChannelMap(ctx, topic, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700394 return ch, nil
395 }
396
397 // Register for the topic and set it up
khenaidoo26721882021-08-11 17:42:52 -0400398 var consumerListeningChannel chan proto.Message
Scott Baker2c1c4822019-10-16 11:02:41 -0700399 var err error
400
401 // Use the consumerType option to figure out the type of consumer to launch
402 if sc.consumerType == PartitionConsumer {
403 if sc.autoCreateTopic {
Neha Sharma94f16a92020-06-26 04:17:55 +0000404 if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
405 logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700406 return nil, err
407 }
408 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000409 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
410 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700411 return nil, err
412 }
413 } else if sc.consumerType == GroupCustomer {
414 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
415 // does not consume from a precreated topic in some scenarios
416 //if sc.autoCreateTopic {
417 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000418 // logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700419 // return nil, err
420 // }
421 //}
422 //groupId := sc.consumerGroupName
423 groupId := getGroupId(kvArgs...)
424 // Include the group prefix
425 if groupId != "" {
426 groupId = sc.consumerGroupPrefix + groupId
427 } else {
428 // Need to use a unique group Id per topic
429 groupId = sc.consumerGroupPrefix + topic.Name
430 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000431 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
432 logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700433 return nil, err
434 }
435
436 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +0000437 logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
Scott Baker2c1c4822019-10-16 11:02:41 -0700438 return nil, errors.New("unknown-consumer-type")
439 }
440
441 return consumerListeningChannel, nil
442}
443
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400444// UnSubscribe unsubscribe a consumer from a given topic
khenaidoo26721882021-08-11 17:42:52 -0400445func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700446 sc.lockTopic(topic)
447 defer sc.unLockTopic(topic)
448
Neha Sharma94f16a92020-06-26 04:17:55 +0000449 logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700450 var err error
Neha Sharma94f16a92020-06-26 04:17:55 +0000451 if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
452 logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700453 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000454 if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
455 logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700456 }
457 return err
458}
459
Neha Sharma94f16a92020-06-26 04:17:55 +0000460func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
Kent Hagermanccfa2132019-12-17 13:29:34 -0500461 sc.metadataCallback = callback
462}
463
Neha Sharma94f16a92020-06-26 04:17:55 +0000464func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
Scott Baker104b67d2019-10-29 15:56:27 -0700465 // Post a consistent stream of liveness data to the channel,
466 // so that in a live state, the core does not timeout and
467 // send a forced liveness message. Production of liveness
468 // events to the channel is rate-limited by livenessChannelInterval.
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700469 sc.livenessMutex.Lock()
470 defer sc.livenessMutex.Unlock()
Scott Baker104b67d2019-10-29 15:56:27 -0700471 if sc.liveness != nil {
472 if sc.alive != alive {
Neha Sharma94f16a92020-06-26 04:17:55 +0000473 logger.Info(ctx, "update-liveness-channel-because-change")
Scott Baker104b67d2019-10-29 15:56:27 -0700474 sc.liveness <- alive
475 sc.lastLivenessTime = time.Now()
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800476 } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
Akash Soni7b911a02024-06-20 13:06:21 +0530477 logger.Debugf(ctx, "update-liveness-channel-because-interval")
Scott Baker104b67d2019-10-29 15:56:27 -0700478 sc.liveness <- alive
479 sc.lastLivenessTime = time.Now()
480 }
481 }
482
483 // Only emit a log message when the state changes
484 if sc.alive != alive {
Neha Sharma94f16a92020-06-26 04:17:55 +0000485 logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
Scott Baker104b67d2019-10-29 15:56:27 -0700486 sc.alive = alive
487 }
488}
489
Scott Baker0fef6982019-12-12 09:49:42 -0800490// Once unhealthy, we never go back
Neha Sharma94f16a92020-06-26 04:17:55 +0000491func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
Scott Baker0fef6982019-12-12 09:49:42 -0800492 sc.healthy = false
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700493 sc.healthinessMutex.Lock()
494 defer sc.healthinessMutex.Unlock()
Scott Baker0fef6982019-12-12 09:49:42 -0800495 if sc.healthiness != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000496 logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
Scott Baker0fef6982019-12-12 09:49:42 -0800497 sc.healthiness <- sc.healthy
498 }
499}
500
Neha Sharma94f16a92020-06-26 04:17:55 +0000501func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800502 // Sarama producers and consumers encapsulate the error inside
503 // a ProducerError or ConsumerError struct.
504 if prodError, ok := err.(*sarama.ProducerError); ok {
505 err = prodError.Err
506 } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
507 err = consumerError.Err
508 }
509
510 // Sarama-Cluster will compose the error into a ClusterError struct,
511 // which we can't do a compare by reference. To handle that, we the
512 // best we can do is compare the error strings.
513
514 switch err.Error() {
515 case context.DeadlineExceeded.Error():
Neha Sharma94f16a92020-06-26 04:17:55 +0000516 logger.Info(ctx, "is-liveness-error-timeout")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800517 return true
518 case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
Neha Sharma94f16a92020-06-26 04:17:55 +0000519 logger.Info(ctx, "is-liveness-error-no-brokers")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800520 return true
521 case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
Neha Sharma94f16a92020-06-26 04:17:55 +0000522 logger.Info(ctx, "is-liveness-error-shutting-down")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800523 return true
524 case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
Neha Sharma94f16a92020-06-26 04:17:55 +0000525 logger.Info(ctx, "is-liveness-error-not-available")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800526 return true
527 case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
Neha Sharma94f16a92020-06-26 04:17:55 +0000528 logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800529 return true
530 }
531
532 if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
Neha Sharma94f16a92020-06-26 04:17:55 +0000533 logger.Info(ctx, "is-liveness-error-connection-refused")
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800534 return true
535 }
536
Scott Baker718bee02020-01-07 09:52:02 -0800537 if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
Neha Sharma94f16a92020-06-26 04:17:55 +0000538 logger.Info(ctx, "is-liveness-error-io-timeout")
Scott Baker718bee02020-01-07 09:52:02 -0800539 return true
540 }
541
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800542 // Other errors shouldn't trigger a loss of liveness
543
Neha Sharma94f16a92020-06-26 04:17:55 +0000544 logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800545
546 return false
547}
548
Scott Baker2c1c4822019-10-16 11:02:41 -0700549// send formats and sends the request onto the kafka messaging bus.
Neha Sharma94f16a92020-06-26 04:17:55 +0000550func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700551
552 // Assert message is a proto message
553 var protoMsg proto.Message
554 var ok bool
555 // ascertain the value interface type is a proto.Message
556 if protoMsg, ok = msg.(proto.Message); !ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000557 logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
David K. Bainbridge7c75cac2020-02-19 08:53:46 -0800558 return fmt.Errorf("not-a-proto-msg-%s", msg)
Scott Baker2c1c4822019-10-16 11:02:41 -0700559 }
560
561 var marshalled []byte
562 var err error
563 // Create the Sarama producer message
564 if marshalled, err = proto.Marshal(protoMsg); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000565 logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700566 return err
567 }
568 key := ""
569 if len(keys) > 0 {
570 key = keys[0] // Only the first key is relevant
571 }
572 kafkaMsg := &sarama.ProducerMessage{
573 Topic: topic.Name,
574 Key: sarama.StringEncoder(key),
575 Value: sarama.ByteEncoder(marshalled),
576 }
577
578 // Send message to kafka
579 sc.producer.Input() <- kafkaMsg
580 // Wait for result
581 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
582 select {
583 case ok := <-sc.producer.Successes():
Neha Sharma94f16a92020-06-26 04:17:55 +0000584 logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
585 sc.updateLiveness(ctx, true)
Scott Baker2c1c4822019-10-16 11:02:41 -0700586 case notOk := <-sc.producer.Errors():
Neha Sharma94f16a92020-06-26 04:17:55 +0000587 logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
588 if sc.isLivenessError(ctx, notOk) {
589 sc.updateLiveness(ctx, false)
Scott Baker104b67d2019-10-29 15:56:27 -0700590 }
591 return notOk
592 }
593 return nil
594}
595
596// Enable the liveness monitor channel. This channel will report
597// a "true" or "false" on every publish, which indicates whether
598// or not the channel is still live. This channel is then picked up
599// by the service (i.e. rw_core / ro_core) to update readiness status
600// and/or take other actions.
Neha Sharma94f16a92020-06-26 04:17:55 +0000601func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
602 logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
Scott Baker104b67d2019-10-29 15:56:27 -0700603 if enable {
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700604 sc.livenessMutex.Lock()
605 defer sc.livenessMutex.Unlock()
Scott Baker104b67d2019-10-29 15:56:27 -0700606 if sc.liveness == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000607 logger.Info(ctx, "kafka-create-liveness-channel")
Scott Baker104b67d2019-10-29 15:56:27 -0700608 // At least 1, so we can immediately post to it without blocking
609 // Setting a bigger number (10) allows the monitor to fall behind
610 // without blocking others. The monitor shouldn't really fall
611 // behind...
612 sc.liveness = make(chan bool, 10)
khenaidoo26721882021-08-11 17:42:52 -0400613 // post initial state to the channel
Scott Baker104b67d2019-10-29 15:56:27 -0700614 sc.liveness <- sc.alive
615 }
616 } else {
617 // TODO: Think about whether we need the ability to turn off
618 // liveness monitoring
619 panic("Turning off liveness reporting is not supported")
620 }
621 return sc.liveness
622}
623
Scott Baker0fef6982019-12-12 09:49:42 -0800624// Enable the Healthiness monitor channel. This channel will report "false"
625// if the kafka consumers die, or some other problem occurs which is
626// catastrophic that would require re-creating the client.
Neha Sharma94f16a92020-06-26 04:17:55 +0000627func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
628 logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
Scott Baker0fef6982019-12-12 09:49:42 -0800629 if enable {
David K. Bainbridge5edd7fb2020-07-29 19:30:48 -0700630 sc.healthinessMutex.Lock()
631 defer sc.healthinessMutex.Unlock()
Scott Baker0fef6982019-12-12 09:49:42 -0800632 if sc.healthiness == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000633 logger.Info(ctx, "kafka-create-healthiness-channel")
Scott Baker0fef6982019-12-12 09:49:42 -0800634 // At least 1, so we can immediately post to it without blocking
635 // Setting a bigger number (10) allows the monitor to fall behind
636 // without blocking others. The monitor shouldn't really fall
637 // behind...
638 sc.healthiness = make(chan bool, 10)
khenaidoo26721882021-08-11 17:42:52 -0400639 // post initial state to the channel
Scott Baker0fef6982019-12-12 09:49:42 -0800640 sc.healthiness <- sc.healthy
641 }
642 } else {
643 // TODO: Think about whether we need the ability to turn off
644 // liveness monitoring
645 panic("Turning off healthiness reporting is not supported")
646 }
647 return sc.healthiness
648}
649
Scott Baker104b67d2019-10-29 15:56:27 -0700650// send an empty message on the liveness channel to check whether connectivity has
651// been restored.
Neha Sharma94f16a92020-06-26 04:17:55 +0000652func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
Scott Baker104b67d2019-10-29 15:56:27 -0700653 if !sc.started {
654 return fmt.Errorf("SendLiveness() called while not started")
655 }
656
657 kafkaMsg := &sarama.ProducerMessage{
658 Topic: "_liveness_test",
659 Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
660 }
661
662 // Send message to kafka
663 sc.producer.Input() <- kafkaMsg
664 // Wait for result
665 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
666 select {
667 case ok := <-sc.producer.Successes():
Neha Sharma94f16a92020-06-26 04:17:55 +0000668 logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
669 sc.updateLiveness(ctx, true)
Scott Baker104b67d2019-10-29 15:56:27 -0700670 case notOk := <-sc.producer.Errors():
Neha Sharma94f16a92020-06-26 04:17:55 +0000671 logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
672 if sc.isLivenessError(ctx, notOk) {
673 sc.updateLiveness(ctx, false)
Scott Baker104b67d2019-10-29 15:56:27 -0700674 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700675 return notOk
676 }
677 return nil
678}
679
680// getGroupId returns the group id from the key-value args.
681func getGroupId(kvArgs ...*KVArg) string {
682 for _, arg := range kvArgs {
683 if arg.Key == GroupIdKey {
684 return arg.Value.(string)
685 }
686 }
687 return ""
688}
689
690// getOffset returns the offset from the key-value args.
691func getOffset(kvArgs ...*KVArg) int64 {
692 for _, arg := range kvArgs {
693 if arg.Key == Offset {
694 return arg.Value.(int64)
695 }
696 }
697 return sarama.OffsetNewest
698}
699
Neha Sharma94f16a92020-06-26 04:17:55 +0000700func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700701 config := sarama.NewConfig()
702 config.Version = sarama.V1_0_0_0
703
704 // Create a cluster Admin
705 var cAdmin sarama.ClusterAdmin
706 var err error
Neha Sharmadd9af392020-04-28 09:03:57 +0000707 if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000708 logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
Scott Baker2c1c4822019-10-16 11:02:41 -0700709 return err
710 }
711 sc.cAdmin = cAdmin
712 return nil
713}
714
715func (sc *SaramaClient) lockTopic(topic *Topic) {
716 sc.lockOfTopicLockMap.Lock()
717 if _, exist := sc.topicLockMap[topic.Name]; exist {
718 sc.lockOfTopicLockMap.Unlock()
719 sc.topicLockMap[topic.Name].Lock()
720 } else {
721 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
722 sc.lockOfTopicLockMap.Unlock()
723 sc.topicLockMap[topic.Name].Lock()
724 }
725}
726
727func (sc *SaramaClient) unLockTopic(topic *Topic) {
728 sc.lockOfTopicLockMap.Lock()
729 defer sc.lockOfTopicLockMap.Unlock()
730 if _, exist := sc.topicLockMap[topic.Name]; exist {
731 sc.topicLockMap[topic.Name].Unlock()
732 }
733}
734
735func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
736 sc.lockTopicToConsumerChannelMap.Lock()
737 defer sc.lockTopicToConsumerChannelMap.Unlock()
738 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
739 sc.topicToConsumerChannelMap[id] = arg
740 }
741}
742
Scott Baker2c1c4822019-10-16 11:02:41 -0700743func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
744 sc.lockTopicToConsumerChannelMap.RLock()
745 defer sc.lockTopicToConsumerChannelMap.RUnlock()
746
747 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
748 return consumerCh
749 }
750 return nil
751}
752
khenaidoo26721882021-08-11 17:42:52 -0400753func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan proto.Message) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700754 sc.lockTopicToConsumerChannelMap.Lock()
755 defer sc.lockTopicToConsumerChannelMap.Unlock()
756 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
757 consumerCh.channels = append(consumerCh.channels, ch)
758 return
759 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000760 logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700761}
762
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400763// closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
Neha Sharma94f16a92020-06-26 04:17:55 +0000764func closeConsumers(ctx context.Context, consumers []interface{}) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700765 var err error
766 for _, consumer := range consumers {
767 // Is it a partition consumers?
768 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
769 if errTemp := partionConsumer.Close(); errTemp != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000770 logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
Scott Baker2c1c4822019-10-16 11:02:41 -0700771 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
772 // This can occur on race condition
773 err = nil
774 } else {
775 err = errTemp
776 }
777 }
Abhay Kumar40252eb2025-10-13 13:25:53 +0000778 } else if groupConsumer, ok := consumer.(sarama.ConsumerGroup); ok {
Scott Baker2c1c4822019-10-16 11:02:41 -0700779 if errTemp := groupConsumer.Close(); errTemp != nil {
780 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
781 // This can occur on race condition
782 err = nil
783 } else {
784 err = errTemp
785 }
786 }
787 }
788 }
789 return err
790}
791
khenaidoo26721882021-08-11 17:42:52 -0400792func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan proto.Message) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700793 sc.lockTopicToConsumerChannelMap.Lock()
794 defer sc.lockTopicToConsumerChannelMap.Unlock()
795 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
796 // Channel will be closed in the removeChannel method
Neha Sharma94f16a92020-06-26 04:17:55 +0000797 consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700798 // If there are no more channels then we can close the consumers itself
799 if len(consumerCh.channels) == 0 {
Neha Sharma94f16a92020-06-26 04:17:55 +0000800 logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
801 err := closeConsumers(ctx, consumerCh.consumers)
Scott Baker2c1c4822019-10-16 11:02:41 -0700802 //err := consumerCh.consumers.Close()
803 delete(sc.topicToConsumerChannelMap, topic.Name)
804 return err
805 }
806 return nil
807 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000808 logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700809 return errors.New("topic-does-not-exist")
810}
811
Neha Sharma94f16a92020-06-26 04:17:55 +0000812func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700813 sc.lockTopicToConsumerChannelMap.Lock()
814 defer sc.lockTopicToConsumerChannelMap.Unlock()
815 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
816 for _, ch := range consumerCh.channels {
817 // Channel will be closed in the removeChannel method
Neha Sharma94f16a92020-06-26 04:17:55 +0000818 removeChannel(ctx, consumerCh.channels, ch)
Scott Baker2c1c4822019-10-16 11:02:41 -0700819 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000820 err := closeConsumers(ctx, consumerCh.consumers)
Scott Baker2c1c4822019-10-16 11:02:41 -0700821 //if err == sarama.ErrUnknownTopicOrPartition {
822 // // Not an error
823 // err = nil
824 //}
825 //err := consumerCh.consumers.Close()
826 delete(sc.topicToConsumerChannelMap, topic.Name)
827 return err
828 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000829 logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700830 return nil
831}
832
Joey Armstrong7f8436c2023-07-09 20:23:27 -0400833// createPublisher creates the publisher which is used to send a message onto kafka
Neha Sharma94f16a92020-06-26 04:17:55 +0000834func (sc *SaramaClient) createPublisher(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700835 // This Creates the publisher
836 config := sarama.NewConfig()
Himani Chawlaf87a6a92021-04-01 17:44:16 +0530837 config.Version = sarama.V1_0_0_0
kesavandd85e52b2022-03-15 16:38:08 +0530838 config.Producer.Partitioner = sarama.NewHashPartitioner
Scott Baker2c1c4822019-10-16 11:02:41 -0700839 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
840 config.Producer.Flush.Messages = sc.producerFlushMessages
841 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
842 config.Producer.Return.Errors = sc.producerReturnErrors
843 config.Producer.Return.Successes = sc.producerReturnSuccess
844 //config.Producer.RequiredAcks = sarama.WaitForAll
845 config.Producer.RequiredAcks = sarama.WaitForLocal
846
Neha Sharmadd9af392020-04-28 09:03:57 +0000847 brokers := []string{sc.KafkaAddress}
Scott Baker2c1c4822019-10-16 11:02:41 -0700848
849 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000850 logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700851 return err
852 } else {
853 sc.producer = producer
854 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000855 logger.Info(ctx, "Kafka-publisher-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700856 return nil
857}
858
Neha Sharma94f16a92020-06-26 04:17:55 +0000859func (sc *SaramaClient) createConsumer(ctx context.Context) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700860 config := sarama.NewConfig()
Himani Chawlaf87a6a92021-04-01 17:44:16 +0530861 config.Version = sarama.V1_0_0_0
Scott Baker2c1c4822019-10-16 11:02:41 -0700862 config.Consumer.Return.Errors = true
863 config.Consumer.Fetch.Min = 1
864 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
865 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
866 config.Consumer.Offsets.Initial = sarama.OffsetNewest
867 config.Metadata.Retry.Max = sc.metadataMaxRetry
Neha Sharmadd9af392020-04-28 09:03:57 +0000868 brokers := []string{sc.KafkaAddress}
Scott Baker2c1c4822019-10-16 11:02:41 -0700869
870 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000871 logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700872 return err
873 } else {
874 sc.consumer = consumer
875 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000876 logger.Info(ctx, "Kafka-consumers-created")
Scott Baker2c1c4822019-10-16 11:02:41 -0700877 return nil
878}
879
880// createGroupConsumer creates a consumers group
Abhay Kumar40252eb2025-10-13 13:25:53 +0000881func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (sarama.ConsumerGroup, error) {
882 config := sarama.NewConfig()
Himani Chawlaf87a6a92021-04-01 17:44:16 +0530883 config.Version = sarama.V1_0_0_0
Scott Baker2c1c4822019-10-16 11:02:41 -0700884 config.ClientID = uuid.New().String()
Abhay Kumar40252eb2025-10-13 13:25:53 +0000885 config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
Scott Baker104b67d2019-10-29 15:56:27 -0700886 config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
887 config.Consumer.Return.Errors = true
Scott Baker2c1c4822019-10-16 11:02:41 -0700888 config.Consumer.Offsets.Initial = initialOffset
Abhay Kumar40252eb2025-10-13 13:25:53 +0000889
Neha Sharmadd9af392020-04-28 09:03:57 +0000890 brokers := []string{sc.KafkaAddress}
Abhay Kumar40252eb2025-10-13 13:25:53 +0000891 // topics := []string{topic.Name}
892 var consumerGroup sarama.ConsumerGroup
Scott Baker2c1c4822019-10-16 11:02:41 -0700893 var err error
Abhay Kumar40252eb2025-10-13 13:25:53 +0000894 if consumerGroup, err = sarama.NewConsumerGroup(brokers, groupId, config); err != nil {
895 logger.Errorw(ctx, "create-group-consumer-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700896 return nil, err
897 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700898
Abhay Kumar40252eb2025-10-13 13:25:53 +0000899 logger.Debugw(ctx, "create-group-consumer-success", log.Fields{"topic": topic.Name, "groupId": groupId})
900 //sc.groupConsumers[topic.Name] = consumerGroup
901 sc.addToGroupConsumers(topic.Name, consumerGroup)
902 return consumerGroup, nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700903}
904
905// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
906// topic via the unique channel each subscriber received during subscription
khenaidoo26721882021-08-11 17:42:52 -0400907func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage proto.Message, fromTopic string, ts time.Time) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700908 // Need to go over all channels and publish messages to them - do we need to copy msg?
909 sc.lockTopicToConsumerChannelMap.RLock()
Scott Baker2c1c4822019-10-16 11:02:41 -0700910 for _, ch := range consumerCh.channels {
khenaidoo26721882021-08-11 17:42:52 -0400911 go func(c chan proto.Message) {
Scott Baker2c1c4822019-10-16 11:02:41 -0700912 c <- protoMessage
913 }(ch)
914 }
Kent Hagermanccfa2132019-12-17 13:29:34 -0500915 sc.lockTopicToConsumerChannelMap.RUnlock()
916
917 if callback := sc.metadataCallback; callback != nil {
khenaidoo26721882021-08-11 17:42:52 -0400918 callback(fromTopic, ts)
Kent Hagermanccfa2132019-12-17 13:29:34 -0500919 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700920}
921
Neha Sharma94f16a92020-06-26 04:17:55 +0000922func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
923 logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700924startloop:
925 for {
926 select {
927 case err, ok := <-consumer.Errors():
928 if ok {
Neha Sharma94f16a92020-06-26 04:17:55 +0000929 if sc.isLivenessError(ctx, err) {
930 sc.updateLiveness(ctx, false)
931 logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
cbabud4978652019-12-04 08:04:21 +0100932 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700933 } else {
934 // Channel is closed
935 break startloop
936 }
937 case msg, ok := <-consumer.Messages():
Neha Sharma94f16a92020-06-26 04:17:55 +0000938 //logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
Scott Baker2c1c4822019-10-16 11:02:41 -0700939 if !ok {
940 // channel is closed
941 break startloop
942 }
943 msgBody := msg.Value
Neha Sharma94f16a92020-06-26 04:17:55 +0000944 sc.updateLiveness(ctx, true)
945 logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
khenaidoo26721882021-08-11 17:42:52 -0400946 var protoMsg proto.Message
947 if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000948 logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700949 continue
950 }
khenaidoo26721882021-08-11 17:42:52 -0400951 go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
Scott Baker2c1c4822019-10-16 11:02:41 -0700952 case <-sc.doneCh:
Neha Sharma94f16a92020-06-26 04:17:55 +0000953 logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700954 break startloop
955 }
956 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000957 logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
958 sc.setUnhealthy(ctx)
Scott Baker2c1c4822019-10-16 11:02:41 -0700959}
960
Abhay Kumar40252eb2025-10-13 13:25:53 +0000961func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumerGroup sarama.ConsumerGroup, consumerChnls *consumerChannels) {
Neha Sharma94f16a92020-06-26 04:17:55 +0000962 logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -0700963
Abhay Kumar40252eb2025-10-13 13:25:53 +0000964 go func() {
965 for {
966 err := consumerGroup.Consume(ctx, []string{topic.Name}, &groupConsumerHandler{
967 consumerChnls: consumerChnls,
968 sc: sc,
969 topic: topic,
970 })
971 if err != nil {
972 logger.Warnw(ctx, "group-consumer-error", log.Fields{"topic": topic.Name, "error": err})
Neha Sharma94f16a92020-06-26 04:17:55 +0000973 if sc.isLivenessError(ctx, err) {
974 sc.updateLiveness(ctx, false)
Scott Bakerfa2f6ee2019-11-19 14:53:14 -0800975 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700976 }
Abhay Kumar40252eb2025-10-13 13:25:53 +0000977 select {
978 case <-sc.doneCh:
979 logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
980 return
981 default:
Scott Baker2c1c4822019-10-16 11:02:41 -0700982 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700983 }
Abhay Kumar40252eb2025-10-13 13:25:53 +0000984 }()
985}
986
987type groupConsumerHandler struct {
988 consumerChnls *consumerChannels
989 sc *SaramaClient
990 topic *Topic
991}
992
993func (h *groupConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
994 return nil
995}
996
997func (h *groupConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
998 return nil
999}
1000
1001func (h *groupConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
1002 for msg := range claim.Messages() {
1003 h.sc.updateLiveness(context.Background(), true)
1004 logger.Debugw(context.Background(), "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
1005 var protoMsg proto.Message
1006 if err := proto.Unmarshal(msg.Value, protoMsg); err != nil {
1007 logger.Warnw(context.Background(), "invalid-message", log.Fields{"error": err})
1008 continue
1009 }
1010 go h.sc.dispatchToConsumers(h.consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
1011 session.MarkMessage(msg, "")
Scott Baker2c1c4822019-10-16 11:02:41 -07001012 }
Abhay Kumar40252eb2025-10-13 13:25:53 +00001013 return nil
Scott Baker2c1c4822019-10-16 11:02:41 -07001014}
1015
Neha Sharma94f16a92020-06-26 04:17:55 +00001016func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
1017 logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001018 var consumerCh *consumerChannels
1019 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001020 logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001021 return errors.New("consumers-not-exist")
1022 }
1023 // For each consumer listening for that topic, start a consumption loop
1024 for _, consumer := range consumerCh.consumers {
1025 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
Neha Sharma94f16a92020-06-26 04:17:55 +00001026 go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
Abhay Kumar40252eb2025-10-13 13:25:53 +00001027 } else if gConsumer, ok := consumer.(sarama.ConsumerGroup); ok {
Neha Sharma94f16a92020-06-26 04:17:55 +00001028 go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
Scott Baker2c1c4822019-10-16 11:02:41 -07001029 } else {
Neha Sharma94f16a92020-06-26 04:17:55 +00001030 logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
Scott Baker2c1c4822019-10-16 11:02:41 -07001031 return errors.New("invalid-consumer")
1032 }
1033 }
1034 return nil
1035}
1036
Joey Armstrong7f8436c2023-07-09 20:23:27 -04001037// // setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1038// // for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo26721882021-08-11 17:42:52 -04001039func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan proto.Message, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -07001040 var pConsumers []sarama.PartitionConsumer
1041 var err error
1042
Neha Sharma94f16a92020-06-26 04:17:55 +00001043 if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
1044 logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001045 return nil, err
1046 }
1047
1048 consumersIf := make([]interface{}, 0)
1049 for _, pConsumer := range pConsumers {
1050 consumersIf = append(consumersIf, pConsumer)
1051 }
1052
1053 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
1054 // unbuffered to verify race conditions.
khenaidoo26721882021-08-11 17:42:52 -04001055 consumerListeningChannel := make(chan proto.Message)
Scott Baker2c1c4822019-10-16 11:02:41 -07001056 cc := &consumerChannels{
1057 consumers: consumersIf,
khenaidoo26721882021-08-11 17:42:52 -04001058 channels: []chan proto.Message{consumerListeningChannel},
Scott Baker2c1c4822019-10-16 11:02:41 -07001059 }
1060
1061 // Add the consumers channel to the map
1062 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1063
1064 //Start a consumers to listen on that specific topic
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001065 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +00001066 if err := sc.startConsumers(ctx, topic); err != nil {
1067 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001068 "topic": topic,
1069 "error": err})
1070 }
1071 }()
Scott Baker2c1c4822019-10-16 11:02:41 -07001072
1073 return consumerListeningChannel, nil
1074}
1075
1076// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
1077// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo26721882021-08-11 17:42:52 -04001078func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
Abhay Kumar40252eb2025-10-13 13:25:53 +00001079 var consumerGroup sarama.ConsumerGroup
Scott Baker2c1c4822019-10-16 11:02:41 -07001080 var err error
Abhay Kumar40252eb2025-10-13 13:25:53 +00001081 if consumerGroup, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset); err != nil {
1082 logger.Errorw(ctx, "creating-group-consumer-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001083 return nil, err
1084 }
Abhay Kumar40252eb2025-10-13 13:25:53 +00001085
khenaidoo26721882021-08-11 17:42:52 -04001086 consumerListeningChannel := make(chan proto.Message)
Scott Baker2c1c4822019-10-16 11:02:41 -07001087 cc := &consumerChannels{
Abhay Kumar40252eb2025-10-13 13:25:53 +00001088 consumers: []interface{}{consumerGroup},
khenaidoo26721882021-08-11 17:42:52 -04001089 channels: []chan proto.Message{consumerListeningChannel},
Scott Baker2c1c4822019-10-16 11:02:41 -07001090 }
1091
1092 // Add the consumers channel to the map
1093 sc.addTopicToConsumerChannelMap(topic.Name, cc)
1094
1095 //Start a consumers to listen on that specific topic
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001096 go func() {
Neha Sharma94f16a92020-06-26 04:17:55 +00001097 if err := sc.startConsumers(ctx, topic); err != nil {
1098 logger.Errorw(ctx, "start-consumers-failed", log.Fields{
David K. Bainbridge7c75cac2020-02-19 08:53:46 -08001099 "topic": topic,
1100 "error": err})
1101 }
1102 }()
Scott Baker2c1c4822019-10-16 11:02:41 -07001103
1104 return consumerListeningChannel, nil
1105}
1106
Neha Sharma94f16a92020-06-26 04:17:55 +00001107func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
1108 logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001109 partitionList, err := sc.consumer.Partitions(topic.Name)
1110 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001111 logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001112 return nil, err
1113 }
1114
1115 pConsumers := make([]sarama.PartitionConsumer, 0)
1116 for _, partition := range partitionList {
1117 var pConsumer sarama.PartitionConsumer
1118 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +00001119 logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
Scott Baker2c1c4822019-10-16 11:02:41 -07001120 return nil, err
1121 }
1122 pConsumers = append(pConsumers, pConsumer)
1123 }
1124 return pConsumers, nil
1125}
1126
khenaidoo26721882021-08-11 17:42:52 -04001127func removeChannel(ctx context.Context, channels []chan proto.Message, ch <-chan proto.Message) []chan proto.Message {
Scott Baker2c1c4822019-10-16 11:02:41 -07001128 var i int
khenaidoo26721882021-08-11 17:42:52 -04001129 var channel chan proto.Message
Scott Baker2c1c4822019-10-16 11:02:41 -07001130 for i, channel = range channels {
1131 if channel == ch {
1132 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
1133 close(channel)
Neha Sharma94f16a92020-06-26 04:17:55 +00001134 logger.Debug(ctx, "channel-closed")
Scott Baker2c1c4822019-10-16 11:02:41 -07001135 return channels[:len(channels)-1]
1136 }
1137 }
1138 return channels
1139}
1140
Abhay Kumar40252eb2025-10-13 13:25:53 +00001141func (sc *SaramaClient) addToGroupConsumers(topic string, consumerGroup sarama.ConsumerGroup) {
Scott Baker2c1c4822019-10-16 11:02:41 -07001142 sc.lockOfGroupConsumers.Lock()
1143 defer sc.lockOfGroupConsumers.Unlock()
1144 if _, exist := sc.groupConsumers[topic]; !exist {
Abhay Kumar40252eb2025-10-13 13:25:53 +00001145 sc.groupConsumers[topic] = consumerGroup
Scott Baker2c1c4822019-10-16 11:02:41 -07001146 }
1147}
1148
Neha Sharma94f16a92020-06-26 04:17:55 +00001149func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -07001150 sc.lockOfGroupConsumers.Lock()
1151 defer sc.lockOfGroupConsumers.Unlock()
Abhay Kumar40252eb2025-10-13 13:25:53 +00001152 if consumerGroup, exist := sc.groupConsumers[topic]; exist {
Scott Baker2c1c4822019-10-16 11:02:41 -07001153 delete(sc.groupConsumers, topic)
Abhay Kumar40252eb2025-10-13 13:25:53 +00001154 if err := consumerGroup.Close(); err != nil {
1155 logger.Errorw(ctx, "failure-closing-consumer-group", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -07001156 return err
1157 }
1158 }
1159 return nil
1160}
kesavandd85e52b2022-03-15 16:38:08 +05301161
1162func (sc *SaramaClient) ListTopics(ctx context.Context) ([]string, error) {
1163
1164 config := sarama.NewConfig()
1165 client, err := sarama.NewClient([]string{sc.KafkaAddress}, config)
1166 if err != nil {
1167 logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
1168 return nil, err
1169 }
1170
1171 topics, err := client.Topics()
1172 if err != nil {
1173 logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
1174 return nil, err
1175 }
1176
1177 return topics, nil
1178}