blob: 5bac2b50aa73728116c48ddd113bd04d5423554a [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "crypto/tls"
5 "fmt"
6 "io"
7 "net"
8 "regexp"
9 "time"
10
11 "github.com/klauspost/compress/gzip"
12 "github.com/rcrowley/go-metrics"
13 "golang.org/x/net/proxy"
14)
15
16const defaultClientID = "sarama"
17
18// validClientID specifies the permitted characters for a client.id when
19// connecting to Kafka versions before 1.0.0 (KIP-190)
20var validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
21
22// Config is used to pass multiple configuration options to Sarama's constructors.
23type Config struct {
24 // Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
25 Admin struct {
26 Retry struct {
27 // The total number of times to retry sending (retriable) admin requests (default 5).
28 // Similar to the `retries` setting of the JVM AdminClientConfig.
29 Max int
30 // Backoff time between retries of a failed request (default 100ms)
31 Backoff time.Duration
32 }
33 // The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
34 // including topics, brokers, configurations and ACLs (defaults to 3 seconds).
35 Timeout time.Duration
36 }
37
38 // Net is the namespace for network-level properties used by the Broker, and
39 // shared by the Client/Producer/Consumer.
40 Net struct {
41 // How many outstanding requests a connection is allowed to have before
42 // sending on it blocks (default 5).
43 // Throughput can improve but message ordering is not guaranteed if Producer.Idempotent is disabled, see:
44 // https://kafka.apache.org/protocol#protocol_network
45 // https://kafka.apache.org/28/documentation.html#producerconfigs_max.in.flight.requests.per.connection
46 MaxOpenRequests int
47
48 // All three of the below configurations are similar to the
49 // `socket.timeout.ms` setting in JVM kafka. All of them default
50 // to 30 seconds.
51 DialTimeout time.Duration // How long to wait for the initial connection.
52 ReadTimeout time.Duration // How long to wait for a response.
53 WriteTimeout time.Duration // How long to wait for a transmit.
54
55 // ResolveCanonicalBootstrapServers turns each bootstrap broker address
56 // into a set of IPs, then does a reverse lookup on each one to get its
57 // canonical hostname. This list of hostnames then replaces the
58 // original address list. Similar to the `client.dns.lookup` option in
59 // the JVM client, this is especially useful with GSSAPI, where it
60 // allows providing an alias record instead of individual broker
61 // hostnames. Defaults to false.
62 ResolveCanonicalBootstrapServers bool
63
64 TLS struct {
65 // Whether or not to use TLS when connecting to the broker
66 // (defaults to false).
67 Enable bool
68 // The TLS configuration to use for secure connections if
69 // enabled (defaults to nil).
70 Config *tls.Config
71 }
72
73 // SASL based authentication with broker. While there are multiple SASL authentication methods
74 // the current implementation is limited to plaintext (SASL/PLAIN) authentication
75 SASL struct {
76 // Whether or not to use SASL authentication when connecting to the broker
77 // (defaults to false).
78 Enable bool
79 // SASLMechanism is the name of the enabled SASL mechanism.
80 // Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
81 Mechanism SASLMechanism
82 // Version is the SASL Protocol Version to use
83 // Kafka > 1.x should use V1, except on Azure EventHub which use V0
84 Version int16
85 // Whether or not to send the Kafka SASL handshake first if enabled
86 // (defaults to true). You should only set this to false if you're using
87 // a non-Kafka SASL proxy.
88 Handshake bool
89 // AuthIdentity is an (optional) authorization identity (authzid) to
90 // use for SASL/PLAIN authentication (if different from User) when
91 // an authenticated user is permitted to act as the presented
92 // alternative user. See RFC4616 for details.
93 AuthIdentity string
94 // User is the authentication identity (authcid) to present for
95 // SASL/PLAIN or SASL/SCRAM authentication
96 User string
97 // Password for SASL/PLAIN authentication
98 Password string
99 // authz id used for SASL/SCRAM authentication
100 SCRAMAuthzID string
101 // SCRAMClientGeneratorFunc is a generator of a user provided implementation of a SCRAM
102 // client used to perform the SCRAM exchange with the server.
103 SCRAMClientGeneratorFunc func() SCRAMClient
104 // TokenProvider is a user-defined callback for generating
105 // access tokens for SASL/OAUTHBEARER auth. See the
106 // AccessTokenProvider interface docs for proper implementation
107 // guidelines.
108 TokenProvider AccessTokenProvider
109
110 GSSAPI GSSAPIConfig
111 }
112
113 // KeepAlive specifies the keep-alive period for an active network connection (defaults to 0).
114 // If zero or positive, keep-alives are enabled.
115 // If negative, keep-alives are disabled.
116 KeepAlive time.Duration
117
118 // LocalAddr is the local address to use when dialing an
119 // address. The address must be of a compatible type for the
120 // network being dialed.
121 // If nil, a local address is automatically chosen.
122 LocalAddr net.Addr
123
124 Proxy struct {
125 // Whether or not to use proxy when connecting to the broker
126 // (defaults to false).
127 Enable bool
128 // The proxy dialer to use enabled (defaults to nil).
129 Dialer proxy.Dialer
130 }
131 }
132
133 // Metadata is the namespace for metadata management properties used by the
134 // Client, and shared by the Producer/Consumer.
135 Metadata struct {
136 Retry struct {
137 // The total number of times to retry a metadata request when the
138 // cluster is in the middle of a leader election (default 3).
139 Max int
140 // How long to wait for leader election to occur before retrying
141 // (default 250ms). Similar to the JVM's `retry.backoff.ms`.
142 Backoff time.Duration
143 // Called to compute backoff time dynamically. Useful for implementing
144 // more sophisticated backoff strategies. This takes precedence over
145 // `Backoff` if set.
146 BackoffFunc func(retries, maxRetries int) time.Duration
147 }
148 // How frequently to refresh the cluster metadata in the background.
149 // Defaults to 10 minutes. Set to 0 to disable. Similar to
150 // `topic.metadata.refresh.interval.ms` in the JVM version.
151 RefreshFrequency time.Duration
152
153 // Whether to maintain a full set of metadata for all topics, or just
154 // the minimal set that has been necessary so far. The full set is simpler
155 // and usually more convenient, but can take up a substantial amount of
156 // memory if you have many topics and partitions. Defaults to true.
157 Full bool
158
159 // How long to wait for a successful metadata response.
160 // Disabled by default which means a metadata request against an unreachable
161 // cluster (all brokers are unreachable or unresponsive) can take up to
162 // `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max`
163 // to fail.
164 Timeout time.Duration
165
166 // Whether to allow auto-create topics in metadata refresh. If set to true,
167 // the broker may auto-create topics that we requested which do not already exist,
168 // if it is configured to do so (`auto.create.topics.enable` is true). Defaults to true.
169 AllowAutoTopicCreation bool
170
171 // SingleFlight controls whether to send a single metadata refresh request at a given time
172 // or whether to allow anyone to refresh the metadata concurrently.
173 // If this is set to true and the client needs to refresh the metadata from different goroutines,
174 // the requests will be batched together so that a single refresh is sent at a time.
175 // See https://github.com/IBM/sarama/issues/3224 for more details.
176 // SingleFlight defaults to true.
177 SingleFlight bool
178 }
179
180 // Producer is the namespace for configuration related to producing messages,
181 // used by the Producer.
182 Producer struct {
183 // The maximum permitted size of a message (defaults to 1000000). Should be
184 // set equal to or smaller than the broker's `message.max.bytes`.
185 MaxMessageBytes int
186 // The level of acknowledgement reliability needed from the broker (defaults
187 // to WaitForLocal). Equivalent to the `request.required.acks` setting of the
188 // JVM producer.
189 RequiredAcks RequiredAcks
190 // The maximum duration the broker will wait the receipt of the number of
191 // RequiredAcks (defaults to 10 seconds). This is only relevant when
192 // RequiredAcks is set to WaitForAll or a number > 1. Only supports
193 // millisecond resolution, nanoseconds will be truncated. Equivalent to
194 // the JVM producer's `request.timeout.ms` setting.
195 Timeout time.Duration
196 // The type of compression to use on messages (defaults to no compression).
197 // Similar to `compression.codec` setting of the JVM producer.
198 Compression CompressionCodec
199 // The level of compression to use on messages. The meaning depends
200 // on the actual compression type used and defaults to default compression
201 // level for the codec.
202 CompressionLevel int
203 // Generates partitioners for choosing the partition to send messages to
204 // (defaults to hashing the message key). Similar to the `partitioner.class`
205 // setting for the JVM producer.
206 Partitioner PartitionerConstructor
207 // If enabled, the producer will ensure that exactly one copy of each message is
208 // written.
209 Idempotent bool
210 // Transaction specify
211 Transaction struct {
212 // Used in transactions to identify an instance of a producer through restarts
213 ID string
214 // Amount of time a transaction can remain unresolved (neither committed nor aborted)
215 // default is 1 min
216 Timeout time.Duration
217
218 Retry struct {
219 // The total number of times to retry sending a message (default 50).
220 // Similar to the `message.send.max.retries` setting of the JVM producer.
221 Max int
222 // How long to wait for the cluster to settle between retries
223 // (default 10ms). Similar to the `retry.backoff.ms` setting of the
224 // JVM producer.
225 Backoff time.Duration
226 // Called to compute backoff time dynamically. Useful for implementing
227 // more sophisticated backoff strategies. This takes precedence over
228 // `Backoff` if set.
229 BackoffFunc func(retries, maxRetries int) time.Duration
230 }
231 }
232
233 // Return specifies what channels will be populated. If they are set to true,
234 // you must read from the respective channels to prevent deadlock. If,
235 // however, this config is used to create a `SyncProducer`, both must be set
236 // to true and you shall not read from the channels since the producer does
237 // this internally.
238 Return struct {
239 // If enabled, successfully delivered messages will be returned on the
240 // Successes channel (default disabled).
241 Successes bool
242
243 // If enabled, messages that failed to deliver will be returned on the
244 // Errors channel, including error (default enabled).
245 Errors bool
246 }
247
248 // The following config options control how often messages are batched up and
249 // sent to the broker. By default, messages are sent as fast as possible, and
250 // all messages received while the current batch is in-flight are placed
251 // into the subsequent batch.
252 Flush struct {
253 // The best-effort number of bytes needed to trigger a flush. Use the
254 // global sarama.MaxRequestSize to set a hard upper limit.
255 Bytes int
256 // The best-effort number of messages needed to trigger a flush. Use
257 // `MaxMessages` to set a hard upper limit.
258 Messages int
259 // The best-effort frequency of flushes. Equivalent to
260 // `queue.buffering.max.ms` setting of JVM producer.
261 Frequency time.Duration
262 // The maximum number of messages the producer will send in a single
263 // broker request. Defaults to 0 for unlimited. Similar to
264 // `queue.buffering.max.messages` in the JVM producer.
265 MaxMessages int
266 }
267
268 Retry struct {
269 // The total number of times to retry sending a message (default 3).
270 // Similar to the `message.send.max.retries` setting of the JVM producer.
271 Max int
272 // How long to wait for the cluster to settle between retries
273 // (default 100ms). Similar to the `retry.backoff.ms` setting of the
274 // JVM producer.
275 Backoff time.Duration
276 // Called to compute backoff time dynamically. Useful for implementing
277 // more sophisticated backoff strategies. This takes precedence over
278 // `Backoff` if set.
279 BackoffFunc func(retries, maxRetries int) time.Duration
280 // The maximum length of the bridging buffer between `input` and `retries` channels
281 // in AsyncProducer#retryHandler.
282 // The limit is to prevent this buffer from overflowing or causing OOM.
283 // Defaults to 0 for unlimited.
284 // Any value between 0 and 4096 is pushed to 4096.
285 // A zero or negative value indicates unlimited.
286 MaxBufferLength int
287 // The maximum total byte size of messages in the bridging buffer between `input`
288 // and `retries` channels in AsyncProducer#retryHandler.
289 // This limit prevents the buffer from consuming excessive memory.
290 // Defaults to 0 for unlimited.
291 // Any value between 0 and 32 MB is pushed to 32 MB.
292 // A zero or negative value indicates unlimited.
293 MaxBufferBytes int64
294 }
295
296 // Interceptors to be called when the producer dispatcher reads the
297 // message for the first time. Interceptors allows to intercept and
298 // possible mutate the message before they are published to Kafka
299 // cluster. *ProducerMessage modified by the first interceptor's
300 // OnSend() is passed to the second interceptor OnSend(), and so on in
301 // the interceptor chain.
302 Interceptors []ProducerInterceptor
303 }
304
305 // Consumer is the namespace for configuration related to consuming messages,
306 // used by the Consumer.
307 Consumer struct {
308 // Group is the namespace for configuring consumer group.
309 Group struct {
310 Session struct {
311 // The timeout used to detect consumer failures when using Kafka's group management facility.
312 // The consumer sends periodic heartbeats to indicate its liveness to the broker.
313 // If no heartbeats are received by the broker before the expiration of this session timeout,
314 // then the broker will remove this consumer from the group and initiate a rebalance.
315 // Note that the value must be in the allowable range as configured in the broker configuration
316 // by `group.min.session.timeout.ms` and `group.max.session.timeout.ms` (default 10s)
317 Timeout time.Duration
318 }
319 Heartbeat struct {
320 // The expected time between heartbeats to the consumer coordinator when using Kafka's group
321 // management facilities. Heartbeats are used to ensure that the consumer's session stays active and
322 // to facilitate rebalancing when new consumers join or leave the group.
323 // The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no
324 // higher than 1/3 of that value.
325 // It can be adjusted even lower to control the expected time for normal rebalances (default 3s)
326 Interval time.Duration
327 }
328 Rebalance struct {
329 // Strategy for allocating topic partitions to members.
330 // Deprecated: Strategy exists for historical compatibility
331 // and should not be used. Please use GroupStrategies.
332 Strategy BalanceStrategy
333
334 // GroupStrategies is the priority-ordered list of client-side consumer group
335 // balancing strategies that will be offered to the coordinator. The first
336 // strategy that all group members support will be chosen by the leader.
337 // default: [ NewBalanceStrategyRange() ]
338 GroupStrategies []BalanceStrategy
339
340 // The maximum allowed time for each worker to join the group once a rebalance has begun.
341 // This is basically a limit on the amount of time needed for all tasks to flush any pending
342 // data and commit offsets. If the timeout is exceeded, then the worker will be removed from
343 // the group, which will cause offset commit failures (default 60s).
344 Timeout time.Duration
345
346 Retry struct {
347 // When a new consumer joins a consumer group the set of consumers attempt to "rebalance"
348 // the load to assign partitions to each consumer. If the set of consumers changes while
349 // this assignment is taking place the rebalance will fail and retry. This setting controls
350 // the maximum number of attempts before giving up (default 4).
351 Max int
352 // Backoff time between retries during rebalance (default 2s)
353 Backoff time.Duration
354 }
355 }
356 Member struct {
357 // Custom metadata to include when joining the group. The user data for all joined members
358 // can be retrieved by sending a DescribeGroupRequest to the broker that is the
359 // coordinator for the group.
360 UserData []byte
361 }
362
363 // support KIP-345
364 InstanceId string
365
366 // If true, consumer offsets will be automatically reset to configured Initial value
367 // if the fetched consumer offset is out of range of available offsets. Out of range
368 // can happen if the data has been deleted from the server, or during situations of
369 // under-replication where a replica does not have all the data yet. It can be
370 // dangerous to reset the offset automatically, particularly in the latter case. Defaults
371 // to true to maintain existing behavior.
372 ResetInvalidOffsets bool
373 }
374
375 Retry struct {
376 // How long to wait after a failing to read from a partition before
377 // trying again (default 2s).
378 Backoff time.Duration
379 // Called to compute backoff time dynamically. Useful for implementing
380 // more sophisticated backoff strategies. This takes precedence over
381 // `Backoff` if set.
382 BackoffFunc func(retries int) time.Duration
383 }
384
385 // Fetch is the namespace for controlling how many bytes are retrieved by any
386 // given request.
387 Fetch struct {
388 // The minimum number of message bytes to fetch in a request - the broker
389 // will wait until at least this many are available. The default is 1,
390 // as 0 causes the consumer to spin when no messages are available.
391 // Equivalent to the JVM's `fetch.min.bytes`.
392 Min int32
393 // The default number of message bytes to fetch from the broker in each
394 // request (default 1MB). This should be larger than the majority of
395 // your messages, or else the consumer will spend a lot of time
396 // negotiating sizes and not actually consuming. Similar to the JVM's
397 // `fetch.message.max.bytes`.
398 Default int32
399 // The maximum number of message bytes to fetch from the broker in a
400 // single request. Messages larger than this will return
401 // ErrMessageTooLarge and will not be consumable, so you must be sure
402 // this is at least as large as your largest message. Defaults to 0
403 // (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
404 // global `sarama.MaxResponseSize` still applies.
405 Max int32
406 }
407 // The maximum amount of time the broker will wait for Consumer.Fetch.Min
408 // bytes to become available before it returns fewer than that anyways. The
409 // default is 250ms, since 0 causes the consumer to spin when no events are
410 // available. 100-500ms is a reasonable range for most cases. Kafka only
411 // supports precision up to milliseconds; nanoseconds will be truncated.
412 // Equivalent to the JVM's `fetch.max.wait.ms`.
413 MaxWaitTime time.Duration
414
415 // The maximum amount of time the consumer expects a message takes to
416 // process for the user. If writing to the Messages channel takes longer
417 // than this, that partition will stop fetching more messages until it
418 // can proceed again.
419 // Note that, since the Messages channel is buffered, the actual grace time is
420 // (MaxProcessingTime * ChannelBufferSize). Defaults to 100ms.
421 // If a message is not written to the Messages channel between two ticks
422 // of the expiryTicker then a timeout is detected.
423 // Using a ticker instead of a timer to detect timeouts should typically
424 // result in many fewer calls to Timer functions which may result in a
425 // significant performance improvement if many messages are being sent
426 // and timeouts are infrequent.
427 // The disadvantage of using a ticker instead of a timer is that
428 // timeouts will be less accurate. That is, the effective timeout could
429 // be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
430 // example, if `MaxProcessingTime` is 100ms then a delay of 180ms
431 // between two messages being sent may not be recognized as a timeout.
432 MaxProcessingTime time.Duration
433
434 // Return specifies what channels will be populated. If they are set to true,
435 // you must read from them to prevent deadlock.
436 Return struct {
437 // If enabled, any errors that occurred while consuming are returned on
438 // the Errors channel (default disabled).
439 Errors bool
440 }
441
442 // Offsets specifies configuration for how and when to commit consumed
443 // offsets. This currently requires the manual use of an OffsetManager
444 // but will eventually be automated.
445 Offsets struct {
446 // Deprecated: CommitInterval exists for historical compatibility
447 // and should not be used. Please use Consumer.Offsets.AutoCommit
448 CommitInterval time.Duration
449
450 // AutoCommit specifies configuration for commit messages automatically.
451 AutoCommit struct {
452 // Whether or not to auto-commit updated offsets back to the broker.
453 // (default enabled).
454 Enable bool
455
456 // How frequently to commit updated offsets. Ineffective unless
457 // auto-commit is enabled (default 1s)
458 Interval time.Duration
459 }
460
461 // The initial offset to use if no offset was previously committed.
462 // Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
463 Initial int64
464
465 // The retention duration for committed offsets. If zero, disabled
466 // (in which case the `offsets.retention.minutes` option on the
467 // broker will be used). Kafka only supports precision up to
468 // milliseconds; nanoseconds will be truncated. Requires Kafka
469 // broker version 0.9.0 or later.
470 // (default is 0: disabled).
471 Retention time.Duration
472
473 Retry struct {
474 // The total number of times to retry failing commit
475 // requests during OffsetManager shutdown (default 3).
476 Max int
477 }
478 }
479
480 // IsolationLevel support 2 mode:
481 // - use `ReadUncommitted` (default) to consume and return all messages in message channel
482 // - use `ReadCommitted` to hide messages that are part of an aborted transaction
483 IsolationLevel IsolationLevel
484
485 // Interceptors to be called just before the record is sent to the
486 // messages channel. Interceptors allows to intercept and possible
487 // mutate the message before they are returned to the client.
488 // *ConsumerMessage modified by the first interceptor's OnConsume() is
489 // passed to the second interceptor OnConsume(), and so on in the
490 // interceptor chain.
491 Interceptors []ConsumerInterceptor
492 }
493
494 // A user-provided string sent with every request to the brokers for logging,
495 // debugging, and auditing purposes. Defaults to "sarama", but you should
496 // probably set it to something specific to your application.
497 ClientID string
498 // A rack identifier for this client. This can be any string value which
499 // indicates where this client is physically located.
500 // It corresponds with the broker config 'broker.rack'
501 RackID string
502 // The number of events to buffer in internal and external channels. This
503 // permits the producer and consumer to continue processing some messages
504 // in the background while user code is working, greatly improving throughput.
505 // Defaults to 256.
506 ChannelBufferSize int
507 // ApiVersionsRequest determines whether Sarama should send an
508 // ApiVersionsRequest message to each broker as part of its initial
509 // connection. This defaults to `true` to match the official Java client
510 // and most 3rdparty ones.
511 ApiVersionsRequest bool
512 // The version of Kafka that Sarama will assume it is running against.
513 // Defaults to the oldest supported stable version. Since Kafka provides
514 // backwards-compatibility, setting it to a version older than you have
515 // will not break anything, although it may prevent you from using the
516 // latest features. Setting it to a version greater than you are actually
517 // running may lead to random breakage.
518 Version KafkaVersion
519 // The registry to define metrics into.
520 // Defaults to a local registry.
521 // If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true"
522 // prior to starting Sarama.
523 // See Examples on how to use the metrics registry
524 MetricRegistry metrics.Registry
525}
526
527// NewConfig returns a new configuration instance with sane defaults.
528func NewConfig() *Config {
529 c := &Config{}
530
531 c.Admin.Retry.Max = 5
532 c.Admin.Retry.Backoff = 100 * time.Millisecond
533 c.Admin.Timeout = 3 * time.Second
534
535 c.Net.MaxOpenRequests = 5
536 c.Net.DialTimeout = 30 * time.Second
537 c.Net.ReadTimeout = 30 * time.Second
538 c.Net.WriteTimeout = 30 * time.Second
539 c.Net.SASL.Handshake = true
540 c.Net.SASL.Version = SASLHandshakeV1
541
542 c.Metadata.Retry.Max = 3
543 c.Metadata.Retry.Backoff = 250 * time.Millisecond
544 c.Metadata.RefreshFrequency = 10 * time.Minute
545 c.Metadata.Full = true
546 c.Metadata.AllowAutoTopicCreation = true
547 c.Metadata.SingleFlight = true
548
549 c.Producer.MaxMessageBytes = 1024 * 1024
550 c.Producer.RequiredAcks = WaitForLocal
551 c.Producer.Timeout = 10 * time.Second
552 c.Producer.Partitioner = NewHashPartitioner
553 c.Producer.Retry.Max = 3
554 c.Producer.Retry.Backoff = 100 * time.Millisecond
555 c.Producer.Return.Errors = true
556 c.Producer.CompressionLevel = CompressionLevelDefault
557
558 c.Producer.Transaction.Timeout = 1 * time.Minute
559 c.Producer.Transaction.Retry.Max = 50
560 c.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond
561
562 c.Consumer.Fetch.Min = 1
563 c.Consumer.Fetch.Default = 1024 * 1024
564 c.Consumer.Retry.Backoff = 2 * time.Second
565 c.Consumer.MaxWaitTime = 500 * time.Millisecond
566 c.Consumer.MaxProcessingTime = 100 * time.Millisecond
567 c.Consumer.Return.Errors = false
568 c.Consumer.Offsets.AutoCommit.Enable = true
569 c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
570 c.Consumer.Offsets.Initial = OffsetNewest
571 c.Consumer.Offsets.Retry.Max = 3
572
573 c.Consumer.Group.Session.Timeout = 10 * time.Second
574 c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
575 c.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{NewBalanceStrategyRange()}
576 c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
577 c.Consumer.Group.Rebalance.Retry.Max = 4
578 c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
579 c.Consumer.Group.ResetInvalidOffsets = true
580
581 c.ClientID = defaultClientID
582 c.ChannelBufferSize = 256
583 c.ApiVersionsRequest = true
584 c.Version = DefaultVersion
585 c.MetricRegistry = metrics.NewRegistry()
586
587 return c
588}
589
590// Validate checks a Config instance. It will return a
591// ConfigurationError if the specified values don't make sense.
592//
593//nolint:gocyclo // This function's cyclomatic complexity has go beyond 100
594func (c *Config) Validate() error {
595 // some configuration values should be warned on but not fail completely, do those first
596 if !c.Net.TLS.Enable && c.Net.TLS.Config != nil {
597 Logger.Println("Net.TLS is disabled but a non-nil configuration was provided.")
598 }
599 if !c.Net.SASL.Enable {
600 if c.Net.SASL.User != "" {
601 Logger.Println("Net.SASL is disabled but a non-empty username was provided.")
602 }
603 if c.Net.SASL.Password != "" {
604 Logger.Println("Net.SASL is disabled but a non-empty password was provided.")
605 }
606 }
607 if c.Producer.RequiredAcks > 1 {
608 Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
609 }
610 if c.Producer.MaxMessageBytes >= int(MaxRequestSize) {
611 Logger.Println("Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored.")
612 }
613 if c.Producer.Flush.Bytes >= int(MaxRequestSize) {
614 Logger.Println("Producer.Flush.Bytes must be smaller than MaxRequestSize; it will be ignored.")
615 }
616 if (c.Producer.Flush.Bytes > 0 || c.Producer.Flush.Messages > 0) && c.Producer.Flush.Frequency == 0 {
617 Logger.Println("Producer.Flush: Bytes or Messages are set, but Frequency is not; messages may not get flushed.")
618 }
619 if c.Producer.Timeout%time.Millisecond != 0 {
620 Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
621 }
622 if c.Consumer.MaxWaitTime < 100*time.Millisecond {
623 Logger.Println("Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
624 }
625 if c.Consumer.MaxWaitTime%time.Millisecond != 0 {
626 Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
627 }
628 if c.Consumer.Offsets.Retention%time.Millisecond != 0 {
629 Logger.Println("Consumer.Offsets.Retention only supports millisecond precision; nanoseconds will be truncated.")
630 }
631 if c.Consumer.Group.Session.Timeout%time.Millisecond != 0 {
632 Logger.Println("Consumer.Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
633 }
634 if c.Consumer.Group.Heartbeat.Interval%time.Millisecond != 0 {
635 Logger.Println("Consumer.Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
636 }
637 if c.Consumer.Group.Rebalance.Timeout%time.Millisecond != 0 {
638 Logger.Println("Consumer.Group.Rebalance.Timeout only supports millisecond precision; nanoseconds will be truncated.")
639 }
640 if c.ClientID == defaultClientID {
641 Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.")
642 }
643
644 // validate Net values
645 switch {
646 case c.Net.MaxOpenRequests <= 0:
647 return ConfigurationError("Net.MaxOpenRequests must be > 0")
648 case c.Net.DialTimeout <= 0:
649 return ConfigurationError("Net.DialTimeout must be > 0")
650 case c.Net.ReadTimeout <= 0:
651 return ConfigurationError("Net.ReadTimeout must be > 0")
652 case c.Net.WriteTimeout <= 0:
653 return ConfigurationError("Net.WriteTimeout must be > 0")
654 case c.Net.SASL.Enable:
655 if c.Net.SASL.Mechanism == "" {
656 c.Net.SASL.Mechanism = SASLTypePlaintext
657 }
658 if c.Net.SASL.Version == SASLHandshakeV0 && c.ApiVersionsRequest {
659 return ConfigurationError("ApiVersionsRequest must be disabled when SASL v0 is enabled")
660 }
661 switch c.Net.SASL.Mechanism {
662 case SASLTypePlaintext:
663 if c.Net.SASL.User == "" {
664 return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
665 }
666 if c.Net.SASL.Password == "" {
667 return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
668 }
669 case SASLTypeOAuth:
670 if c.Net.SASL.TokenProvider == nil {
671 return ConfigurationError("An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider")
672 }
673 case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
674 if c.Net.SASL.User == "" {
675 return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
676 }
677 if c.Net.SASL.Password == "" {
678 return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
679 }
680 if c.Net.SASL.SCRAMClientGeneratorFunc == nil {
681 return ConfigurationError("A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc")
682 }
683 case SASLTypeGSSAPI:
684 if c.Net.SASL.GSSAPI.ServiceName == "" {
685 return ConfigurationError("Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used")
686 }
687
688 switch c.Net.SASL.GSSAPI.AuthType {
689 case KRB5_USER_AUTH:
690 if c.Net.SASL.GSSAPI.Password == "" {
691 return ConfigurationError("Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
692 "mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH")
693 }
694 case KRB5_KEYTAB_AUTH:
695 if c.Net.SASL.GSSAPI.KeyTabPath == "" {
696 return ConfigurationError("Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
697 " and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH")
698 }
699 case KRB5_CCACHE_AUTH:
700 if c.Net.SASL.GSSAPI.CCachePath == "" {
701 return ConfigurationError("Net.SASL.GSSAPI.CCachePath must not be empty when GSS-API mechanism is used" +
702 " and Net.SASL.GSSAPI.AuthType = KRB5_CCACHE_AUTH")
703 }
704 default:
705 return ConfigurationError("Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH, KRB5_KEYTAB_AUTH, and KRB5_CCACHE_AUTH")
706 }
707
708 if c.Net.SASL.GSSAPI.KerberosConfigPath == "" {
709 return ConfigurationError("Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used")
710 }
711 if c.Net.SASL.GSSAPI.Username == "" {
712 return ConfigurationError("Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used")
713 }
714 if c.Net.SASL.GSSAPI.Realm == "" {
715 return ConfigurationError("Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used")
716 }
717 default:
718 msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s`, `%s` and `%s`",
719 SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512, SASLTypeGSSAPI)
720 return ConfigurationError(msg)
721 }
722 }
723
724 // validate the Admin values
725 switch {
726 case c.Admin.Timeout <= 0:
727 return ConfigurationError("Admin.Timeout must be > 0")
728 }
729
730 // validate the Metadata values
731 switch {
732 case c.Metadata.Retry.Max < 0:
733 return ConfigurationError("Metadata.Retry.Max must be >= 0")
734 case c.Metadata.Retry.Backoff < 0:
735 return ConfigurationError("Metadata.Retry.Backoff must be >= 0")
736 case c.Metadata.RefreshFrequency < 0:
737 return ConfigurationError("Metadata.RefreshFrequency must be >= 0")
738 }
739
740 // validate the Producer values
741 switch {
742 case c.Producer.MaxMessageBytes <= 0:
743 return ConfigurationError("Producer.MaxMessageBytes must be > 0")
744 case c.Producer.RequiredAcks < -1:
745 return ConfigurationError("Producer.RequiredAcks must be >= -1")
746 case c.Producer.Timeout <= 0:
747 return ConfigurationError("Producer.Timeout must be > 0")
748 case c.Producer.Partitioner == nil:
749 return ConfigurationError("Producer.Partitioner must not be nil")
750 case c.Producer.Flush.Bytes < 0:
751 return ConfigurationError("Producer.Flush.Bytes must be >= 0")
752 case c.Producer.Flush.Messages < 0:
753 return ConfigurationError("Producer.Flush.Messages must be >= 0")
754 case c.Producer.Flush.Frequency < 0:
755 return ConfigurationError("Producer.Flush.Frequency must be >= 0")
756 case c.Producer.Flush.MaxMessages < 0:
757 return ConfigurationError("Producer.Flush.MaxMessages must be >= 0")
758 case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages:
759 return ConfigurationError("Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set")
760 case c.Producer.Retry.Max < 0:
761 return ConfigurationError("Producer.Retry.Max must be >= 0")
762 case c.Producer.Retry.Backoff < 0:
763 return ConfigurationError("Producer.Retry.Backoff must be >= 0")
764 }
765
766 if c.Producer.Compression == CompressionLZ4 && !c.Version.IsAtLeast(V0_10_0_0) {
767 return ConfigurationError("lz4 compression requires Version >= V0_10_0_0")
768 }
769
770 if c.Producer.Compression == CompressionGZIP {
771 if c.Producer.CompressionLevel != CompressionLevelDefault {
772 if _, err := gzip.NewWriterLevel(io.Discard, c.Producer.CompressionLevel); err != nil {
773 return ConfigurationError(fmt.Sprintf("gzip compression does not work with level %d: %v", c.Producer.CompressionLevel, err))
774 }
775 }
776 }
777
778 if c.Producer.Compression == CompressionZSTD && !c.Version.IsAtLeast(V2_1_0_0) {
779 return ConfigurationError("zstd compression requires Version >= V2_1_0_0")
780 }
781
782 if c.Producer.Idempotent {
783 if !c.Version.IsAtLeast(V0_11_0_0) {
784 return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
785 }
786 if c.Producer.Retry.Max == 0 {
787 return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
788 }
789 if c.Producer.RequiredAcks != WaitForAll {
790 return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
791 }
792 if c.Net.MaxOpenRequests > 1 {
793 return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
794 }
795 }
796
797 if c.Producer.Transaction.ID != "" && !c.Producer.Idempotent {
798 return ConfigurationError("Transactional producer requires Idempotent to be true")
799 }
800
801 // validate the Consumer values
802 switch {
803 case c.Consumer.Fetch.Min <= 0:
804 return ConfigurationError("Consumer.Fetch.Min must be > 0")
805 case c.Consumer.Fetch.Default <= 0:
806 return ConfigurationError("Consumer.Fetch.Default must be > 0")
807 case c.Consumer.Fetch.Max < 0:
808 return ConfigurationError("Consumer.Fetch.Max must be >= 0")
809 case c.Consumer.MaxWaitTime < 1*time.Millisecond:
810 return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
811 case c.Consumer.MaxProcessingTime <= 0:
812 return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
813 case c.Consumer.Retry.Backoff < 0:
814 return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
815 case c.Consumer.Offsets.AutoCommit.Interval <= 0:
816 return ConfigurationError("Consumer.Offsets.AutoCommit.Interval must be > 0")
817 case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
818 return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
819 case c.Consumer.Offsets.Retry.Max < 0:
820 return ConfigurationError("Consumer.Offsets.Retry.Max must be >= 0")
821 case c.Consumer.IsolationLevel != ReadUncommitted && c.Consumer.IsolationLevel != ReadCommitted:
822 return ConfigurationError("Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted")
823 }
824
825 if c.Consumer.Offsets.CommitInterval != 0 {
826 Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" +
827 " and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored")
828 }
829 if c.Consumer.Group.Rebalance.Strategy != nil {
830 Logger.Println("Deprecation warning: Consumer.Group.Rebalance.Strategy exists for historical compatibility" +
831 " and should not be used. Please use Consumer.Group.Rebalance.GroupStrategies")
832 }
833
834 // validate IsolationLevel
835 if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
836 return ConfigurationError("ReadCommitted requires Version >= V0_11_0_0")
837 }
838
839 // validate the Consumer Group values
840 switch {
841 case c.Consumer.Group.Session.Timeout <= 2*time.Millisecond:
842 return ConfigurationError("Consumer.Group.Session.Timeout must be >= 2ms")
843 case c.Consumer.Group.Heartbeat.Interval < 1*time.Millisecond:
844 return ConfigurationError("Consumer.Group.Heartbeat.Interval must be >= 1ms")
845 case c.Consumer.Group.Heartbeat.Interval >= c.Consumer.Group.Session.Timeout:
846 return ConfigurationError("Consumer.Group.Heartbeat.Interval must be < Consumer.Group.Session.Timeout")
847 case c.Consumer.Group.Rebalance.Strategy == nil && len(c.Consumer.Group.Rebalance.GroupStrategies) == 0:
848 return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies or Consumer.Group.Rebalance.Strategy must not be empty")
849 case c.Consumer.Group.Rebalance.Timeout <= time.Millisecond:
850 return ConfigurationError("Consumer.Group.Rebalance.Timeout must be >= 1ms")
851 case c.Consumer.Group.Rebalance.Retry.Max < 0:
852 return ConfigurationError("Consumer.Group.Rebalance.Retry.Max must be >= 0")
853 case c.Consumer.Group.Rebalance.Retry.Backoff < 0:
854 return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0")
855 }
856
857 for _, strategy := range c.Consumer.Group.Rebalance.GroupStrategies {
858 if strategy == nil {
859 return ConfigurationError("elements in Consumer.Group.Rebalance.Strategies must not be empty")
860 }
861 }
862
863 if c.Consumer.Group.InstanceId != "" {
864 if !c.Version.IsAtLeast(V2_3_0_0) {
865 return ConfigurationError("Consumer.Group.InstanceId need Version >= 2.3")
866 }
867 if err := validateGroupInstanceId(c.Consumer.Group.InstanceId); err != nil {
868 return err
869 }
870 }
871
872 // validate misc shared values
873 switch {
874 case c.ChannelBufferSize < 0:
875 return ConfigurationError("ChannelBufferSize must be >= 0")
876 }
877
878 // only validate clientID locally for Kafka versions before KIP-190 was implemented
879 if !c.Version.IsAtLeast(V1_0_0_0) && !validClientID.MatchString(c.ClientID) {
880 return ConfigurationError(fmt.Sprintf("ClientID value %q is not valid for Kafka versions before 1.0.0", c.ClientID))
881 }
882
883 return nil
884}
885
886func (c *Config) getDialer() proxy.Dialer {
887 if c.Net.Proxy.Enable {
888 Logger.Println("using proxy")
889 return c.Net.Proxy.Dialer
890 } else {
891 return &net.Dialer{
892 Timeout: c.Net.DialTimeout,
893 KeepAlive: c.Net.KeepAlive,
894 LocalAddr: c.Net.LocalAddr,
895 }
896 }
897}
898
899const MAX_GROUP_INSTANCE_ID_LENGTH = 249
900
901var GROUP_INSTANCE_ID_REGEXP = regexp.MustCompile(`^[0-9a-zA-Z\._\-]+$`)
902
903func validateGroupInstanceId(id string) error {
904 if id == "" {
905 return ConfigurationError("Group instance id must be non-empty string")
906 }
907 if id == "." || id == ".." {
908 return ConfigurationError(`Group instance id cannot be "." or ".."`)
909 }
910 if len(id) > MAX_GROUP_INSTANCE_ID_LENGTH {
911 return ConfigurationError(fmt.Sprintf(`Group instance id cannot be longer than %v, characters: %s`, MAX_GROUP_INSTANCE_ID_LENGTH, id))
912 }
913 if !GROUP_INSTANCE_ID_REGEXP.MatchString(id) {
914 return ConfigurationError(fmt.Sprintf(`Group instance id %s is illegal, it contains a character other than, '.', '_' and '-'`, id))
915 }
916 return nil
917}