blob: 8989f32d579387075b4a21ee27bd0720dd9a05a7 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package wait provides utility functions for polling, listening using Go
16// channel.
17package wait
18
19import (
20 "log"
21 "sync"
22)
23
24const (
25 // To avoid lock contention we use an array of list struct (rw mutex & map)
26 // for the id argument, we apply mod operation and uses its remainder to
27 // index into the array and find the corresponding element.
28 defaultListElementLength = 64
29)
30
31// Wait is an interface that provides the ability to wait and trigger events that
32// are associated with IDs.
33type Wait interface {
34 // Register waits returns a chan that waits on the given ID.
35 // The chan will be triggered when Trigger is called with
36 // the same ID.
37 Register(id uint64) <-chan any
38 // Trigger triggers the waiting chans with the given ID.
39 Trigger(id uint64, x any)
40 IsRegistered(id uint64) bool
41}
42
43type list struct {
44 e []listElement
45}
46
47type listElement struct {
48 l sync.RWMutex
49 m map[uint64]chan any
50}
51
52// New creates a Wait.
53func New() Wait {
54 res := list{
55 e: make([]listElement, defaultListElementLength),
56 }
57 for i := 0; i < len(res.e); i++ {
58 res.e[i].m = make(map[uint64]chan any)
59 }
60 return &res
61}
62
63func (w *list) Register(id uint64) <-chan any {
64 idx := id % defaultListElementLength
65 newCh := make(chan any, 1)
66 w.e[idx].l.Lock()
67 defer w.e[idx].l.Unlock()
68 if _, ok := w.e[idx].m[id]; !ok {
69 w.e[idx].m[id] = newCh
70 } else {
71 log.Panicf("dup id %x", id)
72 }
73 return newCh
74}
75
76func (w *list) Trigger(id uint64, x any) {
77 idx := id % defaultListElementLength
78 w.e[idx].l.Lock()
79 ch := w.e[idx].m[id]
80 delete(w.e[idx].m, id)
81 w.e[idx].l.Unlock()
82 if ch != nil {
83 ch <- x
84 close(ch)
85 }
86}
87
88func (w *list) IsRegistered(id uint64) bool {
89 idx := id % defaultListElementLength
90 w.e[idx].l.RLock()
91 defer w.e[idx].l.RUnlock()
92 _, ok := w.e[idx].m[id]
93 return ok
94}
95
96type waitWithResponse struct {
97 ch <-chan any
98}
99
100func NewWithResponse(ch <-chan any) Wait {
101 return &waitWithResponse{ch: ch}
102}
103
104func (w *waitWithResponse) Register(id uint64) <-chan any {
105 return w.ch
106}
107func (w *waitWithResponse) Trigger(id uint64, x any) {}
108func (w *waitWithResponse) IsRegistered(id uint64) bool {
109 panic("waitWithResponse.IsRegistered() shouldn't be called")
110}