blob: a46f98b8e287b056a4000f716b78e9ae98cfb543 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "errors"
20 "fmt"
21 "sync"
22 "time"
23
24 "go.uber.org/zap"
25 "google.golang.org/grpc"
26 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/metadata"
28 "google.golang.org/grpc/status"
29
30 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
31 "go.etcd.io/etcd/api/v3/mvccpb"
32 v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
33)
34
35const (
36 EventTypeDelete = mvccpb.DELETE
37 EventTypePut = mvccpb.PUT
38
39 closeSendErrTimeout = 250 * time.Millisecond
40
41 // AutoWatchID is the watcher ID passed in WatchStream.Watch when no
42 // user-provided ID is available. If pass, an ID will automatically be assigned.
43 AutoWatchID = 0
44
45 // InvalidWatchID represents an invalid watch ID and prevents duplication with an existing watch.
46 InvalidWatchID = -1
47)
48
49type Event mvccpb.Event
50
51type WatchChan <-chan WatchResponse
52
53type Watcher interface {
54 // Watch watches on a key or prefix. The watched events will be returned
55 // through the returned channel. If revisions waiting to be sent over the
56 // watch are compacted, then the watch will be canceled by the server, the
57 // client will post a compacted error watch response, and the channel will close.
58 // If the requested revision is 0 or unspecified, the returned channel will
59 // return watch events that happen after the server receives the watch request.
60 // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
61 // and "WatchResponse" from this closed channel has zero events and nil "Err()".
62 // The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
63 // to release the associated resources.
64 //
65 // If the context is "context.Background/TODO", returned "WatchChan" will
66 // not be closed and block until event is triggered, except when server
67 // returns a non-recoverable error (e.g. ErrCompacted).
68 // For example, when context passed with "WithRequireLeader" and the
69 // connected server has no leader (e.g. due to network partition),
70 // error "etcdserver: no leader" (ErrNoLeader) will be returned,
71 // and then "WatchChan" is closed with non-nil "Err()".
72 // In order to prevent a watch stream being stuck in a partitioned node,
73 // make sure to wrap context with "WithRequireLeader".
74 //
75 // Otherwise, as long as the context has not been canceled or timed out,
76 // watch will retry on other recoverable errors forever until reconnected.
77 //
78 // TODO: explicitly set context error in the last "WatchResponse" message and close channel?
79 // Currently, client contexts are overwritten with "valCtx" that never closes.
80 // TODO(v3.4): configure watch retry policy, limit maximum retry number
81 // (see https://github.com/etcd-io/etcd/issues/8980)
82 Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
83
84 // RequestProgress requests a progress notify response be sent in all watch channels.
85 RequestProgress(ctx context.Context) error
86
87 // Close closes the watcher and cancels all watch requests.
88 Close() error
89}
90
91type WatchResponse struct {
92 Header pb.ResponseHeader
93 Events []*Event
94
95 // CompactRevision is the minimum revision the watcher may receive.
96 CompactRevision int64
97
98 // Canceled is used to indicate watch failure.
99 // If the watch failed and the stream was about to close, before the channel is closed,
100 // the channel sends a final response that has Canceled set to true with a non-nil Err().
101 Canceled bool
102
103 // Created is used to indicate the creation of the watcher.
104 Created bool
105
106 closeErr error
107
108 // cancelReason is a reason of canceling watch
109 cancelReason string
110}
111
112// IsCreate returns true if the event tells that the key is newly created.
113func (e *Event) IsCreate() bool {
114 return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
115}
116
117// IsModify returns true if the event tells that a new value is put on existing key.
118func (e *Event) IsModify() bool {
119 return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
120}
121
122// Err is the error value if this WatchResponse holds an error.
123func (wr *WatchResponse) Err() error {
124 switch {
125 case wr.closeErr != nil:
126 return v3rpc.Error(wr.closeErr)
127 case wr.CompactRevision != 0:
128 return v3rpc.ErrCompacted
129 case wr.Canceled:
130 if len(wr.cancelReason) != 0 {
131 return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
132 }
133 return v3rpc.ErrFutureRev
134 }
135 return nil
136}
137
138// IsProgressNotify returns true if the WatchResponse is progress notification.
139func (wr *WatchResponse) IsProgressNotify() bool {
140 return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
141}
142
143// watcher implements the Watcher interface
144type watcher struct {
145 remote pb.WatchClient
146 callOpts []grpc.CallOption
147
148 // mu protects the grpc streams map
149 mu sync.Mutex
150
151 // streams holds all the active grpc streams keyed by ctx value.
152 streams map[string]*watchGRPCStream
153 lg *zap.Logger
154}
155
156// watchGRPCStream tracks all watch resources attached to a single grpc stream.
157type watchGRPCStream struct {
158 owner *watcher
159 remote pb.WatchClient
160 callOpts []grpc.CallOption
161
162 // ctx controls internal remote.Watch requests
163 ctx context.Context
164 // ctxKey is the key used when looking up this stream's context
165 ctxKey string
166 cancel context.CancelFunc
167
168 // substreams holds all active watchers on this grpc stream
169 substreams map[int64]*watcherStream
170 // resuming holds all resuming watchers on this grpc stream
171 resuming []*watcherStream
172
173 // reqc sends a watch request from Watch() to the main goroutine
174 reqc chan watchStreamRequest
175 // respc receives data from the watch client
176 respc chan *pb.WatchResponse
177 // donec closes to broadcast shutdown
178 donec chan struct{}
179 // errc transmits errors from grpc Recv to the watch stream reconnect logic
180 errc chan error
181 // closingc gets the watcherStream of closing watchers
182 closingc chan *watcherStream
183 // wg is Done when all substream goroutines have exited
184 wg sync.WaitGroup
185
186 // resumec closes to signal that all substreams should begin resuming
187 resumec chan struct{}
188 // closeErr is the error that closed the watch stream
189 closeErr error
190
191 lg *zap.Logger
192}
193
194// watchStreamRequest is a union of the supported watch request operation types
195type watchStreamRequest interface {
196 toPB() *pb.WatchRequest
197}
198
199// watchRequest is issued by the subscriber to start a new watcher
200type watchRequest struct {
201 ctx context.Context
202 key string
203 end string
204 rev int64
205
206 // send created notification event if this field is true
207 createdNotify bool
208 // progressNotify is for progress updates
209 progressNotify bool
210 // fragmentation should be disabled by default
211 // if true, split watch events when total exceeds
212 // "--max-request-bytes" flag value + 512-byte
213 fragment bool
214
215 // filters is the list of events to filter out
216 filters []pb.WatchCreateRequest_FilterType
217 // get the previous key-value pair before the event happens
218 prevKV bool
219 // retc receives a chan WatchResponse once the watcher is established
220 retc chan chan WatchResponse
221}
222
223// progressRequest is issued by the subscriber to request watch progress
224type progressRequest struct{}
225
226// watcherStream represents a registered watcher
227type watcherStream struct {
228 // initReq is the request that initiated this request
229 initReq watchRequest
230
231 // outc publishes watch responses to subscriber
232 outc chan WatchResponse
233 // recvc buffers watch responses before publishing
234 recvc chan *WatchResponse
235 // donec closes when the watcherStream goroutine stops.
236 donec chan struct{}
237 // closing is set to true when stream should be scheduled to shutdown.
238 closing bool
239 // id is the registered watch id on the grpc stream
240 id int64
241
242 // buf holds all events received from etcd but not yet consumed by the client
243 buf []*WatchResponse
244}
245
246func NewWatcher(c *Client) Watcher {
247 return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
248}
249
250func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
251 w := &watcher{
252 remote: wc,
253 streams: make(map[string]*watchGRPCStream),
254 }
255 if c != nil {
256 w.callOpts = c.callOpts
257 w.lg = c.lg
258 }
259 return w
260}
261
262// never closes
263var (
264 valCtxCh = make(chan struct{})
265 zeroTime = time.Unix(0, 0)
266)
267
268// ctx with only the values; never Done
269type valCtx struct{ context.Context }
270
271func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
272func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
273func (vc *valCtx) Err() error { return nil }
274
275func (w *watcher) newWatcherGRPCStream(inctx context.Context) *watchGRPCStream {
276 ctx, cancel := context.WithCancel(&valCtx{inctx})
277 wgs := &watchGRPCStream{
278 owner: w,
279 remote: w.remote,
280 callOpts: w.callOpts,
281 ctx: ctx,
282 ctxKey: streamKeyFromCtx(inctx),
283 cancel: cancel,
284 substreams: make(map[int64]*watcherStream),
285 respc: make(chan *pb.WatchResponse),
286 reqc: make(chan watchStreamRequest),
287 donec: make(chan struct{}),
288 errc: make(chan error, 1),
289 closingc: make(chan *watcherStream),
290 resumec: make(chan struct{}),
291 lg: w.lg,
292 }
293 go wgs.run()
294 return wgs
295}
296
297// Watch posts a watch request to run() and waits for a new watcher channel
298func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
299 ow := opWatch(key, opts...)
300
301 var filters []pb.WatchCreateRequest_FilterType
302 if ow.filterPut {
303 filters = append(filters, pb.WatchCreateRequest_NOPUT)
304 }
305 if ow.filterDelete {
306 filters = append(filters, pb.WatchCreateRequest_NODELETE)
307 }
308
309 wr := &watchRequest{
310 ctx: ctx,
311 createdNotify: ow.createdNotify,
312 key: string(ow.key),
313 end: string(ow.end),
314 rev: ow.rev,
315 progressNotify: ow.progressNotify,
316 fragment: ow.fragment,
317 filters: filters,
318 prevKV: ow.prevKV,
319 retc: make(chan chan WatchResponse, 1),
320 }
321
322 ok := false
323 ctxKey := streamKeyFromCtx(ctx)
324
325 var closeCh chan WatchResponse
326 for {
327 // find or allocate appropriate grpc watch stream
328 w.mu.Lock()
329 if w.streams == nil {
330 // closed
331 w.mu.Unlock()
332 ch := make(chan WatchResponse)
333 close(ch)
334 return ch
335 }
336 wgs := w.streams[ctxKey]
337 if wgs == nil {
338 wgs = w.newWatcherGRPCStream(ctx)
339 w.streams[ctxKey] = wgs
340 }
341 donec := wgs.donec
342 reqc := wgs.reqc
343 w.mu.Unlock()
344
345 // couldn't create channel; return closed channel
346 if closeCh == nil {
347 closeCh = make(chan WatchResponse, 1)
348 }
349
350 // submit request
351 select {
352 case reqc <- wr:
353 ok = true
354 case <-wr.ctx.Done():
355 ok = false
356 case <-donec:
357 ok = false
358 if wgs.closeErr != nil {
359 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
360 break
361 }
362 // retry; may have dropped stream from no ctxs
363 continue
364 }
365
366 // receive channel
367 if ok {
368 select {
369 case ret := <-wr.retc:
370 return ret
371 case <-ctx.Done():
372 case <-donec:
373 if wgs.closeErr != nil {
374 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
375 break
376 }
377 // retry; may have dropped stream from no ctxs
378 continue
379 }
380 }
381 break
382 }
383
384 close(closeCh)
385 return closeCh
386}
387
388func (w *watcher) Close() (err error) {
389 w.mu.Lock()
390 streams := w.streams
391 w.streams = nil
392 w.mu.Unlock()
393 for _, wgs := range streams {
394 if werr := wgs.close(); werr != nil {
395 err = werr
396 }
397 }
398 // Consider context.Canceled as a successful close
399 if errors.Is(err, context.Canceled) {
400 err = nil
401 }
402 return err
403}
404
405// RequestProgress requests a progress notify response be sent in all watch channels.
406func (w *watcher) RequestProgress(ctx context.Context) (err error) {
407 ctxKey := streamKeyFromCtx(ctx)
408
409 w.mu.Lock()
410 if w.streams == nil {
411 w.mu.Unlock()
412 return errors.New("no stream found for context")
413 }
414 wgs := w.streams[ctxKey]
415 if wgs == nil {
416 wgs = w.newWatcherGRPCStream(ctx)
417 w.streams[ctxKey] = wgs
418 }
419 donec := wgs.donec
420 reqc := wgs.reqc
421 w.mu.Unlock()
422
423 pr := &progressRequest{}
424
425 select {
426 case reqc <- pr:
427 return nil
428 case <-ctx.Done():
429 return ctx.Err()
430 case <-donec:
431 if wgs.closeErr != nil {
432 return wgs.closeErr
433 }
434 // retry; may have dropped stream from no ctxs
435 return w.RequestProgress(ctx)
436 }
437}
438
439func (w *watchGRPCStream) close() (err error) {
440 w.cancel()
441 <-w.donec
442 select {
443 case err = <-w.errc:
444 default:
445 }
446 return ContextError(w.ctx, err)
447}
448
449func (w *watcher) closeStream(wgs *watchGRPCStream) {
450 w.mu.Lock()
451 close(wgs.donec)
452 wgs.cancel()
453 if w.streams != nil {
454 delete(w.streams, wgs.ctxKey)
455 }
456 w.mu.Unlock()
457}
458
459func (w *watchGRPCStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
460 // check watch ID for backward compatibility (<= v3.3)
461 if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
462 w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
463 // failed; no channel
464 close(ws.recvc)
465 return
466 }
467 ws.id = resp.WatchId
468 w.substreams[ws.id] = ws
469}
470
471func (w *watchGRPCStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
472 select {
473 case ws.outc <- *resp:
474 case <-ws.initReq.ctx.Done():
475 case <-time.After(closeSendErrTimeout):
476 }
477 close(ws.outc)
478}
479
480func (w *watchGRPCStream) closeSubstream(ws *watcherStream) {
481 // send channel response in case stream was never established
482 select {
483 case ws.initReq.retc <- ws.outc:
484 default:
485 }
486 // close subscriber's channel
487 if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
488 go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
489 } else if ws.outc != nil {
490 close(ws.outc)
491 }
492 if ws.id != InvalidWatchID {
493 delete(w.substreams, ws.id)
494 return
495 }
496 for i := range w.resuming {
497 if w.resuming[i] == ws {
498 w.resuming[i] = nil
499 return
500 }
501 }
502}
503
504// run is the root of the goroutines for managing a watcher client
505func (w *watchGRPCStream) run() {
506 var wc pb.Watch_WatchClient
507 var closeErr error
508
509 // substreams marked to close but goroutine still running; needed for
510 // avoiding double-closing recvc on grpc stream teardown
511 closing := make(map[*watcherStream]struct{})
512
513 defer func() {
514 w.closeErr = closeErr
515 // shutdown substreams and resuming substreams
516 for _, ws := range w.substreams {
517 if _, ok := closing[ws]; !ok {
518 close(ws.recvc)
519 closing[ws] = struct{}{}
520 }
521 }
522 for _, ws := range w.resuming {
523 if _, ok := closing[ws]; ws != nil && !ok {
524 close(ws.recvc)
525 closing[ws] = struct{}{}
526 }
527 }
528 w.joinSubstreams()
529 for range closing {
530 w.closeSubstream(<-w.closingc)
531 }
532 w.wg.Wait()
533 w.owner.closeStream(w)
534 }()
535
536 // start a stream with the etcd grpc server
537 if wc, closeErr = w.newWatchClient(); closeErr != nil {
538 return
539 }
540
541 cancelSet := make(map[int64]struct{})
542
543 var cur *pb.WatchResponse
544 backoff := time.Millisecond
545 for {
546 select {
547 // Watch() requested
548 case req := <-w.reqc:
549 switch wreq := req.(type) {
550 case *watchRequest:
551 outc := make(chan WatchResponse, 1)
552 // TODO: pass custom watch ID?
553 ws := &watcherStream{
554 initReq: *wreq,
555 id: InvalidWatchID,
556 outc: outc,
557 // unbuffered so resumes won't cause repeat events
558 recvc: make(chan *WatchResponse),
559 }
560
561 ws.donec = make(chan struct{})
562 w.wg.Add(1)
563 go w.serveSubstream(ws, w.resumec)
564
565 // queue up for watcher creation/resume
566 w.resuming = append(w.resuming, ws)
567 if len(w.resuming) == 1 {
568 // head of resume queue, can register a new watcher
569 if err := wc.Send(ws.initReq.toPB()); err != nil {
570 w.lg.Debug("error when sending request", zap.Error(err))
571 }
572 }
573 case *progressRequest:
574 if err := wc.Send(wreq.toPB()); err != nil {
575 w.lg.Debug("error when sending request", zap.Error(err))
576 }
577 }
578
579 // new events from the watch client
580 case pbresp := <-w.respc:
581 if cur == nil || pbresp.Created || pbresp.Canceled {
582 cur = pbresp
583 } else if cur != nil && cur.WatchId == pbresp.WatchId {
584 // merge new events
585 cur.Events = append(cur.Events, pbresp.Events...)
586 // update "Fragment" field; last response with "Fragment" == false
587 cur.Fragment = pbresp.Fragment
588 }
589
590 switch {
591 case pbresp.Created:
592 // response to head of queue creation
593 if len(w.resuming) != 0 {
594 if ws := w.resuming[0]; ws != nil {
595 w.addSubstream(pbresp, ws)
596 w.dispatchEvent(pbresp)
597 w.resuming[0] = nil
598 }
599 }
600
601 if ws := w.nextResume(); ws != nil {
602 if err := wc.Send(ws.initReq.toPB()); err != nil {
603 w.lg.Debug("error when sending request", zap.Error(err))
604 }
605 }
606
607 // reset for next iteration
608 cur = nil
609
610 case pbresp.Canceled && pbresp.CompactRevision == 0:
611 delete(cancelSet, pbresp.WatchId)
612 if ws, ok := w.substreams[pbresp.WatchId]; ok {
613 // signal to stream goroutine to update closingc
614 close(ws.recvc)
615 closing[ws] = struct{}{}
616 }
617
618 // reset for next iteration
619 cur = nil
620
621 case cur.Fragment:
622 // watch response events are still fragmented
623 // continue to fetch next fragmented event arrival
624 continue
625
626 default:
627 // dispatch to appropriate watch stream
628 ok := w.dispatchEvent(cur)
629
630 // reset for next iteration
631 cur = nil
632
633 if ok {
634 break
635 }
636
637 // watch response on unexpected watch id; cancel id
638 if _, ok := cancelSet[pbresp.WatchId]; ok {
639 break
640 }
641
642 cancelSet[pbresp.WatchId] = struct{}{}
643 cr := &pb.WatchRequest_CancelRequest{
644 CancelRequest: &pb.WatchCancelRequest{
645 WatchId: pbresp.WatchId,
646 },
647 }
648 req := &pb.WatchRequest{RequestUnion: cr}
649 w.lg.Debug("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
650 if err := wc.Send(req); err != nil {
651 w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
652 }
653 }
654
655 // watch client failed on Recv; spawn another if possible
656 case err := <-w.errc:
657 if isHaltErr(w.ctx, err) || errors.Is(ContextError(w.ctx, err), v3rpc.ErrNoLeader) {
658 closeErr = err
659 return
660 }
661 backoff = w.backoffIfUnavailable(backoff, err)
662 if wc, closeErr = w.newWatchClient(); closeErr != nil {
663 return
664 }
665 if ws := w.nextResume(); ws != nil {
666 if err := wc.Send(ws.initReq.toPB()); err != nil {
667 w.lg.Debug("error when sending request", zap.Error(err))
668 }
669 }
670 cancelSet = make(map[int64]struct{})
671
672 case <-w.ctx.Done():
673 return
674
675 case ws := <-w.closingc:
676 w.closeSubstream(ws)
677 delete(closing, ws)
678 // no more watchers on this stream, shutdown, skip cancellation
679 if len(w.substreams)+len(w.resuming) == 0 {
680 return
681 }
682 if ws.id != InvalidWatchID {
683 // client is closing an established watch; close it on the server proactively instead of waiting
684 // to close when the next message arrives
685 cancelSet[ws.id] = struct{}{}
686 cr := &pb.WatchRequest_CancelRequest{
687 CancelRequest: &pb.WatchCancelRequest{
688 WatchId: ws.id,
689 },
690 }
691 req := &pb.WatchRequest{RequestUnion: cr}
692 w.lg.Debug("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
693 if err := wc.Send(req); err != nil {
694 w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
695 }
696 }
697 }
698 }
699}
700
701// nextResume chooses the next resuming to register with the grpc stream. Abandoned
702// streams are marked as nil in the queue since the head must wait for its inflight registration.
703func (w *watchGRPCStream) nextResume() *watcherStream {
704 for len(w.resuming) != 0 {
705 if w.resuming[0] != nil {
706 return w.resuming[0]
707 }
708 w.resuming = w.resuming[1:len(w.resuming)]
709 }
710 return nil
711}
712
713// dispatchEvent sends a WatchResponse to the appropriate watcher stream
714func (w *watchGRPCStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
715 events := make([]*Event, len(pbresp.Events))
716 for i, ev := range pbresp.Events {
717 events[i] = (*Event)(ev)
718 }
719 // TODO: return watch ID?
720 wr := &WatchResponse{
721 Header: *pbresp.Header,
722 Events: events,
723 CompactRevision: pbresp.CompactRevision,
724 Created: pbresp.Created,
725 Canceled: pbresp.Canceled,
726 cancelReason: pbresp.CancelReason,
727 }
728
729 // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to
730 // indicate they should be broadcast.
731 if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID {
732 return w.broadcastResponse(wr)
733 }
734
735 return w.unicastResponse(wr, pbresp.WatchId)
736}
737
738// broadcastResponse send a watch response to all watch substreams.
739func (w *watchGRPCStream) broadcastResponse(wr *WatchResponse) bool {
740 for _, ws := range w.substreams {
741 select {
742 case ws.recvc <- wr:
743 case <-ws.donec:
744 }
745 }
746 return true
747}
748
749// unicastResponse sends a watch response to a specific watch substream.
750func (w *watchGRPCStream) unicastResponse(wr *WatchResponse, watchID int64) bool {
751 ws, ok := w.substreams[watchID]
752 if !ok {
753 return false
754 }
755 select {
756 case ws.recvc <- wr:
757 case <-ws.donec:
758 return false
759 }
760 return true
761}
762
763// serveWatchClient forwards messages from the grpc stream to run()
764func (w *watchGRPCStream) serveWatchClient(wc pb.Watch_WatchClient) {
765 for {
766 resp, err := wc.Recv()
767 if err != nil {
768 select {
769 case w.errc <- err:
770 case <-w.donec:
771 }
772 return
773 }
774 select {
775 case w.respc <- resp:
776 case <-w.donec:
777 return
778 }
779 }
780}
781
782// serveSubstream forwards watch responses from run() to the subscriber
783func (w *watchGRPCStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
784 if ws.closing {
785 panic("created substream goroutine but substream is closing")
786 }
787
788 // nextRev is the minimum expected next revision
789 nextRev := ws.initReq.rev
790 resuming := false
791 defer func() {
792 if !resuming {
793 ws.closing = true
794 }
795 close(ws.donec)
796 if !resuming {
797 w.closingc <- ws
798 }
799 w.wg.Done()
800 }()
801
802 emptyWr := &WatchResponse{}
803 for {
804 curWr := emptyWr
805 outc := ws.outc
806
807 if len(ws.buf) > 0 {
808 curWr = ws.buf[0]
809 } else {
810 outc = nil
811 }
812 select {
813 case outc <- *curWr:
814 if ws.buf[0].Err() != nil {
815 return
816 }
817 ws.buf[0] = nil
818 ws.buf = ws.buf[1:]
819 case wr, ok := <-ws.recvc:
820 if !ok {
821 // shutdown from closeSubstream
822 return
823 }
824
825 if wr.Created {
826 if ws.initReq.retc != nil {
827 ws.initReq.retc <- ws.outc
828 // to prevent next write from taking the slot in buffered channel
829 // and posting duplicate create events
830 ws.initReq.retc = nil
831
832 // send first creation event only if requested
833 if ws.initReq.createdNotify {
834 ws.outc <- *wr
835 }
836 // once the watch channel is returned, a current revision
837 // watch must resume at the store revision. This is necessary
838 // for the following case to work as expected:
839 // wch := m1.Watch("a")
840 // m2.Put("a", "b")
841 // <-wch
842 // If the revision is only bound on the first observed event,
843 // if wch is disconnected before the Put is issued, then reconnects
844 // after it is committed, it'll miss the Put.
845 if ws.initReq.rev == 0 {
846 nextRev = wr.Header.Revision
847 }
848 }
849 } else {
850 // current progress of watch; <= store revision
851 nextRev = wr.Header.Revision + 1
852 }
853
854 if len(wr.Events) > 0 {
855 nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
856 }
857
858 ws.initReq.rev = nextRev
859
860 // created event is already sent above,
861 // watcher should not post duplicate events
862 if wr.Created {
863 continue
864 }
865
866 // TODO pause channel if buffer gets too large
867 ws.buf = append(ws.buf, wr)
868 case <-w.ctx.Done():
869 return
870 case <-ws.initReq.ctx.Done():
871 return
872 case <-resumec:
873 resuming = true
874 return
875 }
876 }
877 // lazily send cancel message if events on missing id
878}
879
880func (w *watchGRPCStream) newWatchClient() (pb.Watch_WatchClient, error) {
881 // mark all substreams as resuming
882 close(w.resumec)
883 w.resumec = make(chan struct{})
884 w.joinSubstreams()
885 for _, ws := range w.substreams {
886 ws.id = InvalidWatchID
887 w.resuming = append(w.resuming, ws)
888 }
889 // strip out nils, if any
890 var resuming []*watcherStream
891 for _, ws := range w.resuming {
892 if ws != nil {
893 resuming = append(resuming, ws)
894 }
895 }
896 w.resuming = resuming
897 w.substreams = make(map[int64]*watcherStream)
898
899 // connect to grpc stream while accepting watcher cancelation
900 stopc := make(chan struct{})
901 donec := w.waitCancelSubstreams(stopc)
902 wc, err := w.openWatchClient()
903 close(stopc)
904 <-donec
905
906 // serve all non-closing streams, even if there's a client error
907 // so that the teardown path can shutdown the streams as expected.
908 for _, ws := range w.resuming {
909 if ws.closing {
910 continue
911 }
912 ws.donec = make(chan struct{})
913 w.wg.Add(1)
914 go w.serveSubstream(ws, w.resumec)
915 }
916
917 if err != nil {
918 return nil, v3rpc.Error(err)
919 }
920
921 // receive data from new grpc stream
922 go w.serveWatchClient(wc)
923 return wc, nil
924}
925
926func (w *watchGRPCStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
927 var wg sync.WaitGroup
928 wg.Add(len(w.resuming))
929 donec := make(chan struct{})
930 for i := range w.resuming {
931 go func(ws *watcherStream) {
932 defer wg.Done()
933 if ws.closing {
934 if ws.initReq.ctx.Err() != nil && ws.outc != nil {
935 close(ws.outc)
936 ws.outc = nil
937 }
938 return
939 }
940 select {
941 case <-ws.initReq.ctx.Done():
942 // closed ws will be removed from resuming
943 ws.closing = true
944 close(ws.outc)
945 ws.outc = nil
946 w.wg.Add(1)
947 go func() {
948 defer w.wg.Done()
949 w.closingc <- ws
950 }()
951 case <-stopc:
952 }
953 }(w.resuming[i])
954 }
955 go func() {
956 defer close(donec)
957 wg.Wait()
958 }()
959 return donec
960}
961
962// joinSubstreams waits for all substream goroutines to complete.
963func (w *watchGRPCStream) joinSubstreams() {
964 for _, ws := range w.substreams {
965 <-ws.donec
966 }
967 for _, ws := range w.resuming {
968 if ws != nil {
969 <-ws.donec
970 }
971 }
972}
973
974var maxBackoff = 100 * time.Millisecond
975
976func (w *watchGRPCStream) backoffIfUnavailable(backoff time.Duration, err error) time.Duration {
977 if isUnavailableErr(w.ctx, err) {
978 // retry, but backoff
979 if backoff < maxBackoff {
980 // 25% backoff factor
981 backoff = backoff + backoff/4
982 if backoff > maxBackoff {
983 backoff = maxBackoff
984 }
985 }
986 time.Sleep(backoff)
987 }
988 return backoff
989}
990
991// openWatchClient retries opening a watch client until success or halt.
992// manually retry in case "ws==nil && err==nil"
993// TODO: remove FailFast=false
994func (w *watchGRPCStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
995 backoff := time.Millisecond
996 for {
997 select {
998 case <-w.ctx.Done():
999 if err == nil {
1000 return nil, w.ctx.Err()
1001 }
1002 return nil, err
1003 default:
1004 }
1005 if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
1006 break
1007 }
1008 if isHaltErr(w.ctx, err) {
1009 return nil, v3rpc.Error(err)
1010 }
1011 backoff = w.backoffIfUnavailable(backoff, err)
1012 }
1013 return ws, nil
1014}
1015
1016// toPB converts an internal watch request structure to its protobuf WatchRequest structure.
1017func (wr *watchRequest) toPB() *pb.WatchRequest {
1018 req := &pb.WatchCreateRequest{
1019 StartRevision: wr.rev,
1020 Key: []byte(wr.key),
1021 RangeEnd: []byte(wr.end),
1022 ProgressNotify: wr.progressNotify,
1023 Filters: wr.filters,
1024 PrevKv: wr.prevKV,
1025 Fragment: wr.fragment,
1026 }
1027 cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
1028 return &pb.WatchRequest{RequestUnion: cr}
1029}
1030
1031// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
1032func (pr *progressRequest) toPB() *pb.WatchRequest {
1033 req := &pb.WatchProgressRequest{}
1034 cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
1035 return &pb.WatchRequest{RequestUnion: cr}
1036}
1037
1038func streamKeyFromCtx(ctx context.Context) string {
1039 if md, ok := metadata.FromOutgoingContext(ctx); ok {
1040 return fmt.Sprintf("%+v", map[string][]string(md))
1041 }
1042 return ""
1043}