blob: f6876fbee02c0cc40ad8168b009ca2cddb795481 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import "sync"
4
5var expectationsPool = sync.Pool{
6 New: func() interface{} {
7 return make(chan *ProducerError, 1)
8 },
9}
10
11// SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
12// broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
13// to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
14//
15// The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
16// durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
17// There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
18//
19// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
20// be set to true in its configuration.
21type SyncProducer interface {
22
23 // SendMessage produces a given message, and returns only when it either has
24 // succeeded or failed to produce. It will return the partition and the offset
25 // of the produced message, or an error if the message failed to produce.
26 SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
27
28 // SendMessages produces a given set of messages, and returns only when all
29 // messages in the set have either succeeded or failed. Note that messages
30 // can succeed and fail individually; if some succeed and some fail,
31 // SendMessages will return an error.
32 SendMessages(msgs []*ProducerMessage) error
33
34 // Close shuts down the producer; you must call this function before a producer
35 // object passes out of scope, as it may otherwise leak memory.
36 // You must call this before calling Close on the underlying client.
37 Close() error
38
39 // TxnStatus return current producer transaction status.
40 TxnStatus() ProducerTxnStatusFlag
41
42 // IsTransactional return true when current producer is transactional.
43 IsTransactional() bool
44
45 // BeginTxn mark current transaction as ready.
46 BeginTxn() error
47
48 // CommitTxn commit current transaction.
49 CommitTxn() error
50
51 // AbortTxn abort current transaction.
52 AbortTxn() error
53
54 // AddOffsetsToTxn add associated offsets to current transaction.
55 AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error
56
57 // AddMessageToTxn add message offsets to current transaction.
58 AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
59}
60
61type syncProducer struct {
62 producer *asyncProducer
63 wg sync.WaitGroup
64}
65
66// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
67func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
68 if config == nil {
69 config = NewConfig()
70 config.Producer.Return.Successes = true
71 }
72
73 if err := verifyProducerConfig(config); err != nil {
74 return nil, err
75 }
76
77 p, err := NewAsyncProducer(addrs, config)
78 if err != nil {
79 return nil, err
80 }
81 return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
82}
83
84// NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
85// necessary to call Close() on the underlying client when shutting down this producer.
86func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
87 if err := verifyProducerConfig(client.Config()); err != nil {
88 return nil, err
89 }
90
91 p, err := NewAsyncProducerFromClient(client)
92 if err != nil {
93 return nil, err
94 }
95 return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
96}
97
98func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
99 sp := &syncProducer{producer: p}
100
101 sp.wg.Add(2)
102 go withRecover(sp.handleSuccesses)
103 go withRecover(sp.handleErrors)
104
105 return sp
106}
107
108func verifyProducerConfig(config *Config) error {
109 if !config.Producer.Return.Errors {
110 return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
111 }
112 if !config.Producer.Return.Successes {
113 return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
114 }
115 return nil
116}
117
118func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
119 expectation := expectationsPool.Get().(chan *ProducerError)
120 msg.expectation = expectation
121 sp.producer.Input() <- msg
122 pErr := <-expectation
123 msg.expectation = nil
124 expectationsPool.Put(expectation)
125 if pErr != nil {
126 return -1, -1, pErr.Err
127 }
128
129 return msg.Partition, msg.Offset, nil
130}
131
132func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
133 indices := make(chan int, len(msgs))
134 go func() {
135 for i, msg := range msgs {
136 expectation := expectationsPool.Get().(chan *ProducerError)
137 msg.expectation = expectation
138 sp.producer.Input() <- msg
139 indices <- i
140 }
141 close(indices)
142 }()
143
144 var errors ProducerErrors
145 for i := range indices {
146 expectation := msgs[i].expectation
147 pErr := <-expectation
148 msgs[i].expectation = nil
149 expectationsPool.Put(expectation)
150 if pErr != nil {
151 errors = append(errors, pErr)
152 }
153 }
154
155 if len(errors) > 0 {
156 return errors
157 }
158 return nil
159}
160
161func (sp *syncProducer) handleSuccesses() {
162 defer sp.wg.Done()
163 for msg := range sp.producer.Successes() {
164 expectation := msg.expectation
165 expectation <- nil
166 }
167}
168
169func (sp *syncProducer) handleErrors() {
170 defer sp.wg.Done()
171 for err := range sp.producer.Errors() {
172 expectation := err.Msg.expectation
173 expectation <- err
174 }
175}
176
177func (sp *syncProducer) Close() error {
178 sp.producer.AsyncClose()
179 sp.wg.Wait()
180 return nil
181}
182
183func (sp *syncProducer) IsTransactional() bool {
184 return sp.producer.IsTransactional()
185}
186
187func (sp *syncProducer) BeginTxn() error {
188 return sp.producer.BeginTxn()
189}
190
191func (sp *syncProducer) CommitTxn() error {
192 return sp.producer.CommitTxn()
193}
194
195func (sp *syncProducer) AbortTxn() error {
196 return sp.producer.AbortTxn()
197}
198
199func (sp *syncProducer) AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error {
200 return sp.producer.AddOffsetsToTxn(offsets, groupId)
201}
202
203func (sp *syncProducer) AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error {
204 return sp.producer.AddMessageToTxn(msg, groupId, metadata)
205}
206
207func (p *syncProducer) TxnStatus() ProducerTxnStatusFlag {
208 return p.producer.TxnStatus()
209}