blob: 7fb080b5f9d5e4dbec7862450c9060baaf436f2f [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import (
4 "net"
5 "strconv"
6)
7
8// ConsumerMetadataResponse holds the response for a consumer group meta data requests
9type ConsumerMetadataResponse struct {
10 Version int16
11 Err KError
12 Coordinator *Broker
13 CoordinatorID int32 // deprecated: use Coordinator.ID()
14 CoordinatorHost string // deprecated: use Coordinator.Addr()
15 CoordinatorPort int32 // deprecated: use Coordinator.Addr()
16}
17
18func (r *ConsumerMetadataResponse) setVersion(v int16) {
19 r.Version = v
20}
21
22func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
23 tmp := new(FindCoordinatorResponse)
24
25 if err := tmp.decode(pd, version); err != nil {
26 return err
27 }
28
29 r.Err = tmp.Err
30
31 r.Coordinator = tmp.Coordinator
32 if tmp.Coordinator == nil {
33 return nil
34 }
35
36 // this can all go away in 2.0, but we have to fill in deprecated fields to maintain
37 // backwards compatibility
38 host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
39 if err != nil {
40 return err
41 }
42 port, err := strconv.ParseInt(portstr, 10, 32)
43 if err != nil {
44 return err
45 }
46 r.CoordinatorID = r.Coordinator.ID()
47 r.CoordinatorHost = host
48 r.CoordinatorPort = int32(port)
49
50 return nil
51}
52
53func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
54 if r.Coordinator == nil {
55 r.Coordinator = new(Broker)
56 r.Coordinator.id = r.CoordinatorID
57 r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort)))
58 }
59
60 tmp := &FindCoordinatorResponse{
61 Version: r.Version,
62 Err: r.Err,
63 Coordinator: r.Coordinator,
64 }
65
66 if err := tmp.encode(pe); err != nil {
67 return err
68 }
69
70 return nil
71}
72
73func (r *ConsumerMetadataResponse) key() int16 {
74 return apiKeyFindCoordinator
75}
76
77func (r *ConsumerMetadataResponse) version() int16 {
78 return r.Version
79}
80
81func (r *ConsumerMetadataResponse) headerVersion() int16 {
82 return 0
83}
84
85func (r *ConsumerMetadataResponse) isValidVersion() bool {
86 return r.Version >= 0 && r.Version <= 2
87}
88
89func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
90 switch r.Version {
91 case 2:
92 return V2_0_0_0
93 case 1:
94 return V0_11_0_0
95 default:
96 return V0_8_2_0
97 }
98}