blob: 2c352797f73796d5121e10b436e96cdde0cc522e [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "fmt"
5 "strings"
6 "sync"
7)
8
9// TestReporter has methods matching go's testing.T to avoid importing
10// `testing` in the main part of the library.
11type TestReporter interface {
12 Error(...interface{})
13 Errorf(string, ...interface{})
14 Fatal(...interface{})
15 Fatalf(string, ...interface{})
16 Helper()
17}
18
19// MockResponse is a response builder interface it defines one method that
20// allows generating a response based on a request body. MockResponses are used
21// to program behavior of MockBroker in tests.
22type MockResponse interface {
23 For(reqBody versionedDecoder) (res encoderWithHeader)
24}
25
26// MockWrapper is a mock response builder that returns a particular concrete
27// response regardless of the actual request passed to the `For` method.
28type MockWrapper struct {
29 res encoderWithHeader
30}
31
32func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
33 return mw.res
34}
35
36func NewMockWrapper(res encoderWithHeader) *MockWrapper {
37 return &MockWrapper{res: res}
38}
39
40// MockSequence is a mock response builder that is created from a sequence of
41// concrete responses. Every time when a `MockBroker` calls its `For` method
42// the next response from the sequence is returned. When the end of the
43// sequence is reached the last element from the sequence is returned.
44type MockSequence struct {
45 responses []MockResponse
46}
47
48func NewMockSequence(responses ...interface{}) *MockSequence {
49 ms := &MockSequence{}
50 ms.responses = make([]MockResponse, len(responses))
51 for i, res := range responses {
52 switch res := res.(type) {
53 case MockResponse:
54 ms.responses[i] = res
55 case encoderWithHeader:
56 ms.responses[i] = NewMockWrapper(res)
57 default:
58 panic(fmt.Sprintf("Unexpected response type: %T", res))
59 }
60 }
61 return ms
62}
63
64func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
65 res = mc.responses[0].For(reqBody)
66 if len(mc.responses) > 1 {
67 mc.responses = mc.responses[1:]
68 }
69 return res
70}
71
72type MockListGroupsResponse struct {
73 groups map[string]string
74 t TestReporter
75}
76
77func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
78 return &MockListGroupsResponse{
79 groups: make(map[string]string),
80 t: t,
81 }
82}
83
84func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
85 request := reqBody.(*ListGroupsRequest)
86 response := &ListGroupsResponse{
87 Version: request.Version,
88 Groups: m.groups,
89 }
90 return response
91}
92
93func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
94 m.groups[groupID] = protocolType
95 return m
96}
97
98type MockDescribeGroupsResponse struct {
99 groups map[string]*GroupDescription
100 t TestReporter
101}
102
103func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
104 return &MockDescribeGroupsResponse{
105 t: t,
106 groups: make(map[string]*GroupDescription),
107 }
108}
109
110func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
111 m.groups[groupID] = description
112 return m
113}
114
115func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
116 request := reqBody.(*DescribeGroupsRequest)
117
118 response := &DescribeGroupsResponse{Version: request.version()}
119 for _, requestedGroup := range request.Groups {
120 if group, ok := m.groups[requestedGroup]; ok {
121 response.Groups = append(response.Groups, group)
122 } else {
123 // Mimic real kafka - if a group doesn't exist, return
124 // an entry with state "Dead"
125 response.Groups = append(response.Groups, &GroupDescription{
126 GroupId: requestedGroup,
127 State: "Dead",
128 })
129 }
130 }
131
132 return response
133}
134
135// MockMetadataResponse is a `MetadataResponse` builder.
136type MockMetadataResponse struct {
137 controllerID int32
138 errors map[string]KError
139 leaders map[string]map[int32]int32
140 brokers map[string]int32
141 t TestReporter
142}
143
144func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
145 return &MockMetadataResponse{
146 errors: make(map[string]KError),
147 leaders: make(map[string]map[int32]int32),
148 brokers: make(map[string]int32),
149 t: t,
150 }
151}
152
153func (mmr *MockMetadataResponse) SetError(topic string, kerror KError) *MockMetadataResponse {
154 mmr.errors[topic] = kerror
155 return mmr
156}
157
158func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
159 partitions := mmr.leaders[topic]
160 if partitions == nil {
161 partitions = make(map[int32]int32)
162 mmr.leaders[topic] = partitions
163 }
164 partitions[partition] = brokerID
165 return mmr
166}
167
168func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
169 mmr.brokers[addr] = brokerID
170 return mmr
171}
172
173func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
174 mmr.controllerID = brokerID
175 return mmr
176}
177
178func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
179 metadataRequest := reqBody.(*MetadataRequest)
180 metadataResponse := &MetadataResponse{
181 Version: metadataRequest.version(),
182 ControllerID: mmr.controllerID,
183 }
184 for addr, brokerID := range mmr.brokers {
185 metadataResponse.AddBroker(addr, brokerID)
186 }
187
188 // Generate set of replicas
189 var replicas []int32
190 var offlineReplicas []int32
191 for _, brokerID := range mmr.brokers {
192 replicas = append(replicas, brokerID)
193 }
194
195 if len(metadataRequest.Topics) == 0 {
196 for topic, partitions := range mmr.leaders {
197 for partition, brokerID := range partitions {
198 metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
199 }
200 }
201 for topic, err := range mmr.errors {
202 metadataResponse.AddTopic(topic, err)
203 }
204 return metadataResponse
205 }
206 for _, topic := range metadataRequest.Topics {
207 leaders, ok := mmr.leaders[topic]
208 if !ok {
209 if err, ok := mmr.errors[topic]; ok {
210 metadataResponse.AddTopic(topic, err)
211 } else {
212 metadataResponse.AddTopic(topic, ErrUnknownTopicOrPartition)
213 }
214 continue
215 }
216 for partition, brokerID := range leaders {
217 metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
218 }
219 }
220 return metadataResponse
221}
222
223// MockOffsetResponse is an `OffsetResponse` builder.
224type MockOffsetResponse struct {
225 offsets map[string]map[int32]map[int64]int64
226 t TestReporter
227}
228
229func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
230 return &MockOffsetResponse{
231 offsets: make(map[string]map[int32]map[int64]int64),
232 t: t,
233 }
234}
235
236func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
237 partitions := mor.offsets[topic]
238 if partitions == nil {
239 partitions = make(map[int32]map[int64]int64)
240 mor.offsets[topic] = partitions
241 }
242 times := partitions[partition]
243 if times == nil {
244 times = make(map[int64]int64)
245 partitions[partition] = times
246 }
247 times[time] = offset
248 return mor
249}
250
251func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
252 offsetRequest := reqBody.(*OffsetRequest)
253 offsetResponse := &OffsetResponse{Version: offsetRequest.Version}
254 for topic, partitions := range offsetRequest.blocks {
255 for partition, block := range partitions {
256 offset := mor.getOffset(topic, partition, block.timestamp)
257 offsetResponse.AddTopicPartition(topic, partition, offset)
258 }
259 }
260 return offsetResponse
261}
262
263func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
264 partitions := mor.offsets[topic]
265 if partitions == nil {
266 mor.t.Errorf("missing topic: %s", topic)
267 }
268 times := partitions[partition]
269 if times == nil {
270 mor.t.Errorf("missing partition: %d", partition)
271 }
272 offset, ok := times[time]
273 if !ok {
274 mor.t.Errorf("missing time: %d", time)
275 }
276 return offset
277}
278
279// mockMessage is a message that used to be mocked for `FetchResponse`
280type mockMessage struct {
281 key Encoder
282 msg Encoder
283}
284
285func newMockMessage(key, msg Encoder) *mockMessage {
286 return &mockMessage{
287 key: key,
288 msg: msg,
289 }
290}
291
292// MockFetchResponse is a `FetchResponse` builder.
293type MockFetchResponse struct {
294 messages map[string]map[int32]map[int64]*mockMessage
295 messagesLock *sync.RWMutex
296 highWaterMarks map[string]map[int32]int64
297 t TestReporter
298 batchSize int
299}
300
301func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
302 return &MockFetchResponse{
303 messages: make(map[string]map[int32]map[int64]*mockMessage),
304 messagesLock: &sync.RWMutex{},
305 highWaterMarks: make(map[string]map[int32]int64),
306 t: t,
307 batchSize: batchSize,
308 }
309}
310
311func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
312 return mfr.SetMessageWithKey(topic, partition, offset, nil, msg)
313}
314
315func (mfr *MockFetchResponse) SetMessageWithKey(topic string, partition int32, offset int64, key, msg Encoder) *MockFetchResponse {
316 mfr.messagesLock.Lock()
317 defer mfr.messagesLock.Unlock()
318 partitions := mfr.messages[topic]
319 if partitions == nil {
320 partitions = make(map[int32]map[int64]*mockMessage)
321 mfr.messages[topic] = partitions
322 }
323 messages := partitions[partition]
324 if messages == nil {
325 messages = make(map[int64]*mockMessage)
326 partitions[partition] = messages
327 }
328 messages[offset] = newMockMessage(key, msg)
329 return mfr
330}
331
332func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
333 partitions := mfr.highWaterMarks[topic]
334 if partitions == nil {
335 partitions = make(map[int32]int64)
336 mfr.highWaterMarks[topic] = partitions
337 }
338 partitions[partition] = offset
339 return mfr
340}
341
342func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
343 fetchRequest := reqBody.(*FetchRequest)
344 res := &FetchResponse{
345 Version: fetchRequest.Version,
346 }
347 for topic, partitions := range fetchRequest.blocks {
348 for partition, block := range partitions {
349 initialOffset := block.fetchOffset
350 offset := initialOffset
351 maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
352 for i := 0; i < mfr.batchSize && offset < maxOffset; {
353 msg := mfr.getMessage(topic, partition, offset)
354 if msg != nil {
355 res.AddMessage(topic, partition, msg.key, msg.msg, offset)
356 i++
357 }
358 offset++
359 }
360 fb := res.GetBlock(topic, partition)
361 if fb == nil {
362 res.AddError(topic, partition, ErrNoError)
363 fb = res.GetBlock(topic, partition)
364 }
365 fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
366 }
367 }
368 return res
369}
370
371func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) *mockMessage {
372 mfr.messagesLock.RLock()
373 defer mfr.messagesLock.RUnlock()
374 partitions := mfr.messages[topic]
375 if partitions == nil {
376 return nil
377 }
378 messages := partitions[partition]
379 if messages == nil {
380 return nil
381 }
382 return messages[offset]
383}
384
385func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
386 mfr.messagesLock.RLock()
387 defer mfr.messagesLock.RUnlock()
388 partitions := mfr.messages[topic]
389 if partitions == nil {
390 return 0
391 }
392 messages := partitions[partition]
393 if messages == nil {
394 return 0
395 }
396 return len(messages)
397}
398
399func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
400 partitions := mfr.highWaterMarks[topic]
401 if partitions == nil {
402 return 0
403 }
404 return partitions[partition]
405}
406
407// MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
408type MockConsumerMetadataResponse struct {
409 coordinators map[string]interface{}
410 t TestReporter
411}
412
413func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
414 return &MockConsumerMetadataResponse{
415 coordinators: make(map[string]interface{}),
416 t: t,
417 }
418}
419
420func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
421 mr.coordinators[group] = broker
422 return mr
423}
424
425func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
426 mr.coordinators[group] = kerror
427 return mr
428}
429
430func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
431 req := reqBody.(*ConsumerMetadataRequest)
432 group := req.ConsumerGroup
433 res := &ConsumerMetadataResponse{Version: req.version()}
434 v := mr.coordinators[group]
435 switch v := v.(type) {
436 case *MockBroker:
437 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
438 case KError:
439 res.Err = v
440 }
441 return res
442}
443
444// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
445type MockFindCoordinatorResponse struct {
446 groupCoordinators map[string]interface{}
447 transCoordinators map[string]interface{}
448 t TestReporter
449}
450
451func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
452 return &MockFindCoordinatorResponse{
453 groupCoordinators: make(map[string]interface{}),
454 transCoordinators: make(map[string]interface{}),
455 t: t,
456 }
457}
458
459func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
460 switch coordinatorType {
461 case CoordinatorGroup:
462 mr.groupCoordinators[group] = broker
463 case CoordinatorTransaction:
464 mr.transCoordinators[group] = broker
465 }
466 return mr
467}
468
469func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
470 switch coordinatorType {
471 case CoordinatorGroup:
472 mr.groupCoordinators[group] = kerror
473 case CoordinatorTransaction:
474 mr.transCoordinators[group] = kerror
475 }
476 return mr
477}
478
479func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
480 req := reqBody.(*FindCoordinatorRequest)
481 res := &FindCoordinatorResponse{Version: req.version()}
482 var v interface{}
483 switch req.CoordinatorType {
484 case CoordinatorGroup:
485 v = mr.groupCoordinators[req.CoordinatorKey]
486 case CoordinatorTransaction:
487 v = mr.transCoordinators[req.CoordinatorKey]
488 }
489 switch v := v.(type) {
490 case *MockBroker:
491 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
492 case KError:
493 res.Err = v
494 }
495 return res
496}
497
498// MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
499type MockOffsetCommitResponse struct {
500 errors map[string]map[string]map[int32]KError
501 t TestReporter
502}
503
504func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
505 return &MockOffsetCommitResponse{t: t}
506}
507
508func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
509 if mr.errors == nil {
510 mr.errors = make(map[string]map[string]map[int32]KError)
511 }
512 topics := mr.errors[group]
513 if topics == nil {
514 topics = make(map[string]map[int32]KError)
515 mr.errors[group] = topics
516 }
517 partitions := topics[topic]
518 if partitions == nil {
519 partitions = make(map[int32]KError)
520 topics[topic] = partitions
521 }
522 partitions[partition] = kerror
523 return mr
524}
525
526func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
527 req := reqBody.(*OffsetCommitRequest)
528 group := req.ConsumerGroup
529 res := &OffsetCommitResponse{Version: req.version()}
530 for topic, partitions := range req.blocks {
531 for partition := range partitions {
532 res.AddError(topic, partition, mr.getError(group, topic, partition))
533 }
534 }
535 return res
536}
537
538func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
539 topics := mr.errors[group]
540 if topics == nil {
541 return ErrNoError
542 }
543 partitions := topics[topic]
544 if partitions == nil {
545 return ErrNoError
546 }
547 kerror, ok := partitions[partition]
548 if !ok {
549 return ErrNoError
550 }
551 return kerror
552}
553
554// MockProduceResponse is a `ProduceResponse` builder.
555type MockProduceResponse struct {
556 version int16
557 errors map[string]map[int32]KError
558 t TestReporter
559}
560
561func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
562 return &MockProduceResponse{t: t}
563}
564
565func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
566 mr.version = version
567 return mr
568}
569
570func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
571 if mr.errors == nil {
572 mr.errors = make(map[string]map[int32]KError)
573 }
574 partitions := mr.errors[topic]
575 if partitions == nil {
576 partitions = make(map[int32]KError)
577 mr.errors[topic] = partitions
578 }
579 partitions[partition] = kerror
580 return mr
581}
582
583func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
584 req := reqBody.(*ProduceRequest)
585 res := &ProduceResponse{
586 Version: req.version(),
587 }
588 if mr.version > 0 {
589 res.Version = mr.version
590 }
591 for topic, partitions := range req.records {
592 for partition := range partitions {
593 res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
594 }
595 }
596 return res
597}
598
599func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
600 partitions := mr.errors[topic]
601 if partitions == nil {
602 return ErrNoError
603 }
604 kerror, ok := partitions[partition]
605 if !ok {
606 return ErrNoError
607 }
608 return kerror
609}
610
611// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
612type MockOffsetFetchResponse struct {
613 offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
614 error KError
615 t TestReporter
616}
617
618func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
619 return &MockOffsetFetchResponse{t: t}
620}
621
622func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
623 if mr.offsets == nil {
624 mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
625 }
626 topics := mr.offsets[group]
627 if topics == nil {
628 topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
629 mr.offsets[group] = topics
630 }
631 partitions := topics[topic]
632 if partitions == nil {
633 partitions = make(map[int32]*OffsetFetchResponseBlock)
634 topics[topic] = partitions
635 }
636 partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
637 return mr
638}
639
640func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
641 mr.error = kerror
642 return mr
643}
644
645func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
646 req := reqBody.(*OffsetFetchRequest)
647 group := req.ConsumerGroup
648 res := &OffsetFetchResponse{Version: req.Version}
649
650 for topic, partitions := range mr.offsets[group] {
651 for partition, block := range partitions {
652 res.AddBlock(topic, partition, block)
653 }
654 }
655
656 if res.Version >= 2 {
657 res.Err = mr.error
658 }
659 return res
660}
661
662type MockCreateTopicsResponse struct {
663 t TestReporter
664}
665
666func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
667 return &MockCreateTopicsResponse{t: t}
668}
669
670func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
671 req := reqBody.(*CreateTopicsRequest)
672 res := &CreateTopicsResponse{
673 Version: req.Version,
674 }
675 res.TopicErrors = make(map[string]*TopicError)
676
677 for topic := range req.TopicDetails {
678 if res.Version >= 1 && strings.HasPrefix(topic, "_") {
679 msg := "insufficient permissions to create topic with reserved prefix"
680 res.TopicErrors[topic] = &TopicError{
681 Err: ErrTopicAuthorizationFailed,
682 ErrMsg: &msg,
683 }
684 continue
685 }
686 res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
687 }
688 return res
689}
690
691type MockDeleteTopicsResponse struct {
692 t TestReporter
693 error KError
694}
695
696func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
697 return &MockDeleteTopicsResponse{t: t}
698}
699
700func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
701 req := reqBody.(*DeleteTopicsRequest)
702 res := &DeleteTopicsResponse{Version: req.version()}
703 res.TopicErrorCodes = make(map[string]KError)
704
705 for _, topic := range req.Topics {
706 res.TopicErrorCodes[topic] = mr.error
707 }
708 res.Version = req.Version
709 return res
710}
711
712func (mr *MockDeleteTopicsResponse) SetError(kerror KError) *MockDeleteTopicsResponse {
713 mr.error = kerror
714 return mr
715}
716
717type MockCreatePartitionsResponse struct {
718 t TestReporter
719}
720
721func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
722 return &MockCreatePartitionsResponse{t: t}
723}
724
725func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
726 req := reqBody.(*CreatePartitionsRequest)
727 res := &CreatePartitionsResponse{Version: req.version()}
728 res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
729
730 for topic := range req.TopicPartitions {
731 if strings.HasPrefix(topic, "_") {
732 msg := "insufficient permissions to create partition on topic with reserved prefix"
733 res.TopicPartitionErrors[topic] = &TopicPartitionError{
734 Err: ErrTopicAuthorizationFailed,
735 ErrMsg: &msg,
736 }
737 continue
738 }
739 res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
740 }
741 return res
742}
743
744type MockAlterPartitionReassignmentsResponse struct {
745 t TestReporter
746}
747
748func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {
749 return &MockAlterPartitionReassignmentsResponse{t: t}
750}
751
752func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
753 req := reqBody.(*AlterPartitionReassignmentsRequest)
754 _ = req
755 res := &AlterPartitionReassignmentsResponse{Version: req.version()}
756 return res
757}
758
759type MockListPartitionReassignmentsResponse struct {
760 t TestReporter
761}
762
763func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
764 return &MockListPartitionReassignmentsResponse{t: t}
765}
766
767func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
768 req := reqBody.(*ListPartitionReassignmentsRequest)
769 _ = req
770 res := &ListPartitionReassignmentsResponse{Version: req.version()}
771
772 for topic, partitions := range req.blocks {
773 for _, partition := range partitions {
774 res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
775 }
776 }
777
778 return res
779}
780
781type MockElectLeadersResponse struct {
782 t TestReporter
783}
784
785func NewMockElectLeadersResponse(t TestReporter) *MockElectLeadersResponse {
786 return &MockElectLeadersResponse{t: t}
787}
788
789func (mr *MockElectLeadersResponse) For(reqBody versionedDecoder) encoderWithHeader {
790 req := reqBody.(*ElectLeadersRequest)
791 res := &ElectLeadersResponse{Version: req.version(), ReplicaElectionResults: map[string]map[int32]*PartitionResult{}}
792
793 for topic, partitions := range req.TopicPartitions {
794 for _, partition := range partitions {
795 res.ReplicaElectionResults[topic] = map[int32]*PartitionResult{
796 partition: {ErrorCode: ErrNoError},
797 }
798 }
799 }
800 return res
801}
802
803type MockDeleteRecordsResponse struct {
804 t TestReporter
805}
806
807func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
808 return &MockDeleteRecordsResponse{t: t}
809}
810
811func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
812 req := reqBody.(*DeleteRecordsRequest)
813 res := &DeleteRecordsResponse{Version: req.version()}
814 res.Topics = make(map[string]*DeleteRecordsResponseTopic)
815
816 for topic, deleteRecordRequestTopic := range req.Topics {
817 partitions := make(map[int32]*DeleteRecordsResponsePartition)
818 for partition := range deleteRecordRequestTopic.PartitionOffsets {
819 partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
820 }
821 res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
822 }
823 return res
824}
825
826type MockDescribeConfigsResponse struct {
827 t TestReporter
828}
829
830func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
831 return &MockDescribeConfigsResponse{t: t}
832}
833
834func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
835 req := reqBody.(*DescribeConfigsRequest)
836 res := &DescribeConfigsResponse{
837 Version: req.Version,
838 }
839
840 includeSynonyms := req.Version > 0
841 includeSource := req.Version > 0
842
843 for _, r := range req.Resources {
844 var configEntries []*ConfigEntry
845 switch r.Type {
846 case BrokerResource:
847 configEntries = append(configEntries,
848 &ConfigEntry{
849 Name: "min.insync.replicas",
850 Value: "2",
851 ReadOnly: false,
852 Default: false,
853 },
854 )
855 res.Resources = append(res.Resources, &ResourceResponse{
856 Name: r.Name,
857 Configs: configEntries,
858 })
859 case BrokerLoggerResource:
860 configEntries = append(configEntries,
861 &ConfigEntry{
862 Name: "kafka.controller.KafkaController",
863 Value: "DEBUG",
864 ReadOnly: false,
865 Default: false,
866 },
867 )
868 res.Resources = append(res.Resources, &ResourceResponse{
869 Name: r.Name,
870 Configs: configEntries,
871 })
872 case TopicResource:
873 maxMessageBytes := &ConfigEntry{
874 Name: "max.message.bytes",
875 Value: "1000000",
876 ReadOnly: false,
877 Default: !includeSource,
878 Sensitive: false,
879 }
880 if includeSource {
881 maxMessageBytes.Source = SourceDefault
882 }
883 if includeSynonyms {
884 maxMessageBytes.Synonyms = []*ConfigSynonym{
885 {
886 ConfigName: "max.message.bytes",
887 ConfigValue: "500000",
888 },
889 }
890 }
891 retentionMs := &ConfigEntry{
892 Name: "retention.ms",
893 Value: "5000",
894 ReadOnly: false,
895 Default: false,
896 Sensitive: false,
897 }
898 if includeSynonyms {
899 retentionMs.Synonyms = []*ConfigSynonym{
900 {
901 ConfigName: "log.retention.ms",
902 ConfigValue: "2500",
903 },
904 }
905 }
906 password := &ConfigEntry{
907 Name: "password",
908 Value: "12345",
909 ReadOnly: false,
910 Default: false,
911 Sensitive: true,
912 }
913 configEntries = append(
914 configEntries, maxMessageBytes, retentionMs, password)
915 res.Resources = append(res.Resources, &ResourceResponse{
916 Name: r.Name,
917 Configs: configEntries,
918 })
919 }
920 }
921 return res
922}
923
924type MockDescribeConfigsResponseWithErrorCode struct {
925 t TestReporter
926}
927
928func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode {
929 return &MockDescribeConfigsResponseWithErrorCode{t: t}
930}
931
932func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
933 req := reqBody.(*DescribeConfigsRequest)
934 res := &DescribeConfigsResponse{
935 Version: req.Version,
936 }
937
938 for _, r := range req.Resources {
939 res.Resources = append(res.Resources, &ResourceResponse{
940 Name: r.Name,
941 Type: r.Type,
942 ErrorCode: 83,
943 ErrorMsg: "",
944 })
945 }
946 return res
947}
948
949type MockAlterConfigsResponse struct {
950 t TestReporter
951}
952
953func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
954 return &MockAlterConfigsResponse{t: t}
955}
956
957func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
958 req := reqBody.(*AlterConfigsRequest)
959 res := &AlterConfigsResponse{Version: req.version()}
960
961 for _, r := range req.Resources {
962 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
963 Name: r.Name,
964 Type: r.Type,
965 ErrorMsg: "",
966 })
967 }
968 return res
969}
970
971type MockAlterConfigsResponseWithErrorCode struct {
972 t TestReporter
973}
974
975func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode {
976 return &MockAlterConfigsResponseWithErrorCode{t: t}
977}
978
979func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
980 req := reqBody.(*AlterConfigsRequest)
981 res := &AlterConfigsResponse{Version: req.version()}
982
983 for _, r := range req.Resources {
984 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
985 Name: r.Name,
986 Type: r.Type,
987 ErrorCode: 83,
988 ErrorMsg: "",
989 })
990 }
991 return res
992}
993
994type MockIncrementalAlterConfigsResponse struct {
995 t TestReporter
996}
997
998func NewMockIncrementalAlterConfigsResponse(t TestReporter) *MockIncrementalAlterConfigsResponse {
999 return &MockIncrementalAlterConfigsResponse{t: t}
1000}
1001
1002func (mr *MockIncrementalAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1003 req := reqBody.(*IncrementalAlterConfigsRequest)
1004 res := &IncrementalAlterConfigsResponse{Version: req.version()}
1005
1006 for _, r := range req.Resources {
1007 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
1008 Name: r.Name,
1009 Type: r.Type,
1010 ErrorMsg: "",
1011 })
1012 }
1013 return res
1014}
1015
1016type MockIncrementalAlterConfigsResponseWithErrorCode struct {
1017 t TestReporter
1018}
1019
1020func NewMockIncrementalAlterConfigsResponseWithErrorCode(t TestReporter) *MockIncrementalAlterConfigsResponseWithErrorCode {
1021 return &MockIncrementalAlterConfigsResponseWithErrorCode{t: t}
1022}
1023
1024func (mr *MockIncrementalAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
1025 req := reqBody.(*IncrementalAlterConfigsRequest)
1026 res := &IncrementalAlterConfigsResponse{Version: req.version()}
1027
1028 for _, r := range req.Resources {
1029 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
1030 Name: r.Name,
1031 Type: r.Type,
1032 ErrorCode: 83,
1033 ErrorMsg: "",
1034 })
1035 }
1036 return res
1037}
1038
1039type MockCreateAclsResponse struct {
1040 t TestReporter
1041}
1042
1043func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
1044 return &MockCreateAclsResponse{t: t}
1045}
1046
1047func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1048 req := reqBody.(*CreateAclsRequest)
1049 res := &CreateAclsResponse{Version: req.version()}
1050
1051 for range req.AclCreations {
1052 res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
1053 }
1054 return res
1055}
1056
1057type MockCreateAclsResponseError struct {
1058 t TestReporter
1059}
1060
1061func NewMockCreateAclsResponseWithError(t TestReporter) *MockCreateAclsResponseError {
1062 return &MockCreateAclsResponseError{t: t}
1063}
1064
1065func (mr *MockCreateAclsResponseError) For(reqBody versionedDecoder) encoderWithHeader {
1066 req := reqBody.(*CreateAclsRequest)
1067 res := &CreateAclsResponse{Version: req.version()}
1068
1069 for range req.AclCreations {
1070 res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrInvalidRequest})
1071 }
1072 return res
1073}
1074
1075type MockListAclsResponse struct {
1076 t TestReporter
1077}
1078
1079func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
1080 return &MockListAclsResponse{t: t}
1081}
1082
1083func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1084 req := reqBody.(*DescribeAclsRequest)
1085 res := &DescribeAclsResponse{Version: req.version()}
1086 res.Err = ErrNoError
1087 acl := &ResourceAcls{}
1088 if req.ResourceName != nil {
1089 acl.Resource.ResourceName = *req.ResourceName
1090 }
1091 acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
1092 acl.Resource.ResourceType = req.ResourceType
1093
1094 host := "*"
1095 if req.Host != nil {
1096 host = *req.Host
1097 }
1098
1099 principal := "User:test"
1100 if req.Principal != nil {
1101 principal = *req.Principal
1102 }
1103
1104 permissionType := req.PermissionType
1105 if permissionType == AclPermissionAny {
1106 permissionType = AclPermissionAllow
1107 }
1108
1109 acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
1110 res.ResourceAcls = append(res.ResourceAcls, acl)
1111 res.Version = int16(req.Version)
1112 return res
1113}
1114
1115type MockSaslAuthenticateResponse struct {
1116 t TestReporter
1117 kerror KError
1118 saslAuthBytes []byte
1119 sessionLifetimeMs int64
1120}
1121
1122func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
1123 return &MockSaslAuthenticateResponse{t: t}
1124}
1125
1126func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
1127 req := reqBody.(*SaslAuthenticateRequest)
1128 res := &SaslAuthenticateResponse{
1129 Version: req.version(),
1130 Err: msar.kerror,
1131 SaslAuthBytes: msar.saslAuthBytes,
1132 SessionLifetimeMs: msar.sessionLifetimeMs,
1133 }
1134 return res
1135}
1136
1137func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
1138 msar.kerror = kerror
1139 return msar
1140}
1141
1142func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
1143 msar.saslAuthBytes = saslAuthBytes
1144 return msar
1145}
1146
1147func (msar *MockSaslAuthenticateResponse) SetSessionLifetimeMs(sessionLifetimeMs int64) *MockSaslAuthenticateResponse {
1148 msar.sessionLifetimeMs = sessionLifetimeMs
1149 return msar
1150}
1151
1152type MockDeleteAclsResponse struct {
1153 t TestReporter
1154}
1155
1156type MockSaslHandshakeResponse struct {
1157 enabledMechanisms []string
1158 kerror KError
1159 t TestReporter
1160}
1161
1162func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
1163 return &MockSaslHandshakeResponse{t: t}
1164}
1165
1166func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
1167 req := reqBody.(*SaslHandshakeRequest)
1168 res := &SaslHandshakeResponse{Version: req.version()}
1169 res.Err = mshr.kerror
1170 res.EnabledMechanisms = mshr.enabledMechanisms
1171 return res
1172}
1173
1174func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
1175 mshr.kerror = kerror
1176 return mshr
1177}
1178
1179func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
1180 mshr.enabledMechanisms = enabledMechanisms
1181 return mshr
1182}
1183
1184func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
1185 return &MockDeleteAclsResponse{t: t}
1186}
1187
1188func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1189 req := reqBody.(*DeleteAclsRequest)
1190 res := &DeleteAclsResponse{Version: req.version()}
1191
1192 for range req.Filters {
1193 response := &FilterResponse{Err: ErrNoError}
1194 response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
1195 res.FilterResponses = append(res.FilterResponses, response)
1196 }
1197 res.Version = int16(req.Version)
1198 return res
1199}
1200
1201type MockDeleteGroupsResponse struct {
1202 deletedGroups []string
1203}
1204
1205func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
1206 return &MockDeleteGroupsResponse{}
1207}
1208
1209func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
1210 m.deletedGroups = groups
1211 return m
1212}
1213
1214func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1215 req := reqBody.(*DeleteGroupsRequest)
1216 resp := &DeleteGroupsResponse{
1217 Version: req.version(),
1218 GroupErrorCodes: map[string]KError{},
1219 }
1220 for _, group := range m.deletedGroups {
1221 resp.GroupErrorCodes[group] = ErrNoError
1222 }
1223 return resp
1224}
1225
1226type MockDeleteOffsetResponse struct {
1227 errorCode KError
1228 topic string
1229 partition int32
1230 errorPartition KError
1231}
1232
1233func NewMockDeleteOffsetRequest(t TestReporter) *MockDeleteOffsetResponse {
1234 return &MockDeleteOffsetResponse{}
1235}
1236
1237func (m *MockDeleteOffsetResponse) SetDeletedOffset(errorCode KError, topic string, partition int32, errorPartition KError) *MockDeleteOffsetResponse {
1238 m.errorCode = errorCode
1239 m.topic = topic
1240 m.partition = partition
1241 m.errorPartition = errorPartition
1242 return m
1243}
1244
1245func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
1246 req := reqBody.(*DeleteOffsetsRequest)
1247 resp := &DeleteOffsetsResponse{
1248 Version: req.version(),
1249 ErrorCode: m.errorCode,
1250 Errors: map[string]map[int32]KError{
1251 m.topic: {m.partition: m.errorPartition},
1252 },
1253 }
1254 return resp
1255}
1256
1257type MockJoinGroupResponse struct {
1258 t TestReporter
1259
1260 ThrottleTime int32
1261 Err KError
1262 GenerationId int32
1263 GroupProtocol string
1264 LeaderId string
1265 MemberId string
1266 Members []GroupMember
1267}
1268
1269func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse {
1270 return &MockJoinGroupResponse{
1271 t: t,
1272 Members: make([]GroupMember, 0),
1273 }
1274}
1275
1276func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
1277 req := reqBody.(*JoinGroupRequest)
1278 resp := &JoinGroupResponse{
1279 Version: req.Version,
1280 ThrottleTime: m.ThrottleTime,
1281 Err: m.Err,
1282 GenerationId: m.GenerationId,
1283 GroupProtocol: m.GroupProtocol,
1284 LeaderId: m.LeaderId,
1285 MemberId: m.MemberId,
1286 Members: m.Members,
1287 }
1288 return resp
1289}
1290
1291func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse {
1292 m.ThrottleTime = t
1293 return m
1294}
1295
1296func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse {
1297 m.Err = kerr
1298 return m
1299}
1300
1301func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse {
1302 m.GenerationId = id
1303 return m
1304}
1305
1306func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse {
1307 m.GroupProtocol = proto
1308 return m
1309}
1310
1311func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse {
1312 m.LeaderId = id
1313 return m
1314}
1315
1316func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse {
1317 m.MemberId = id
1318 return m
1319}
1320
1321func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse {
1322 bin, err := encode(meta, nil)
1323 if err != nil {
1324 panic(fmt.Sprintf("error encoding member metadata: %v", err))
1325 }
1326 m.Members = append(m.Members, GroupMember{MemberId: id, Metadata: bin})
1327 return m
1328}
1329
1330type MockLeaveGroupResponse struct {
1331 t TestReporter
1332
1333 Err KError
1334}
1335
1336func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse {
1337 return &MockLeaveGroupResponse{t: t}
1338}
1339
1340func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
1341 req := reqBody.(*LeaveGroupRequest)
1342 resp := &LeaveGroupResponse{
1343 Version: req.version(),
1344 Err: m.Err,
1345 }
1346 return resp
1347}
1348
1349func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse {
1350 m.Err = kerr
1351 return m
1352}
1353
1354type MockSyncGroupResponse struct {
1355 t TestReporter
1356
1357 Err KError
1358 MemberAssignment []byte
1359}
1360
1361func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse {
1362 return &MockSyncGroupResponse{t: t}
1363}
1364
1365func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
1366 req := reqBody.(*SyncGroupRequest)
1367 resp := &SyncGroupResponse{
1368 Version: req.version(),
1369 Err: m.Err,
1370 MemberAssignment: m.MemberAssignment,
1371 }
1372 return resp
1373}
1374
1375func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse {
1376 m.Err = kerr
1377 return m
1378}
1379
1380func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse {
1381 bin, err := encode(assignment, nil)
1382 if err != nil {
1383 panic(fmt.Sprintf("error encoding member assignment: %v", err))
1384 }
1385 m.MemberAssignment = bin
1386 return m
1387}
1388
1389type MockHeartbeatResponse struct {
1390 t TestReporter
1391
1392 Err KError
1393}
1394
1395func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse {
1396 return &MockHeartbeatResponse{t: t}
1397}
1398
1399func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader {
1400 req := reqBody.(*HeartbeatRequest)
1401 resp := &HeartbeatResponse{
1402 Version: req.version(),
1403 }
1404 return resp
1405}
1406
1407func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse {
1408 m.Err = kerr
1409 return m
1410}
1411
1412type MockDescribeLogDirsResponse struct {
1413 t TestReporter
1414 logDirs []DescribeLogDirsResponseDirMetadata
1415}
1416
1417func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
1418 return &MockDescribeLogDirsResponse{t: t}
1419}
1420
1421func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
1422 var topics []DescribeLogDirsResponseTopic
1423 for topic := range topicPartitions {
1424 var partitions []DescribeLogDirsResponsePartition
1425 for i := 0; i < topicPartitions[topic]; i++ {
1426 partitions = append(partitions, DescribeLogDirsResponsePartition{
1427 PartitionID: int32(i),
1428 IsTemporary: false,
1429 OffsetLag: int64(0),
1430 Size: int64(1234),
1431 })
1432 }
1433 topics = append(topics, DescribeLogDirsResponseTopic{
1434 Topic: topic,
1435 Partitions: partitions,
1436 })
1437 }
1438 logDir := DescribeLogDirsResponseDirMetadata{
1439 ErrorCode: ErrNoError,
1440 Path: logDirPath,
1441 Topics: topics,
1442 }
1443 m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
1444 return m
1445}
1446
1447func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1448 req := reqBody.(*DescribeLogDirsRequest)
1449 resp := &DescribeLogDirsResponse{
1450 Version: req.version(),
1451 LogDirs: m.logDirs,
1452 }
1453 return resp
1454}
1455
1456type MockApiVersionsResponse struct {
1457 t TestReporter
1458 apiKeys []ApiVersionsResponseKey
1459}
1460
1461func NewMockApiVersionsResponse(t TestReporter) *MockApiVersionsResponse {
1462 return &MockApiVersionsResponse{
1463 t: t,
1464 apiKeys: []ApiVersionsResponseKey{
1465 {
1466 ApiKey: 0,
1467 MinVersion: 5,
1468 MaxVersion: 8,
1469 },
1470 {
1471 ApiKey: 1,
1472 MinVersion: 7,
1473 MaxVersion: 11,
1474 },
1475 },
1476 }
1477}
1478
1479func (m *MockApiVersionsResponse) SetApiKeys(apiKeys []ApiVersionsResponseKey) *MockApiVersionsResponse {
1480 m.apiKeys = apiKeys
1481 return m
1482}
1483
1484func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1485 req := reqBody.(*ApiVersionsRequest)
1486 res := &ApiVersionsResponse{
1487 Version: req.Version,
1488 ApiKeys: m.apiKeys,
1489 }
1490 return res
1491}
1492
1493// MockInitProducerIDResponse is an `InitPorducerIDResponse` builder.
1494type MockInitProducerIDResponse struct {
1495 producerID int64
1496 producerEpoch int16
1497 err KError
1498 t TestReporter
1499}
1500
1501func NewMockInitProducerIDResponse(t TestReporter) *MockInitProducerIDResponse {
1502 return &MockInitProducerIDResponse{
1503 t: t,
1504 }
1505}
1506
1507func (m *MockInitProducerIDResponse) SetProducerID(id int) *MockInitProducerIDResponse {
1508 m.producerID = int64(id)
1509 return m
1510}
1511
1512func (m *MockInitProducerIDResponse) SetProducerEpoch(epoch int) *MockInitProducerIDResponse {
1513 m.producerEpoch = int16(epoch)
1514 return m
1515}
1516
1517func (m *MockInitProducerIDResponse) SetError(err KError) *MockInitProducerIDResponse {
1518 m.err = err
1519 return m
1520}
1521
1522func (m *MockInitProducerIDResponse) For(reqBody versionedDecoder) encoderWithHeader {
1523 req := reqBody.(*InitProducerIDRequest)
1524 res := &InitProducerIDResponse{
1525 Version: req.Version,
1526 Err: m.err,
1527 ProducerID: m.producerID,
1528 ProducerEpoch: m.producerEpoch,
1529 }
1530 return res
1531}