blob: aa52bfe95fd8506f192d44d681861ec00abc31be [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
Abhay Kumara2ae5992025-11-10 14:02:24 +000023 "fmt"
khenaidooac637102019-01-14 15:44:34 -050024 "io"
Abhay Kumara2ae5992025-11-10 14:02:24 +000025 "sync/atomic"
khenaidooac637102019-01-14 15:44:34 -050026
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/codes"
khenaidooac637102019-01-14 15:44:34 -050029 "google.golang.org/grpc/internal/channelz"
Abhay Kumara2ae5992025-11-10 14:02:24 +000030 istatus "google.golang.org/grpc/internal/status"
khenaidooac637102019-01-14 15:44:34 -050031 "google.golang.org/grpc/internal/transport"
32 "google.golang.org/grpc/status"
33)
34
Abhay Kumara2ae5992025-11-10 14:02:24 +000035// pickerGeneration stores a picker and a channel used to signal that a picker
36// newer than this one is available.
37type pickerGeneration struct {
38 // picker is the picker produced by the LB policy. May be nil if a picker
39 // has never been produced.
40 picker balancer.Picker
41 // blockingCh is closed when the picker has been invalidated because there
42 // is a new one available.
43 blockingCh chan struct{}
44}
45
khenaidooac637102019-01-14 15:44:34 -050046// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
47// actions and unblock when there's a picker update.
48type pickerWrapper struct {
Abhay Kumara2ae5992025-11-10 14:02:24 +000049 // If pickerGen holds a nil pointer, the pickerWrapper is closed.
50 pickerGen atomic.Pointer[pickerGeneration]
khenaidooac637102019-01-14 15:44:34 -050051}
52
53func newPickerWrapper() *pickerWrapper {
Abhay Kumara2ae5992025-11-10 14:02:24 +000054 pw := &pickerWrapper{}
55 pw.pickerGen.Store(&pickerGeneration{
56 blockingCh: make(chan struct{}),
57 })
58 return pw
khenaidooac637102019-01-14 15:44:34 -050059}
60
Abhay Kumara2ae5992025-11-10 14:02:24 +000061// updatePicker is called by UpdateState calls from the LB policy. It
62// unblocks all blocked pick.
63func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
64 old := pw.pickerGen.Swap(&pickerGeneration{
65 picker: p,
66 blockingCh: make(chan struct{}),
67 })
68 close(old.blockingCh)
khenaidooac637102019-01-14 15:44:34 -050069}
70
Abhay Kumara2ae5992025-11-10 14:02:24 +000071// doneChannelzWrapper performs the following:
72// - increments the calls started channelz counter
73// - wraps the done function in the passed in result to increment the calls
74// failed or calls succeeded channelz counter before invoking the actual
75// done function.
76func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
77 ac := acbw.ac
khenaidooac637102019-01-14 15:44:34 -050078 ac.incrCallsStarted()
Abhay Kumara2ae5992025-11-10 14:02:24 +000079 done := result.Done
80 result.Done = func(b balancer.DoneInfo) {
khenaidooac637102019-01-14 15:44:34 -050081 if b.Err != nil && b.Err != io.EOF {
82 ac.incrCallsFailed()
83 } else {
84 ac.incrCallsSucceeded()
85 }
86 if done != nil {
87 done(b)
88 }
89 }
90}
91
Abhay Kumara2ae5992025-11-10 14:02:24 +000092type pick struct {
93 transport transport.ClientTransport // the selected transport
94 result balancer.PickResult // the contents of the pick from the LB policy
95 blocked bool // set if a picker call queued for a new picker
96}
97
khenaidooac637102019-01-14 15:44:34 -050098// pick returns the transport that will be used for the RPC.
99// It may block in the following cases:
100// - there's no picker
101// - the current picker returns ErrNoSubConnAvailable
102// - the current picker returns other errors and failfast is false.
103// - the subConn returned by the current picker is not READY
104// When one of these situations happens, pick blocks until the picker gets updated.
Abhay Kumara2ae5992025-11-10 14:02:24 +0000105func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (pick, error) {
William Kurkiandaa6bb22019-03-07 12:26:28 -0500106 var ch chan struct{}
khenaidooac637102019-01-14 15:44:34 -0500107
Abhay Kumara2ae5992025-11-10 14:02:24 +0000108 var lastPickErr error
109 pickBlocked := false
khenaidooac637102019-01-14 15:44:34 -0500110
Abhay Kumara2ae5992025-11-10 14:02:24 +0000111 for {
112 pg := pw.pickerGen.Load()
113 if pg == nil {
114 return pick{}, ErrClientConnClosing
khenaidooac637102019-01-14 15:44:34 -0500115 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000116 if pg.picker == nil {
117 ch = pg.blockingCh
118 }
119 if ch == pg.blockingCh {
khenaidooac637102019-01-14 15:44:34 -0500120 // This could happen when either:
Abhay Kumara2ae5992025-11-10 14:02:24 +0000121 // - pw.picker is nil (the previous if condition), or
122 // - we have already called pick on the current picker.
khenaidooac637102019-01-14 15:44:34 -0500123 select {
124 case <-ctx.Done():
Abhay Kumara2ae5992025-11-10 14:02:24 +0000125 var errStr string
126 if lastPickErr != nil {
127 errStr = "latest balancer error: " + lastPickErr.Error()
128 } else {
129 errStr = fmt.Sprintf("%v while waiting for connections to become ready", ctx.Err())
Scott Baker8461e152019-10-01 14:44:30 -0700130 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000131 switch ctx.Err() {
132 case context.DeadlineExceeded:
133 return pick{}, status.Error(codes.DeadlineExceeded, errStr)
134 case context.Canceled:
135 return pick{}, status.Error(codes.Canceled, errStr)
136 }
khenaidooac637102019-01-14 15:44:34 -0500137 case <-ch:
138 }
139 continue
140 }
141
Abhay Kumara2ae5992025-11-10 14:02:24 +0000142 // If the channel is set, it means that the pick call had to wait for a
143 // new picker at some point. Either it's the first iteration and this
144 // function received the first picker, or a picker errored with
145 // ErrNoSubConnAvailable or errored with failfast set to false, which
146 // will trigger a continue to the next iteration. In the first case this
147 // conditional will hit if this call had to block (the channel is set).
148 // In the second case, the only way it will get to this conditional is
149 // if there is a new picker.
150 if ch != nil {
151 pickBlocked = true
khenaidooac637102019-01-14 15:44:34 -0500152 }
153
Abhay Kumara2ae5992025-11-10 14:02:24 +0000154 ch = pg.blockingCh
155 p := pg.picker
156
157 pickResult, err := p.Pick(info)
158 if err != nil {
159 if err == balancer.ErrNoSubConnAvailable {
160 continue
161 }
162 if st, ok := status.FromError(err); ok {
163 // Status error: end the RPC unconditionally with this status.
164 // First restrict the code to the list allowed by gRFC A54.
165 if istatus.IsRestrictedControlPlaneCode(st) {
166 err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
167 }
168 return pick{}, dropError{error: err}
169 }
170 // For all other errors, wait for ready RPCs should block and other
171 // RPCs should fail with unavailable.
172 if !failfast {
173 lastPickErr = err
174 continue
175 }
176 return pick{}, status.Error(codes.Unavailable, err.Error())
177 }
178
179 acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
khenaidooac637102019-01-14 15:44:34 -0500180 if !ok {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000181 logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
khenaidooac637102019-01-14 15:44:34 -0500182 continue
183 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000184 if t := acbw.ac.getReadyTransport(); t != nil {
khenaidooac637102019-01-14 15:44:34 -0500185 if channelz.IsOn() {
Abhay Kumara2ae5992025-11-10 14:02:24 +0000186 doneChannelzWrapper(acbw, &pickResult)
khenaidooac637102019-01-14 15:44:34 -0500187 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000188 return pick{transport: t, result: pickResult, blocked: pickBlocked}, nil
khenaidooac637102019-01-14 15:44:34 -0500189 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000190 if pickResult.Done != nil {
Scott Baker8461e152019-10-01 14:44:30 -0700191 // Calling done with nil error, no bytes sent and no bytes received.
192 // DoneInfo with default value works.
Abhay Kumara2ae5992025-11-10 14:02:24 +0000193 pickResult.Done(balancer.DoneInfo{})
Scott Baker8461e152019-10-01 14:44:30 -0700194 }
Abhay Kumara2ae5992025-11-10 14:02:24 +0000195 logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
khenaidooac637102019-01-14 15:44:34 -0500196 // If ok == false, ac.state is not READY.
197 // A valid picker always returns READY subConn. This means the state of ac
198 // just changed, and picker will be updated shortly.
199 // continue back to the beginning of the for loop to repick.
200 }
201}
202
Abhay Kumara2ae5992025-11-10 14:02:24 +0000203func (pw *pickerWrapper) close() {
204 old := pw.pickerGen.Swap(nil)
205 close(old.blockingCh)
206}
207
208// reset clears the pickerWrapper and prepares it for being used again when idle
209// mode is exited.
210func (pw *pickerWrapper) reset() {
211 old := pw.pickerGen.Swap(&pickerGeneration{blockingCh: make(chan struct{})})
212 close(old.blockingCh)
213}
214
215// dropError is a wrapper error that indicates the LB policy wishes to drop the
216// RPC and not retry it.
217type dropError struct {
218 error
khenaidooac637102019-01-14 15:44:34 -0500219}