blob: 02f975951242d4b48616573593a73c15d905e32d [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "io"
24 "sync"
25
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/internal/channelz"
Akash Kankanala761955c2024-02-21 19:32:20 +053029 istatus "google.golang.org/grpc/internal/status"
khenaidoo5fc5cea2021-08-11 17:39:16 -040030 "google.golang.org/grpc/internal/transport"
31 "google.golang.org/grpc/status"
32)
33
34// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
35// actions and unblock when there's a picker update.
36type pickerWrapper struct {
37 mu sync.Mutex
38 done bool
Akash Kankanala761955c2024-02-21 19:32:20 +053039 idle bool
khenaidoo5fc5cea2021-08-11 17:39:16 -040040 blockingCh chan struct{}
41 picker balancer.Picker
42}
43
44func newPickerWrapper() *pickerWrapper {
45 return &pickerWrapper{blockingCh: make(chan struct{})}
46}
47
48// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
49func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
50 pw.mu.Lock()
Akash Kankanala761955c2024-02-21 19:32:20 +053051 if pw.done || pw.idle {
52 // There is a small window where a picker update from the LB policy can
53 // race with the channel going to idle mode. If the picker is idle here,
54 // it is because the channel asked it to do so, and therefore it is sage
55 // to ignore the update from the LB policy.
khenaidoo5fc5cea2021-08-11 17:39:16 -040056 pw.mu.Unlock()
57 return
58 }
59 pw.picker = p
60 // pw.blockingCh should never be nil.
61 close(pw.blockingCh)
62 pw.blockingCh = make(chan struct{})
63 pw.mu.Unlock()
64}
65
Akash Kankanala761955c2024-02-21 19:32:20 +053066// doneChannelzWrapper performs the following:
67// - increments the calls started channelz counter
68// - wraps the done function in the passed in result to increment the calls
69// failed or calls succeeded channelz counter before invoking the actual
70// done function.
71func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
72 ac := acbw.ac
khenaidoo5fc5cea2021-08-11 17:39:16 -040073 ac.incrCallsStarted()
Akash Kankanala761955c2024-02-21 19:32:20 +053074 done := result.Done
75 result.Done = func(b balancer.DoneInfo) {
khenaidoo5fc5cea2021-08-11 17:39:16 -040076 if b.Err != nil && b.Err != io.EOF {
77 ac.incrCallsFailed()
78 } else {
79 ac.incrCallsSucceeded()
80 }
81 if done != nil {
82 done(b)
83 }
84 }
85}
86
87// pick returns the transport that will be used for the RPC.
88// It may block in the following cases:
89// - there's no picker
90// - the current picker returns ErrNoSubConnAvailable
91// - the current picker returns other errors and failfast is false.
92// - the subConn returned by the current picker is not READY
93// When one of these situations happens, pick blocks until the picker gets updated.
Akash Kankanala761955c2024-02-21 19:32:20 +053094func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
khenaidoo5fc5cea2021-08-11 17:39:16 -040095 var ch chan struct{}
96
97 var lastPickErr error
98 for {
99 pw.mu.Lock()
100 if pw.done {
101 pw.mu.Unlock()
Akash Kankanala761955c2024-02-21 19:32:20 +0530102 return nil, balancer.PickResult{}, ErrClientConnClosing
khenaidoo5fc5cea2021-08-11 17:39:16 -0400103 }
104
105 if pw.picker == nil {
106 ch = pw.blockingCh
107 }
108 if ch == pw.blockingCh {
109 // This could happen when either:
110 // - pw.picker is nil (the previous if condition), or
111 // - has called pick on the current picker.
112 pw.mu.Unlock()
113 select {
114 case <-ctx.Done():
115 var errStr string
116 if lastPickErr != nil {
117 errStr = "latest balancer error: " + lastPickErr.Error()
118 } else {
119 errStr = ctx.Err().Error()
120 }
121 switch ctx.Err() {
122 case context.DeadlineExceeded:
Akash Kankanala761955c2024-02-21 19:32:20 +0530123 return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400124 case context.Canceled:
Akash Kankanala761955c2024-02-21 19:32:20 +0530125 return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400126 }
127 case <-ch:
128 }
129 continue
130 }
131
132 ch = pw.blockingCh
133 p := pw.picker
134 pw.mu.Unlock()
135
136 pickResult, err := p.Pick(info)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400137 if err != nil {
138 if err == balancer.ErrNoSubConnAvailable {
139 continue
140 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530141 if st, ok := status.FromError(err); ok {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400142 // Status error: end the RPC unconditionally with this status.
Akash Kankanala761955c2024-02-21 19:32:20 +0530143 // First restrict the code to the list allowed by gRFC A54.
144 if istatus.IsRestrictedControlPlaneCode(st) {
145 err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
146 }
147 return nil, balancer.PickResult{}, dropError{error: err}
khenaidoo5fc5cea2021-08-11 17:39:16 -0400148 }
149 // For all other errors, wait for ready RPCs should block and other
150 // RPCs should fail with unavailable.
151 if !failfast {
152 lastPickErr = err
153 continue
154 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530155 return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
khenaidoo5fc5cea2021-08-11 17:39:16 -0400156 }
157
Akash Kankanala761955c2024-02-21 19:32:20 +0530158 acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400159 if !ok {
khenaidoo5cb0d402021-12-08 14:09:16 -0500160 logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400161 continue
162 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530163 if t := acbw.ac.getReadyTransport(); t != nil {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400164 if channelz.IsOn() {
Akash Kankanala761955c2024-02-21 19:32:20 +0530165 doneChannelzWrapper(acbw, &pickResult)
166 return t, pickResult, nil
khenaidoo5fc5cea2021-08-11 17:39:16 -0400167 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530168 return t, pickResult, nil
khenaidoo5fc5cea2021-08-11 17:39:16 -0400169 }
170 if pickResult.Done != nil {
171 // Calling done with nil error, no bytes sent and no bytes received.
172 // DoneInfo with default value works.
173 pickResult.Done(balancer.DoneInfo{})
174 }
175 logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
176 // If ok == false, ac.state is not READY.
177 // A valid picker always returns READY subConn. This means the state of ac
178 // just changed, and picker will be updated shortly.
179 // continue back to the beginning of the for loop to repick.
180 }
181}
182
183func (pw *pickerWrapper) close() {
184 pw.mu.Lock()
185 defer pw.mu.Unlock()
186 if pw.done {
187 return
188 }
189 pw.done = true
190 close(pw.blockingCh)
191}
Akash Kankanala761955c2024-02-21 19:32:20 +0530192
193func (pw *pickerWrapper) enterIdleMode() {
194 pw.mu.Lock()
195 defer pw.mu.Unlock()
196 if pw.done {
197 return
198 }
199 pw.idle = true
200}
201
202func (pw *pickerWrapper) exitIdleMode() {
203 pw.mu.Lock()
204 defer pw.mu.Unlock()
205 if pw.done {
206 return
207 }
208 pw.blockingCh = make(chan struct{})
209 pw.idle = false
210}
211
212// dropError is a wrapper error that indicates the LB policy wishes to drop the
213// RPC and not retry it.
214type dropError struct {
215 error
216}