package sarama

import (
	"encoding/binary"
	"fmt"
	"io"
)

type protocolBody interface {
	encoder
	versionedDecoder
	key() int16
	version() int16
	setVersion(int16)
	headerVersion() int16
	isValidVersion() bool
	requiredVersion() KafkaVersion
}

type request struct {
	correlationID int32
	clientID      string
	body          protocolBody
}

func (r *request) encode(pe packetEncoder) error {
	pe.push(&lengthField{})
	pe.putInt16(r.body.key())
	pe.putInt16(r.body.version())
	pe.putInt32(r.correlationID)

	if r.body.headerVersion() >= 1 {
		err := pe.putString(r.clientID)
		if err != nil {
			return err
		}
	}

	if r.body.headerVersion() >= 2 {
		// we don't use tag headers at the moment so we just put an array length of 0
		pe.putUVarint(0)
	}
	pe = prepareFlexibleEncoder(pe, r.body)

	err := r.body.encode(pe)
	if err != nil {
		return err
	}

	return pe.pop()
}

func (r *request) decode(pd packetDecoder) (err error) {
	key, err := pd.getInt16()
	if err != nil {
		return err
	}

	version, err := pd.getInt16()
	if err != nil {
		return err
	}

	r.correlationID, err = pd.getInt32()
	if err != nil {
		return err
	}

	r.clientID, err = pd.getString()
	if err != nil {
		return err
	}

	r.body = allocateBody(key, version)
	if r.body == nil {
		return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
	}

	if r.body.headerVersion() >= 2 {
		// tagged field
		_, err = pd.getUVarint()
		if err != nil {
			return err
		}
	}

	if decoder, ok := pd.(*realDecoder); ok {
		pd = prepareFlexibleDecoder(decoder, r.body, version)
	}
	return r.body.decode(pd, version)
}

func decodeRequest(r io.Reader) (*request, int, error) {
	var (
		bytesRead   int
		lengthBytes = make([]byte, 4)
	)

	if n, err := io.ReadFull(r, lengthBytes); err != nil {
		return nil, n, err
	}

	bytesRead += len(lengthBytes)
	length := int32(binary.BigEndian.Uint32(lengthBytes))

	if length <= 4 || length > MaxRequestSize {
		return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
	}

	encodedReq := make([]byte, length)
	if n, err := io.ReadFull(r, encodedReq); err != nil {
		return nil, bytesRead + n, err
	}

	bytesRead += len(encodedReq)

	req := &request{}
	if err := decode(encodedReq, req, nil); err != nil {
		return nil, bytesRead, err
	}

	return req, bytesRead, nil
}

