blob: 4f512f2a91d6dff478271168f319be4cf408a3fe [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "strconv"
5 "strings"
6 "sync"
7
8 "github.com/rcrowley/go-metrics"
9)
10
11// Use exponentially decaying reservoir for sampling histograms with the same defaults as the Java library:
12// 1028 elements, which offers a 99.9% confidence level with a 5% margin of error assuming a normal distribution,
13// and an alpha factor of 0.015, which heavily biases the reservoir to the past 5 minutes of measurements.
14// See https://github.com/dropwizard/metrics/blob/v3.1.0/metrics-core/src/main/java/com/codahale/metrics/ExponentiallyDecayingReservoir.java#L38
15const (
16 metricsReservoirSize = 1028
17 metricsAlphaFactor = 0.015
18)
19
20func getOrRegisterHistogram(name string, r metrics.Registry) metrics.Histogram {
21 return r.GetOrRegister(name, func() metrics.Histogram {
22 return metrics.NewHistogram(metrics.NewExpDecaySample(metricsReservoirSize, metricsAlphaFactor))
23 }).(metrics.Histogram)
24}
25
26func getMetricNameForBroker(name string, broker *Broker) string {
27 // Use broker id like the Java client as it does not contain '.' or ':' characters that
28 // can be interpreted as special character by monitoring tool (e.g. Graphite)
29 return name + "-for-broker-" + strconv.FormatInt(int64(broker.ID()), 10)
30}
31
32func getMetricNameForTopic(name string, topic string) string {
33 // Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
34 // cf. KAFKA-1902 and KAFKA-2337
35 return name + "-for-topic-" + strings.ReplaceAll(topic, ".", "_")
36}
37
38func getOrRegisterTopicMeter(name string, topic string, r metrics.Registry) metrics.Meter {
39 return metrics.GetOrRegisterMeter(getMetricNameForTopic(name, topic), r)
40}
41
42func getOrRegisterTopicHistogram(name string, topic string, r metrics.Registry) metrics.Histogram {
43 return getOrRegisterHistogram(getMetricNameForTopic(name, topic), r)
44}
45
46// cleanupRegistry is an implementation of metrics.Registry that allows
47// to unregister from the parent registry only those metrics
48// that have been registered in cleanupRegistry
49type cleanupRegistry struct {
50 parent metrics.Registry
51 metrics map[string]struct{}
52 mutex sync.RWMutex
53}
54
55func newCleanupRegistry(parent metrics.Registry) metrics.Registry {
56 return &cleanupRegistry{
57 parent: parent,
58 metrics: map[string]struct{}{},
59 }
60}
61
62func (r *cleanupRegistry) Each(fn func(string, interface{})) {
63 r.mutex.RLock()
64 defer r.mutex.RUnlock()
65 wrappedFn := func(name string, iface interface{}) {
66 if _, ok := r.metrics[name]; ok {
67 fn(name, iface)
68 }
69 }
70 r.parent.Each(wrappedFn)
71}
72
73func (r *cleanupRegistry) Get(name string) interface{} {
74 r.mutex.RLock()
75 defer r.mutex.RUnlock()
76 if _, ok := r.metrics[name]; ok {
77 return r.parent.Get(name)
78 }
79 return nil
80}
81
82func (r *cleanupRegistry) GetOrRegister(name string, metric interface{}) interface{} {
83 r.mutex.Lock()
84 defer r.mutex.Unlock()
85 r.metrics[name] = struct{}{}
86 return r.parent.GetOrRegister(name, metric)
87}
88
89func (r *cleanupRegistry) Register(name string, metric interface{}) error {
90 r.mutex.Lock()
91 defer r.mutex.Unlock()
92 r.metrics[name] = struct{}{}
93 return r.parent.Register(name, metric)
94}
95
96func (r *cleanupRegistry) RunHealthchecks() {
97 r.parent.RunHealthchecks()
98}
99
100func (r *cleanupRegistry) GetAll() map[string]map[string]interface{} {
101 return r.parent.GetAll()
102}
103
104func (r *cleanupRegistry) Unregister(name string) {
105 r.mutex.Lock()
106 defer r.mutex.Unlock()
107 if _, ok := r.metrics[name]; ok {
108 delete(r.metrics, name)
109 r.parent.Unregister(name)
110 }
111}
112
113func (r *cleanupRegistry) UnregisterAll() {
114 r.mutex.Lock()
115 defer r.mutex.Unlock()
116 for name := range r.metrics {
117 delete(r.metrics, name)
118 r.parent.Unregister(name)
119 }
120}