blob: 83a1c466f964be74c6fbbe1c6ba8bce1796ac25e [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "io"
7)
8
9type protocolBody interface {
10 encoder
11 versionedDecoder
12 key() int16
13 version() int16
14 setVersion(int16)
15 headerVersion() int16
16 isValidVersion() bool
17 requiredVersion() KafkaVersion
18}
19
20type request struct {
21 correlationID int32
22 clientID string
23 body protocolBody
24}
25
26func (r *request) encode(pe packetEncoder) error {
27 pe.push(&lengthField{})
28 pe.putInt16(r.body.key())
29 pe.putInt16(r.body.version())
30 pe.putInt32(r.correlationID)
31
32 if r.body.headerVersion() >= 1 {
33 err := pe.putString(r.clientID)
34 if err != nil {
35 return err
36 }
37 }
38
39 if r.body.headerVersion() >= 2 {
40 // we don't use tag headers at the moment so we just put an array length of 0
41 pe.putUVarint(0)
42 }
43 pe = prepareFlexibleEncoder(pe, r.body)
44
45 err := r.body.encode(pe)
46 if err != nil {
47 return err
48 }
49
50 return pe.pop()
51}
52
53func (r *request) decode(pd packetDecoder) (err error) {
54 key, err := pd.getInt16()
55 if err != nil {
56 return err
57 }
58
59 version, err := pd.getInt16()
60 if err != nil {
61 return err
62 }
63
64 r.correlationID, err = pd.getInt32()
65 if err != nil {
66 return err
67 }
68
69 r.clientID, err = pd.getString()
70 if err != nil {
71 return err
72 }
73
74 r.body = allocateBody(key, version)
75 if r.body == nil {
76 return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
77 }
78
79 if r.body.headerVersion() >= 2 {
80 // tagged field
81 _, err = pd.getUVarint()
82 if err != nil {
83 return err
84 }
85 }
86
87 if decoder, ok := pd.(*realDecoder); ok {
88 pd = prepareFlexibleDecoder(decoder, r.body, version)
89 }
90 return r.body.decode(pd, version)
91}
92
93func decodeRequest(r io.Reader) (*request, int, error) {
94 var (
95 bytesRead int
96 lengthBytes = make([]byte, 4)
97 )
98
99 if n, err := io.ReadFull(r, lengthBytes); err != nil {
100 return nil, n, err
101 }
102
103 bytesRead += len(lengthBytes)
104 length := int32(binary.BigEndian.Uint32(lengthBytes))
105
106 if length <= 4 || length > MaxRequestSize {
107 return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
108 }
109
110 encodedReq := make([]byte, length)
111 if n, err := io.ReadFull(r, encodedReq); err != nil {
112 return nil, bytesRead + n, err
113 }
114
115 bytesRead += len(encodedReq)
116
117 req := &request{}
118 if err := decode(encodedReq, req, nil); err != nil {
119 return nil, bytesRead, err
120 }
121
122 return req, bytesRead, nil
123}
124
125func allocateBody(key, version int16) protocolBody {
126 switch key {
127 case apiKeyProduce:
128 return &ProduceRequest{Version: version}
129 case apiKeyFetch:
130 return &FetchRequest{Version: version}
131 case apiKeyListOffsets:
132 return &OffsetRequest{Version: version}
133 case apiKeyMetadata:
134 return &MetadataRequest{Version: version}
135 // 4: LeaderAndIsrRequest
136 // 5: StopReplicaRequest
137 // 6: UpdateMetadataRequest
138 // 7: ControlledShutdownRequest
139 case apiKeyOffsetCommit:
140 return &OffsetCommitRequest{Version: version}
141 case apiKeyOffsetFetch:
142 return &OffsetFetchRequest{Version: version}
143 case apiKeyFindCoordinator:
144 return &FindCoordinatorRequest{Version: version}
145 case apiKeyJoinGroup:
146 return &JoinGroupRequest{Version: version}
147 case apiKeyHeartbeat:
148 return &HeartbeatRequest{Version: version}
149 case apiKeyLeaveGroup:
150 return &LeaveGroupRequest{Version: version}
151 case apiKeySyncGroup:
152 return &SyncGroupRequest{Version: version}
153 case apiKeyDescribeGroups:
154 return &DescribeGroupsRequest{Version: version}
155 case apiKeyListGroups:
156 return &ListGroupsRequest{Version: version}
157 case apiKeySaslHandshake:
158 return &SaslHandshakeRequest{Version: version}
159 case apiKeyApiVersions:
160 return &ApiVersionsRequest{Version: version}
161 case apiKeyCreateTopics:
162 return &CreateTopicsRequest{Version: version}
163 case apiKeyDeleteTopics:
164 return &DeleteTopicsRequest{Version: version}
165 case apiKeyDeleteRecords:
166 return &DeleteRecordsRequest{Version: version}
167 case apiKeyInitProducerId:
168 return &InitProducerIDRequest{Version: version}
169 // 23: OffsetForLeaderEpochRequest
170 case apiKeyAddPartitionsToTxn:
171 return &AddPartitionsToTxnRequest{Version: version}
172 case apiKeyAddOffsetsToTxn:
173 return &AddOffsetsToTxnRequest{Version: version}
174 case apiKeyEndTxn:
175 return &EndTxnRequest{Version: version}
176 // 27: WriteTxnMarkersRequest
177 case apiKeyTxnOffsetCommit:
178 return &TxnOffsetCommitRequest{Version: version}
179 case apiKeyDescribeAcls:
180 return &DescribeAclsRequest{Version: int(version)}
181 case apiKeyCreateAcls:
182 return &CreateAclsRequest{Version: version}
183 case apiKeyDeleteAcls:
184 return &DeleteAclsRequest{Version: int(version)}
185 case apiKeyDescribeConfigs:
186 return &DescribeConfigsRequest{Version: version}
187 case apiKeyAlterConfigs:
188 return &AlterConfigsRequest{Version: version}
189 // 34: AlterReplicaLogDirsRequest
190 case apiKeyDescribeLogDirs:
191 return &DescribeLogDirsRequest{Version: version}
192 case apiKeySASLAuth:
193 return &SaslAuthenticateRequest{Version: version}
194 case apiKeyCreatePartitions:
195 return &CreatePartitionsRequest{Version: version}
196 // 38: CreateDelegationTokenRequest
197 // 39: RenewDelegationTokenRequest
198 // 40: ExpireDelegationTokenRequest
199 // 41: DescribeDelegationTokenRequest
200 case apiKeyDeleteGroups:
201 return &DeleteGroupsRequest{Version: version}
202 case apiKeyElectLeaders:
203 return &ElectLeadersRequest{Version: version}
204 case apiKeyIncrementalAlterConfigs:
205 return &IncrementalAlterConfigsRequest{Version: version}
206 case apiKeyAlterPartitionReassignments:
207 return &AlterPartitionReassignmentsRequest{Version: version}
208 case apiKeyListPartitionReassignments:
209 return &ListPartitionReassignmentsRequest{Version: version}
210 case apiKeyOffsetDelete:
211 return &DeleteOffsetsRequest{Version: version}
212 case apiKeyDescribeClientQuotas:
213 return &DescribeClientQuotasRequest{Version: version}
214 case apiKeyAlterClientQuotas:
215 return &AlterClientQuotasRequest{Version: version}
216 case apiKeyDescribeUserScramCredentials:
217 return &DescribeUserScramCredentialsRequest{Version: version}
218 case apiKeyAlterUserScramCredentials:
219 return &AlterUserScramCredentialsRequest{Version: version}
220 // 52: VoteRequest
221 // 53: BeginQuorumEpochRequest
222 // 54: EndQuorumEpochRequest
223 // 55: DescribeQuorumRequest
224 // 56: AlterPartitionRequest
225 // 57: UpdateFeaturesRequest
226 // 58: EnvelopeRequest
227 // 59: FetchSnapshotRequest
228 // 60: DescribeClusterRequest
229 // 61: DescribeProducersRequest
230 // 62: BrokerRegistrationRequest
231 // 63: BrokerHeartbeatRequest
232 // 64: UnregisterBrokerRequest
233 // 65: DescribeTransactionsRequest
234 // 66: ListTransactionsRequest
235 // 67: AllocateProducerIdsRequest
236 // 68: ConsumerGroupHeartbeatRequest
237 }
238 return nil
239}