blob: 22e8e1a63d9342ae7fa8fb11cec643e82b53588c [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package kafka
2
3/**
4 * Copyright 2016 Confluent Inc.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19import (
20 "encoding/json"
21 "fmt"
22 "os"
23 "time"
24)
25
26/*
27#include <librdkafka/rdkafka.h>
28*/
29import "C"
30
31var testconf struct {
32 Brokers string
33 Topic string
34 GroupID string
35 PerfMsgCount int
36 PerfMsgSize int
37 Config []string
38 conf ConfigMap
39}
40
41// testconf_read reads the test suite config file testconf.json which must
42// contain at least Brokers and Topic string properties.
43// Returns true if the testconf was found and usable, false if no such file, or panics
44// if the file format is wrong.
45func testconfRead() bool {
46 cf, err := os.Open("testconf.json")
47 if err != nil {
48 fmt.Fprintf(os.Stderr, "%% testconf.json not found - ignoring test\n")
49 return false
50 }
51
52 // Default values
53 testconf.PerfMsgCount = 2000000
54 testconf.PerfMsgSize = 100
55 testconf.GroupID = "testgroup"
56
57 jp := json.NewDecoder(cf)
58 err = jp.Decode(&testconf)
59 if err != nil {
60 panic(fmt.Sprintf("Failed to parse testconf: %s", err))
61 }
62
63 cf.Close()
64
65 if testconf.Brokers[0] == '$' {
66 // Read broker list from environment variable
67 testconf.Brokers = os.Getenv(testconf.Brokers[1:])
68 }
69
70 if testconf.Brokers == "" || testconf.Topic == "" {
71 panic("Missing Brokers or Topic in testconf.json")
72 }
73
74 return true
75}
76
77// update existing ConfigMap with key=value pairs from testconf.Config
78func (cm *ConfigMap) updateFromTestconf() error {
79 if testconf.Config == nil {
80 return nil
81 }
82
83 // Translate "key=value" pairs in Config to ConfigMap
84 for _, s := range testconf.Config {
85 err := cm.Set(s)
86 if err != nil {
87 return err
88 }
89 }
90
91 return nil
92
93}
94
95// Return the number of messages available in all partitions of a topic.
96// WARNING: This uses watermark offsets so it will be incorrect for compacted topics.
97func getMessageCountInTopic(topic string) (int, error) {
98
99 // Create consumer
100 config := &ConfigMap{"bootstrap.servers": testconf.Brokers,
101 "group.id": testconf.GroupID}
102 config.updateFromTestconf()
103
104 c, err := NewConsumer(config)
105 if err != nil {
106 return 0, err
107 }
108
109 // get metadata for the topic to find out number of partitions
110
111 metadata, err := c.GetMetadata(&topic, false, 5*1000)
112 if err != nil {
113 return 0, err
114 }
115
116 t, ok := metadata.Topics[topic]
117 if !ok {
118 return 0, newError(C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
119 }
120
121 cnt := 0
122 for _, p := range t.Partitions {
123 low, high, err := c.QueryWatermarkOffsets(topic, p.ID, 5*1000)
124 if err != nil {
125 continue
126 }
127 cnt += int(high - low)
128 }
129
130 return cnt, nil
131}
132
133// getBrokerList returns a list of brokers (ids) in the cluster
134func getBrokerList(H Handle) (brokers []int32, err error) {
135 md, err := getMetadata(H, nil, true, 15*1000)
136 if err != nil {
137 return nil, err
138 }
139
140 brokers = make([]int32, len(md.Brokers))
141 for i, mdBroker := range md.Brokers {
142 brokers[i] = mdBroker.ID
143 }
144
145 return brokers, nil
146}
147
148// waitTopicInMetadata waits for the given topic to show up in metadata
149func waitTopicInMetadata(H Handle, topic string, timeoutMs int) error {
150 d, _ := time.ParseDuration(fmt.Sprintf("%dms", timeoutMs))
151 tEnd := time.Now().Add(d)
152
153 for {
154 remain := tEnd.Sub(time.Now()).Seconds()
155 if remain < 0.0 {
156 return newErrorFromString(ErrTimedOut,
157 fmt.Sprintf("Timed out waiting for topic %s to appear in metadata", topic))
158 }
159
160 md, err := getMetadata(H, nil, true, int(remain*1000))
161 if err != nil {
162 return err
163 }
164
165 for _, t := range md.Topics {
166 if t.Topic != topic {
167 continue
168 }
169 if t.Error.Code() != ErrNoError || len(t.Partitions) < 1 {
170 continue
171 }
172 // Proper topic found in metadata
173 return nil
174 }
175
176 time.Sleep(500 * 1000) // 500ms
177 }
178
179}