blob: 83a1c466f964be74c6fbbe1c6ba8bce1796ac25e [file] [log] [blame]
package sarama
import (
"encoding/binary"
"fmt"
"io"
)
type protocolBody interface {
encoder
versionedDecoder
key() int16
version() int16
setVersion(int16)
headerVersion() int16
isValidVersion() bool
requiredVersion() KafkaVersion
}
type request struct {
correlationID int32
clientID string
body protocolBody
}
func (r *request) encode(pe packetEncoder) error {
pe.push(&lengthField{})
pe.putInt16(r.body.key())
pe.putInt16(r.body.version())
pe.putInt32(r.correlationID)
if r.body.headerVersion() >= 1 {
err := pe.putString(r.clientID)
if err != nil {
return err
}
}
if r.body.headerVersion() >= 2 {
// we don't use tag headers at the moment so we just put an array length of 0
pe.putUVarint(0)
}
pe = prepareFlexibleEncoder(pe, r.body)
err := r.body.encode(pe)
if err != nil {
return err
}
return pe.pop()
}
func (r *request) decode(pd packetDecoder) (err error) {
key, err := pd.getInt16()
if err != nil {
return err
}
version, err := pd.getInt16()
if err != nil {
return err
}
r.correlationID, err = pd.getInt32()
if err != nil {
return err
}
r.clientID, err = pd.getString()
if err != nil {
return err
}
r.body = allocateBody(key, version)
if r.body == nil {
return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
}
if r.body.headerVersion() >= 2 {
// tagged field
_, err = pd.getUVarint()
if err != nil {
return err
}
}
if decoder, ok := pd.(*realDecoder); ok {
pd = prepareFlexibleDecoder(decoder, r.body, version)
}
return r.body.decode(pd, version)
}
func decodeRequest(r io.Reader) (*request, int, error) {
var (
bytesRead int
lengthBytes = make([]byte, 4)
)
if n, err := io.ReadFull(r, lengthBytes); err != nil {
return nil, n, err
}
bytesRead += len(lengthBytes)
length := int32(binary.BigEndian.Uint32(lengthBytes))
if length <= 4 || length > MaxRequestSize {
return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
}
encodedReq := make([]byte, length)
if n, err := io.ReadFull(r, encodedReq); err != nil {
return nil, bytesRead + n, err
}
bytesRead += len(encodedReq)
req := &request{}
if err := decode(encodedReq, req, nil); err != nil {
return nil, bytesRead, err
}
return req, bytesRead, nil
}
func allocateBody(key, version int16) protocolBody {
switch key {
case apiKeyProduce:
return &ProduceRequest{Version: version}
case apiKeyFetch:
return &FetchRequest{Version: version}
case apiKeyListOffsets:
return &OffsetRequest{Version: version}
case apiKeyMetadata:
return &MetadataRequest{Version: version}
// 4: LeaderAndIsrRequest
// 5: StopReplicaRequest
// 6: UpdateMetadataRequest
// 7: ControlledShutdownRequest
case apiKeyOffsetCommit:
return &OffsetCommitRequest{Version: version}
case apiKeyOffsetFetch:
return &OffsetFetchRequest{Version: version}
case apiKeyFindCoordinator:
return &FindCoordinatorRequest{Version: version}
case apiKeyJoinGroup:
return &JoinGroupRequest{Version: version}
case apiKeyHeartbeat:
return &HeartbeatRequest{Version: version}
case apiKeyLeaveGroup:
return &LeaveGroupRequest{Version: version}
case apiKeySyncGroup:
return &SyncGroupRequest{Version: version}
case apiKeyDescribeGroups:
return &DescribeGroupsRequest{Version: version}
case apiKeyListGroups:
return &ListGroupsRequest{Version: version}
case apiKeySaslHandshake:
return &SaslHandshakeRequest{Version: version}
case apiKeyApiVersions:
return &ApiVersionsRequest{Version: version}
case apiKeyCreateTopics:
return &CreateTopicsRequest{Version: version}
case apiKeyDeleteTopics:
return &DeleteTopicsRequest{Version: version}
case apiKeyDeleteRecords:
return &DeleteRecordsRequest{Version: version}
case apiKeyInitProducerId:
return &InitProducerIDRequest{Version: version}
// 23: OffsetForLeaderEpochRequest
case apiKeyAddPartitionsToTxn:
return &AddPartitionsToTxnRequest{Version: version}
case apiKeyAddOffsetsToTxn:
return &AddOffsetsToTxnRequest{Version: version}
case apiKeyEndTxn:
return &EndTxnRequest{Version: version}
// 27: WriteTxnMarkersRequest
case apiKeyTxnOffsetCommit:
return &TxnOffsetCommitRequest{Version: version}
case apiKeyDescribeAcls:
return &DescribeAclsRequest{Version: int(version)}
case apiKeyCreateAcls:
return &CreateAclsRequest{Version: version}
case apiKeyDeleteAcls:
return &DeleteAclsRequest{Version: int(version)}
case apiKeyDescribeConfigs:
return &DescribeConfigsRequest{Version: version}
case apiKeyAlterConfigs:
return &AlterConfigsRequest{Version: version}
// 34: AlterReplicaLogDirsRequest
case apiKeyDescribeLogDirs:
return &DescribeLogDirsRequest{Version: version}
case apiKeySASLAuth:
return &SaslAuthenticateRequest{Version: version}
case apiKeyCreatePartitions:
return &CreatePartitionsRequest{Version: version}
// 38: CreateDelegationTokenRequest
// 39: RenewDelegationTokenRequest
// 40: ExpireDelegationTokenRequest
// 41: DescribeDelegationTokenRequest
case apiKeyDeleteGroups:
return &DeleteGroupsRequest{Version: version}
case apiKeyElectLeaders:
return &ElectLeadersRequest{Version: version}
case apiKeyIncrementalAlterConfigs:
return &IncrementalAlterConfigsRequest{Version: version}
case apiKeyAlterPartitionReassignments:
return &AlterPartitionReassignmentsRequest{Version: version}
case apiKeyListPartitionReassignments:
return &ListPartitionReassignmentsRequest{Version: version}
case apiKeyOffsetDelete:
return &DeleteOffsetsRequest{Version: version}
case apiKeyDescribeClientQuotas:
return &DescribeClientQuotasRequest{Version: version}
case apiKeyAlterClientQuotas:
return &AlterClientQuotasRequest{Version: version}
case apiKeyDescribeUserScramCredentials:
return &DescribeUserScramCredentialsRequest{Version: version}
case apiKeyAlterUserScramCredentials:
return &AlterUserScramCredentialsRequest{Version: version}
// 52: VoteRequest
// 53: BeginQuorumEpochRequest
// 54: EndQuorumEpochRequest
// 55: DescribeQuorumRequest
// 56: AlterPartitionRequest
// 57: UpdateFeaturesRequest
// 58: EnvelopeRequest
// 59: FetchSnapshotRequest
// 60: DescribeClusterRequest
// 61: DescribeProducersRequest
// 62: BrokerRegistrationRequest
// 63: BrokerHeartbeatRequest
// 64: UnregisterBrokerRequest
// 65: DescribeTransactionsRequest
// 66: ListTransactionsRequest
// 67: AllocateProducerIdsRequest
// 68: ConsumerGroupHeartbeatRequest
}
return nil
}