blob: 2af478c5795df24af60fff4d6ae85a76449c48b4 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import "time"
4
5type OffsetCommitResponse struct {
6 Version int16
7 ThrottleTimeMs int32
8 Errors map[string]map[int32]KError
9}
10
11func (r *OffsetCommitResponse) setVersion(v int16) {
12 r.Version = v
13}
14
15func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) {
16 if r.Errors == nil {
17 r.Errors = make(map[string]map[int32]KError)
18 }
19 partitions := r.Errors[topic]
20 if partitions == nil {
21 partitions = make(map[int32]KError)
22 r.Errors[topic] = partitions
23 }
24 partitions[partition] = kerror
25}
26
27func (r *OffsetCommitResponse) encode(pe packetEncoder) error {
28 if r.Version >= 3 {
29 pe.putInt32(r.ThrottleTimeMs)
30 }
31 if err := pe.putArrayLength(len(r.Errors)); err != nil {
32 return err
33 }
34 for topic, partitions := range r.Errors {
35 if err := pe.putString(topic); err != nil {
36 return err
37 }
38 if err := pe.putArrayLength(len(partitions)); err != nil {
39 return err
40 }
41 for partition, kerror := range partitions {
42 pe.putInt32(partition)
43 pe.putKError(kerror)
44 }
45 }
46 return nil
47}
48
49func (r *OffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) {
50 r.Version = version
51
52 if version >= 3 {
53 r.ThrottleTimeMs, err = pd.getInt32()
54 if err != nil {
55 return err
56 }
57 }
58
59 numTopics, err := pd.getArrayLength()
60 if err != nil || numTopics == 0 {
61 return err
62 }
63
64 r.Errors = make(map[string]map[int32]KError, numTopics)
65 for i := 0; i < numTopics; i++ {
66 name, err := pd.getString()
67 if err != nil {
68 return err
69 }
70
71 numErrors, err := pd.getArrayLength()
72 if err != nil {
73 return err
74 }
75
76 r.Errors[name] = make(map[int32]KError, numErrors)
77
78 for j := 0; j < numErrors; j++ {
79 id, err := pd.getInt32()
80 if err != nil {
81 return err
82 }
83
84 r.Errors[name][id], err = pd.getKError()
85 if err != nil {
86 return err
87 }
88 }
89 }
90
91 return nil
92}
93
94func (r *OffsetCommitResponse) key() int16 {
95 return apiKeyOffsetCommit
96}
97
98func (r *OffsetCommitResponse) version() int16 {
99 return r.Version
100}
101
102func (r *OffsetCommitResponse) headerVersion() int16 {
103 return 0
104}
105
106func (r *OffsetCommitResponse) isValidVersion() bool {
107 return r.Version >= 0 && r.Version <= 7
108}
109
110func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {
111 switch r.Version {
112 case 7:
113 return V2_3_0_0
114 case 5, 6:
115 return V2_1_0_0
116 case 4:
117 return V2_0_0_0
118 case 3:
119 return V0_11_0_0
120 case 2:
121 return V0_9_0_0
122 case 0, 1:
123 return V0_8_2_0
124 default:
125 return V2_4_0_0
126 }
127}
128
129func (r *OffsetCommitResponse) throttleTime() time.Duration {
130 return time.Duration(r.ThrottleTimeMs) * time.Millisecond
131}