blob: 2275e96c972cc8ac8540e08bb3c05e45bdef4a4b [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package concurrency
16
17import (
18 "context"
19 "time"
20
21 "go.uber.org/zap"
22
23 v3 "go.etcd.io/etcd/client/v3"
24)
25
26const defaultSessionTTL = 60
27
28// Session represents a lease kept alive for the lifetime of a client.
29// Fault-tolerant applications may use sessions to reason about liveness.
30type Session struct {
31 client *v3.Client
32 opts *sessionOptions
33 id v3.LeaseID
34
35 ctx context.Context
36 cancel context.CancelFunc
37 donec <-chan struct{}
38}
39
40// NewSession gets the leased session for a client.
41func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
42 lg := client.GetLogger()
43 ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
44 for _, opt := range opts {
45 opt(ops, lg)
46 }
47
48 id := ops.leaseID
49 if id == v3.NoLease {
50 resp, err := client.Grant(ops.ctx, int64(ops.ttl))
51 if err != nil {
52 return nil, err
53 }
54 id = resp.ID
55 }
56
57 ctx, cancel := context.WithCancel(ops.ctx)
58 keepAlive, err := client.KeepAlive(ctx, id)
59 if err != nil || keepAlive == nil {
60 cancel()
61 return nil, err
62 }
63
64 donec := make(chan struct{})
65 s := &Session{client: client, opts: ops, id: id, ctx: ctx, cancel: cancel, donec: donec}
66
67 // keep the lease alive until client error or cancelled context
68 go func() {
69 defer func() {
70 close(donec)
71 cancel()
72 }()
73 for range keepAlive {
74 // eat messages until keep alive channel closes
75 }
76 }()
77
78 return s, nil
79}
80
81// Client is the etcd client that is attached to the session.
82func (s *Session) Client() *v3.Client {
83 return s.client
84}
85
86// Lease is the lease ID for keys bound to the session.
87func (s *Session) Lease() v3.LeaseID { return s.id }
88
89// Ctx is the context attached to the session, it is canceled when the lease is orphaned, expires, or
90// is otherwise no longer being refreshed.
91func (s *Session) Ctx() context.Context {
92 return s.ctx
93}
94
95// Done returns a channel that closes when the lease is orphaned, expires, or
96// is otherwise no longer being refreshed.
97func (s *Session) Done() <-chan struct{} { return s.donec }
98
99// Orphan ends the refresh for the session lease. This is useful
100// in case the state of the client connection is indeterminate (revoke
101// would fail) or when transferring lease ownership.
102func (s *Session) Orphan() {
103 s.cancel()
104 <-s.donec
105}
106
107// Close orphans the session and revokes the session lease.
108func (s *Session) Close() error {
109 s.Orphan()
110 // if revoke takes longer than the ttl, lease is expired anyway
111 ctx, cancel := context.WithTimeout(s.opts.ctx, time.Duration(s.opts.ttl)*time.Second)
112 _, err := s.client.Revoke(ctx, s.id)
113 cancel()
114 return err
115}
116
117type sessionOptions struct {
118 ttl int
119 leaseID v3.LeaseID
120 ctx context.Context
121}
122
123// SessionOption configures Session.
124type SessionOption func(*sessionOptions, *zap.Logger)
125
126// WithTTL configures the session's TTL in seconds.
127// If TTL is <= 0, the default 60 seconds TTL will be used.
128func WithTTL(ttl int) SessionOption {
129 return func(so *sessionOptions, lg *zap.Logger) {
130 if ttl > 0 {
131 so.ttl = ttl
132 } else {
133 lg.Warn("WithTTL(): TTL should be > 0, preserving current TTL", zap.Int64("current-session-ttl", int64(so.ttl)))
134 }
135 }
136}
137
138// WithLease specifies the existing leaseID to be used for the session.
139// This is useful in process restart scenario, for example, to reclaim
140// leadership from an election prior to restart.
141func WithLease(leaseID v3.LeaseID) SessionOption {
142 return func(so *sessionOptions, _ *zap.Logger) {
143 so.leaseID = leaseID
144 }
145}
146
147// WithContext assigns a context to the session instead of defaulting to
148// using the client context. This is useful for canceling NewSession and
149// Close operations immediately without having to close the client. If the
150// context is canceled before Close() completes, the session's lease will be
151// abandoned and left to expire instead of being revoked.
152func WithContext(ctx context.Context) SessionOption {
153 return func(so *sessionOptions, _ *zap.Logger) {
154 so.ctx = ctx
155 }
156}