| Abhay Kumar | 40252eb | 2025-10-13 13:25:53 +0000 | [diff] [blame^] | 1 | package sarama |
| 2 | |
| 3 | import ( |
| 4 | "strconv" |
| 5 | "strings" |
| 6 | "sync" |
| 7 | |
| 8 | "github.com/rcrowley/go-metrics" |
| 9 | ) |
| 10 | |
| 11 | // Use exponentially decaying reservoir for sampling histograms with the same defaults as the Java library: |
| 12 | // 1028 elements, which offers a 99.9% confidence level with a 5% margin of error assuming a normal distribution, |
| 13 | // and an alpha factor of 0.015, which heavily biases the reservoir to the past 5 minutes of measurements. |
| 14 | // See https://github.com/dropwizard/metrics/blob/v3.1.0/metrics-core/src/main/java/com/codahale/metrics/ExponentiallyDecayingReservoir.java#L38 |
| 15 | const ( |
| 16 | metricsReservoirSize = 1028 |
| 17 | metricsAlphaFactor = 0.015 |
| 18 | ) |
| 19 | |
| 20 | func getOrRegisterHistogram(name string, r metrics.Registry) metrics.Histogram { |
| 21 | return r.GetOrRegister(name, func() metrics.Histogram { |
| 22 | return metrics.NewHistogram(metrics.NewExpDecaySample(metricsReservoirSize, metricsAlphaFactor)) |
| 23 | }).(metrics.Histogram) |
| 24 | } |
| 25 | |
| 26 | func getMetricNameForBroker(name string, broker *Broker) string { |
| 27 | // Use broker id like the Java client as it does not contain '.' or ':' characters that |
| 28 | // can be interpreted as special character by monitoring tool (e.g. Graphite) |
| 29 | return name + "-for-broker-" + strconv.FormatInt(int64(broker.ID()), 10) |
| 30 | } |
| 31 | |
| 32 | func getMetricNameForTopic(name string, topic string) string { |
| 33 | // Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy |
| 34 | // cf. KAFKA-1902 and KAFKA-2337 |
| 35 | return name + "-for-topic-" + strings.ReplaceAll(topic, ".", "_") |
| 36 | } |
| 37 | |
| 38 | func getOrRegisterTopicMeter(name string, topic string, r metrics.Registry) metrics.Meter { |
| 39 | return metrics.GetOrRegisterMeter(getMetricNameForTopic(name, topic), r) |
| 40 | } |
| 41 | |
| 42 | func getOrRegisterTopicHistogram(name string, topic string, r metrics.Registry) metrics.Histogram { |
| 43 | return getOrRegisterHistogram(getMetricNameForTopic(name, topic), r) |
| 44 | } |
| 45 | |
| 46 | // cleanupRegistry is an implementation of metrics.Registry that allows |
| 47 | // to unregister from the parent registry only those metrics |
| 48 | // that have been registered in cleanupRegistry |
| 49 | type cleanupRegistry struct { |
| 50 | parent metrics.Registry |
| 51 | metrics map[string]struct{} |
| 52 | mutex sync.RWMutex |
| 53 | } |
| 54 | |
| 55 | func newCleanupRegistry(parent metrics.Registry) metrics.Registry { |
| 56 | return &cleanupRegistry{ |
| 57 | parent: parent, |
| 58 | metrics: map[string]struct{}{}, |
| 59 | } |
| 60 | } |
| 61 | |
| 62 | func (r *cleanupRegistry) Each(fn func(string, interface{})) { |
| 63 | r.mutex.RLock() |
| 64 | defer r.mutex.RUnlock() |
| 65 | wrappedFn := func(name string, iface interface{}) { |
| 66 | if _, ok := r.metrics[name]; ok { |
| 67 | fn(name, iface) |
| 68 | } |
| 69 | } |
| 70 | r.parent.Each(wrappedFn) |
| 71 | } |
| 72 | |
| 73 | func (r *cleanupRegistry) Get(name string) interface{} { |
| 74 | r.mutex.RLock() |
| 75 | defer r.mutex.RUnlock() |
| 76 | if _, ok := r.metrics[name]; ok { |
| 77 | return r.parent.Get(name) |
| 78 | } |
| 79 | return nil |
| 80 | } |
| 81 | |
| 82 | func (r *cleanupRegistry) GetOrRegister(name string, metric interface{}) interface{} { |
| 83 | r.mutex.Lock() |
| 84 | defer r.mutex.Unlock() |
| 85 | r.metrics[name] = struct{}{} |
| 86 | return r.parent.GetOrRegister(name, metric) |
| 87 | } |
| 88 | |
| 89 | func (r *cleanupRegistry) Register(name string, metric interface{}) error { |
| 90 | r.mutex.Lock() |
| 91 | defer r.mutex.Unlock() |
| 92 | r.metrics[name] = struct{}{} |
| 93 | return r.parent.Register(name, metric) |
| 94 | } |
| 95 | |
| 96 | func (r *cleanupRegistry) RunHealthchecks() { |
| 97 | r.parent.RunHealthchecks() |
| 98 | } |
| 99 | |
| 100 | func (r *cleanupRegistry) GetAll() map[string]map[string]interface{} { |
| 101 | return r.parent.GetAll() |
| 102 | } |
| 103 | |
| 104 | func (r *cleanupRegistry) Unregister(name string) { |
| 105 | r.mutex.Lock() |
| 106 | defer r.mutex.Unlock() |
| 107 | if _, ok := r.metrics[name]; ok { |
| 108 | delete(r.metrics, name) |
| 109 | r.parent.Unregister(name) |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | func (r *cleanupRegistry) UnregisterAll() { |
| 114 | r.mutex.Lock() |
| 115 | defer r.mutex.Unlock() |
| 116 | for name := range r.metrics { |
| 117 | delete(r.metrics, name) |
| 118 | r.parent.Unregister(name) |
| 119 | } |
| 120 | } |