blob: 2c5c6937b01666fe62bec730541763735eb2e386 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import "errors"
4
5// ReceiveTime is a special value for the timestamp field of Offset Commit Requests which
6// tells the broker to set the timestamp to the time at which the request was received.
7// The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
8const ReceiveTime int64 = -1
9
10// GroupGenerationUndefined is a special value for the group generation field of
11// Offset Commit Requests that should be used when a consumer group does not rely
12// on Kafka for partition management.
13const GroupGenerationUndefined = -1
14
15type offsetCommitRequestBlock struct {
16 offset int64
17 timestamp int64
18 committedLeaderEpoch int32
19 metadata string
20}
21
22func (b *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error {
23 pe.putInt64(b.offset)
24 if version == 1 {
25 pe.putInt64(b.timestamp)
26 } else if b.timestamp != 0 {
27 Logger.Println("Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored")
28 }
29 if version >= 6 {
30 pe.putInt32(b.committedLeaderEpoch)
31 }
32
33 return pe.putString(b.metadata)
34}
35
36func (b *offsetCommitRequestBlock) decode(pd packetDecoder, version int16) (err error) {
37 if b.offset, err = pd.getInt64(); err != nil {
38 return err
39 }
40 if version == 1 {
41 if b.timestamp, err = pd.getInt64(); err != nil {
42 return err
43 }
44 }
45 if version >= 6 {
46 if b.committedLeaderEpoch, err = pd.getInt32(); err != nil {
47 return err
48 }
49 }
50
51 b.metadata, err = pd.getString()
52 return err
53}
54
55type OffsetCommitRequest struct {
56 ConsumerGroup string
57 ConsumerGroupGeneration int32 // v1 or later
58 ConsumerID string // v1 or later
59 GroupInstanceId *string // v7 or later
60 RetentionTime int64 // v2 or later
61
62 // Version can be:
63 // - 0 (kafka 0.8.1 and later)
64 // - 1 (kafka 0.8.2 and later)
65 // - 2 (kafka 0.9.0 and later)
66 // - 3 (kafka 0.11.0 and later)
67 // - 4 (kafka 2.0.0 and later)
68 // - 5&6 (kafka 2.1.0 and later)
69 // - 7 (kafka 2.3.0 and later)
70 Version int16
71 blocks map[string]map[int32]*offsetCommitRequestBlock
72}
73
74func (r *OffsetCommitRequest) setVersion(v int16) {
75 r.Version = v
76}
77
78func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
79 if r.Version < 0 || r.Version > 7 {
80 return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
81 }
82
83 if err := pe.putString(r.ConsumerGroup); err != nil {
84 return err
85 }
86
87 if r.Version >= 1 {
88 pe.putInt32(r.ConsumerGroupGeneration)
89 if err := pe.putString(r.ConsumerID); err != nil {
90 return err
91 }
92 } else {
93 if r.ConsumerGroupGeneration != 0 {
94 Logger.Println("Non-zero ConsumerGroupGeneration specified for OffsetCommitRequest v0, it will be ignored")
95 }
96 if r.ConsumerID != "" {
97 Logger.Println("Non-empty ConsumerID specified for OffsetCommitRequest v0, it will be ignored")
98 }
99 }
100
101 // Version 5 removes RetentionTime, which is now controlled only by a broker configuration.
102 if r.Version >= 2 && r.Version <= 4 {
103 pe.putInt64(r.RetentionTime)
104 } else if r.RetentionTime != 0 {
105 Logger.Println("Non-zero RetentionTime specified for OffsetCommitRequest version <2, it will be ignored")
106 }
107
108 if r.Version >= 7 {
109 if err := pe.putNullableString(r.GroupInstanceId); err != nil {
110 return err
111 }
112 }
113
114 if err := pe.putArrayLength(len(r.blocks)); err != nil {
115 return err
116 }
117 for topic, partitions := range r.blocks {
118 if err := pe.putString(topic); err != nil {
119 return err
120 }
121 if err := pe.putArrayLength(len(partitions)); err != nil {
122 return err
123 }
124 for partition, block := range partitions {
125 pe.putInt32(partition)
126 if err := block.encode(pe, r.Version); err != nil {
127 return err
128 }
129 }
130 }
131 return nil
132}
133
134func (r *OffsetCommitRequest) decode(pd packetDecoder, version int16) (err error) {
135 r.Version = version
136
137 if r.ConsumerGroup, err = pd.getString(); err != nil {
138 return err
139 }
140
141 if r.Version >= 1 {
142 if r.ConsumerGroupGeneration, err = pd.getInt32(); err != nil {
143 return err
144 }
145 if r.ConsumerID, err = pd.getString(); err != nil {
146 return err
147 }
148 }
149
150 // Version 5 removes RetentionTime, which is now controlled only by a broker configuration.
151 if r.Version >= 2 && r.Version <= 4 {
152 if r.RetentionTime, err = pd.getInt64(); err != nil {
153 return err
154 }
155 }
156
157 if r.Version >= 7 {
158 if r.GroupInstanceId, err = pd.getNullableString(); err != nil {
159 return err
160 }
161 }
162
163 topicCount, err := pd.getArrayLength()
164 if err != nil {
165 return err
166 }
167 if topicCount == 0 {
168 return nil
169 }
170 r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
171 for i := 0; i < topicCount; i++ {
172 topic, err := pd.getString()
173 if err != nil {
174 return err
175 }
176 partitionCount, err := pd.getArrayLength()
177 if err != nil {
178 return err
179 }
180 r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
181 for j := 0; j < partitionCount; j++ {
182 partition, err := pd.getInt32()
183 if err != nil {
184 return err
185 }
186 block := &offsetCommitRequestBlock{}
187 if err := block.decode(pd, r.Version); err != nil {
188 return err
189 }
190 r.blocks[topic][partition] = block
191 }
192 }
193 return nil
194}
195
196func (r *OffsetCommitRequest) key() int16 {
197 return apiKeyOffsetCommit
198}
199
200func (r *OffsetCommitRequest) version() int16 {
201 return r.Version
202}
203
204func (r *OffsetCommitRequest) headerVersion() int16 {
205 return 1
206}
207
208func (r *OffsetCommitRequest) isValidVersion() bool {
209 return r.Version >= 0 && r.Version <= 7
210}
211
212func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
213 switch r.Version {
214 case 7:
215 return V2_3_0_0
216 case 5, 6:
217 return V2_1_0_0
218 case 4:
219 return V2_0_0_0
220 case 3:
221 return V0_11_0_0
222 case 2:
223 return V0_9_0_0
224 case 0, 1:
225 return V0_8_2_0
226 default:
227 return V2_4_0_0
228 }
229}
230
231func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
232 r.AddBlockWithLeaderEpoch(topic, partitionID, offset, 0, timestamp, metadata)
233}
234
235func (r *OffsetCommitRequest) AddBlockWithLeaderEpoch(topic string, partitionID int32, offset int64, leaderEpoch int32, timestamp int64, metadata string) {
236 if r.blocks == nil {
237 r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
238 }
239
240 if r.blocks[topic] == nil {
241 r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
242 }
243
244 r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, leaderEpoch, metadata}
245}
246
247func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error) {
248 partitions := r.blocks[topic]
249 if partitions == nil {
250 return 0, "", errors.New("no such offset")
251 }
252 block := partitions[partitionID]
253 if block == nil {
254 return 0, "", errors.New("no such offset")
255 }
256 return block.offset, block.metadata, nil
257}