blob: 11b5834828633e9bc9504840a087dc2fea09d937 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "errors"
20 "sync"
21 "time"
22
23 "go.uber.org/zap"
24 "google.golang.org/grpc"
25 "google.golang.org/grpc/metadata"
26
27 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
28 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
29)
30
31type (
32 LeaseRevokeResponse pb.LeaseRevokeResponse
33 LeaseID int64
34)
35
36// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
37type LeaseGrantResponse struct {
38 *pb.ResponseHeader
39 ID LeaseID
40 TTL int64
41 Error string
42}
43
44// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
45type LeaseKeepAliveResponse struct {
46 *pb.ResponseHeader
47 ID LeaseID
48 TTL int64
49}
50
51// LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse.
52type LeaseTimeToLiveResponse struct {
53 *pb.ResponseHeader
54 ID LeaseID `json:"id"`
55
56 // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. Expired lease will return -1.
57 TTL int64 `json:"ttl"`
58
59 // GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
60 GrantedTTL int64 `json:"granted-ttl"`
61
62 // Keys is the list of keys attached to this lease.
63 Keys [][]byte `json:"keys"`
64}
65
66// LeaseStatus represents a lease status.
67type LeaseStatus struct {
68 ID LeaseID `json:"id"`
69 // TODO: TTL int64
70}
71
72// LeaseLeasesResponse wraps the protobuf message LeaseLeasesResponse.
73type LeaseLeasesResponse struct {
74 *pb.ResponseHeader
75 Leases []LeaseStatus `json:"leases"`
76}
77
78const (
79 // defaultTTL is the assumed lease TTL used for the first keepalive
80 // deadline before the actual TTL is known to the client.
81 defaultTTL = 5 * time.Second
82 // NoLease is a lease ID for the absence of a lease.
83 NoLease LeaseID = 0
84
85 // retryConnWait is how long to wait before retrying request due to an error
86 retryConnWait = 500 * time.Millisecond
87)
88
89// LeaseResponseChSize is the size of buffer to store unsent lease responses.
90// WARNING: DO NOT UPDATE.
91// Only for testing purposes.
92var LeaseResponseChSize = 16
93
94// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
95//
96// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
97type ErrKeepAliveHalted struct {
98 Reason error
99}
100
101func (e ErrKeepAliveHalted) Error() string {
102 s := "etcdclient: leases keep alive halted"
103 if e.Reason != nil {
104 s += ": " + e.Reason.Error()
105 }
106 return s
107}
108
109type Lease interface {
110 // Grant creates a new lease.
111 Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
112
113 // Revoke revokes the given lease.
114 Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
115
116 // TimeToLive retrieves the lease information of the given lease ID.
117 TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
118
119 // Leases retrieves all leases.
120 Leases(ctx context.Context) (*LeaseLeasesResponse, error)
121
122 // KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted
123 // to the channel are not consumed promptly the channel may become full. When full, the lease
124 // client will continue sending keep alive requests to the etcd server, but will drop responses
125 // until there is capacity on the channel to send more responses.
126 //
127 // If client keep alive loop halts with an unexpected error (e.g. "etcdserver: no leader") or
128 // canceled by the caller (e.g. context.Canceled), KeepAlive returns a ErrKeepAliveHalted error
129 // containing the error reason.
130 //
131 // The returned "LeaseKeepAliveResponse" channel closes if underlying keep
132 // alive stream is interrupted in some way the client cannot handle itself;
133 // given context "ctx" is canceled or timed out.
134 //
135 // TODO(v4.0): post errors to last keep alive message before closing
136 // (see https://github.com/etcd-io/etcd/pull/7866)
137 KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
138
139 // KeepAliveOnce renews the lease once. The response corresponds to the
140 // first message from calling KeepAlive. If the response has a recoverable
141 // error, KeepAliveOnce will retry the RPC with a new keep alive message.
142 //
143 // In most of the cases, Keepalive should be used instead of KeepAliveOnce.
144 KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
145
146 // Close releases all resources Lease keeps for efficient communication
147 // with the etcd server.
148 Close() error
149}
150
151type lessor struct {
152 mu sync.Mutex // guards all fields
153
154 // donec is closed and loopErr is set when recvKeepAliveLoop stops
155 donec chan struct{}
156 loopErr error
157
158 remote pb.LeaseClient
159
160 stream pb.Lease_LeaseKeepAliveClient
161 streamCancel context.CancelFunc
162
163 stopCtx context.Context
164 stopCancel context.CancelFunc
165
166 keepAlives map[LeaseID]*keepAlive
167
168 // firstKeepAliveTimeout is the timeout for the first keepalive request
169 // before the actual TTL is known to the lease client
170 firstKeepAliveTimeout time.Duration
171
172 // firstKeepAliveOnce ensures stream starts after first KeepAlive call.
173 firstKeepAliveOnce sync.Once
174
175 callOpts []grpc.CallOption
176
177 lg *zap.Logger
178}
179
180// keepAlive multiplexes a keepalive for a lease over multiple channels
181type keepAlive struct {
182 chs []chan<- *LeaseKeepAliveResponse
183 ctxs []context.Context
184 // deadline is the time the keep alive channels close if no response
185 deadline time.Time
186 // nextKeepAlive is when to send the next keep alive message
187 nextKeepAlive time.Time
188 // donec is closed on lease revoke, expiration, or cancel.
189 donec chan struct{}
190}
191
192func NewLease(c *Client) Lease {
193 return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
194}
195
196func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
197 l := &lessor{
198 donec: make(chan struct{}),
199 keepAlives: make(map[LeaseID]*keepAlive),
200 remote: remote,
201 firstKeepAliveTimeout: keepAliveTimeout,
202 }
203 if l.firstKeepAliveTimeout == time.Second {
204 l.firstKeepAliveTimeout = defaultTTL
205 }
206 if c != nil {
207 l.lg = c.lg
208 l.callOpts = c.callOpts
209 }
210 reqLeaderCtx := WithRequireLeader(context.Background())
211 l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
212 return l
213}
214
215func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
216 r := &pb.LeaseGrantRequest{TTL: ttl}
217 resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
218 if err == nil {
219 gresp := &LeaseGrantResponse{
220 ResponseHeader: resp.GetHeader(),
221 ID: LeaseID(resp.ID),
222 TTL: resp.TTL,
223 Error: resp.Error,
224 }
225 return gresp, nil
226 }
227 return nil, ContextError(ctx, err)
228}
229
230func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
231 r := &pb.LeaseRevokeRequest{ID: int64(id)}
232 resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
233 if err == nil {
234 return (*LeaseRevokeResponse)(resp), nil
235 }
236 return nil, ContextError(ctx, err)
237}
238
239func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
240 r := toLeaseTimeToLiveRequest(id, opts...)
241 resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
242 if err != nil {
243 return nil, ContextError(ctx, err)
244 }
245 gresp := &LeaseTimeToLiveResponse{
246 ResponseHeader: resp.GetHeader(),
247 ID: LeaseID(resp.ID),
248 TTL: resp.TTL,
249 GrantedTTL: resp.GrantedTTL,
250 Keys: resp.Keys,
251 }
252 return gresp, nil
253}
254
255func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
256 resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
257 if err == nil {
258 leases := make([]LeaseStatus, len(resp.Leases))
259 for i := range resp.Leases {
260 leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
261 }
262 return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
263 }
264 return nil, ContextError(ctx, err)
265}
266
267// To identify the context passed to `KeepAlive`, a key/value pair is
268// attached to the context. The key is a `keepAliveCtxKey` object, and
269// the value is the pointer to the context object itself, ensuring
270// uniqueness as each context has a unique memory address.
271type keepAliveCtxKey struct{}
272
273func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
274 ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
275
276 l.mu.Lock()
277 // ensure that recvKeepAliveLoop is still running
278 select {
279 case <-l.donec:
280 err := l.loopErr
281 l.mu.Unlock()
282 close(ch)
283 return ch, ErrKeepAliveHalted{Reason: err}
284 default:
285 }
286 ka, ok := l.keepAlives[id]
287
288 if ctx.Done() != nil {
289 ctx = context.WithValue(ctx, keepAliveCtxKey{}, &ctx)
290 }
291 if !ok {
292 // create fresh keep alive
293 ka = &keepAlive{
294 chs: []chan<- *LeaseKeepAliveResponse{ch},
295 ctxs: []context.Context{ctx},
296 deadline: time.Now().Add(l.firstKeepAliveTimeout),
297 nextKeepAlive: time.Now(),
298 donec: make(chan struct{}),
299 }
300 l.keepAlives[id] = ka
301 } else {
302 // add channel and context to existing keep alive
303 ka.ctxs = append(ka.ctxs, ctx)
304 ka.chs = append(ka.chs, ch)
305 }
306 l.mu.Unlock()
307
308 if ctx.Done() != nil {
309 go l.keepAliveCtxCloser(ctx, id, ka.donec)
310 }
311 l.firstKeepAliveOnce.Do(func() {
312 go l.recvKeepAliveLoop()
313 go l.deadlineLoop()
314 })
315
316 return ch, nil
317}
318
319func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
320 for {
321 resp, err := l.keepAliveOnce(ctx, id)
322 if err == nil {
323 if resp.TTL <= 0 {
324 err = rpctypes.ErrLeaseNotFound
325 }
326 return resp, err
327 }
328 if isHaltErr(ctx, err) {
329 return nil, ContextError(ctx, err)
330 }
331 }
332}
333
334func (l *lessor) Close() error {
335 l.stopCancel()
336 // close for synchronous teardown if stream goroutines never launched
337 l.firstKeepAliveOnce.Do(func() { close(l.donec) })
338 <-l.donec
339 return nil
340}
341
342func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
343 select {
344 case <-donec:
345 return
346 case <-l.donec:
347 return
348 case <-ctx.Done():
349 }
350
351 l.mu.Lock()
352 defer l.mu.Unlock()
353
354 ka, ok := l.keepAlives[id]
355 if !ok {
356 return
357 }
358
359 // close channel and remove context if still associated with keep alive
360 for i, c := range ka.ctxs {
361 if c.Value(keepAliveCtxKey{}) == ctx.Value(keepAliveCtxKey{}) {
362 close(ka.chs[i])
363 ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
364 ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
365 break
366 }
367 }
368 // remove if no one more listeners
369 if len(ka.chs) == 0 {
370 delete(l.keepAlives, id)
371 }
372}
373
374// closeRequireLeader scans keepAlives for ctxs that have require leader
375// and closes the associated channels.
376func (l *lessor) closeRequireLeader() {
377 l.mu.Lock()
378 defer l.mu.Unlock()
379 for _, ka := range l.keepAlives {
380 reqIdxs := 0
381 // find all required leader channels, close, mark as nil
382 for i, ctx := range ka.ctxs {
383 md, ok := metadata.FromOutgoingContext(ctx)
384 if !ok {
385 continue
386 }
387 ks := md[rpctypes.MetadataRequireLeaderKey]
388 if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
389 continue
390 }
391 close(ka.chs[i])
392 ka.chs[i] = nil
393 reqIdxs++
394 }
395 if reqIdxs == 0 {
396 continue
397 }
398 // remove all channels that required a leader from keepalive
399 newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
400 newCtxs := make([]context.Context, len(newChs))
401 newIdx := 0
402 for i := range ka.chs {
403 if ka.chs[i] == nil {
404 continue
405 }
406 newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
407 newIdx++
408 }
409 ka.chs, ka.ctxs = newChs, newCtxs
410 }
411}
412
413func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (karesp *LeaseKeepAliveResponse, ferr error) {
414 cctx, cancel := context.WithCancel(ctx)
415 defer cancel()
416
417 stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
418 if err != nil {
419 return nil, ContextError(ctx, err)
420 }
421
422 defer func() {
423 if cerr := stream.CloseSend(); cerr != nil {
424 if ferr == nil {
425 ferr = ContextError(ctx, cerr)
426 }
427 return
428 }
429 }()
430
431 err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
432 if err != nil {
433 return nil, ContextError(ctx, err)
434 }
435
436 resp, rerr := stream.Recv()
437 if rerr != nil {
438 return nil, ContextError(ctx, rerr)
439 }
440
441 karesp = &LeaseKeepAliveResponse{
442 ResponseHeader: resp.GetHeader(),
443 ID: LeaseID(resp.ID),
444 TTL: resp.TTL,
445 }
446 return karesp, nil
447}
448
449func (l *lessor) recvKeepAliveLoop() (gerr error) {
450 defer func() {
451 l.mu.Lock()
452 close(l.donec)
453 l.loopErr = gerr
454 for _, ka := range l.keepAlives {
455 ka.close()
456 }
457 l.keepAlives = make(map[LeaseID]*keepAlive)
458 l.mu.Unlock()
459 }()
460
461 for {
462 stream, err := l.resetRecv()
463 if err != nil {
464 l.lg.Warn("error occurred during lease keep alive loop",
465 zap.Error(err),
466 )
467 if canceledByCaller(l.stopCtx, err) {
468 return err
469 }
470 } else {
471 for {
472 resp, err := stream.Recv()
473 if err != nil {
474 if canceledByCaller(l.stopCtx, err) {
475 return err
476 }
477
478 if errors.Is(ContextError(l.stopCtx, err), rpctypes.ErrNoLeader) {
479 l.closeRequireLeader()
480 }
481 break
482 }
483
484 l.recvKeepAlive(resp)
485 }
486 }
487
488 select {
489 case <-time.After(retryConnWait):
490 case <-l.stopCtx.Done():
491 return l.stopCtx.Err()
492 }
493 }
494}
495
496// resetRecv opens a new lease stream and starts sending keep alive requests.
497func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
498 sctx, cancel := context.WithCancel(l.stopCtx)
499 stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...)
500 if err != nil {
501 cancel()
502 return nil, err
503 }
504
505 l.mu.Lock()
506 defer l.mu.Unlock()
507 if l.stream != nil && l.streamCancel != nil {
508 l.streamCancel()
509 }
510
511 l.streamCancel = cancel
512 l.stream = stream
513
514 go l.sendKeepAliveLoop(stream)
515 return stream, nil
516}
517
518// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
519func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
520 karesp := &LeaseKeepAliveResponse{
521 ResponseHeader: resp.GetHeader(),
522 ID: LeaseID(resp.ID),
523 TTL: resp.TTL,
524 }
525
526 l.mu.Lock()
527 defer l.mu.Unlock()
528
529 ka, ok := l.keepAlives[karesp.ID]
530 if !ok {
531 return
532 }
533
534 if karesp.TTL <= 0 {
535 // lease expired; close all keep alive channels
536 delete(l.keepAlives, karesp.ID)
537 ka.close()
538 return
539 }
540
541 // send update to all channels
542 nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
543 ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
544 for _, ch := range ka.chs {
545 select {
546 case ch <- karesp:
547 default:
548 if l.lg != nil {
549 l.lg.Warn("lease keepalive response queue is full; dropping response send",
550 zap.Int("queue-size", len(ch)),
551 zap.Int("queue-capacity", cap(ch)),
552 )
553 }
554 }
555 // still advance in order to rate-limit keep-alive sends
556 ka.nextKeepAlive = nextKeepAlive
557 }
558}
559
560// deadlineLoop reaps any keep alive channels that have not received a response
561// within the lease TTL
562func (l *lessor) deadlineLoop() {
563 timer := time.NewTimer(time.Second)
564 defer timer.Stop()
565 for {
566 timer.Reset(time.Second)
567 select {
568 case <-timer.C:
569 case <-l.donec:
570 return
571 }
572 now := time.Now()
573 l.mu.Lock()
574 for id, ka := range l.keepAlives {
575 if ka.deadline.Before(now) {
576 // waited too long for response; lease may be expired
577 ka.close()
578 delete(l.keepAlives, id)
579 }
580 }
581 l.mu.Unlock()
582 }
583}
584
585// sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
586func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
587 for {
588 var tosend []LeaseID
589
590 now := time.Now()
591 l.mu.Lock()
592 for id, ka := range l.keepAlives {
593 if ka.nextKeepAlive.Before(now) {
594 tosend = append(tosend, id)
595 }
596 }
597 l.mu.Unlock()
598
599 for _, id := range tosend {
600 r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
601 if err := stream.Send(r); err != nil {
602 l.lg.Warn("error occurred during lease keep alive request sending",
603 zap.Error(err),
604 )
605 return
606 }
607 }
608
609 select {
610 case <-time.After(retryConnWait):
611 case <-stream.Context().Done():
612 return
613 case <-l.donec:
614 return
615 case <-l.stopCtx.Done():
616 return
617 }
618 }
619}
620
621func (ka *keepAlive) close() {
622 close(ka.donec)
623 for _, ch := range ka.chs {
624 close(ch)
625 }
626}