| package sarama |
| |
| type SyncGroupRequestAssignment struct { |
| // MemberId contains the ID of the member to assign. |
| MemberId string |
| // Assignment contains the member assignment. |
| Assignment []byte |
| } |
| |
| func (a *SyncGroupRequestAssignment) encode(pe packetEncoder, version int16) (err error) { |
| if err := pe.putString(a.MemberId); err != nil { |
| return err |
| } |
| |
| if err := pe.putBytes(a.Assignment); err != nil { |
| return err |
| } |
| |
| pe.putEmptyTaggedFieldArray() |
| return nil |
| } |
| |
| func (a *SyncGroupRequestAssignment) decode(pd packetDecoder, version int16) (err error) { |
| if a.MemberId, err = pd.getString(); err != nil { |
| return err |
| } |
| |
| if a.Assignment, err = pd.getBytes(); err != nil { |
| return err |
| } |
| |
| _, err = pd.getEmptyTaggedFieldArray() |
| return err |
| } |
| |
| type SyncGroupRequest struct { |
| // Version defines the protocol version to use for encode and decode |
| Version int16 |
| // GroupId contains the unique group identifier. |
| GroupId string |
| // GenerationId contains the generation of the group. |
| GenerationId int32 |
| // MemberId contains the member ID assigned by the group. |
| MemberId string |
| // GroupInstanceId contains the unique identifier of the consumer instance provided by end user. |
| GroupInstanceId *string |
| // GroupAssignments contains each assignment. |
| GroupAssignments []SyncGroupRequestAssignment |
| } |
| |
| func (s *SyncGroupRequest) setVersion(v int16) { |
| s.Version = v |
| } |
| |
| func (s *SyncGroupRequest) encode(pe packetEncoder) (err error) { |
| if err := pe.putString(s.GroupId); err != nil { |
| return err |
| } |
| |
| pe.putInt32(s.GenerationId) |
| |
| if err := pe.putString(s.MemberId); err != nil { |
| return err |
| } |
| |
| if s.Version >= 3 { |
| if err := pe.putNullableString(s.GroupInstanceId); err != nil { |
| return err |
| } |
| } |
| |
| if err := pe.putArrayLength(len(s.GroupAssignments)); err != nil { |
| return err |
| } |
| for _, block := range s.GroupAssignments { |
| if err := block.encode(pe, s.Version); err != nil { |
| return err |
| } |
| } |
| |
| pe.putEmptyTaggedFieldArray() |
| return nil |
| } |
| |
| func (s *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) { |
| s.Version = version |
| if s.GroupId, err = pd.getString(); err != nil { |
| return err |
| } |
| |
| if s.GenerationId, err = pd.getInt32(); err != nil { |
| return err |
| } |
| |
| if s.MemberId, err = pd.getString(); err != nil { |
| return err |
| } |
| |
| if s.Version >= 3 { |
| if s.GroupInstanceId, err = pd.getNullableString(); err != nil { |
| return err |
| } |
| } |
| |
| if numAssignments, err := pd.getArrayLength(); err != nil { |
| return err |
| } else if numAssignments > 0 { |
| s.GroupAssignments = make([]SyncGroupRequestAssignment, numAssignments) |
| for i := 0; i < numAssignments; i++ { |
| var block SyncGroupRequestAssignment |
| if err := block.decode(pd, s.Version); err != nil { |
| return err |
| } |
| s.GroupAssignments[i] = block |
| } |
| } |
| |
| _, err = pd.getEmptyTaggedFieldArray() |
| return err |
| } |
| |
| func (r *SyncGroupRequest) key() int16 { |
| return apiKeySyncGroup |
| } |
| |
| func (r *SyncGroupRequest) version() int16 { |
| return r.Version |
| } |
| |
| func (r *SyncGroupRequest) headerVersion() int16 { |
| if r.Version >= 4 { |
| return 2 |
| } |
| return 1 |
| } |
| |
| func (r *SyncGroupRequest) isValidVersion() bool { |
| return r.Version >= 0 && r.Version <= 4 |
| } |
| |
| func (r *SyncGroupRequest) isFlexible() bool { |
| return r.isFlexibleVersion(r.Version) |
| } |
| |
| func (r *SyncGroupRequest) isFlexibleVersion(version int16) bool { |
| return version >= 4 |
| } |
| |
| func (r *SyncGroupRequest) requiredVersion() KafkaVersion { |
| switch r.Version { |
| case 4: |
| return V2_4_0_0 |
| case 3: |
| return V2_3_0_0 |
| case 2: |
| return V2_0_0_0 |
| case 1: |
| return V0_11_0_0 |
| case 0: |
| return V0_9_0_0 |
| default: |
| return V2_3_0_0 |
| } |
| } |
| |
| func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) { |
| r.GroupAssignments = append(r.GroupAssignments, SyncGroupRequestAssignment{ |
| MemberId: memberId, |
| Assignment: memberAssignment, |
| }) |
| } |
| |
| func (r *SyncGroupRequest) AddGroupAssignmentMember( |
| memberId string, |
| memberAssignment *ConsumerGroupMemberAssignment, |
| ) error { |
| bin, err := encode(memberAssignment, nil) |
| if err != nil { |
| return err |
| } |
| |
| r.AddGroupAssignment(memberId, bin) |
| return nil |
| } |