package sarama

import (
	"fmt"
	"strings"
	"sync"
)

// TestReporter has methods matching go's testing.T to avoid importing
// `testing` in the main part of the library.
type TestReporter interface {
	Error(...interface{})
	Errorf(string, ...interface{})
	Fatal(...interface{})
	Fatalf(string, ...interface{})
	Helper()
}

// MockResponse is a response builder interface it defines one method that
// allows generating a response based on a request body. MockResponses are used
// to program behavior of MockBroker in tests.
type MockResponse interface {
	For(reqBody versionedDecoder) (res encoderWithHeader)
}

// MockWrapper is a mock response builder that returns a particular concrete
// response regardless of the actual request passed to the `For` method.
type MockWrapper struct {
	res encoderWithHeader
}

func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
	return mw.res
}

func NewMockWrapper(res encoderWithHeader) *MockWrapper {
	return &MockWrapper{res: res}
}

// MockSequence is a mock response builder that is created from a sequence of
// concrete responses. Every time when a `MockBroker` calls its `For` method
// the next response from the sequence is returned. When the end of the
// sequence is reached the last element from the sequence is returned.
type MockSequence struct {
	responses []MockResponse
}

func NewMockSequence(responses ...interface{}) *MockSequence {
	ms := &MockSequence{}
	ms.responses = make([]MockResponse, len(responses))
	for i, res := range responses {
		switch res := res.(type) {
		case MockResponse:
			ms.responses[i] = res
		case encoderWithHeader:
			ms.responses[i] = NewMockWrapper(res)
		default:
			panic(fmt.Sprintf("Unexpected response type: %T", res))
		}
	}
	return ms
}

func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
	res = mc.responses[0].For(reqBody)
	if len(mc.responses) > 1 {
		mc.responses = mc.responses[1:]
	}
	return res
}

type MockListGroupsResponse struct {
	groups map[string]string
	t      TestReporter
}

func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
	return &MockListGroupsResponse{
		groups: make(map[string]string),
		t:      t,
	}
}

func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	request := reqBody.(*ListGroupsRequest)
	response := &ListGroupsResponse{
		Version: request.Version,
		Groups:  m.groups,
	}
	return response
}

func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
	m.groups[groupID] = protocolType
	return m
}

type MockDescribeGroupsResponse struct {
	groups map[string]*GroupDescription
	t      TestReporter
}

func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
	return &MockDescribeGroupsResponse{
		t:      t,
		groups: make(map[string]*GroupDescription),
	}
}

func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
	m.groups[groupID] = description
	return m
}

func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	request := reqBody.(*DescribeGroupsRequest)

	response := &DescribeGroupsResponse{Version: request.version()}
	for _, requestedGroup := range request.Groups {
		if group, ok := m.groups[requestedGroup]; ok {
			response.Groups = append(response.Groups, group)
		} else {
			// Mimic real kafka - if a group doesn't exist, return
			// an entry with state "Dead"
			response.Groups = append(response.Groups, &GroupDescription{
				GroupId: requestedGroup,
				State:   "Dead",
			})
		}
	}

	return response
}

// MockMetadataResponse is a `MetadataResponse` builder.
type MockMetadataResponse struct {
	controllerID int32
	errors       map[string]KError
	leaders      map[string]map[int32]int32
	brokers      map[string]int32
	t            TestReporter
}

func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
	return &MockMetadataResponse{
		errors:  make(map[string]KError),
		leaders: make(map[string]map[int32]int32),
		brokers: make(map[string]int32),
		t:       t,
	}
}

func (mmr *MockMetadataResponse) SetError(topic string, kerror KError) *MockMetadataResponse {
	mmr.errors[topic] = kerror
	return mmr
}

func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
	partitions := mmr.leaders[topic]
	if partitions == nil {
		partitions = make(map[int32]int32)
		mmr.leaders[topic] = partitions
	}
	partitions[partition] = brokerID
	return mmr
}

func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
	mmr.brokers[addr] = brokerID
	return mmr
}

func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
	mmr.controllerID = brokerID
	return mmr
}