func allocateBody(key, version int16) protocolBody {
	switch key {
	case apiKeyProduce:
		return &ProduceRequest{Version: version}
	case apiKeyFetch:
		return &FetchRequest{Version: version}
	case apiKeyListOffsets:
		return &OffsetRequest{Version: version}
	case apiKeyMetadata:
		return &MetadataRequest{Version: version}
	// 4: LeaderAndIsrRequest
	// 5: StopReplicaRequest
	// 6: UpdateMetadataRequest
	// 7: ControlledShutdownRequest
	case apiKeyOffsetCommit:
		return &OffsetCommitRequest{Version: version}
	case apiKeyOffsetFetch:
		return &OffsetFetchRequest{Version: version}
	case apiKeyFindCoordinator:
		return &FindCoordinatorRequest{Version: version}
	case apiKeyJoinGroup:
		return &JoinGroupRequest{Version: version}
	case apiKeyHeartbeat:
		return &HeartbeatRequest{Version: version}
	case apiKeyLeaveGroup:
		return &LeaveGroupRequest{Version: version}
	case apiKeySyncGroup:
		return &SyncGroupRequest{Version: version}
	case apiKeyDescribeGroups:
		return &DescribeGroupsRequest{Version: version}
	case apiKeyListGroups:
		return &ListGroupsRequest{Version: version}
	case apiKeySaslHandshake:
		return &SaslHandshakeRequest{Version: version}
	case apiKeyApiVersions:
		return &ApiVersionsRequest{Version: version}
	case apiKeyCreateTopics:
		return &CreateTopicsRequest{Version: version}
	case apiKeyDeleteTopics:
		return &DeleteTopicsRequest{Version: version}
	case apiKeyDeleteRecords:
		return &DeleteRecordsRequest{Version: version}
	case apiKeyInitProducerId:
		return &InitProducerIDRequest{Version: version}
	// 23: OffsetForLeaderEpochRequest
	case apiKeyAddPartitionsToTxn:
		return &AddPartitionsToTxnRequest{Version: version}
	case apiKeyAddOffsetsToTxn:
		return &AddOffsetsToTxnRequest{Version: version}
	case apiKeyEndTxn:
		return &EndTxnRequest{Version: version}
	// 27: WriteTxnMarkersRequest
	case apiKeyTxnOffsetCommit:
		return &TxnOffsetCommitRequest{Version: version}
	case apiKeyDescribeAcls:
		return &DescribeAclsRequest{Version: int(version)}
	case apiKeyCreateAcls:
		return &CreateAclsRequest{Version: version}
	case apiKeyDeleteAcls:
		return &DeleteAclsRequest{Version: int(version)}
	case apiKeyDescribeConfigs:
		return &DescribeConfigsRequest{Version: version}
	case apiKeyAlterConfigs:
		return &AlterConfigsRequest{Version: version}
	// 34: AlterReplicaLogDirsRequest
	case apiKeyDescribeLogDirs:
		return &DescribeLogDirsRequest{Version: version}
	case apiKeySASLAuth:
		return &SaslAuthenticateRequest{Version: version}
	case apiKeyCreatePartitions:
		return &CreatePartitionsRequest{Version: version}
	// 38: CreateDelegationTokenRequest
	// 39: RenewDelegationTokenRequest
	// 40: ExpireDelegationTokenRequest
	// 41: DescribeDelegationTokenRequest
	case apiKeyDeleteGroups:
		return &DeleteGroupsRequest{Version: version}
	case apiKeyElectLeaders:
		return &ElectLeadersRequest{Version: version}
	case apiKeyIncrementalAlterConfigs:
		return &IncrementalAlterConfigsRequest{Version: version}
	case apiKeyAlterPartitionReassignments:
		return &AlterPartitionReassignmentsRequest{Version: version}
	case apiKeyListPartitionReassignments:
		return &ListPartitionReassignmentsRequest{Version: version}
	case apiKeyOffsetDelete:
		return &DeleteOffsetsRequest{Version: version}
	case apiKeyDescribeClientQuotas:
		return &DescribeClientQuotasRequest{Version: version}
	case apiKeyAlterClientQuotas:
		return &AlterClientQuotasRequest{Version: version}
	case apiKeyDescribeUserScramCredentials:
		return &DescribeUserScramCredentialsRequest{Version: version}
	case apiKeyAlterUserScramCredentials:
		return &AlterUserScramCredentialsRequest{Version: version}
		// 52: VoteRequest
		// 53: BeginQuorumEpochRequest
		// 54: EndQuorumEpochRequest
		// 55: DescribeQuorumRequest
		// 56: AlterPartitionRequest
		// 57: UpdateFeaturesRequest
		// 58: EnvelopeRequest
		// 59: FetchSnapshotRequest
		// 60: DescribeClusterRequest
		// 61: DescribeProducersRequest
		// 62: BrokerRegistrationRequest
		// 63: BrokerHeartbeatRequest
		// 64: UnregisterBrokerRequest
		// 65: DescribeTransactionsRequest
		// 66: ListTransactionsRequest
		// 67: AllocateProducerIdsRequest
		// 68: ConsumerGroupHeartbeatRequest
	}
	return nil
}
