blob: a605ced67c5609845a1eeaba7cdda59efc0712ba [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import "time"
4
5type SyncGroupResponse struct {
6 // Version defines the protocol version to use for encode and decode
7 Version int16
8 // ThrottleTime contains the duration in milliseconds for which the
9 // request was throttled due to a quota violation, or zero if the request
10 // did not violate any quota.
11 ThrottleTime int32
12 // Err contains the error code, or 0 if there was no error.
13 Err KError
14 // MemberAssignment contains the member assignment.
15 MemberAssignment []byte
16}
17
18func (r *SyncGroupResponse) setVersion(v int16) {
19 r.Version = v
20}
21
22func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) {
23 assignment := new(ConsumerGroupMemberAssignment)
24 err := decode(r.MemberAssignment, assignment, nil)
25 return assignment, err
26}
27
28func (r *SyncGroupResponse) encode(pe packetEncoder) error {
29 if r.Version >= 1 {
30 pe.putInt32(r.ThrottleTime)
31 }
32 pe.putKError(r.Err)
33 if err := pe.putBytes(r.MemberAssignment); err != nil {
34 return err
35 }
36
37 pe.putEmptyTaggedFieldArray()
38 return nil
39}
40
41func (r *SyncGroupResponse) decode(pd packetDecoder, version int16) (err error) {
42 r.Version = version
43 if r.Version >= 1 {
44 if r.ThrottleTime, err = pd.getInt32(); err != nil {
45 return err
46 }
47 }
48 r.Err, err = pd.getKError()
49 if err != nil {
50 return err
51 }
52
53 r.MemberAssignment, err = pd.getBytes()
54 if err != nil {
55 return err
56 }
57
58 _, err = pd.getEmptyTaggedFieldArray()
59 return err
60}
61
62func (r *SyncGroupResponse) key() int16 {
63 return apiKeySyncGroup
64}
65
66func (r *SyncGroupResponse) version() int16 {
67 return r.Version
68}
69
70func (r *SyncGroupResponse) headerVersion() int16 {
71 if r.Version >= 4 {
72 return 1
73 }
74 return 0
75}
76
77func (r *SyncGroupResponse) isValidVersion() bool {
78 return r.Version >= 0 && r.Version <= 4
79}
80
81func (r *SyncGroupResponse) isFlexible() bool {
82 return r.isFlexibleVersion(r.Version)
83}
84
85func (r *SyncGroupResponse) isFlexibleVersion(version int16) bool {
86 return version >= 4
87}
88
89func (r *SyncGroupResponse) requiredVersion() KafkaVersion {
90 switch r.Version {
91 case 4:
92 return V2_4_0_0
93 case 3:
94 return V2_3_0_0
95 case 2:
96 return V2_0_0_0
97 case 1:
98 return V0_11_0_0
99 case 0:
100 return V0_9_0_0
101 default:
102 return V2_3_0_0
103 }
104}
105
106func (r *SyncGroupResponse) throttleTime() time.Duration {
107 return time.Duration(r.ThrottleTime) * time.Millisecond
108}