func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
	metadataRequest := reqBody.(*MetadataRequest)
	metadataResponse := &MetadataResponse{
		Version:      metadataRequest.version(),
		ControllerID: mmr.controllerID,
	}
	for addr, brokerID := range mmr.brokers {
		metadataResponse.AddBroker(addr, brokerID)
	}

	// Generate set of replicas
	var replicas []int32
	var offlineReplicas []int32
	for _, brokerID := range mmr.brokers {
		replicas = append(replicas, brokerID)
	}

	if len(metadataRequest.Topics) == 0 {
		for topic, partitions := range mmr.leaders {
			for partition, brokerID := range partitions {
				metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
			}
		}
		for topic, err := range mmr.errors {
			metadataResponse.AddTopic(topic, err)
		}
		return metadataResponse
	}
	for _, topic := range metadataRequest.Topics {
		leaders, ok := mmr.leaders[topic]
		if !ok {
			if err, ok := mmr.errors[topic]; ok {
				metadataResponse.AddTopic(topic, err)
			} else {
				metadataResponse.AddTopic(topic, ErrUnknownTopicOrPartition)
			}
			continue
		}
		for partition, brokerID := range leaders {
			metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
		}
	}
	return metadataResponse
}

// MockOffsetResponse is an `OffsetResponse` builder.
type MockOffsetResponse struct {
	offsets map[string]map[int32]map[int64]int64
	t       TestReporter
}

func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
	return &MockOffsetResponse{
		offsets: make(map[string]map[int32]map[int64]int64),
		t:       t,
	}
}

func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
	partitions := mor.offsets[topic]
	if partitions == nil {
		partitions = make(map[int32]map[int64]int64)
		mor.offsets[topic] = partitions
	}
	times := partitions[partition]
	if times == nil {
		times = make(map[int64]int64)
		partitions[partition] = times
	}
	times[time] = offset
	return mor
}

func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
	offsetRequest := reqBody.(*OffsetRequest)
	offsetResponse := &OffsetResponse{Version: offsetRequest.Version}
	for topic, partitions := range offsetRequest.blocks {
		for partition, block := range partitions {
			offset := mor.getOffset(topic, partition, block.timestamp)
			offsetResponse.AddTopicPartition(topic, partition, offset)
		}
	}
	return offsetResponse
}

func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
	partitions := mor.offsets[topic]
	if partitions == nil {
		mor.t.Errorf("missing topic: %s", topic)
	}
	times := partitions[partition]
	if times == nil {
		mor.t.Errorf("missing partition: %d", partition)
	}
	offset, ok := times[time]
	if !ok {
		mor.t.Errorf("missing time: %d", time)
	}
	return offset
}

// mockMessage is a message that used to be mocked for `FetchResponse`
type mockMessage struct {
	key Encoder
	msg Encoder
}

func newMockMessage(key, msg Encoder) *mockMessage {
	return &mockMessage{
		key: key,
		msg: msg,
	}
}

// MockFetchResponse is a `FetchResponse` builder.
type MockFetchResponse struct {
	messages       map[string]map[int32]map[int64]*mockMessage
	messagesLock   *sync.RWMutex
	highWaterMarks map[string]map[int32]int64
	t              TestReporter
	batchSize      int
}

func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
	return &MockFetchResponse{
		messages:       make(map[string]map[int32]map[int64]*mockMessage),
		messagesLock:   &sync.RWMutex{},
		highWaterMarks: make(map[string]map[int32]int64),
		t:              t,
		batchSize:      batchSize,
	}
}

func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
	return mfr.SetMessageWithKey(topic, partition, offset, nil, msg)
}

func (mfr *MockFetchResponse) SetMessageWithKey(topic string, partition int32, offset int64, key, msg Encoder) *MockFetchResponse {
	mfr.messagesLock.Lock()
	defer mfr.messagesLock.Unlock()
	partitions := mfr.messages[topic]
	if partitions == nil {
		partitions = make(map[int32]map[int64]*mockMessage)
		mfr.messages[topic] = partitions
	}
	messages := partitions[partition]
	if messages == nil {
		messages = make(map[int64]*mockMessage)
		partitions[partition] = messages
	}
	messages[offset] = newMockMessage(key, msg)
	return mfr
}

func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
	partitions := mfr.highWaterMarks[topic]
	if partitions == nil {
		partitions = make(map[int32]int64)
		mfr.highWaterMarks[topic] = partitions
	}
	partitions[partition] = offset
	return mfr
}

