| Abhay Kumar | 40252eb | 2025-10-13 13:25:53 +0000 | [diff] [blame^] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package etcdserver |
| 16 | |
| 17 | import ( |
| 18 | "fmt" |
| 19 | "time" |
| 20 | |
| 21 | "go.etcd.io/etcd/client/pkg/v3/types" |
| 22 | "go.etcd.io/etcd/server/v3/etcdserver/api/membership" |
| 23 | "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" |
| 24 | ) |
| 25 | |
| 26 | // isConnectedToQuorumSince checks whether the local member is connected to the |
| 27 | // quorum of the cluster since the given time. |
| 28 | func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool { |
| 29 | return numConnectedSince(transport, since, self, members) >= (len(members)/2)+1 |
| 30 | } |
| 31 | |
| 32 | // isConnectedSince checks whether the local member is connected to the |
| 33 | // remote member since the given time. |
| 34 | func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote types.ID) bool { |
| 35 | t := transport.ActiveSince(remote) |
| 36 | return !t.IsZero() && t.Before(since) |
| 37 | } |
| 38 | |
| 39 | // isConnectedFullySince checks whether the local member is connected to all |
| 40 | // members in the cluster since the given time. |
| 41 | func isConnectedFullySince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool { |
| 42 | return numConnectedSince(transport, since, self, members) == len(members) |
| 43 | } |
| 44 | |
| 45 | // numConnectedSince counts how many members are connected to the local member |
| 46 | // since the given time. |
| 47 | func numConnectedSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) int { |
| 48 | connectedNum := 0 |
| 49 | for _, m := range members { |
| 50 | if m.ID == self || isConnectedSince(transport, since, m.ID) { |
| 51 | connectedNum++ |
| 52 | } |
| 53 | } |
| 54 | return connectedNum |
| 55 | } |
| 56 | |
| 57 | // longestConnected chooses the member with longest active-since-time. |
| 58 | // It returns false, if nothing is active. |
| 59 | func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool) { |
| 60 | var longest types.ID |
| 61 | var oldest time.Time |
| 62 | for _, id := range membs { |
| 63 | tm := tp.ActiveSince(id) |
| 64 | if tm.IsZero() { // inactive |
| 65 | continue |
| 66 | } |
| 67 | |
| 68 | if oldest.IsZero() { // first longest candidate |
| 69 | oldest = tm |
| 70 | longest = id |
| 71 | } |
| 72 | |
| 73 | if tm.Before(oldest) { |
| 74 | oldest = tm |
| 75 | longest = id |
| 76 | } |
| 77 | } |
| 78 | if uint64(longest) == 0 { |
| 79 | return longest, false |
| 80 | } |
| 81 | return longest, true |
| 82 | } |
| 83 | |
| 84 | type notifier struct { |
| 85 | c chan struct{} |
| 86 | err error |
| 87 | } |
| 88 | |
| 89 | func newNotifier() *notifier { |
| 90 | return ¬ifier{ |
| 91 | c: make(chan struct{}), |
| 92 | } |
| 93 | } |
| 94 | |
| 95 | func (nc *notifier) notify(err error) { |
| 96 | nc.err = err |
| 97 | close(nc.c) |
| 98 | } |
| 99 | |
| 100 | // panicAlternativeStringer wraps a fmt.Stringer, and if calling String() panics, calls the alternative instead. |
| 101 | // This is needed to ensure logging slow v2 requests does not panic, which occurs when running integration tests |
| 102 | // with the embedded server with github.com/golang/protobuf v1.4.0+. See https://github.com/etcd-io/etcd/issues/12197. |
| 103 | type panicAlternativeStringer struct { |
| 104 | stringer fmt.Stringer |
| 105 | alternative func() string |
| 106 | } |
| 107 | |
| 108 | func (n panicAlternativeStringer) String() (s string) { |
| 109 | defer func() { |
| 110 | if err := recover(); err != nil { |
| 111 | s = n.alternative() |
| 112 | } |
| 113 | }() |
| 114 | s = n.stringer.String() |
| 115 | return s |
| 116 | } |