blob: 01fbb339e93ed8ac7bd448b56cbb3c9118eb383d [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3type offsetRequestBlock struct {
4 // currentLeaderEpoch contains the current leader epoch (used in version 4+).
5 currentLeaderEpoch int32
6 // timestamp contains the current timestamp.
7 timestamp int64
8 // maxNumOffsets contains the maximum number of offsets to report.
9 maxNumOffsets int32 // Only used in version 0
10}
11
12func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
13 if version >= 4 {
14 pe.putInt32(b.currentLeaderEpoch)
15 }
16
17 pe.putInt64(b.timestamp)
18
19 if version == 0 {
20 pe.putInt32(b.maxNumOffsets)
21 }
22
23 return nil
24}
25
26func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) {
27 b.currentLeaderEpoch = -1
28 if version >= 4 {
29 if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
30 return err
31 }
32 }
33
34 if b.timestamp, err = pd.getInt64(); err != nil {
35 return err
36 }
37
38 if version == 0 {
39 if b.maxNumOffsets, err = pd.getInt32(); err != nil {
40 return err
41 }
42 }
43
44 return nil
45}
46
47type OffsetRequest struct {
48 Version int16
49 IsolationLevel IsolationLevel
50 replicaID int32
51 isReplicaIDSet bool
52 blocks map[string]map[int32]*offsetRequestBlock
53}
54
55func NewOffsetRequest(version KafkaVersion) *OffsetRequest {
56 request := &OffsetRequest{}
57 if version.IsAtLeast(V2_2_0_0) {
58 // Version 5 adds a new error code, OFFSET_NOT_AVAILABLE.
59 request.Version = 5
60 } else if version.IsAtLeast(V2_1_0_0) {
61 // Version 4 adds the current leader epoch, which is used for fencing.
62 request.Version = 4
63 } else if version.IsAtLeast(V2_0_0_0) {
64 // Version 3 is the same as version 2.
65 request.Version = 3
66 } else if version.IsAtLeast(V0_11_0_0) {
67 // Version 2 adds the isolation level, which is used for transactional reads.
68 request.Version = 2
69 } else if version.IsAtLeast(V0_10_1_0) {
70 // Version 1 removes MaxNumOffsets. From this version forward, only a single
71 // offset can be returned.
72 request.Version = 1
73 }
74 return request
75}
76
77func (r *OffsetRequest) setVersion(v int16) {
78 r.Version = v
79}
80
81func (r *OffsetRequest) encode(pe packetEncoder) error {
82 if r.isReplicaIDSet {
83 pe.putInt32(r.replicaID)
84 } else {
85 // default replica ID is always -1 for clients
86 pe.putInt32(-1)
87 }
88
89 if r.Version >= 2 {
90 pe.putBool(r.IsolationLevel == ReadCommitted)
91 }
92
93 err := pe.putArrayLength(len(r.blocks))
94 if err != nil {
95 return err
96 }
97 for topic, partitions := range r.blocks {
98 err = pe.putString(topic)
99 if err != nil {
100 return err
101 }
102 err = pe.putArrayLength(len(partitions))
103 if err != nil {
104 return err
105 }
106 for partition, block := range partitions {
107 pe.putInt32(partition)
108 if err = block.encode(pe, r.Version); err != nil {
109 return err
110 }
111 }
112 }
113 return nil
114}
115
116func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
117 r.Version = version
118
119 replicaID, err := pd.getInt32()
120 if err != nil {
121 return err
122 }
123 if replicaID >= 0 {
124 r.SetReplicaID(replicaID)
125 }
126
127 if r.Version >= 2 {
128 tmp, err := pd.getBool()
129 if err != nil {
130 return err
131 }
132
133 r.IsolationLevel = ReadUncommitted
134 if tmp {
135 r.IsolationLevel = ReadCommitted
136 }
137 }
138
139 blockCount, err := pd.getArrayLength()
140 if err != nil {
141 return err
142 }
143 if blockCount == 0 {
144 return nil
145 }
146 r.blocks = make(map[string]map[int32]*offsetRequestBlock)
147 for i := 0; i < blockCount; i++ {
148 topic, err := pd.getString()
149 if err != nil {
150 return err
151 }
152 partitionCount, err := pd.getArrayLength()
153 if err != nil {
154 return err
155 }
156 r.blocks[topic] = make(map[int32]*offsetRequestBlock)
157 for j := 0; j < partitionCount; j++ {
158 partition, err := pd.getInt32()
159 if err != nil {
160 return err
161 }
162 block := &offsetRequestBlock{}
163 if err := block.decode(pd, version); err != nil {
164 return err
165 }
166 r.blocks[topic][partition] = block
167 }
168 }
169 return nil
170}
171
172func (r *OffsetRequest) key() int16 {
173 return apiKeyListOffsets
174}
175
176func (r *OffsetRequest) version() int16 {
177 return r.Version
178}
179
180func (r *OffsetRequest) headerVersion() int16 {
181 return 1
182}
183
184func (r *OffsetRequest) isValidVersion() bool {
185 return r.Version >= 0 && r.Version <= 5
186}
187
188func (r *OffsetRequest) requiredVersion() KafkaVersion {
189 switch r.Version {
190 case 5:
191 return V2_2_0_0
192 case 4:
193 return V2_1_0_0
194 case 3:
195 return V2_0_0_0
196 case 2:
197 return V0_11_0_0
198 case 1:
199 return V0_10_1_0
200 case 0:
201 return V0_8_2_0
202 default:
203 return V2_0_0_0
204 }
205}
206
207func (r *OffsetRequest) SetReplicaID(id int32) {
208 r.replicaID = id
209 r.isReplicaIDSet = true
210}
211
212func (r *OffsetRequest) ReplicaID() int32 {
213 if r.isReplicaIDSet {
214 return r.replicaID
215 }
216 return -1
217}
218
219func (r *OffsetRequest) AddBlock(topic string, partitionID int32, timestamp int64, maxOffsets int32) {
220 if r.blocks == nil {
221 r.blocks = make(map[string]map[int32]*offsetRequestBlock)
222 }
223
224 if r.blocks[topic] == nil {
225 r.blocks[topic] = make(map[int32]*offsetRequestBlock)
226 }
227
228 tmp := new(offsetRequestBlock)
229 tmp.currentLeaderEpoch = -1
230 tmp.timestamp = timestamp
231 if r.Version == 0 {
232 tmp.maxNumOffsets = maxOffsets
233 }
234
235 r.blocks[topic][partitionID] = tmp
236}