func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
	fetchRequest := reqBody.(*FetchRequest)
	res := &FetchResponse{
		Version: fetchRequest.Version,
	}
	for topic, partitions := range fetchRequest.blocks {
		for partition, block := range partitions {
			initialOffset := block.fetchOffset
			offset := initialOffset
			maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
			for i := 0; i < mfr.batchSize && offset < maxOffset; {
				msg := mfr.getMessage(topic, partition, offset)
				if msg != nil {
					res.AddMessage(topic, partition, msg.key, msg.msg, offset)
					i++
				}
				offset++
			}
			fb := res.GetBlock(topic, partition)
			if fb == nil {
				res.AddError(topic, partition, ErrNoError)
				fb = res.GetBlock(topic, partition)
			}
			fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
		}
	}
	return res
}

func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) *mockMessage {
	mfr.messagesLock.RLock()
	defer mfr.messagesLock.RUnlock()
	partitions := mfr.messages[topic]
	if partitions == nil {
		return nil
	}
	messages := partitions[partition]
	if messages == nil {
		return nil
	}
	return messages[offset]
}

func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
	mfr.messagesLock.RLock()
	defer mfr.messagesLock.RUnlock()
	partitions := mfr.messages[topic]
	if partitions == nil {
		return 0
	}
	messages := partitions[partition]
	if messages == nil {
		return 0
	}
	return len(messages)
}

func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
	partitions := mfr.highWaterMarks[topic]
	if partitions == nil {
		return 0
	}
	return partitions[partition]
}

// MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
type MockConsumerMetadataResponse struct {
	coordinators map[string]interface{}
	t            TestReporter
}

func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
	return &MockConsumerMetadataResponse{
		coordinators: make(map[string]interface{}),
		t:            t,
	}
}

func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
	mr.coordinators[group] = broker
	return mr
}

func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
	mr.coordinators[group] = kerror
	return mr
}

func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*ConsumerMetadataRequest)
	group := req.ConsumerGroup
	res := &ConsumerMetadataResponse{Version: req.version()}
	v := mr.coordinators[group]
	switch v := v.(type) {
	case *MockBroker:
		res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
	case KError:
		res.Err = v
	}
	return res
}

// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
type MockFindCoordinatorResponse struct {
	groupCoordinators map[string]interface{}
	transCoordinators map[string]interface{}
	t                 TestReporter
}

func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
	return &MockFindCoordinatorResponse{
		groupCoordinators: make(map[string]interface{}),
		transCoordinators: make(map[string]interface{}),
		t:                 t,
	}
}

func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
	switch coordinatorType {
	case CoordinatorGroup:
		mr.groupCoordinators[group] = broker
	case CoordinatorTransaction:
		mr.transCoordinators[group] = broker
	}
	return mr
}

func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
	switch coordinatorType {
	case CoordinatorGroup:
		mr.groupCoordinators[group] = kerror
	case CoordinatorTransaction:
		mr.transCoordinators[group] = kerror
	}
	return mr
}

func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*FindCoordinatorRequest)
	res := &FindCoordinatorResponse{Version: req.version()}
	var v interface{}
	switch req.CoordinatorType {
	case CoordinatorGroup:
		v = mr.groupCoordinators[req.CoordinatorKey]
	case CoordinatorTransaction:
		v = mr.transCoordinators[req.CoordinatorKey]
	}
	switch v := v.(type) {
	case *MockBroker:
		res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
	case KError:
		res.Err = v
	}
	return res
}

// MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
type MockOffsetCommitResponse struct {
	errors map[string]map[string]map[int32]KError
	t      TestReporter
}

func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
	return &MockOffsetCommitResponse{t: t}
}

func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
	if mr.errors == nil {
		mr.errors = make(map[string]map[string]map[int32]KError)
	}
	topics := mr.errors[group]
	if topics == nil {
		topics = make(map[string]map[int32]KError)
		mr.errors[group] = topics
	}
	partitions := topics[topic]
	if partitions == nil {
		partitions = make(map[int32]KError)
		topics[topic] = partitions
	}
	partitions[partition] = kerror
	return mr
}

func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*OffsetCommitRequest)
	group := req.ConsumerGroup
	res := &OffsetCommitResponse{Version: req.version()}
	for topic, partitions := range req.blocks {
		for partition := range partitions {
			res.AddError(topic, partition, mr.getError(group, topic, partition))
		}
	}
	return res
}

func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
	topics := mr.errors[group]
	if topics == nil {
		return ErrNoError
	}
	partitions := topics[topic]
	if partitions == nil {
		return ErrNoError
	}
	kerror, ok := partitions[partition]
	if !ok {
		return ErrNoError
	}
	return kerror
}

