blob: f3c5d5dbeb4ec1317370730e4920f41242b34054 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8// Protocol, http://kafka.apache.org/protocol.html
9// v1
10// v2 = v3 = v4
11// v5 = v6 = v7
12// Produce Response (Version: 7) => [responses] throttle_time_ms
13// responses => topic [partition_responses]
14// topic => STRING
15// partition_responses => partition error_code base_offset log_append_time log_start_offset
16// partition => INT32
17// error_code => INT16
18// base_offset => INT64
19// log_append_time => INT64
20// log_start_offset => INT64
21// throttle_time_ms => INT32
22
23// partition_responses in protocol
24type ProduceResponseBlock struct {
25 Err KError // v0, error_code
26 Offset int64 // v0, base_offset
27 Timestamp time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
28 StartOffset int64 // v5, log_start_offset
29}
30
31func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
32 b.Err, err = pd.getKError()
33 if err != nil {
34 return err
35 }
36
37 b.Offset, err = pd.getInt64()
38 if err != nil {
39 return err
40 }
41
42 if version >= 2 {
43 if millis, err := pd.getInt64(); err != nil {
44 return err
45 } else if millis != -1 {
46 b.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
47 }
48 }
49
50 if version >= 5 {
51 b.StartOffset, err = pd.getInt64()
52 if err != nil {
53 return err
54 }
55 }
56
57 return nil
58}
59
60func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err error) {
61 pe.putKError(b.Err)
62 pe.putInt64(b.Offset)
63
64 if version >= 2 {
65 timestamp := int64(-1)
66 if !b.Timestamp.Before(time.Unix(0, 0)) {
67 timestamp = b.Timestamp.UnixNano() / int64(time.Millisecond)
68 } else if !b.Timestamp.IsZero() {
69 return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", b.Timestamp)}
70 }
71 pe.putInt64(timestamp)
72 }
73
74 if version >= 5 {
75 pe.putInt64(b.StartOffset)
76 }
77
78 return nil
79}
80
81type ProduceResponse struct {
82 Blocks map[string]map[int32]*ProduceResponseBlock // v0, responses
83 Version int16
84 ThrottleTime time.Duration // v1, throttle_time_ms
85}
86
87func (r *ProduceResponse) setVersion(v int16) {
88 r.Version = v
89}
90
91func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
92 r.Version = version
93
94 numTopics, err := pd.getArrayLength()
95 if err != nil {
96 return err
97 }
98
99 r.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics)
100 for i := 0; i < numTopics; i++ {
101 name, err := pd.getString()
102 if err != nil {
103 return err
104 }
105
106 numBlocks, err := pd.getArrayLength()
107 if err != nil {
108 return err
109 }
110
111 r.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks)
112
113 for j := 0; j < numBlocks; j++ {
114 id, err := pd.getInt32()
115 if err != nil {
116 return err
117 }
118
119 block := new(ProduceResponseBlock)
120 err = block.decode(pd, version)
121 if err != nil {
122 return err
123 }
124 r.Blocks[name][id] = block
125 }
126 }
127
128 if r.Version >= 1 {
129 if r.ThrottleTime, err = pd.getDurationMs(); err != nil {
130 return err
131 }
132 }
133
134 return nil
135}
136
137func (r *ProduceResponse) encode(pe packetEncoder) error {
138 err := pe.putArrayLength(len(r.Blocks))
139 if err != nil {
140 return err
141 }
142 for topic, partitions := range r.Blocks {
143 err = pe.putString(topic)
144 if err != nil {
145 return err
146 }
147 err = pe.putArrayLength(len(partitions))
148 if err != nil {
149 return err
150 }
151 for id, prb := range partitions {
152 pe.putInt32(id)
153 err = prb.encode(pe, r.Version)
154 if err != nil {
155 return err
156 }
157 }
158 }
159
160 if r.Version >= 1 {
161 pe.putDurationMs(r.ThrottleTime)
162 }
163 return nil
164}
165
166func (r *ProduceResponse) key() int16 {
167 return apiKeyProduce
168}
169
170func (r *ProduceResponse) version() int16 {
171 return r.Version
172}
173
174func (r *ProduceResponse) headerVersion() int16 {
175 return 0
176}
177
178func (r *ProduceResponse) isValidVersion() bool {
179 return r.Version >= 0 && r.Version <= 7
180}
181
182func (r *ProduceResponse) requiredVersion() KafkaVersion {
183 switch r.Version {
184 case 7:
185 return V2_1_0_0
186 case 6:
187 return V2_0_0_0
188 case 4, 5:
189 return V1_0_0_0
190 case 3:
191 return V0_11_0_0
192 case 2:
193 return V0_10_0_0
194 case 1:
195 return V0_9_0_0
196 case 0:
197 return V0_8_2_0
198 default:
199 return V2_1_0_0
200 }
201}
202
203func (r *ProduceResponse) throttleTime() time.Duration {
204 return r.ThrottleTime
205}
206
207func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
208 if r.Blocks == nil {
209 return nil
210 }
211
212 if r.Blocks[topic] == nil {
213 return nil
214 }
215
216 return r.Blocks[topic][partition]
217}
218
219// Testing API
220
221func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) {
222 if r.Blocks == nil {
223 r.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
224 }
225 byTopic, ok := r.Blocks[topic]
226 if !ok {
227 byTopic = make(map[int32]*ProduceResponseBlock)
228 r.Blocks[topic] = byTopic
229 }
230 block := &ProduceResponseBlock{
231 Err: err,
232 }
233 if r.Version >= 2 {
234 block.Timestamp = time.Now()
235 }
236 byTopic[partition] = block
237}