blob: 0b594c261362abdbb77047cbce8b2024432883ee [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import (
4 "sync"
5 "time"
6)
7
8// Offset Manager
9
10// OffsetManager uses Kafka to store and fetch consumed partition offsets.
11type OffsetManager interface {
12 // ManagePartition creates a PartitionOffsetManager on the given topic/partition.
13 // It will return an error if this OffsetManager is already managing the given
14 // topic/partition.
15 ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
16
17 // Close stops the OffsetManager from managing offsets. It is required to call
18 // this function before an OffsetManager object passes out of scope, as it
19 // will otherwise leak memory. You must call this after all the
20 // PartitionOffsetManagers are closed.
21 Close() error
22
23 // Commit commits the offsets. This method can be used if AutoCommit.Enable is
24 // set to false.
25 Commit()
26}
27
28type offsetManager struct {
29 client Client
30 conf *Config
31 group string
32 ticker *time.Ticker
33 sessionCanceler func()
34
35 memberID string
36 groupInstanceId *string
37 generation int32
38
39 broker *Broker
40 brokerLock sync.RWMutex
41
42 poms map[string]map[int32]*partitionOffsetManager
43 pomsLock sync.RWMutex
44
45 closeOnce sync.Once
46 closing chan none
47 closed chan none
48}
49
50// NewOffsetManagerFromClient creates a new OffsetManager from the given client.
51// It is still necessary to call Close() on the underlying client when finished with the partition manager.
52func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
53 return newOffsetManagerFromClient(group, "", GroupGenerationUndefined, client, nil)
54}
55
56func newOffsetManagerFromClient(group, memberID string, generation int32, client Client, sessionCanceler func()) (*offsetManager, error) {
57 // Check that we are not dealing with a closed Client before processing any other arguments
58 if client.Closed() {
59 return nil, ErrClosedClient
60 }
61
62 conf := client.Config()
63 om := &offsetManager{
64 client: client,
65 conf: conf,
66 group: group,
67 poms: make(map[string]map[int32]*partitionOffsetManager),
68 sessionCanceler: sessionCanceler,
69
70 memberID: memberID,
71 generation: generation,
72
73 closing: make(chan none),
74 closed: make(chan none),
75 }
76 if conf.Consumer.Group.InstanceId != "" {
77 om.groupInstanceId = &conf.Consumer.Group.InstanceId
78 }
79 if conf.Consumer.Offsets.AutoCommit.Enable {
80 om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)
81 go withRecover(om.mainLoop)
82 }
83
84 return om, nil
85}
86
87func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
88 pom, err := om.newPartitionOffsetManager(topic, partition)
89 if err != nil {
90 return nil, err
91 }
92
93 om.pomsLock.Lock()
94 defer om.pomsLock.Unlock()
95
96 topicManagers := om.poms[topic]
97 if topicManagers == nil {
98 topicManagers = make(map[int32]*partitionOffsetManager)
99 om.poms[topic] = topicManagers
100 }
101
102 if topicManagers[partition] != nil {
103 return nil, ConfigurationError("That topic/partition is already being managed")
104 }
105
106 topicManagers[partition] = pom
107 return pom, nil
108}
109
110func (om *offsetManager) Close() error {
111 om.closeOnce.Do(func() {
112 // exit the mainLoop
113 close(om.closing)
114 if om.conf.Consumer.Offsets.AutoCommit.Enable {
115 <-om.closed
116 }
117
118 // mark all POMs as closed
119 om.asyncClosePOMs()
120
121 // flush one last time
122 if om.conf.Consumer.Offsets.AutoCommit.Enable {
123 for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
124 om.flushToBroker()
125 if om.releasePOMs(false) == 0 {
126 break
127 }
128 }
129 }
130
131 om.releasePOMs(true)
132 om.brokerLock.Lock()
133 om.broker = nil
134 om.brokerLock.Unlock()
135 })
136 return nil
137}
138
139func (om *offsetManager) computeBackoff(retries int) time.Duration {
140 if om.conf.Metadata.Retry.BackoffFunc != nil {
141 return om.conf.Metadata.Retry.BackoffFunc(retries, om.conf.Metadata.Retry.Max)
142 } else {
143 return om.conf.Metadata.Retry.Backoff
144 }
145}
146
147func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, int32, string, error) {
148 broker, err := om.coordinator()
149 if err != nil {
150 if retries <= 0 {
151 return 0, 0, "", err
152 }
153 return om.fetchInitialOffset(topic, partition, retries-1)
154 }
155
156 partitions := map[string][]int32{topic: {partition}}
157 req := NewOffsetFetchRequest(om.conf.Version, om.group, partitions)
158 resp, err := broker.FetchOffset(req)
159 if err != nil {
160 if retries <= 0 {
161 return 0, 0, "", err
162 }
163 om.releaseCoordinator(broker)
164 return om.fetchInitialOffset(topic, partition, retries-1)
165 }
166
167 block := resp.GetBlock(topic, partition)
168 if block == nil {
169 return 0, 0, "", ErrIncompleteResponse
170 }
171
172 switch block.Err {
173 case ErrNoError:
174 return block.Offset, block.LeaderEpoch, block.Metadata, nil
175 case ErrNotCoordinatorForConsumer:
176 if retries <= 0 {
177 return 0, 0, "", block.Err
178 }
179 om.releaseCoordinator(broker)
180 return om.fetchInitialOffset(topic, partition, retries-1)
181 case ErrOffsetsLoadInProgress:
182 if retries <= 0 {
183 return 0, 0, "", block.Err
184 }
185 backoff := om.computeBackoff(retries)
186 select {
187 case <-om.closing:
188 return 0, 0, "", block.Err
189 case <-time.After(backoff):
190 }
191 return om.fetchInitialOffset(topic, partition, retries-1)
192 default:
193 return 0, 0, "", block.Err
194 }
195}
196
197func (om *offsetManager) coordinator() (*Broker, error) {
198 om.brokerLock.RLock()
199 broker := om.broker
200 om.brokerLock.RUnlock()
201
202 if broker != nil {
203 return broker, nil
204 }
205
206 om.brokerLock.Lock()
207 defer om.brokerLock.Unlock()
208
209 if broker := om.broker; broker != nil {
210 return broker, nil
211 }
212
213 if err := om.client.RefreshCoordinator(om.group); err != nil {
214 return nil, err
215 }
216
217 broker, err := om.client.Coordinator(om.group)
218 if err != nil {
219 return nil, err
220 }
221
222 om.broker = broker
223 return broker, nil
224}
225
226func (om *offsetManager) releaseCoordinator(b *Broker) {
227 om.brokerLock.Lock()
228 if om.broker == b {
229 om.broker = nil
230 }
231 om.brokerLock.Unlock()
232}
233
234func (om *offsetManager) mainLoop() {
235 defer om.ticker.Stop()
236 defer close(om.closed)
237
238 for {
239 select {
240 case <-om.ticker.C:
241 om.Commit()
242 case <-om.closing:
243 return
244 }
245 }
246}
247
248func (om *offsetManager) Commit() {
249 om.flushToBroker()
250 om.releasePOMs(false)
251}
252
253func (om *offsetManager) flushToBroker() {
254 broker, err := om.coordinator()
255 if err != nil {
256 om.handleError(err)
257 return
258 }
259
260 // Care needs to be taken to unlock this. Don't want to defer the unlock as this would
261 // cause the lock to be held while waiting for the broker to reply.
262 broker.lock.Lock()
263 req := om.constructRequest()
264 if req == nil {
265 broker.lock.Unlock()
266 return
267 }
268 resp, rp, err := sendOffsetCommit(broker, req)
269 broker.lock.Unlock()
270
271 if err != nil {
272 om.handleError(err)
273 om.releaseCoordinator(broker)
274 _ = broker.Close()
275 return
276 }
277
278 err = handleResponsePromise(req, resp, rp, nil)
279 if err != nil {
280 om.handleError(err)
281 om.releaseCoordinator(broker)
282 _ = broker.Close()
283 return
284 }
285
286 broker.handleThrottledResponse(resp)
287 om.handleResponse(broker, req, resp)
288}
289
290func sendOffsetCommit(coordinator *Broker, req *OffsetCommitRequest) (*OffsetCommitResponse, *responsePromise, error) {
291 resp := new(OffsetCommitResponse)
292
293 promise, err := coordinator.send(req, resp)
294 if err != nil {
295 return nil, nil, err
296 }
297
298 return resp, promise, nil
299}
300
301func (om *offsetManager) constructRequest() *OffsetCommitRequest {
302 r := &OffsetCommitRequest{
303 Version: 1,
304 ConsumerGroup: om.group,
305 ConsumerID: om.memberID,
306 ConsumerGroupGeneration: om.generation,
307 }
308 // Version 1 adds timestamp and group membership information, as well as the commit timestamp.
309 //
310 // Version 2 adds retention time. It removes the commit timestamp added in version 1.
311 if om.conf.Version.IsAtLeast(V0_9_0_0) {
312 r.Version = 2
313 }
314 // Version 3 and 4 are the same as version 2.
315 if om.conf.Version.IsAtLeast(V0_11_0_0) {
316 r.Version = 3
317 }
318 if om.conf.Version.IsAtLeast(V2_0_0_0) {
319 r.Version = 4
320 }
321 // Version 5 removes the retention time, which is now controlled only by a broker configuration.
322 //
323 // Version 6 adds the leader epoch for fencing.
324 if om.conf.Version.IsAtLeast(V2_1_0_0) {
325 r.Version = 6
326 }
327 // version 7 adds a new field called groupInstanceId to indicate member identity across restarts.
328 if om.conf.Version.IsAtLeast(V2_3_0_0) {
329 r.Version = 7
330 r.GroupInstanceId = om.groupInstanceId
331 }
332
333 // commit timestamp was only briefly supported in V1 where we set it to
334 // ReceiveTime (-1) to tell the broker to set it to the time when the commit
335 // request was received
336 var commitTimestamp int64
337 if r.Version == 1 {
338 commitTimestamp = ReceiveTime
339 }
340
341 // request controlled retention was only supported from V2-V4 (it became
342 // broker-only after that) so if the user has set the config options then
343 // flow those through as retention time on the commit request.
344 if r.Version >= 2 && r.Version < 5 {
345 // Map Sarama's default of 0 to Kafka's default of -1
346 r.RetentionTime = -1
347 if om.conf.Consumer.Offsets.Retention > 0 {
348 r.RetentionTime = int64(om.conf.Consumer.Offsets.Retention / time.Millisecond)
349 }
350 }
351
352 om.pomsLock.RLock()
353 defer om.pomsLock.RUnlock()
354
355 for _, topicManagers := range om.poms {
356 for _, pom := range topicManagers {
357 pom.lock.Lock()
358 if pom.dirty {
359 r.AddBlockWithLeaderEpoch(pom.topic, pom.partition, pom.offset, pom.leaderEpoch, commitTimestamp, pom.metadata)
360 }
361 pom.lock.Unlock()
362 }
363 }
364
365 if len(r.blocks) > 0 {
366 return r
367 }
368
369 return nil
370}
371
372func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) {
373 om.pomsLock.RLock()
374 defer om.pomsLock.RUnlock()
375
376 for _, topicManagers := range om.poms {
377 for _, pom := range topicManagers {
378 if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil {
379 continue
380 }
381
382 var err KError
383 var ok bool
384
385 if resp.Errors[pom.topic] == nil {
386 pom.handleError(ErrIncompleteResponse)
387 continue
388 }
389 if err, ok = resp.Errors[pom.topic][pom.partition]; !ok {
390 pom.handleError(ErrIncompleteResponse)
391 continue
392 }
393
394 switch err {
395 case ErrNoError:
396 block := req.blocks[pom.topic][pom.partition]
397 pom.updateCommitted(block.offset, block.metadata)
398 case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
399 ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
400 // not a critical error, we just need to redispatch
401 om.releaseCoordinator(broker)
402 case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
403 // nothing we can do about this, just tell the user and carry on
404 pom.handleError(err)
405 case ErrOffsetsLoadInProgress:
406 // nothing wrong but we didn't commit, we'll get it next time round
407 case ErrFencedInstancedId:
408 pom.handleError(err)
409 // TODO close the whole consumer for instance fenced....
410 om.tryCancelSession()
411 case ErrUnknownTopicOrPartition:
412 // let the user know *and* try redispatching - if topic-auto-create is
413 // enabled, redispatching should trigger a metadata req and create the
414 // topic; if not then re-dispatching won't help, but we've let the user
415 // know and it shouldn't hurt either (see https://github.com/IBM/sarama/issues/706)
416 fallthrough
417 default:
418 // dunno, tell the user and try redispatching
419 pom.handleError(err)
420 om.releaseCoordinator(broker)
421 }
422 }
423 }
424}
425
426func (om *offsetManager) handleError(err error) {
427 om.pomsLock.RLock()
428 defer om.pomsLock.RUnlock()
429
430 for _, topicManagers := range om.poms {
431 for _, pom := range topicManagers {
432 pom.handleError(err)
433 }
434 }
435}
436
437func (om *offsetManager) asyncClosePOMs() {
438 om.pomsLock.RLock()
439 defer om.pomsLock.RUnlock()
440
441 for _, topicManagers := range om.poms {
442 for _, pom := range topicManagers {
443 pom.AsyncClose()
444 }
445 }
446}
447
448// Releases/removes closed POMs once they are clean (or when forced)
449func (om *offsetManager) releasePOMs(force bool) (remaining int) {
450 om.pomsLock.Lock()
451 defer om.pomsLock.Unlock()
452
453 for topic, topicManagers := range om.poms {
454 for partition, pom := range topicManagers {
455 pom.lock.Lock()
456 releaseDue := pom.done && (force || !pom.dirty)
457 pom.lock.Unlock()
458
459 if releaseDue {
460 pom.release()
461
462 delete(om.poms[topic], partition)
463 if len(om.poms[topic]) == 0 {
464 delete(om.poms, topic)
465 }
466 }
467 }
468 remaining += len(om.poms[topic])
469 }
470 return
471}
472
473func (om *offsetManager) findPOM(topic string, partition int32) *partitionOffsetManager {
474 om.pomsLock.RLock()
475 defer om.pomsLock.RUnlock()
476
477 if partitions, ok := om.poms[topic]; ok {
478 if pom, ok := partitions[partition]; ok {
479 return pom
480 }
481 }
482 return nil
483}
484
485func (om *offsetManager) tryCancelSession() {
486 if om.sessionCanceler != nil {
487 om.sessionCanceler()
488 }
489}
490
491// Partition Offset Manager
492
493// PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
494// on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
495// out of scope.
496type PartitionOffsetManager interface {
497 // NextOffset returns the next offset that should be consumed for the managed
498 // partition, accompanied by metadata which can be used to reconstruct the state
499 // of the partition consumer when it resumes. NextOffset() will return
500 // `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
501 // was committed for this partition yet.
502 NextOffset() (int64, string)
503
504 // MarkOffset marks the provided offset, alongside a metadata string
505 // that represents the state of the partition consumer at that point in time. The
506 // metadata string can be used by another consumer to restore that state, so it
507 // can resume consumption.
508 //
509 // To follow upstream conventions, you are expected to mark the offset of the
510 // next message to read, not the last message read. Thus, when calling `MarkOffset`
511 // you should typically add one to the offset of the last consumed message.
512 //
513 // Note: calling MarkOffset does not necessarily commit the offset to the backend
514 // store immediately for efficiency reasons, and it may never be committed if
515 // your application crashes. This means that you may end up processing the same
516 // message twice, and your processing should ideally be idempotent.
517 MarkOffset(offset int64, metadata string)
518
519 // ResetOffset resets to the provided offset, alongside a metadata string that
520 // represents the state of the partition consumer at that point in time. Reset
521 // acts as a counterpart to MarkOffset, the difference being that it allows to
522 // reset an offset to an earlier or smaller value, where MarkOffset only
523 // allows incrementing the offset. cf MarkOffset for more details.
524 ResetOffset(offset int64, metadata string)
525
526 // Errors returns a read channel of errors that occur during offset management, if
527 // enabled. By default, errors are logged and not returned over this channel. If
528 // you want to implement any custom error handling, set your config's
529 // Consumer.Return.Errors setting to true, and read from this channel.
530 Errors() <-chan *ConsumerError
531
532 // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
533 // return immediately, after which you should wait until the 'errors' channel has
534 // been drained and closed. It is required to call this function, or Close before
535 // a consumer object passes out of scope, as it will otherwise leak memory. You
536 // must call this before calling Close on the underlying client.
537 AsyncClose()
538
539 // Close stops the PartitionOffsetManager from managing offsets. It is required to
540 // call this function (or AsyncClose) before a PartitionOffsetManager object
541 // passes out of scope, as it will otherwise leak memory. You must call this
542 // before calling Close on the underlying client.
543 Close() error
544}
545
546type partitionOffsetManager struct {
547 parent *offsetManager
548 topic string
549 partition int32
550 leaderEpoch int32
551
552 lock sync.Mutex
553 offset int64
554 metadata string
555 dirty bool
556 done bool
557
558 releaseOnce sync.Once
559 errors chan *ConsumerError
560}
561
562func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
563 offset, leaderEpoch, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max)
564 if err != nil {
565 return nil, err
566 }
567
568 return &partitionOffsetManager{
569 parent: om,
570 topic: topic,
571 partition: partition,
572 leaderEpoch: leaderEpoch,
573 errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
574 offset: offset,
575 metadata: metadata,
576 }, nil
577}
578
579func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
580 return pom.errors
581}
582
583func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
584 pom.lock.Lock()
585 defer pom.lock.Unlock()
586
587 if offset > pom.offset {
588 pom.offset = offset
589 pom.metadata = metadata
590 pom.dirty = true
591 }
592}
593
594func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
595 pom.lock.Lock()
596 defer pom.lock.Unlock()
597
598 if offset <= pom.offset {
599 pom.offset = offset
600 pom.metadata = metadata
601 pom.dirty = true
602 }
603}
604
605func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
606 pom.lock.Lock()
607 defer pom.lock.Unlock()
608
609 if pom.offset == offset && pom.metadata == metadata {
610 pom.dirty = false
611 }
612}
613
614func (pom *partitionOffsetManager) NextOffset() (int64, string) {
615 pom.lock.Lock()
616 defer pom.lock.Unlock()
617
618 if pom.offset >= 0 {
619 return pom.offset, pom.metadata
620 }
621
622 return pom.parent.conf.Consumer.Offsets.Initial, ""
623}
624
625func (pom *partitionOffsetManager) AsyncClose() {
626 pom.lock.Lock()
627 pom.done = true
628 pom.lock.Unlock()
629}
630
631func (pom *partitionOffsetManager) Close() error {
632 pom.AsyncClose()
633
634 var errors ConsumerErrors
635 for err := range pom.errors {
636 errors = append(errors, err)
637 }
638
639 if len(errors) > 0 {
640 return errors
641 }
642 return nil
643}
644
645func (pom *partitionOffsetManager) handleError(err error) {
646 cErr := &ConsumerError{
647 Topic: pom.topic,
648 Partition: pom.partition,
649 Err: err,
650 }
651
652 if pom.parent.conf.Consumer.Return.Errors {
653 pom.errors <- cErr
654 } else {
655 Logger.Println(cErr)
656 }
657}
658
659func (pom *partitionOffsetManager) release() {
660 pom.releaseOnce.Do(func() {
661 close(pom.errors)
662 })
663}