// MockProduceResponse is a `ProduceResponse` builder.
type MockProduceResponse struct {
	version int16
	errors  map[string]map[int32]KError
	t       TestReporter
}

func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
	return &MockProduceResponse{t: t}
}

func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
	mr.version = version
	return mr
}

func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
	if mr.errors == nil {
		mr.errors = make(map[string]map[int32]KError)
	}
	partitions := mr.errors[topic]
	if partitions == nil {
		partitions = make(map[int32]KError)
		mr.errors[topic] = partitions
	}
	partitions[partition] = kerror
	return mr
}

func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*ProduceRequest)
	res := &ProduceResponse{
		Version: req.version(),
	}
	if mr.version > 0 {
		res.Version = mr.version
	}
	for topic, partitions := range req.records {
		for partition := range partitions {
			res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
		}
	}
	return res
}

func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
	partitions := mr.errors[topic]
	if partitions == nil {
		return ErrNoError
	}
	kerror, ok := partitions[partition]
	if !ok {
		return ErrNoError
	}
	return kerror
}

// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
type MockOffsetFetchResponse struct {
	offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
	error   KError
	t       TestReporter
}

func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
	return &MockOffsetFetchResponse{t: t}
}

func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
	if mr.offsets == nil {
		mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
	}
	topics := mr.offsets[group]
	if topics == nil {
		topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
		mr.offsets[group] = topics
	}
	partitions := topics[topic]
	if partitions == nil {
		partitions = make(map[int32]*OffsetFetchResponseBlock)
		topics[topic] = partitions
	}
	partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
	return mr
}

func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
	mr.error = kerror
	return mr
}

func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*OffsetFetchRequest)
	group := req.ConsumerGroup
	res := &OffsetFetchResponse{Version: req.Version}

	for topic, partitions := range mr.offsets[group] {
		for partition, block := range partitions {
			res.AddBlock(topic, partition, block)
		}
	}

	if res.Version >= 2 {
		res.Err = mr.error
	}
	return res
}

type MockCreateTopicsResponse struct {
	t TestReporter
}

func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
	return &MockCreateTopicsResponse{t: t}
}

func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*CreateTopicsRequest)
	res := &CreateTopicsResponse{
		Version: req.Version,
	}
	res.TopicErrors = make(map[string]*TopicError)

	for topic := range req.TopicDetails {
		if res.Version >= 1 && strings.HasPrefix(topic, "_") {
			msg := "insufficient permissions to create topic with reserved prefix"
			res.TopicErrors[topic] = &TopicError{
				Err:    ErrTopicAuthorizationFailed,
				ErrMsg: &msg,
			}
			continue
		}
		res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
	}
	return res
}

type MockDeleteTopicsResponse struct {
	t     TestReporter
	error KError
}

func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
	return &MockDeleteTopicsResponse{t: t}
}

func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*DeleteTopicsRequest)
	res := &DeleteTopicsResponse{Version: req.version()}
	res.TopicErrorCodes = make(map[string]KError)

	for _, topic := range req.Topics {
		res.TopicErrorCodes[topic] = mr.error
	}
	res.Version = req.Version
	return res
}

func (mr *MockDeleteTopicsResponse) SetError(kerror KError) *MockDeleteTopicsResponse {
	mr.error = kerror
	return mr
}

type MockCreatePartitionsResponse struct {
	t TestReporter
}

func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
	return &MockCreatePartitionsResponse{t: t}
}

func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*CreatePartitionsRequest)
	res := &CreatePartitionsResponse{Version: req.version()}
	res.TopicPartitionErrors = make(map[string]*TopicPartitionError)

	for topic := range req.TopicPartitions {
		if strings.HasPrefix(topic, "_") {
			msg := "insufficient permissions to create partition on topic with reserved prefix"
			res.TopicPartitionErrors[topic] = &TopicPartitionError{
				Err:    ErrTopicAuthorizationFailed,
				ErrMsg: &msg,
			}
			continue
		}
		res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
	}
	return res
}

type MockAlterPartitionReassignmentsResponse struct {
	t TestReporter
}

func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {
	return &MockAlterPartitionReassignmentsResponse{t: t}
}

func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*AlterPartitionReassignmentsRequest)
	_ = req
	res := &AlterPartitionReassignmentsResponse{Version: req.version()}
	return res
}

type MockListPartitionReassignmentsResponse struct {
	t TestReporter
}

