blob: d57c24f27dbb4273312153d8cadbc563294af891 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import "time"
4
5type OffsetResponseBlock struct {
6 Err KError
7 // Offsets contains the result offsets (for V0/V1 compatibility)
8 Offsets []int64 // Version 0
9 // Timestamp contains the timestamp associated with the returned offset.
10 Timestamp int64 // Version 1
11 // Offset contains the returned offset.
12 Offset int64 // Version 1
13 // LeaderEpoch contains the current leader epoch of the partition.
14 LeaderEpoch int32
15}
16
17func (b *OffsetResponseBlock) decode(pd packetDecoder, version int16) (err error) {
18 b.Err, err = pd.getKError()
19 if err != nil {
20 return err
21 }
22
23 if version == 0 {
24 b.Offsets, err = pd.getInt64Array()
25 return err
26 }
27
28 if version >= 1 {
29 b.Timestamp, err = pd.getInt64()
30 if err != nil {
31 return err
32 }
33
34 b.Offset, err = pd.getInt64()
35 if err != nil {
36 return err
37 }
38
39 // For backwards compatibility put the offset in the offsets array too
40 b.Offsets = []int64{b.Offset}
41 }
42
43 if version >= 4 {
44 if b.LeaderEpoch, err = pd.getInt32(); err != nil {
45 return err
46 }
47 }
48
49 return nil
50}
51
52func (b *OffsetResponseBlock) encode(pe packetEncoder, version int16) (err error) {
53 pe.putKError(b.Err)
54
55 if version == 0 {
56 return pe.putInt64Array(b.Offsets)
57 }
58
59 if version >= 1 {
60 pe.putInt64(b.Timestamp)
61 pe.putInt64(b.Offset)
62 }
63
64 if version >= 4 {
65 pe.putInt32(b.LeaderEpoch)
66 }
67
68 return nil
69}
70
71type OffsetResponse struct {
72 Version int16
73 ThrottleTimeMs int32
74 Blocks map[string]map[int32]*OffsetResponseBlock
75}
76
77func (r *OffsetResponse) setVersion(v int16) {
78 r.Version = v
79}
80
81func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
82 if version >= 2 {
83 r.ThrottleTimeMs, err = pd.getInt32()
84 if err != nil {
85 return err
86 }
87 }
88
89 numTopics, err := pd.getArrayLength()
90 if err != nil {
91 return err
92 }
93
94 r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
95 for i := 0; i < numTopics; i++ {
96 name, err := pd.getString()
97 if err != nil {
98 return err
99 }
100
101 numBlocks, err := pd.getArrayLength()
102 if err != nil {
103 return err
104 }
105
106 r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
107
108 for j := 0; j < numBlocks; j++ {
109 id, err := pd.getInt32()
110 if err != nil {
111 return err
112 }
113
114 block := new(OffsetResponseBlock)
115 err = block.decode(pd, version)
116 if err != nil {
117 return err
118 }
119 r.Blocks[name][id] = block
120 }
121 }
122
123 return nil
124}
125
126func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock {
127 if r.Blocks == nil {
128 return nil
129 }
130
131 if r.Blocks[topic] == nil {
132 return nil
133 }
134
135 return r.Blocks[topic][partition]
136}
137
138/*
139// [0 0 0 1 ntopics
1400 8 109 121 95 116 111 112 105 99 topic
1410 0 0 1 npartitions
1420 0 0 0 id
1430 0
144
1450 0 0 1 0 0 0 0
1460 1 1 1 0 0 0 1
1470 8 109 121 95 116 111 112
148105 99 0 0 0 1 0 0
1490 0 0 0 0 0 0 1
1500 0 0 0 0 1 1 1] <nil>
151*/
152func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
153 if r.Version >= 2 {
154 pe.putInt32(r.ThrottleTimeMs)
155 }
156
157 if err = pe.putArrayLength(len(r.Blocks)); err != nil {
158 return err
159 }
160
161 for topic, partitions := range r.Blocks {
162 if err = pe.putString(topic); err != nil {
163 return err
164 }
165 if err = pe.putArrayLength(len(partitions)); err != nil {
166 return err
167 }
168 for partition, block := range partitions {
169 pe.putInt32(partition)
170 if err = block.encode(pe, r.version()); err != nil {
171 return err
172 }
173 }
174 }
175
176 return nil
177}
178
179func (r *OffsetResponse) key() int16 {
180 return apiKeyListOffsets
181}
182
183func (r *OffsetResponse) version() int16 {
184 return r.Version
185}
186
187func (r *OffsetResponse) headerVersion() int16 {
188 return 0
189}
190
191func (r *OffsetResponse) isValidVersion() bool {
192 return r.Version >= 0 && r.Version <= 5
193}
194
195func (r *OffsetResponse) requiredVersion() KafkaVersion {
196 switch r.Version {
197 case 5:
198 return V2_2_0_0
199 case 4:
200 return V2_1_0_0
201 case 3:
202 return V2_0_0_0
203 case 2:
204 return V0_11_0_0
205 case 1:
206 return V0_10_1_0
207 case 0:
208 return V0_8_2_0
209 default:
210 return V2_0_0_0
211 }
212}
213
214func (r *OffsetResponse) throttleTime() time.Duration {
215 return time.Duration(r.ThrottleTimeMs) * time.Millisecond
216}
217
218// testing API
219
220func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) {
221 if r.Blocks == nil {
222 r.Blocks = make(map[string]map[int32]*OffsetResponseBlock)
223 }
224 byTopic, ok := r.Blocks[topic]
225 if !ok {
226 byTopic = make(map[int32]*OffsetResponseBlock)
227 r.Blocks[topic] = byTopic
228 }
229 byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}, Offset: offset}
230}