blob: 60a9d6247af453878b8a5aec15c3bf54151b220c [file] [log] [blame]
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -08001#!/usr/bin/env python
2
3"""
4A simple process to read time-series samples from a kafka topic and shove
5the data into graphite/carbon as pickled input.
6
7The code is based on a github/gist by phobos182
8(https://gist.github.com/phobos182/3931936).
9
10As all GitHib gists, it is covered by the MIT license.
11
12"""
13
14from optparse import OptionParser
15
16import simplejson
17from kafka import KafkaConsumer
18import pickle
19import struct
20import socket
21import sys
22import time
23
24from kafka.consumer.fetcher import ConsumerRecord
25from kafka.errors import KafkaError
26
27from common.utils.consulhelpers import get_endpoint_from_consul
28
29
30class Graphite:
31
32 def __init__(self, host='localhost', port=2004, retry=5, delay=3,
33 backoff=2, timeout=10):
34 self.host = host
35 self.port = port
36 self.retry = retry
37 self.delay = delay
38 self.backoff = backoff
39 self.timeout = timeout
40
41 # Create initial socket
42 self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
43 self.conn.settimeout(self.timeout)
44 # Initiate connection
45 self.connect()
46
47
48 def _backoff(self, retry, delay, backoff):
49 """Exponential backoff."""
50 retry -= 1
51 if retry == 0:
52 raise Exception('Timeout')
53 time.sleep(delay)
54 delay *= backoff
55 return retry, delay, backoff
56
57
58 def _retry(self, exception, func, *args):
59 """
60 Retry calling the func catching a tuple of exceptions with backoff.
61 """
62 retry = self.retry
63 delay = self.delay
64 backoff = self.backoff
65 while retry > 0:
66 try:
67 return func(*args)
68 except exception, e:
69 retry, delay, backoff = self._backoff(retry, delay, backoff)
70
71
72 def connect(self):
73 """Connect to graphite."""
74 retry = self.retry
75 backoff = self.backoff
76 delay = self.delay
77 while retry > 0:
78 try:
79 # Attempt to connect to Graphite, break if success
80 self.conn.connect((self.host, self.port))
81 break
82 except socket.error, e:
83 # Ditch this socket. Create a new one
84 self.conn.close()
85 self.conn.connect()
86 retry, delay, backoff = self._backoff(retry, delay, backoff)
87
88
89 def close(self):
90 """Close connection go Graphite."""
91 self.conn.close()
92
93
94 def send(self, data, retry=3):
95 """Send data to graphite."""
96 retry = self.retry
97 backoff = self.backoff
98 delay = self.delay
99 # Attempt to send any data in the queue
100 while retry > 0:
101 # Check socket
102 if not self.conn:
103 # Attempt to restablish connection
104 self.close()
105 self.connect()
106 retry, delay, backoff = self._backoff(retry, delay, backoff)
107 continue
108 try:
109 # Send data to socket
110 self.conn.sendall(data)
111 break
112 except socket.error, e:
113 self.close()
114 self.connect()
115 retry, delay, backoff = self._backoff(retry, delay, backoff)
116 continue
117
118
119def _pickle(batch):
120 """Pickle metrics into graphite format."""
121 payload = pickle.dumps(batch)
122 header = struct.pack("!L", len(payload))
123 message = header + payload
124 return message
125
126
127def _convert(msg):
128 """Convert a graphite key value string to pickle."""
129
130 def extract_slice(ts, data):
131 for object_path, metrics in data.iteritems():
132 for metric_name, value in metrics.iteritems():
133 path = '.'.join((object_path, metric_name))
134 yield (path, ts, value)
135
136 assert isinstance(msg, dict)
137 type = msg.get('type')
138 if type == 'slice':
139 extractor, kw = extract_slice, dict(ts=msg['ts'], data=msg['data'])
140 else:
141 raise Exception('Unknown format')
142
143 batch = []
144 for path, timestamp, value in extractor(**kw):
145 batch.append((path, (timestamp, value)))
146 return batch
147
148
149if __name__ == "__main__":
150
151 parser = OptionParser()
152 parser.add_option("-K", "--kafka", dest="kafka",
153 default="localhost:9092", help="Kafka bootstrap server")
154 parser.add_option("-c", "--consul", dest="consul",
155 default="localhost:8500",
156 help="Consul server (needed if kafak server is specifed"
157 "with '@kafka' value)")
158 parser.add_option("-t", "--topic", dest="topic", help="Kafka topic")
159 parser.add_option("-H", "--host", dest="graphite_host",
160 default="localhost", help="Graphite host")
161 parser.add_option("-p", "--port", dest="graphite_port", type=int,
162 default=2004, help="Graphite port")
163
164 (options, args) = parser.parse_args()
165
166 # Assign OptParse variables
167 kafka = options.kafka
168 consul = options.consul
169 topic = options.topic
170 host = options.graphite_host
171 port = options.graphite_port
172
173 # Connect to Graphite
174 try:
175 graphite = Graphite(host, port)
176 except socket.error, e:
177 print "Could not connect to graphite host %s:%s" % (host, port)
178 sys.exit(1)
179 except socket.gaierror, e:
180 print "Invalid hostname for graphite host %s" % (host)
181 sys.exit(1)
182
183 # Resolve Kafka value if it is based on consul lookup
184 if kafka.startswith('@'):
185 kafka = get_endpoint_from_consul(consul, kafka[1:])
186
187 # Connect to Kafka
188 try:
189 print 'Connecting to Kafka at {}'.format(kafka)
190 consumer = KafkaConsumer(topic, bootstrap_servers=kafka)
191 except KafkaError, e:
192 print "Could not connect to kafka bootstrap server {}: {}".format(
193 kafka, e)
194 sys.exit(1)
195
196 # Consume Kafka topic
197 for record in consumer:
198 assert isinstance(record, ConsumerRecord)
199 msg = record.value
200
201 try:
202 batch = _convert(simplejson.loads(msg))
203 except Exception, e:
204 print "Unknown format, could not extract data: {}".format(msg)
205 continue
206
207 pickled = _pickle(batch)
208 graphite.send(pickled)
209 print "Sent %s metrics to Graphite" % (len(batch))