func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
	return &MockListPartitionReassignmentsResponse{t: t}
}

func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*ListPartitionReassignmentsRequest)
	_ = req
	res := &ListPartitionReassignmentsResponse{Version: req.version()}

	for topic, partitions := range req.blocks {
		for _, partition := range partitions {
			res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
		}
	}

	return res
}

type MockElectLeadersResponse struct {
	t TestReporter
}

func NewMockElectLeadersResponse(t TestReporter) *MockElectLeadersResponse {
	return &MockElectLeadersResponse{t: t}
}

func (mr *MockElectLeadersResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*ElectLeadersRequest)
	res := &ElectLeadersResponse{Version: req.version(), ReplicaElectionResults: map[string]map[int32]*PartitionResult{}}

	for topic, partitions := range req.TopicPartitions {
		for _, partition := range partitions {
			res.ReplicaElectionResults[topic] = map[int32]*PartitionResult{
				partition: {ErrorCode: ErrNoError},
			}
		}
	}
	return res
}

type MockDeleteRecordsResponse struct {
	t TestReporter
}

func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
	return &MockDeleteRecordsResponse{t: t}
}

func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*DeleteRecordsRequest)
	res := &DeleteRecordsResponse{Version: req.version()}
	res.Topics = make(map[string]*DeleteRecordsResponseTopic)

	for topic, deleteRecordRequestTopic := range req.Topics {
		partitions := make(map[int32]*DeleteRecordsResponsePartition)
		for partition := range deleteRecordRequestTopic.PartitionOffsets {
			partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
		}
		res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
	}
	return res
}

type MockDescribeConfigsResponse struct {
	t TestReporter
}

func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
	return &MockDescribeConfigsResponse{t: t}
}

func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*DescribeConfigsRequest)
	res := &DescribeConfigsResponse{
		Version: req.Version,
	}

	includeSynonyms := req.Version > 0
	includeSource := req.Version > 0

	for _, r := range req.Resources {
		var configEntries []*ConfigEntry
		switch r.Type {
		case BrokerResource:
			configEntries = append(configEntries,
				&ConfigEntry{
					Name:     "min.insync.replicas",
					Value:    "2",
					ReadOnly: false,
					Default:  false,
				},
			)
			res.Resources = append(res.Resources, &ResourceResponse{
				Name:    r.Name,
				Configs: configEntries,
			})
		case BrokerLoggerResource:
			configEntries = append(configEntries,
				&ConfigEntry{
					Name:     "kafka.controller.KafkaController",
					Value:    "DEBUG",
					ReadOnly: false,
					Default:  false,
				},
			)
			res.Resources = append(res.Resources, &ResourceResponse{
				Name:    r.Name,
				Configs: configEntries,
			})
		case TopicResource:
			maxMessageBytes := &ConfigEntry{
				Name:      "max.message.bytes",
				Value:     "1000000",
				ReadOnly:  false,
				Default:   !includeSource,
				Sensitive: false,
			}
			if includeSource {
				maxMessageBytes.Source = SourceDefault
			}
			if includeSynonyms {
				maxMessageBytes.Synonyms = []*ConfigSynonym{
					{
						ConfigName:  "max.message.bytes",
						ConfigValue: "500000",
					},
				}
			}
			retentionMs := &ConfigEntry{
				Name:      "retention.ms",
				Value:     "5000",
				ReadOnly:  false,
				Default:   false,
				Sensitive: false,
			}
			if includeSynonyms {
				retentionMs.Synonyms = []*ConfigSynonym{
					{
						ConfigName:  "log.retention.ms",
						ConfigValue: "2500",
					},
				}
			}
			password := &ConfigEntry{
				Name:      "password",
				Value:     "12345",
				ReadOnly:  false,
				Default:   false,
				Sensitive: true,
			}
			configEntries = append(
				configEntries, maxMessageBytes, retentionMs, password)
			res.Resources = append(res.Resources, &ResourceResponse{
				Name:    r.Name,
				Configs: configEntries,
			})
		}
	}
	return res
}

type MockDescribeConfigsResponseWithErrorCode struct {
	t TestReporter
}

func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode {
	return &MockDescribeConfigsResponseWithErrorCode{t: t}
}

