| Abhay Kumar | 40252eb | 2025-10-13 13:25:53 +0000 | [diff] [blame^] | 1 | package sarama |
| 2 | |
| 3 | import ( |
| 4 | "encoding/binary" |
| 5 | "fmt" |
| 6 | "io" |
| 7 | ) |
| 8 | |
| 9 | type protocolBody interface { |
| 10 | encoder |
| 11 | versionedDecoder |
| 12 | key() int16 |
| 13 | version() int16 |
| 14 | setVersion(int16) |
| 15 | headerVersion() int16 |
| 16 | isValidVersion() bool |
| 17 | requiredVersion() KafkaVersion |
| 18 | } |
| 19 | |
| 20 | type request struct { |
| 21 | correlationID int32 |
| 22 | clientID string |
| 23 | body protocolBody |
| 24 | } |
| 25 | |
| 26 | func (r *request) encode(pe packetEncoder) error { |
| 27 | pe.push(&lengthField{}) |
| 28 | pe.putInt16(r.body.key()) |
| 29 | pe.putInt16(r.body.version()) |
| 30 | pe.putInt32(r.correlationID) |
| 31 | |
| 32 | if r.body.headerVersion() >= 1 { |
| 33 | err := pe.putString(r.clientID) |
| 34 | if err != nil { |
| 35 | return err |
| 36 | } |
| 37 | } |
| 38 | |
| 39 | if r.body.headerVersion() >= 2 { |
| 40 | // we don't use tag headers at the moment so we just put an array length of 0 |
| 41 | pe.putUVarint(0) |
| 42 | } |
| 43 | pe = prepareFlexibleEncoder(pe, r.body) |
| 44 | |
| 45 | err := r.body.encode(pe) |
| 46 | if err != nil { |
| 47 | return err |
| 48 | } |
| 49 | |
| 50 | return pe.pop() |
| 51 | } |
| 52 | |
| 53 | func (r *request) decode(pd packetDecoder) (err error) { |
| 54 | key, err := pd.getInt16() |
| 55 | if err != nil { |
| 56 | return err |
| 57 | } |
| 58 | |
| 59 | version, err := pd.getInt16() |
| 60 | if err != nil { |
| 61 | return err |
| 62 | } |
| 63 | |
| 64 | r.correlationID, err = pd.getInt32() |
| 65 | if err != nil { |
| 66 | return err |
| 67 | } |
| 68 | |
| 69 | r.clientID, err = pd.getString() |
| 70 | if err != nil { |
| 71 | return err |
| 72 | } |
| 73 | |
| 74 | r.body = allocateBody(key, version) |
| 75 | if r.body == nil { |
| 76 | return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)} |
| 77 | } |
| 78 | |
| 79 | if r.body.headerVersion() >= 2 { |
| 80 | // tagged field |
| 81 | _, err = pd.getUVarint() |
| 82 | if err != nil { |
| 83 | return err |
| 84 | } |
| 85 | } |
| 86 | |
| 87 | if decoder, ok := pd.(*realDecoder); ok { |
| 88 | pd = prepareFlexibleDecoder(decoder, r.body, version) |
| 89 | } |
| 90 | return r.body.decode(pd, version) |
| 91 | } |
| 92 | |
| 93 | func decodeRequest(r io.Reader) (*request, int, error) { |
| 94 | var ( |
| 95 | bytesRead int |
| 96 | lengthBytes = make([]byte, 4) |
| 97 | ) |
| 98 | |
| 99 | if n, err := io.ReadFull(r, lengthBytes); err != nil { |
| 100 | return nil, n, err |
| 101 | } |
| 102 | |
| 103 | bytesRead += len(lengthBytes) |
| 104 | length := int32(binary.BigEndian.Uint32(lengthBytes)) |
| 105 | |
| 106 | if length <= 4 || length > MaxRequestSize { |
| 107 | return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)} |
| 108 | } |
| 109 | |
| 110 | encodedReq := make([]byte, length) |
| 111 | if n, err := io.ReadFull(r, encodedReq); err != nil { |
| 112 | return nil, bytesRead + n, err |
| 113 | } |
| 114 | |
| 115 | bytesRead += len(encodedReq) |
| 116 | |
| 117 | req := &request{} |
| 118 | if err := decode(encodedReq, req, nil); err != nil { |
| 119 | return nil, bytesRead, err |
| 120 | } |
| 121 | |
| 122 | return req, bytesRead, nil |
| 123 | } |
| 124 | |
| 125 | func allocateBody(key, version int16) protocolBody { |
| 126 | switch key { |
| 127 | case apiKeyProduce: |
| 128 | return &ProduceRequest{Version: version} |
| 129 | case apiKeyFetch: |
| 130 | return &FetchRequest{Version: version} |
| 131 | case apiKeyListOffsets: |
| 132 | return &OffsetRequest{Version: version} |
| 133 | case apiKeyMetadata: |
| 134 | return &MetadataRequest{Version: version} |
| 135 | // 4: LeaderAndIsrRequest |
| 136 | // 5: StopReplicaRequest |
| 137 | // 6: UpdateMetadataRequest |
| 138 | // 7: ControlledShutdownRequest |
| 139 | case apiKeyOffsetCommit: |
| 140 | return &OffsetCommitRequest{Version: version} |
| 141 | case apiKeyOffsetFetch: |
| 142 | return &OffsetFetchRequest{Version: version} |
| 143 | case apiKeyFindCoordinator: |
| 144 | return &FindCoordinatorRequest{Version: version} |
| 145 | case apiKeyJoinGroup: |
| 146 | return &JoinGroupRequest{Version: version} |
| 147 | case apiKeyHeartbeat: |
| 148 | return &HeartbeatRequest{Version: version} |
| 149 | case apiKeyLeaveGroup: |
| 150 | return &LeaveGroupRequest{Version: version} |
| 151 | case apiKeySyncGroup: |
| 152 | return &SyncGroupRequest{Version: version} |
| 153 | case apiKeyDescribeGroups: |
| 154 | return &DescribeGroupsRequest{Version: version} |
| 155 | case apiKeyListGroups: |
| 156 | return &ListGroupsRequest{Version: version} |
| 157 | case apiKeySaslHandshake: |
| 158 | return &SaslHandshakeRequest{Version: version} |
| 159 | case apiKeyApiVersions: |
| 160 | return &ApiVersionsRequest{Version: version} |
| 161 | case apiKeyCreateTopics: |
| 162 | return &CreateTopicsRequest{Version: version} |
| 163 | case apiKeyDeleteTopics: |
| 164 | return &DeleteTopicsRequest{Version: version} |
| 165 | case apiKeyDeleteRecords: |
| 166 | return &DeleteRecordsRequest{Version: version} |
| 167 | case apiKeyInitProducerId: |
| 168 | return &InitProducerIDRequest{Version: version} |
| 169 | // 23: OffsetForLeaderEpochRequest |
| 170 | case apiKeyAddPartitionsToTxn: |
| 171 | return &AddPartitionsToTxnRequest{Version: version} |
| 172 | case apiKeyAddOffsetsToTxn: |
| 173 | return &AddOffsetsToTxnRequest{Version: version} |
| 174 | case apiKeyEndTxn: |
| 175 | return &EndTxnRequest{Version: version} |
| 176 | // 27: WriteTxnMarkersRequest |
| 177 | case apiKeyTxnOffsetCommit: |
| 178 | return &TxnOffsetCommitRequest{Version: version} |
| 179 | case apiKeyDescribeAcls: |
| 180 | return &DescribeAclsRequest{Version: int(version)} |
| 181 | case apiKeyCreateAcls: |
| 182 | return &CreateAclsRequest{Version: version} |
| 183 | case apiKeyDeleteAcls: |
| 184 | return &DeleteAclsRequest{Version: int(version)} |
| 185 | case apiKeyDescribeConfigs: |
| 186 | return &DescribeConfigsRequest{Version: version} |
| 187 | case apiKeyAlterConfigs: |
| 188 | return &AlterConfigsRequest{Version: version} |
| 189 | // 34: AlterReplicaLogDirsRequest |
| 190 | case apiKeyDescribeLogDirs: |
| 191 | return &DescribeLogDirsRequest{Version: version} |
| 192 | case apiKeySASLAuth: |
| 193 | return &SaslAuthenticateRequest{Version: version} |
| 194 | case apiKeyCreatePartitions: |
| 195 | return &CreatePartitionsRequest{Version: version} |
| 196 | // 38: CreateDelegationTokenRequest |
| 197 | // 39: RenewDelegationTokenRequest |
| 198 | // 40: ExpireDelegationTokenRequest |
| 199 | // 41: DescribeDelegationTokenRequest |
| 200 | case apiKeyDeleteGroups: |
| 201 | return &DeleteGroupsRequest{Version: version} |
| 202 | case apiKeyElectLeaders: |
| 203 | return &ElectLeadersRequest{Version: version} |
| 204 | case apiKeyIncrementalAlterConfigs: |
| 205 | return &IncrementalAlterConfigsRequest{Version: version} |
| 206 | case apiKeyAlterPartitionReassignments: |
| 207 | return &AlterPartitionReassignmentsRequest{Version: version} |
| 208 | case apiKeyListPartitionReassignments: |
| 209 | return &ListPartitionReassignmentsRequest{Version: version} |
| 210 | case apiKeyOffsetDelete: |
| 211 | return &DeleteOffsetsRequest{Version: version} |
| 212 | case apiKeyDescribeClientQuotas: |
| 213 | return &DescribeClientQuotasRequest{Version: version} |
| 214 | case apiKeyAlterClientQuotas: |
| 215 | return &AlterClientQuotasRequest{Version: version} |
| 216 | case apiKeyDescribeUserScramCredentials: |
| 217 | return &DescribeUserScramCredentialsRequest{Version: version} |
| 218 | case apiKeyAlterUserScramCredentials: |
| 219 | return &AlterUserScramCredentialsRequest{Version: version} |
| 220 | // 52: VoteRequest |
| 221 | // 53: BeginQuorumEpochRequest |
| 222 | // 54: EndQuorumEpochRequest |
| 223 | // 55: DescribeQuorumRequest |
| 224 | // 56: AlterPartitionRequest |
| 225 | // 57: UpdateFeaturesRequest |
| 226 | // 58: EnvelopeRequest |
| 227 | // 59: FetchSnapshotRequest |
| 228 | // 60: DescribeClusterRequest |
| 229 | // 61: DescribeProducersRequest |
| 230 | // 62: BrokerRegistrationRequest |
| 231 | // 63: BrokerHeartbeatRequest |
| 232 | // 64: UnregisterBrokerRequest |
| 233 | // 65: DescribeTransactionsRequest |
| 234 | // 66: ListTransactionsRequest |
| 235 | // 67: AllocateProducerIdsRequest |
| 236 | // 68: ConsumerGroupHeartbeatRequest |
| 237 | } |
| 238 | return nil |
| 239 | } |