blob: 00aaacd15fdc213c52a6abd4250b05916bb94e3f [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "errors"
20 "fmt"
21 "io"
22
23 "go.uber.org/zap"
24 "google.golang.org/grpc"
25
26 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
27)
28
29type (
30 DefragmentResponse pb.DefragmentResponse
31 AlarmResponse pb.AlarmResponse
32 AlarmMember pb.AlarmMember
33 StatusResponse pb.StatusResponse
34 HashKVResponse pb.HashKVResponse
35 MoveLeaderResponse pb.MoveLeaderResponse
36 DowngradeResponse pb.DowngradeResponse
37
38 DowngradeAction pb.DowngradeRequest_DowngradeAction
39)
40
41const (
42 DowngradeValidate = DowngradeAction(pb.DowngradeRequest_VALIDATE)
43 DowngradeEnable = DowngradeAction(pb.DowngradeRequest_ENABLE)
44 DowngradeCancel = DowngradeAction(pb.DowngradeRequest_CANCEL)
45)
46
47type Maintenance interface {
48 // AlarmList gets all active alarms.
49 AlarmList(ctx context.Context) (*AlarmResponse, error)
50
51 // AlarmDisarm disarms a given alarm.
52 AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error)
53
54 // Defragment releases wasted space from internal fragmentation on a given etcd member.
55 // Defragment is only needed when deleting a large number of keys and want to reclaim
56 // the resources.
57 // Defragment is an expensive operation. User should avoid defragmenting multiple members
58 // at the same time.
59 // To defragment multiple members in the cluster, user need to call defragment multiple
60 // times with different endpoints.
61 Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error)
62
63 // Status gets the status of the endpoint.
64 Status(ctx context.Context, endpoint string) (*StatusResponse, error)
65
66 // HashKV returns a hash of the KV state at the time of the RPC.
67 // If revision is zero, the hash is computed on all keys. If the revision
68 // is non-zero, the hash is computed on all keys at or below the given revision.
69 HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)
70
71 // SnapshotWithVersion returns a reader for a point-in-time snapshot and version of etcd that created it.
72 // If the context "ctx" is canceled or timed out, reading from returned
73 // "io.ReadCloser" would error out (e.g. context.Canceled, context.DeadlineExceeded).
74 SnapshotWithVersion(ctx context.Context) (*SnapshotResponse, error)
75
76 // Snapshot provides a reader for a point-in-time snapshot of etcd.
77 // If the context "ctx" is canceled or timed out, reading from returned
78 // "io.ReadCloser" would error out (e.g. context.Canceled, context.DeadlineExceeded).
79 // Deprecated: use SnapshotWithVersion instead.
80 Snapshot(ctx context.Context) (io.ReadCloser, error)
81
82 // MoveLeader requests current leader to transfer its leadership to the transferee.
83 // Request must be made to the leader.
84 MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error)
85
86 // Downgrade requests downgrades, verifies feasibility or cancels downgrade
87 // on the cluster version.
88 // Supported since etcd 3.5.
89 Downgrade(ctx context.Context, action DowngradeAction, version string) (*DowngradeResponse, error)
90}
91
92// SnapshotResponse is aggregated response from the snapshot stream.
93// Consumer is responsible for closing steam by calling .Snapshot.Close()
94type SnapshotResponse struct {
95 // Header is the first header in the snapshot stream, has the current key-value store information
96 // and indicates the point in time of the snapshot.
97 Header *pb.ResponseHeader
98 // Snapshot exposes ReaderCloser interface for data stored in the Blob field in the snapshot stream.
99 Snapshot io.ReadCloser
100 // Version is the local version of server that created the snapshot.
101 // In cluster with binaries with different version, each cluster can return different result.
102 // Informs which etcd server version should be used when restoring the snapshot.
103 // Supported on etcd >= v3.6.
104 Version string
105}
106
107type maintenance struct {
108 lg *zap.Logger
109 dial func(endpoint string) (pb.MaintenanceClient, func(), error)
110 remote pb.MaintenanceClient
111 callOpts []grpc.CallOption
112}
113
114func NewMaintenance(c *Client) Maintenance {
115 api := &maintenance{
116 lg: c.lg,
117 dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
118 conn, err := c.Dial(endpoint)
119 if err != nil {
120 return nil, nil, fmt.Errorf("failed to dial endpoint %s with maintenance client: %w", endpoint, err)
121 }
122
123 cancel := func() { conn.Close() }
124 return RetryMaintenanceClient(c, conn), cancel, nil
125 },
126 remote: RetryMaintenanceClient(c, c.conn),
127 }
128 if c != nil {
129 api.callOpts = c.callOpts
130 }
131 return api
132}
133
134func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
135 api := &maintenance{
136 dial: func(string) (pb.MaintenanceClient, func(), error) {
137 return remote, func() {}, nil
138 },
139 remote: remote,
140 }
141 if c != nil {
142 api.callOpts = c.callOpts
143 api.lg = c.lg
144 }
145 return api
146}
147
148func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
149 req := &pb.AlarmRequest{
150 Action: pb.AlarmRequest_GET,
151 MemberID: 0, // all
152 Alarm: pb.AlarmType_NONE, // all
153 }
154 resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
155 if err == nil {
156 return (*AlarmResponse)(resp), nil
157 }
158 return nil, ContextError(ctx, err)
159}
160
161func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
162 req := &pb.AlarmRequest{
163 Action: pb.AlarmRequest_DEACTIVATE,
164 MemberID: am.MemberID,
165 Alarm: am.Alarm,
166 }
167
168 if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
169 ar, err := m.AlarmList(ctx)
170 if err != nil {
171 return nil, ContextError(ctx, err)
172 }
173 ret := AlarmResponse{}
174 for _, am := range ar.Alarms {
175 dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
176 if derr != nil {
177 return nil, ContextError(ctx, derr)
178 }
179 ret.Alarms = append(ret.Alarms, dresp.Alarms...)
180 }
181 return &ret, nil
182 }
183
184 resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
185 if err == nil {
186 return (*AlarmResponse)(resp), nil
187 }
188 return nil, ContextError(ctx, err)
189}
190
191func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
192 remote, cancel, err := m.dial(endpoint)
193 if err != nil {
194 return nil, ContextError(ctx, err)
195 }
196 defer cancel()
197 resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...)
198 if err != nil {
199 return nil, ContextError(ctx, err)
200 }
201 return (*DefragmentResponse)(resp), nil
202}
203
204func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
205 remote, cancel, err := m.dial(endpoint)
206 if err != nil {
207 return nil, ContextError(ctx, err)
208 }
209 defer cancel()
210 resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...)
211 if err != nil {
212 return nil, ContextError(ctx, err)
213 }
214 return (*StatusResponse)(resp), nil
215}
216
217func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
218 remote, cancel, err := m.dial(endpoint)
219 if err != nil {
220 return nil, ContextError(ctx, err)
221 }
222 defer cancel()
223 resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...)
224 if err != nil {
225 return nil, ContextError(ctx, err)
226 }
227 return (*HashKVResponse)(resp), nil
228}
229
230func (m *maintenance) SnapshotWithVersion(ctx context.Context) (*SnapshotResponse, error) {
231 ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
232 if err != nil {
233 return nil, ContextError(ctx, err)
234 }
235
236 m.lg.Info("opened snapshot stream; downloading")
237 pr, pw := io.Pipe()
238
239 resp, err := ss.Recv()
240 if err != nil {
241 m.logAndCloseWithError(err, pw)
242 return nil, err
243 }
244 go func() {
245 // Saving response is blocking
246 err := m.save(resp, pw)
247 if err != nil {
248 m.logAndCloseWithError(err, pw)
249 return
250 }
251 for {
252 sresp, err := ss.Recv()
253 if err != nil {
254 m.logAndCloseWithError(err, pw)
255 return
256 }
257
258 err = m.save(sresp, pw)
259 if err != nil {
260 m.logAndCloseWithError(err, pw)
261 return
262 }
263 }
264 }()
265
266 return &SnapshotResponse{
267 Header: resp.GetHeader(),
268 Snapshot: &snapshotReadCloser{ctx: ctx, ReadCloser: pr},
269 Version: resp.GetVersion(),
270 }, nil
271}
272
273func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
274 ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
275 if err != nil {
276 return nil, ContextError(ctx, err)
277 }
278
279 m.lg.Info("opened snapshot stream; downloading")
280 pr, pw := io.Pipe()
281
282 go func() {
283 for {
284 resp, err := ss.Recv()
285 if err != nil {
286 m.logAndCloseWithError(err, pw)
287 return
288 }
289 err = m.save(resp, pw)
290 if err != nil {
291 m.logAndCloseWithError(err, pw)
292 return
293 }
294 }
295 }()
296 return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
297}
298
299func (m *maintenance) logAndCloseWithError(err error, pw *io.PipeWriter) {
300 switch {
301 case errors.Is(err, io.EOF):
302 m.lg.Info("completed snapshot read; closing")
303 default:
304 m.lg.Warn("failed to receive from snapshot stream; closing", zap.Error(err))
305 }
306 pw.CloseWithError(err)
307}
308
309func (m *maintenance) save(resp *pb.SnapshotResponse, pw *io.PipeWriter) error {
310 // can "resp == nil && err == nil"
311 // before we receive snapshot SHA digest?
312 // No, server sends EOF with an empty response
313 // after it sends SHA digest at the end
314
315 if _, werr := pw.Write(resp.Blob); werr != nil {
316 return werr
317 }
318 return nil
319}
320
321type snapshotReadCloser struct {
322 ctx context.Context
323 io.ReadCloser
324}
325
326func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) {
327 n, err = rc.ReadCloser.Read(p)
328 return n, ContextError(rc.ctx, err)
329}
330
331func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
332 resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...)
333 return (*MoveLeaderResponse)(resp), ContextError(ctx, err)
334}
335
336func (m *maintenance) Downgrade(ctx context.Context, action DowngradeAction, version string) (*DowngradeResponse, error) {
337 var actionType pb.DowngradeRequest_DowngradeAction
338 switch action {
339 case DowngradeValidate:
340 actionType = pb.DowngradeRequest_VALIDATE
341 case DowngradeEnable:
342 actionType = pb.DowngradeRequest_ENABLE
343 case DowngradeCancel:
344 actionType = pb.DowngradeRequest_CANCEL
345 default:
346 return nil, errors.New("etcdclient: unknown downgrade action")
347 }
348 resp, err := m.remote.Downgrade(ctx, &pb.DowngradeRequest{Action: actionType, Version: version}, m.callOpts...)
349 return (*DowngradeResponse)(resp), ContextError(ctx, err)
350}