func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*DescribeConfigsRequest)
	res := &DescribeConfigsResponse{
		Version: req.Version,
	}

	for _, r := range req.Resources {
		res.Resources = append(res.Resources, &ResourceResponse{
			Name:      r.Name,
			Type:      r.Type,
			ErrorCode: 83,
			ErrorMsg:  "",
		})
	}
	return res
}

type MockAlterConfigsResponse struct {
	t TestReporter
}

func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
	return &MockAlterConfigsResponse{t: t}
}

func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*AlterConfigsRequest)
	res := &AlterConfigsResponse{Version: req.version()}

	for _, r := range req.Resources {
		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
			Name:     r.Name,
			Type:     r.Type,
			ErrorMsg: "",
		})
	}
	return res
}

type MockAlterConfigsResponseWithErrorCode struct {
	t TestReporter
}

func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode {
	return &MockAlterConfigsResponseWithErrorCode{t: t}
}

func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*AlterConfigsRequest)
	res := &AlterConfigsResponse{Version: req.version()}

	for _, r := range req.Resources {
		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
			Name:      r.Name,
			Type:      r.Type,
			ErrorCode: 83,
			ErrorMsg:  "",
		})
	}
	return res
}

type MockIncrementalAlterConfigsResponse struct {
	t TestReporter
}

func NewMockIncrementalAlterConfigsResponse(t TestReporter) *MockIncrementalAlterConfigsResponse {
	return &MockIncrementalAlterConfigsResponse{t: t}
}

func (mr *MockIncrementalAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*IncrementalAlterConfigsRequest)
	res := &IncrementalAlterConfigsResponse{Version: req.version()}

	for _, r := range req.Resources {
		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
			Name:     r.Name,
			Type:     r.Type,
			ErrorMsg: "",
		})
	}
	return res
}

type MockIncrementalAlterConfigsResponseWithErrorCode struct {
	t TestReporter
}

func NewMockIncrementalAlterConfigsResponseWithErrorCode(t TestReporter) *MockIncrementalAlterConfigsResponseWithErrorCode {
	return &MockIncrementalAlterConfigsResponseWithErrorCode{t: t}
}

func (mr *MockIncrementalAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*IncrementalAlterConfigsRequest)
	res := &IncrementalAlterConfigsResponse{Version: req.version()}

	for _, r := range req.Resources {
		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
			Name:      r.Name,
			Type:      r.Type,
			ErrorCode: 83,
			ErrorMsg:  "",
		})
	}
	return res
}

type MockCreateAclsResponse struct {
	t TestReporter
}

func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
	return &MockCreateAclsResponse{t: t}
}

func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*CreateAclsRequest)
	res := &CreateAclsResponse{Version: req.version()}

	for range req.AclCreations {
		res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
	}
	return res
}

type MockCreateAclsResponseError struct {
	t TestReporter
}

func NewMockCreateAclsResponseWithError(t TestReporter) *MockCreateAclsResponseError {
	return &MockCreateAclsResponseError{t: t}
}

func (mr *MockCreateAclsResponseError) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*CreateAclsRequest)
	res := &CreateAclsResponse{Version: req.version()}

	for range req.AclCreations {
		res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrInvalidRequest})
	}
	return res
}

type MockListAclsResponse struct {
	t TestReporter
}

func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
	return &MockListAclsResponse{t: t}
}

func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*DescribeAclsRequest)
	res := &DescribeAclsResponse{Version: req.version()}
	res.Err = ErrNoError
	acl := &ResourceAcls{}
	if req.ResourceName != nil {
		acl.Resource.ResourceName = *req.ResourceName
	}
	acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
	acl.Resource.ResourceType = req.ResourceType

	host := "*"
	if req.Host != nil {
		host = *req.Host
	}

	principal := "User:test"
	if req.Principal != nil {
		principal = *req.Principal
	}

	permissionType := req.PermissionType
	if permissionType == AclPermissionAny {
		permissionType = AclPermissionAllow
	}

	acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
	res.ResourceAcls = append(res.ResourceAcls, acl)
	res.Version = int16(req.Version)
	return res
}

type MockSaslAuthenticateResponse struct {
	t                 TestReporter
	kerror            KError
	saslAuthBytes     []byte
	sessionLifetimeMs int64
}

func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
	return &MockSaslAuthenticateResponse{t: t}
}

func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*SaslAuthenticateRequest)
	res := &SaslAuthenticateResponse{
		Version:           req.version(),
		Err:               msar.kerror,
		SaslAuthBytes:     msar.saslAuthBytes,
		SessionLifetimeMs: msar.sessionLifetimeMs,
	}
	return res
}

