blob: 777cbcd7921d91464eca993c08b24c730148dcff [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package channelz defines APIs for enabling channelz service, entry
20// registration/deletion, and accessing channelz data. It also defines channelz
21// metric struct formats.
22//
23// All APIs in this package are experimental.
24package channelz
25
26import (
Akash Kankanala761955c2024-02-21 19:32:20 +053027 "context"
28 "errors"
khenaidoo5fc5cea2021-08-11 17:39:16 -040029 "fmt"
30 "sort"
31 "sync"
32 "sync/atomic"
33 "time"
34
35 "google.golang.org/grpc/grpclog"
36)
37
38const (
39 defaultMaxTraceEntry int32 = 30
40)
41
42var (
43 db dbWrapper
44 idGen idGenerator
45 // EntryPerPage defines the number of channelz entries to be shown on a web page.
46 EntryPerPage = int64(50)
47 curState int32
48 maxTraceEntry = defaultMaxTraceEntry
49)
50
51// TurnOn turns on channelz data collection.
52func TurnOn() {
53 if !IsOn() {
Akash Kankanala761955c2024-02-21 19:32:20 +053054 db.set(newChannelMap())
55 idGen.reset()
khenaidoo5fc5cea2021-08-11 17:39:16 -040056 atomic.StoreInt32(&curState, 1)
57 }
58}
59
60// IsOn returns whether channelz data collection is on.
61func IsOn() bool {
62 return atomic.CompareAndSwapInt32(&curState, 1, 1)
63}
64
65// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
66// Setting it to 0 will disable channel tracing.
67func SetMaxTraceEntry(i int32) {
68 atomic.StoreInt32(&maxTraceEntry, i)
69}
70
71// ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
72func ResetMaxTraceEntryToDefault() {
73 atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
74}
75
76func getMaxTraceEntry() int {
77 i := atomic.LoadInt32(&maxTraceEntry)
78 return int(i)
79}
80
81// dbWarpper wraps around a reference to internal channelz data storage, and
82// provide synchronized functionality to set and get the reference.
83type dbWrapper struct {
84 mu sync.RWMutex
85 DB *channelMap
86}
87
88func (d *dbWrapper) set(db *channelMap) {
89 d.mu.Lock()
90 d.DB = db
91 d.mu.Unlock()
92}
93
94func (d *dbWrapper) get() *channelMap {
95 d.mu.RLock()
96 defer d.mu.RUnlock()
97 return d.DB
98}
99
Akash Kankanala761955c2024-02-21 19:32:20 +0530100// NewChannelzStorageForTesting initializes channelz data storage and id
101// generator for testing purposes.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400102//
Akash Kankanala761955c2024-02-21 19:32:20 +0530103// Returns a cleanup function to be invoked by the test, which waits for up to
104// 10s for all channelz state to be reset by the grpc goroutines when those
105// entities get closed. This cleanup function helps with ensuring that tests
106// don't mess up each other.
107func NewChannelzStorageForTesting() (cleanup func() error) {
108 db.set(newChannelMap())
khenaidoo5fc5cea2021-08-11 17:39:16 -0400109 idGen.reset()
Akash Kankanala761955c2024-02-21 19:32:20 +0530110
khenaidoo5fc5cea2021-08-11 17:39:16 -0400111 return func() error {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400112 cm := db.get()
113 if cm == nil {
114 return nil
115 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530116
117 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
118 defer cancel()
119 ticker := time.NewTicker(10 * time.Millisecond)
120 defer ticker.Stop()
121 for {
122 cm.mu.RLock()
123 topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets := len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets)
124 cm.mu.RUnlock()
125
126 if err := ctx.Err(); err != nil {
127 return fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets)
128 }
129 if topLevelChannels == 0 && servers == 0 && channels == 0 && subChannels == 0 && listenSockets == 0 && normalSockets == 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400130 return nil
131 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530132 <-ticker.C
khenaidoo5fc5cea2021-08-11 17:39:16 -0400133 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400134 }
135}
136
137// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
138// boolean indicating whether there's more top channels to be queried for.
139//
140// The arg id specifies that only top channel with id at or above it will be included
141// in the result. The returned slice is up to a length of the arg maxResults or
142// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
143func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
144 return db.get().GetTopChannels(id, maxResults)
145}
146
147// GetServers returns a slice of server's ServerMetric, along with a
148// boolean indicating whether there's more servers to be queried for.
149//
150// The arg id specifies that only server with id at or above it will be included
151// in the result. The returned slice is up to a length of the arg maxResults or
152// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
153func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
154 return db.get().GetServers(id, maxResults)
155}
156
157// GetServerSockets returns a slice of server's (identified by id) normal socket's
158// SocketMetric, along with a boolean indicating whether there's more sockets to
159// be queried for.
160//
161// The arg startID specifies that only sockets with id at or above it will be
162// included in the result. The returned slice is up to a length of the arg maxResults
163// or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
164func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
165 return db.get().GetServerSockets(id, startID, maxResults)
166}
167
168// GetChannel returns the ChannelMetric for the channel (identified by id).
169func GetChannel(id int64) *ChannelMetric {
170 return db.get().GetChannel(id)
171}
172
173// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
174func GetSubChannel(id int64) *SubChannelMetric {
175 return db.get().GetSubChannel(id)
176}
177
178// GetSocket returns the SocketInternalMetric for the socket (identified by id).
179func GetSocket(id int64) *SocketMetric {
180 return db.get().GetSocket(id)
181}
182
183// GetServer returns the ServerMetric for the server (identified by id).
184func GetServer(id int64) *ServerMetric {
185 return db.get().GetServer(id)
186}
187
Akash Kankanala761955c2024-02-21 19:32:20 +0530188// RegisterChannel registers the given channel c in the channelz database with
189// ref as its reference name, and adds it to the child list of its parent
190// (identified by pid). pid == nil means no parent.
191//
192// Returns a unique channelz identifier assigned to this channel.
193//
194// If channelz is not turned ON, the channelz database is not mutated.
195func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400196 id := idGen.genID()
Akash Kankanala761955c2024-02-21 19:32:20 +0530197 var parent int64
198 isTopChannel := true
199 if pid != nil {
200 isTopChannel = false
201 parent = pid.Int()
202 }
203
204 if !IsOn() {
205 return newIdentifer(RefChannel, id, pid)
206 }
207
khenaidoo5fc5cea2021-08-11 17:39:16 -0400208 cn := &channel{
209 refName: ref,
210 c: c,
211 subChans: make(map[int64]string),
212 nestedChans: make(map[int64]string),
213 id: id,
Akash Kankanala761955c2024-02-21 19:32:20 +0530214 pid: parent,
khenaidoo5fc5cea2021-08-11 17:39:16 -0400215 trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
216 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530217 db.get().addChannel(id, cn, isTopChannel, parent)
218 return newIdentifer(RefChannel, id, pid)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400219}
220
Akash Kankanala761955c2024-02-21 19:32:20 +0530221// RegisterSubChannel registers the given subChannel c in the channelz database
222// with ref as its reference name, and adds it to the child list of its parent
223// (identified by pid).
224//
225// Returns a unique channelz identifier assigned to this subChannel.
226//
227// If channelz is not turned ON, the channelz database is not mutated.
228func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, error) {
229 if pid == nil {
230 return nil, errors.New("a SubChannel's parent id cannot be nil")
khenaidoo5fc5cea2021-08-11 17:39:16 -0400231 }
232 id := idGen.genID()
Akash Kankanala761955c2024-02-21 19:32:20 +0530233 if !IsOn() {
234 return newIdentifer(RefSubChannel, id, pid), nil
235 }
236
khenaidoo5fc5cea2021-08-11 17:39:16 -0400237 sc := &subChannel{
238 refName: ref,
239 c: c,
240 sockets: make(map[int64]string),
241 id: id,
Akash Kankanala761955c2024-02-21 19:32:20 +0530242 pid: pid.Int(),
khenaidoo5fc5cea2021-08-11 17:39:16 -0400243 trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
244 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530245 db.get().addSubChannel(id, sc, pid.Int())
246 return newIdentifer(RefSubChannel, id, pid), nil
khenaidoo5fc5cea2021-08-11 17:39:16 -0400247}
248
249// RegisterServer registers the given server s in channelz database. It returns
250// the unique channelz tracking id assigned to this server.
Akash Kankanala761955c2024-02-21 19:32:20 +0530251//
252// If channelz is not turned ON, the channelz database is not mutated.
253func RegisterServer(s Server, ref string) *Identifier {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400254 id := idGen.genID()
Akash Kankanala761955c2024-02-21 19:32:20 +0530255 if !IsOn() {
256 return newIdentifer(RefServer, id, nil)
257 }
258
khenaidoo5fc5cea2021-08-11 17:39:16 -0400259 svr := &server{
260 refName: ref,
261 s: s,
262 sockets: make(map[int64]string),
263 listenSockets: make(map[int64]string),
264 id: id,
265 }
266 db.get().addServer(id, svr)
Akash Kankanala761955c2024-02-21 19:32:20 +0530267 return newIdentifer(RefServer, id, nil)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400268}
269
270// RegisterListenSocket registers the given listen socket s in channelz database
271// with ref as its reference name, and add it to the child list of its parent
272// (identified by pid). It returns the unique channelz tracking id assigned to
273// this listen socket.
Akash Kankanala761955c2024-02-21 19:32:20 +0530274//
275// If channelz is not turned ON, the channelz database is not mutated.
276func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
277 if pid == nil {
278 return nil, errors.New("a ListenSocket's parent id cannot be 0")
khenaidoo5fc5cea2021-08-11 17:39:16 -0400279 }
280 id := idGen.genID()
Akash Kankanala761955c2024-02-21 19:32:20 +0530281 if !IsOn() {
282 return newIdentifer(RefListenSocket, id, pid), nil
283 }
284
285 ls := &listenSocket{refName: ref, s: s, id: id, pid: pid.Int()}
286 db.get().addListenSocket(id, ls, pid.Int())
287 return newIdentifer(RefListenSocket, id, pid), nil
khenaidoo5fc5cea2021-08-11 17:39:16 -0400288}
289
290// RegisterNormalSocket registers the given normal socket s in channelz database
Akash Kankanala761955c2024-02-21 19:32:20 +0530291// with ref as its reference name, and adds it to the child list of its parent
khenaidoo5fc5cea2021-08-11 17:39:16 -0400292// (identified by pid). It returns the unique channelz tracking id assigned to
293// this normal socket.
Akash Kankanala761955c2024-02-21 19:32:20 +0530294//
295// If channelz is not turned ON, the channelz database is not mutated.
296func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
297 if pid == nil {
298 return nil, errors.New("a NormalSocket's parent id cannot be 0")
khenaidoo5fc5cea2021-08-11 17:39:16 -0400299 }
300 id := idGen.genID()
Akash Kankanala761955c2024-02-21 19:32:20 +0530301 if !IsOn() {
302 return newIdentifer(RefNormalSocket, id, pid), nil
303 }
304
305 ns := &normalSocket{refName: ref, s: s, id: id, pid: pid.Int()}
306 db.get().addNormalSocket(id, ns, pid.Int())
307 return newIdentifer(RefNormalSocket, id, pid), nil
khenaidoo5fc5cea2021-08-11 17:39:16 -0400308}
309
khenaidoo257f3192021-12-15 16:46:37 -0500310// RemoveEntry removes an entry with unique channelz tracking id to be id from
khenaidoo5fc5cea2021-08-11 17:39:16 -0400311// channelz database.
Akash Kankanala761955c2024-02-21 19:32:20 +0530312//
313// If channelz is not turned ON, this function is a no-op.
314func RemoveEntry(id *Identifier) {
315 if !IsOn() {
316 return
317 }
318 db.get().removeEntry(id.Int())
khenaidoo5fc5cea2021-08-11 17:39:16 -0400319}
320
Akash Kankanala761955c2024-02-21 19:32:20 +0530321// TraceEventDesc is what the caller of AddTraceEvent should provide to describe
322// the event to be added to the channel trace.
323//
324// The Parent field is optional. It is used for an event that will be recorded
325// in the entity's parent trace.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400326type TraceEventDesc struct {
327 Desc string
328 Severity Severity
329 Parent *TraceEventDesc
330}
331
Akash Kankanala761955c2024-02-21 19:32:20 +0530332// AddTraceEvent adds trace related to the entity with specified id, using the
333// provided TraceEventDesc.
334//
335// If channelz is not turned ON, this will simply log the event descriptions.
336func AddTraceEvent(l grpclog.DepthLoggerV2, id *Identifier, depth int, desc *TraceEventDesc) {
337 // Log only the trace description associated with the bottom most entity.
338 switch desc.Severity {
339 case CtUnknown, CtInfo:
340 l.InfoDepth(depth+1, withParens(id)+desc.Desc)
341 case CtWarning:
342 l.WarningDepth(depth+1, withParens(id)+desc.Desc)
343 case CtError:
344 l.ErrorDepth(depth+1, withParens(id)+desc.Desc)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400345 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530346
khenaidoo5fc5cea2021-08-11 17:39:16 -0400347 if getMaxTraceEntry() == 0 {
348 return
349 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530350 if IsOn() {
351 db.get().traceEvent(id.Int(), desc)
352 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400353}
354
355// channelMap is the storage data structure for channelz.
356// Methods of channelMap can be divided in two two categories with respect to locking.
357// 1. Methods acquire the global lock.
358// 2. Methods that can only be called when global lock is held.
359// A second type of method need always to be called inside a first type of method.
360type channelMap struct {
361 mu sync.RWMutex
362 topLevelChannels map[int64]struct{}
363 servers map[int64]*server
364 channels map[int64]*channel
365 subChannels map[int64]*subChannel
366 listenSockets map[int64]*listenSocket
367 normalSockets map[int64]*normalSocket
368}
369
Akash Kankanala761955c2024-02-21 19:32:20 +0530370func newChannelMap() *channelMap {
371 return &channelMap{
372 topLevelChannels: make(map[int64]struct{}),
373 channels: make(map[int64]*channel),
374 listenSockets: make(map[int64]*listenSocket),
375 normalSockets: make(map[int64]*normalSocket),
376 servers: make(map[int64]*server),
377 subChannels: make(map[int64]*subChannel),
378 }
379}
380
khenaidoo5fc5cea2021-08-11 17:39:16 -0400381func (c *channelMap) addServer(id int64, s *server) {
382 c.mu.Lock()
383 s.cm = c
384 c.servers[id] = s
385 c.mu.Unlock()
386}
387
khenaidoo257f3192021-12-15 16:46:37 -0500388func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64) {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400389 c.mu.Lock()
390 cn.cm = c
391 cn.trace.cm = c
392 c.channels[id] = cn
393 if isTopChannel {
394 c.topLevelChannels[id] = struct{}{}
395 } else {
396 c.findEntry(pid).addChild(id, cn)
397 }
398 c.mu.Unlock()
399}
400
khenaidoo257f3192021-12-15 16:46:37 -0500401func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64) {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400402 c.mu.Lock()
403 sc.cm = c
404 sc.trace.cm = c
405 c.subChannels[id] = sc
406 c.findEntry(pid).addChild(id, sc)
407 c.mu.Unlock()
408}
409
khenaidoo257f3192021-12-15 16:46:37 -0500410func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64) {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400411 c.mu.Lock()
412 ls.cm = c
413 c.listenSockets[id] = ls
414 c.findEntry(pid).addChild(id, ls)
415 c.mu.Unlock()
416}
417
khenaidoo257f3192021-12-15 16:46:37 -0500418func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64) {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400419 c.mu.Lock()
420 ns.cm = c
421 c.normalSockets[id] = ns
422 c.findEntry(pid).addChild(id, ns)
423 c.mu.Unlock()
424}
425
426// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
427// wait on the deletion of its children and until no other entity's channel trace references it.
428// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
429// shutting down server will lead to the server being also deleted.
430func (c *channelMap) removeEntry(id int64) {
431 c.mu.Lock()
432 c.findEntry(id).triggerDelete()
433 c.mu.Unlock()
434}
435
436// c.mu must be held by the caller
437func (c *channelMap) decrTraceRefCount(id int64) {
438 e := c.findEntry(id)
439 if v, ok := e.(tracedChannel); ok {
440 v.decrTraceRefCount()
441 e.deleteSelfIfReady()
442 }
443}
444
445// c.mu must be held by the caller.
446func (c *channelMap) findEntry(id int64) entry {
447 var v entry
448 var ok bool
449 if v, ok = c.channels[id]; ok {
450 return v
451 }
452 if v, ok = c.subChannels[id]; ok {
453 return v
454 }
455 if v, ok = c.servers[id]; ok {
456 return v
457 }
458 if v, ok = c.listenSockets[id]; ok {
459 return v
460 }
461 if v, ok = c.normalSockets[id]; ok {
462 return v
463 }
464 return &dummyEntry{idNotFound: id}
465}
466
467// c.mu must be held by the caller
468// deleteEntry simply deletes an entry from the channelMap. Before calling this
469// method, caller must check this entry is ready to be deleted, i.e removeEntry()
470// has been called on it, and no children still exist.
471// Conditionals are ordered by the expected frequency of deletion of each entity
472// type, in order to optimize performance.
473func (c *channelMap) deleteEntry(id int64) {
474 var ok bool
475 if _, ok = c.normalSockets[id]; ok {
476 delete(c.normalSockets, id)
477 return
478 }
479 if _, ok = c.subChannels[id]; ok {
480 delete(c.subChannels, id)
481 return
482 }
483 if _, ok = c.channels[id]; ok {
484 delete(c.channels, id)
485 delete(c.topLevelChannels, id)
486 return
487 }
488 if _, ok = c.listenSockets[id]; ok {
489 delete(c.listenSockets, id)
490 return
491 }
492 if _, ok = c.servers[id]; ok {
493 delete(c.servers, id)
494 return
495 }
496}
497
498func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
499 c.mu.Lock()
500 child := c.findEntry(id)
501 childTC, ok := child.(tracedChannel)
502 if !ok {
503 c.mu.Unlock()
504 return
505 }
506 childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
507 if desc.Parent != nil {
508 parent := c.findEntry(child.getParentID())
509 var chanType RefChannelType
510 switch child.(type) {
511 case *channel:
512 chanType = RefChannel
513 case *subChannel:
514 chanType = RefSubChannel
515 }
516 if parentTC, ok := parent.(tracedChannel); ok {
517 parentTC.getChannelTrace().append(&TraceEvent{
518 Desc: desc.Parent.Desc,
519 Severity: desc.Parent.Severity,
520 Timestamp: time.Now(),
521 RefID: id,
522 RefName: childTC.getRefName(),
523 RefType: chanType,
524 })
525 childTC.incrTraceRefCount()
526 }
527 }
528 c.mu.Unlock()
529}
530
531type int64Slice []int64
532
533func (s int64Slice) Len() int { return len(s) }
534func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
535func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
536
537func copyMap(m map[int64]string) map[int64]string {
538 n := make(map[int64]string)
539 for k, v := range m {
540 n[k] = v
541 }
542 return n
543}
544
545func min(a, b int64) int64 {
546 if a < b {
547 return a
548 }
549 return b
550}
551
552func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
553 if maxResults <= 0 {
554 maxResults = EntryPerPage
555 }
556 c.mu.RLock()
557 l := int64(len(c.topLevelChannels))
558 ids := make([]int64, 0, l)
559 cns := make([]*channel, 0, min(l, maxResults))
560
561 for k := range c.topLevelChannels {
562 ids = append(ids, k)
563 }
564 sort.Sort(int64Slice(ids))
565 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
566 count := int64(0)
567 var end bool
568 var t []*ChannelMetric
569 for i, v := range ids[idx:] {
570 if count == maxResults {
571 break
572 }
573 if cn, ok := c.channels[v]; ok {
574 cns = append(cns, cn)
575 t = append(t, &ChannelMetric{
576 NestedChans: copyMap(cn.nestedChans),
577 SubChans: copyMap(cn.subChans),
578 })
579 count++
580 }
581 if i == len(ids[idx:])-1 {
582 end = true
583 break
584 }
585 }
586 c.mu.RUnlock()
587 if count == 0 {
588 end = true
589 }
590
591 for i, cn := range cns {
592 t[i].ChannelData = cn.c.ChannelzMetric()
593 t[i].ID = cn.id
594 t[i].RefName = cn.refName
595 t[i].Trace = cn.trace.dumpData()
596 }
597 return t, end
598}
599
600func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
601 if maxResults <= 0 {
602 maxResults = EntryPerPage
603 }
604 c.mu.RLock()
605 l := int64(len(c.servers))
606 ids := make([]int64, 0, l)
607 ss := make([]*server, 0, min(l, maxResults))
608 for k := range c.servers {
609 ids = append(ids, k)
610 }
611 sort.Sort(int64Slice(ids))
612 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
613 count := int64(0)
614 var end bool
615 var s []*ServerMetric
616 for i, v := range ids[idx:] {
617 if count == maxResults {
618 break
619 }
620 if svr, ok := c.servers[v]; ok {
621 ss = append(ss, svr)
622 s = append(s, &ServerMetric{
623 ListenSockets: copyMap(svr.listenSockets),
624 })
625 count++
626 }
627 if i == len(ids[idx:])-1 {
628 end = true
629 break
630 }
631 }
632 c.mu.RUnlock()
633 if count == 0 {
634 end = true
635 }
636
637 for i, svr := range ss {
638 s[i].ServerData = svr.s.ChannelzMetric()
639 s[i].ID = svr.id
640 s[i].RefName = svr.refName
641 }
642 return s, end
643}
644
645func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
646 if maxResults <= 0 {
647 maxResults = EntryPerPage
648 }
649 var svr *server
650 var ok bool
651 c.mu.RLock()
652 if svr, ok = c.servers[id]; !ok {
653 // server with id doesn't exist.
654 c.mu.RUnlock()
655 return nil, true
656 }
657 svrskts := svr.sockets
658 l := int64(len(svrskts))
659 ids := make([]int64, 0, l)
660 sks := make([]*normalSocket, 0, min(l, maxResults))
661 for k := range svrskts {
662 ids = append(ids, k)
663 }
664 sort.Sort(int64Slice(ids))
665 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
666 count := int64(0)
667 var end bool
668 for i, v := range ids[idx:] {
669 if count == maxResults {
670 break
671 }
672 if ns, ok := c.normalSockets[v]; ok {
673 sks = append(sks, ns)
674 count++
675 }
676 if i == len(ids[idx:])-1 {
677 end = true
678 break
679 }
680 }
681 c.mu.RUnlock()
682 if count == 0 {
683 end = true
684 }
685 s := make([]*SocketMetric, 0, len(sks))
686 for _, ns := range sks {
687 sm := &SocketMetric{}
688 sm.SocketData = ns.s.ChannelzMetric()
689 sm.ID = ns.id
690 sm.RefName = ns.refName
691 s = append(s, sm)
692 }
693 return s, end
694}
695
696func (c *channelMap) GetChannel(id int64) *ChannelMetric {
697 cm := &ChannelMetric{}
698 var cn *channel
699 var ok bool
700 c.mu.RLock()
701 if cn, ok = c.channels[id]; !ok {
702 // channel with id doesn't exist.
703 c.mu.RUnlock()
704 return nil
705 }
706 cm.NestedChans = copyMap(cn.nestedChans)
707 cm.SubChans = copyMap(cn.subChans)
708 // cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
709 // holding the lock to prevent potential data race.
710 chanCopy := cn.c
711 c.mu.RUnlock()
712 cm.ChannelData = chanCopy.ChannelzMetric()
713 cm.ID = cn.id
714 cm.RefName = cn.refName
715 cm.Trace = cn.trace.dumpData()
716 return cm
717}
718
719func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
720 cm := &SubChannelMetric{}
721 var sc *subChannel
722 var ok bool
723 c.mu.RLock()
724 if sc, ok = c.subChannels[id]; !ok {
725 // subchannel with id doesn't exist.
726 c.mu.RUnlock()
727 return nil
728 }
729 cm.Sockets = copyMap(sc.sockets)
730 // sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
731 // holding the lock to prevent potential data race.
732 chanCopy := sc.c
733 c.mu.RUnlock()
734 cm.ChannelData = chanCopy.ChannelzMetric()
735 cm.ID = sc.id
736 cm.RefName = sc.refName
737 cm.Trace = sc.trace.dumpData()
738 return cm
739}
740
741func (c *channelMap) GetSocket(id int64) *SocketMetric {
742 sm := &SocketMetric{}
743 c.mu.RLock()
744 if ls, ok := c.listenSockets[id]; ok {
745 c.mu.RUnlock()
746 sm.SocketData = ls.s.ChannelzMetric()
747 sm.ID = ls.id
748 sm.RefName = ls.refName
749 return sm
750 }
751 if ns, ok := c.normalSockets[id]; ok {
752 c.mu.RUnlock()
753 sm.SocketData = ns.s.ChannelzMetric()
754 sm.ID = ns.id
755 sm.RefName = ns.refName
756 return sm
757 }
758 c.mu.RUnlock()
759 return nil
760}
761
762func (c *channelMap) GetServer(id int64) *ServerMetric {
763 sm := &ServerMetric{}
764 var svr *server
765 var ok bool
766 c.mu.RLock()
767 if svr, ok = c.servers[id]; !ok {
768 c.mu.RUnlock()
769 return nil
770 }
771 sm.ListenSockets = copyMap(svr.listenSockets)
772 c.mu.RUnlock()
773 sm.ID = svr.id
774 sm.RefName = svr.refName
775 sm.ServerData = svr.s.ChannelzMetric()
776 return sm
777}
778
779type idGenerator struct {
780 id int64
781}
782
783func (i *idGenerator) reset() {
784 atomic.StoreInt64(&i.id, 0)
785}
786
787func (i *idGenerator) genID() int64 {
788 return atomic.AddInt64(&i.id, 1)
789}