| package sarama |
| |
| import "time" |
| |
| type DescribeGroupsResponse struct { |
| // Version defines the protocol version to use for encode and decode |
| Version int16 |
| // ThrottleTimeMs contains the duration in milliseconds for which the |
| // request was throttled due to a quota violation, or zero if the request |
| // did not violate any quota. |
| ThrottleTimeMs int32 |
| // Groups contains each described group. |
| Groups []*GroupDescription |
| } |
| |
| func (r *DescribeGroupsResponse) setVersion(v int16) { |
| r.Version = v |
| } |
| |
| func (r *DescribeGroupsResponse) encode(pe packetEncoder) (err error) { |
| if r.Version >= 1 { |
| pe.putInt32(r.ThrottleTimeMs) |
| } |
| if err := pe.putArrayLength(len(r.Groups)); err != nil { |
| return err |
| } |
| |
| for _, block := range r.Groups { |
| if err := block.encode(pe, r.Version); err != nil { |
| return err |
| } |
| } |
| |
| pe.putEmptyTaggedFieldArray() |
| return nil |
| } |
| |
| func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err error) { |
| r.Version = version |
| if r.Version >= 1 { |
| if r.ThrottleTimeMs, err = pd.getInt32(); err != nil { |
| return err |
| } |
| } |
| if numGroups, err := pd.getArrayLength(); err != nil { |
| return err |
| } else if numGroups > 0 { |
| r.Groups = make([]*GroupDescription, numGroups) |
| for i := 0; i < numGroups; i++ { |
| block := &GroupDescription{} |
| if err := block.decode(pd, r.Version); err != nil { |
| return err |
| } |
| r.Groups[i] = block |
| } |
| } |
| |
| _, err = pd.getEmptyTaggedFieldArray() |
| return err |
| } |
| |
| func (r *DescribeGroupsResponse) key() int16 { |
| return apiKeyDescribeGroups |
| } |
| |
| func (r *DescribeGroupsResponse) version() int16 { |
| return r.Version |
| } |
| |
| func (r *DescribeGroupsResponse) headerVersion() int16 { |
| if r.Version >= 5 { |
| return 1 |
| } |
| return 0 |
| } |
| |
| func (r *DescribeGroupsResponse) isValidVersion() bool { |
| return r.Version >= 0 && r.Version <= 5 |
| } |
| |
| func (r *DescribeGroupsResponse) isFlexible() bool { |
| return r.isFlexibleVersion(r.Version) |
| } |
| |
| func (r *DescribeGroupsResponse) isFlexibleVersion(version int16) bool { |
| return version >= 5 |
| } |
| |
| func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion { |
| switch r.Version { |
| case 5: |
| return V2_4_0_0 |
| case 4: |
| return V2_4_0_0 |
| case 3: |
| return V2_3_0_0 |
| case 2: |
| return V2_0_0_0 |
| case 1: |
| return V0_11_0_0 |
| case 0: |
| return V0_9_0_0 |
| default: |
| return V2_4_0_0 |
| } |
| } |
| |
| func (r *DescribeGroupsResponse) throttleTime() time.Duration { |
| return time.Duration(r.ThrottleTimeMs) * time.Millisecond |
| } |
| |
| // GroupDescription contains each described group. |
| type GroupDescription struct { |
| // Version defines the protocol version to use for encode and decode |
| Version int16 |
| // Err contains the describe error as the KError type. |
| Err KError |
| // ErrorCode contains the describe error, or 0 if there was no error. |
| ErrorCode int16 |
| // GroupId contains the group ID string. |
| GroupId string |
| // State contains the group state string, or the empty string. |
| State string |
| // ProtocolType contains the group protocol type, or the empty string. |
| ProtocolType string |
| // Protocol contains the group protocol data, or the empty string. |
| Protocol string |
| // Members contains the group members. |
| Members map[string]*GroupMemberDescription |
| // AuthorizedOperations contains a 32-bit bitfield to represent authorized |
| // operations for this group. |
| AuthorizedOperations int32 |
| } |
| |
| func (gd *GroupDescription) encode(pe packetEncoder, version int16) (err error) { |
| gd.Version = version |
| pe.putInt16(gd.ErrorCode) |
| |
| if err := pe.putString(gd.GroupId); err != nil { |
| return err |
| } |
| if err := pe.putString(gd.State); err != nil { |
| return err |
| } |
| if err := pe.putString(gd.ProtocolType); err != nil { |
| return err |
| } |
| if err := pe.putString(gd.Protocol); err != nil { |
| return err |
| } |
| |
| if err := pe.putArrayLength(len(gd.Members)); err != nil { |
| return err |
| } |
| |
| for _, block := range gd.Members { |
| if err := block.encode(pe, gd.Version); err != nil { |
| return err |
| } |
| } |
| |
| if gd.Version >= 3 { |
| pe.putInt32(gd.AuthorizedOperations) |
| } |
| |
| pe.putEmptyTaggedFieldArray() |
| return nil |
| } |
| |
| func (gd *GroupDescription) decode(pd packetDecoder, version int16) (err error) { |
| gd.Version = version |
| if gd.ErrorCode, err = pd.getInt16(); err != nil { |
| return err |
| } |
| |
| gd.Err = KError(gd.ErrorCode) |
| |
| if gd.GroupId, err = pd.getString(); err != nil { |
| return err |
| } |
| if gd.State, err = pd.getString(); err != nil { |
| return err |
| } |
| if gd.ProtocolType, err = pd.getString(); err != nil { |
| return err |
| } |
| if gd.Protocol, err = pd.getString(); err != nil { |
| return err |
| } |
| |
| if numMembers, err := pd.getArrayLength(); err != nil { |
| return err |
| } else if numMembers > 0 { |
| gd.Members = make(map[string]*GroupMemberDescription, numMembers) |
| for i := 0; i < numMembers; i++ { |
| block := &GroupMemberDescription{} |
| if err := block.decode(pd, gd.Version); err != nil { |
| return err |
| } |
| gd.Members[block.MemberId] = block |
| } |
| } |
| |
| if gd.Version >= 3 { |
| if gd.AuthorizedOperations, err = pd.getInt32(); err != nil { |
| return err |
| } |
| } |
| |
| _, err = pd.getEmptyTaggedFieldArray() |
| return err |
| } |
| |
| // GroupMemberDescription contains the group members. |
| type GroupMemberDescription struct { |
| // Version defines the protocol version to use for encode and decode |
| Version int16 |
| // MemberId contains the member ID assigned by the group coordinator. |
| MemberId string |
| // GroupInstanceId contains the unique identifier of the consumer instance |
| // provided by end user. |
| GroupInstanceId *string |
| // ClientId contains the client ID used in the member's latest join group |
| // request. |
| ClientId string |
| // ClientHost contains the client host. |
| ClientHost string |
| // MemberMetadata contains the metadata corresponding to the current group |
| // protocol in use. |
| MemberMetadata []byte |
| // MemberAssignment contains the current assignment provided by the group |
| // leader. |
| MemberAssignment []byte |
| } |
| |
| func (gmd *GroupMemberDescription) encode(pe packetEncoder, version int16) (err error) { |
| gmd.Version = version |
| if err := pe.putString(gmd.MemberId); err != nil { |
| return err |
| } |
| if gmd.Version >= 4 { |
| if err := pe.putNullableString(gmd.GroupInstanceId); err != nil { |
| return err |
| } |
| } |
| if err := pe.putString(gmd.ClientId); err != nil { |
| return err |
| } |
| if err := pe.putString(gmd.ClientHost); err != nil { |
| return err |
| } |
| if err := pe.putBytes(gmd.MemberMetadata); err != nil { |
| return err |
| } |
| if err := pe.putBytes(gmd.MemberAssignment); err != nil { |
| return err |
| } |
| |
| pe.putEmptyTaggedFieldArray() |
| return nil |
| } |
| |
| func (gmd *GroupMemberDescription) decode(pd packetDecoder, version int16) (err error) { |
| gmd.Version = version |
| if gmd.MemberId, err = pd.getString(); err != nil { |
| return err |
| } |
| if gmd.Version >= 4 { |
| if gmd.GroupInstanceId, err = pd.getNullableString(); err != nil { |
| return err |
| } |
| } |
| if gmd.ClientId, err = pd.getString(); err != nil { |
| return err |
| } |
| if gmd.ClientHost, err = pd.getString(); err != nil { |
| return err |
| } |
| if gmd.MemberMetadata, err = pd.getBytes(); err != nil { |
| return err |
| } |
| if gmd.MemberAssignment, err = pd.getBytes(); err != nil { |
| return err |
| } |
| |
| _, err = pd.getEmptyTaggedFieldArray() |
| return err |
| } |
| |
| func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) { |
| if len(gmd.MemberAssignment) == 0 { |
| return nil, nil |
| } |
| assignment := new(ConsumerGroupMemberAssignment) |
| err := decode(gmd.MemberAssignment, assignment, nil) |
| return assignment, err |
| } |
| |
| func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) { |
| if len(gmd.MemberMetadata) == 0 { |
| return nil, nil |
| } |
| metadata := new(ConsumerGroupMemberMetadata) |
| err := decode(gmd.MemberMetadata, metadata, nil) |
| return metadata, err |
| } |