func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
	msar.kerror = kerror
	return msar
}

func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
	msar.saslAuthBytes = saslAuthBytes
	return msar
}

func (msar *MockSaslAuthenticateResponse) SetSessionLifetimeMs(sessionLifetimeMs int64) *MockSaslAuthenticateResponse {
	msar.sessionLifetimeMs = sessionLifetimeMs
	return msar
}

type MockDeleteAclsResponse struct {
	t TestReporter
}

type MockSaslHandshakeResponse struct {
	enabledMechanisms []string
	kerror            KError
	t                 TestReporter
}

func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
	return &MockSaslHandshakeResponse{t: t}
}

func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*SaslHandshakeRequest)
	res := &SaslHandshakeResponse{Version: req.version()}
	res.Err = mshr.kerror
	res.EnabledMechanisms = mshr.enabledMechanisms
	return res
}

func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
	mshr.kerror = kerror
	return mshr
}

func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
	mshr.enabledMechanisms = enabledMechanisms
	return mshr
}

func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
	return &MockDeleteAclsResponse{t: t}
}

func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*DeleteAclsRequest)
	res := &DeleteAclsResponse{Version: req.version()}

	for range req.Filters {
		response := &FilterResponse{Err: ErrNoError}
		response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
		res.FilterResponses = append(res.FilterResponses, response)
	}
	res.Version = int16(req.Version)
	return res
}

type MockDeleteGroupsResponse struct {
	deletedGroups []string
}

func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
	return &MockDeleteGroupsResponse{}
}

func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
	m.deletedGroups = groups
	return m
}

func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*DeleteGroupsRequest)
	resp := &DeleteGroupsResponse{
		Version:         req.version(),
		GroupErrorCodes: map[string]KError{},
	}
	for _, group := range m.deletedGroups {
		resp.GroupErrorCodes[group] = ErrNoError
	}
	return resp
}

type MockDeleteOffsetResponse struct {
	errorCode      KError
	topic          string
	partition      int32
	errorPartition KError
}

func NewMockDeleteOffsetRequest(t TestReporter) *MockDeleteOffsetResponse {
	return &MockDeleteOffsetResponse{}
}

func (m *MockDeleteOffsetResponse) SetDeletedOffset(errorCode KError, topic string, partition int32, errorPartition KError) *MockDeleteOffsetResponse {
	m.errorCode = errorCode
	m.topic = topic
	m.partition = partition
	m.errorPartition = errorPartition
	return m
}

func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*DeleteOffsetsRequest)
	resp := &DeleteOffsetsResponse{
		Version:   req.version(),
		ErrorCode: m.errorCode,
		Errors: map[string]map[int32]KError{
			m.topic: {m.partition: m.errorPartition},
		},
	}
	return resp
}

type MockJoinGroupResponse struct {
	t TestReporter

	ThrottleTime  int32
	Err           KError
	GenerationId  int32
	GroupProtocol string
	LeaderId      string
	MemberId      string
	Members       []GroupMember
}

func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse {
	return &MockJoinGroupResponse{
		t:       t,
		Members: make([]GroupMember, 0),
	}
}

func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*JoinGroupRequest)
	resp := &JoinGroupResponse{
		Version:       req.Version,
		ThrottleTime:  m.ThrottleTime,
		Err:           m.Err,
		GenerationId:  m.GenerationId,
		GroupProtocol: m.GroupProtocol,
		LeaderId:      m.LeaderId,
		MemberId:      m.MemberId,
		Members:       m.Members,
	}
	return resp
}

func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse {
	m.ThrottleTime = t
	return m
}

func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse {
	m.Err = kerr
	return m
}

func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse {
	m.GenerationId = id
	return m
}

func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse {
	m.GroupProtocol = proto
	return m
}

func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse {
	m.LeaderId = id
	return m
}

func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse {
	m.MemberId = id
	return m
}

func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse {
	bin, err := encode(meta, nil)
	if err != nil {
		panic(fmt.Sprintf("error encoding member metadata: %v", err))
	}
	m.Members = append(m.Members, GroupMember{MemberId: id, Metadata: bin})
	return m
}

type MockLeaveGroupResponse struct {
	t TestReporter

	Err KError
}

func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse {
	return &MockLeaveGroupResponse{t: t}
}

func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*LeaveGroupRequest)
	resp := &LeaveGroupResponse{
		Version: req.version(),
		Err:     m.Err,
	}
	return resp
}

func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse {
	m.Err = kerr
	return m
}

type MockSyncGroupResponse struct {
	t TestReporter

	Err              KError
	MemberAssignment []byte
}

func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse {
	return &MockSyncGroupResponse{t: t}
}

func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*SyncGroupRequest)
	resp := &SyncGroupResponse{
		Version:          req.version(),
		Err:              m.Err,
		MemberAssignment: m.MemberAssignment,
	}
	return resp
}

func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse {
	m.Err = kerr
	return m
}

func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse {
	bin, err := encode(assignment, nil)
	if err != nil {
		panic(fmt.Sprintf("error encoding member assignment: %v", err))
	}
	m.MemberAssignment = bin
	return m
}

type MockHeartbeatResponse struct {
	t TestReporter

	Err KError
}

func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse {
	return &MockHeartbeatResponse{t: t}
}

func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*HeartbeatRequest)
	resp := &HeartbeatResponse{
		Version: req.version(),
	}
	return resp
}

func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse {
	m.Err = kerr
	return m
}

type MockDescribeLogDirsResponse struct {
	t       TestReporter
	logDirs []DescribeLogDirsResponseDirMetadata
}

func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
	return &MockDescribeLogDirsResponse{t: t}
}

func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
	var topics []DescribeLogDirsResponseTopic
	for topic := range topicPartitions {
		var partitions []DescribeLogDirsResponsePartition
		for i := 0; i < topicPartitions[topic]; i++ {
			partitions = append(partitions, DescribeLogDirsResponsePartition{
				PartitionID: int32(i),
				IsTemporary: false,
				OffsetLag:   int64(0),
				Size:        int64(1234),
			})
		}
		topics = append(topics, DescribeLogDirsResponseTopic{
			Topic:      topic,
			Partitions: partitions,
		})
	}
	logDir := DescribeLogDirsResponseDirMetadata{
		ErrorCode: ErrNoError,
		Path:      logDirPath,
		Topics:    topics,
	}
	m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
	return m
}

func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*DescribeLogDirsRequest)
	resp := &DescribeLogDirsResponse{
		Version: req.version(),
		LogDirs: m.logDirs,
	}
	return resp
}

type MockApiVersionsResponse struct {
	t       TestReporter
	apiKeys []ApiVersionsResponseKey
}

func NewMockApiVersionsResponse(t TestReporter) *MockApiVersionsResponse {
	return &MockApiVersionsResponse{
		t: t,
		apiKeys: []ApiVersionsResponseKey{
			{
				ApiKey:     0,
				MinVersion: 5,
				MaxVersion: 8,
			},
			{
				ApiKey:     1,
				MinVersion: 7,
				MaxVersion: 11,
			},
		},
	}
}

func (m *MockApiVersionsResponse) SetApiKeys(apiKeys []ApiVersionsResponseKey) *MockApiVersionsResponse {
	m.apiKeys = apiKeys
	return m
}

func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*ApiVersionsRequest)
	res := &ApiVersionsResponse{
		Version: req.Version,
		ApiKeys: m.apiKeys,
	}
	return res
}

// MockInitProducerIDResponse is an `InitPorducerIDResponse` builder.
type MockInitProducerIDResponse struct {
	producerID    int64
	producerEpoch int16
	err           KError
	t             TestReporter
}

func NewMockInitProducerIDResponse(t TestReporter) *MockInitProducerIDResponse {
	return &MockInitProducerIDResponse{
		t: t,
	}
}

func (m *MockInitProducerIDResponse) SetProducerID(id int) *MockInitProducerIDResponse {
	m.producerID = int64(id)
	return m
}

func (m *MockInitProducerIDResponse) SetProducerEpoch(epoch int) *MockInitProducerIDResponse {
	m.producerEpoch = int16(epoch)
	return m
}

func (m *MockInitProducerIDResponse) SetError(err KError) *MockInitProducerIDResponse {
	m.err = err
	return m
}

func (m *MockInitProducerIDResponse) For(reqBody versionedDecoder) encoderWithHeader {
	req := reqBody.(*InitProducerIDRequest)
	res := &InitProducerIDResponse{
		Version:       req.Version,
		Err:           m.err,
		ProducerID:    m.producerID,
		ProducerEpoch: m.producerEpoch,
	}
	return res
}
