blob: aa52bfe95fd8506f192d44d681861ec00abc31be [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
Abhay Kumar03713392025-12-30 05:20:58 +000023 "fmt"
khenaidoo5fc5cea2021-08-11 17:39:16 -040024 "io"
Abhay Kumar03713392025-12-30 05:20:58 +000025 "sync/atomic"
khenaidoo5fc5cea2021-08-11 17:39:16 -040026
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/internal/channelz"
Akash Kankanala761955c2024-02-21 19:32:20 +053030 istatus "google.golang.org/grpc/internal/status"
khenaidoo5fc5cea2021-08-11 17:39:16 -040031 "google.golang.org/grpc/internal/transport"
32 "google.golang.org/grpc/status"
33)
34
Abhay Kumar03713392025-12-30 05:20:58 +000035// pickerGeneration stores a picker and a channel used to signal that a picker
36// newer than this one is available.
37type pickerGeneration struct {
38 // picker is the picker produced by the LB policy. May be nil if a picker
39 // has never been produced.
40 picker balancer.Picker
41 // blockingCh is closed when the picker has been invalidated because there
42 // is a new one available.
43 blockingCh chan struct{}
44}
45
khenaidoo5fc5cea2021-08-11 17:39:16 -040046// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
47// actions and unblock when there's a picker update.
48type pickerWrapper struct {
Abhay Kumar03713392025-12-30 05:20:58 +000049 // If pickerGen holds a nil pointer, the pickerWrapper is closed.
50 pickerGen atomic.Pointer[pickerGeneration]
khenaidoo5fc5cea2021-08-11 17:39:16 -040051}
52
53func newPickerWrapper() *pickerWrapper {
Abhay Kumar03713392025-12-30 05:20:58 +000054 pw := &pickerWrapper{}
55 pw.pickerGen.Store(&pickerGeneration{
56 blockingCh: make(chan struct{}),
57 })
58 return pw
khenaidoo5fc5cea2021-08-11 17:39:16 -040059}
60
Abhay Kumar03713392025-12-30 05:20:58 +000061// updatePicker is called by UpdateState calls from the LB policy. It
62// unblocks all blocked pick.
khenaidoo5fc5cea2021-08-11 17:39:16 -040063func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
Abhay Kumar03713392025-12-30 05:20:58 +000064 old := pw.pickerGen.Swap(&pickerGeneration{
65 picker: p,
66 blockingCh: make(chan struct{}),
67 })
68 close(old.blockingCh)
khenaidoo5fc5cea2021-08-11 17:39:16 -040069}
70
Akash Kankanala761955c2024-02-21 19:32:20 +053071// doneChannelzWrapper performs the following:
72// - increments the calls started channelz counter
73// - wraps the done function in the passed in result to increment the calls
74// failed or calls succeeded channelz counter before invoking the actual
75// done function.
76func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
77 ac := acbw.ac
khenaidoo5fc5cea2021-08-11 17:39:16 -040078 ac.incrCallsStarted()
Akash Kankanala761955c2024-02-21 19:32:20 +053079 done := result.Done
80 result.Done = func(b balancer.DoneInfo) {
khenaidoo5fc5cea2021-08-11 17:39:16 -040081 if b.Err != nil && b.Err != io.EOF {
82 ac.incrCallsFailed()
83 } else {
84 ac.incrCallsSucceeded()
85 }
86 if done != nil {
87 done(b)
88 }
89 }
90}
91
Abhay Kumar03713392025-12-30 05:20:58 +000092type pick struct {
93 transport transport.ClientTransport // the selected transport
94 result balancer.PickResult // the contents of the pick from the LB policy
95 blocked bool // set if a picker call queued for a new picker
96}
97
khenaidoo5fc5cea2021-08-11 17:39:16 -040098// pick returns the transport that will be used for the RPC.
99// It may block in the following cases:
100// - there's no picker
101// - the current picker returns ErrNoSubConnAvailable
102// - the current picker returns other errors and failfast is false.
103// - the subConn returned by the current picker is not READY
104// When one of these situations happens, pick blocks until the picker gets updated.
Abhay Kumar03713392025-12-30 05:20:58 +0000105func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (pick, error) {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400106 var ch chan struct{}
107
108 var lastPickErr error
Abhay Kumar03713392025-12-30 05:20:58 +0000109 pickBlocked := false
khenaidoo5fc5cea2021-08-11 17:39:16 -0400110
Abhay Kumar03713392025-12-30 05:20:58 +0000111 for {
112 pg := pw.pickerGen.Load()
113 if pg == nil {
114 return pick{}, ErrClientConnClosing
khenaidoo5fc5cea2021-08-11 17:39:16 -0400115 }
Abhay Kumar03713392025-12-30 05:20:58 +0000116 if pg.picker == nil {
117 ch = pg.blockingCh
118 }
119 if ch == pg.blockingCh {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400120 // This could happen when either:
121 // - pw.picker is nil (the previous if condition), or
Abhay Kumar03713392025-12-30 05:20:58 +0000122 // - we have already called pick on the current picker.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400123 select {
124 case <-ctx.Done():
125 var errStr string
126 if lastPickErr != nil {
127 errStr = "latest balancer error: " + lastPickErr.Error()
128 } else {
Abhay Kumar03713392025-12-30 05:20:58 +0000129 errStr = fmt.Sprintf("%v while waiting for connections to become ready", ctx.Err())
khenaidoo5fc5cea2021-08-11 17:39:16 -0400130 }
131 switch ctx.Err() {
132 case context.DeadlineExceeded:
Abhay Kumar03713392025-12-30 05:20:58 +0000133 return pick{}, status.Error(codes.DeadlineExceeded, errStr)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400134 case context.Canceled:
Abhay Kumar03713392025-12-30 05:20:58 +0000135 return pick{}, status.Error(codes.Canceled, errStr)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400136 }
137 case <-ch:
138 }
139 continue
140 }
141
Abhay Kumar03713392025-12-30 05:20:58 +0000142 // If the channel is set, it means that the pick call had to wait for a
143 // new picker at some point. Either it's the first iteration and this
144 // function received the first picker, or a picker errored with
145 // ErrNoSubConnAvailable or errored with failfast set to false, which
146 // will trigger a continue to the next iteration. In the first case this
147 // conditional will hit if this call had to block (the channel is set).
148 // In the second case, the only way it will get to this conditional is
149 // if there is a new picker.
150 if ch != nil {
151 pickBlocked = true
152 }
153
154 ch = pg.blockingCh
155 p := pg.picker
khenaidoo5fc5cea2021-08-11 17:39:16 -0400156
157 pickResult, err := p.Pick(info)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400158 if err != nil {
159 if err == balancer.ErrNoSubConnAvailable {
160 continue
161 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530162 if st, ok := status.FromError(err); ok {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400163 // Status error: end the RPC unconditionally with this status.
Akash Kankanala761955c2024-02-21 19:32:20 +0530164 // First restrict the code to the list allowed by gRFC A54.
165 if istatus.IsRestrictedControlPlaneCode(st) {
166 err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
167 }
Abhay Kumar03713392025-12-30 05:20:58 +0000168 return pick{}, dropError{error: err}
khenaidoo5fc5cea2021-08-11 17:39:16 -0400169 }
170 // For all other errors, wait for ready RPCs should block and other
171 // RPCs should fail with unavailable.
172 if !failfast {
173 lastPickErr = err
174 continue
175 }
Abhay Kumar03713392025-12-30 05:20:58 +0000176 return pick{}, status.Error(codes.Unavailable, err.Error())
khenaidoo5fc5cea2021-08-11 17:39:16 -0400177 }
178
Akash Kankanala761955c2024-02-21 19:32:20 +0530179 acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400180 if !ok {
khenaidoo5cb0d402021-12-08 14:09:16 -0500181 logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400182 continue
183 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530184 if t := acbw.ac.getReadyTransport(); t != nil {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400185 if channelz.IsOn() {
Akash Kankanala761955c2024-02-21 19:32:20 +0530186 doneChannelzWrapper(acbw, &pickResult)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400187 }
Abhay Kumar03713392025-12-30 05:20:58 +0000188 return pick{transport: t, result: pickResult, blocked: pickBlocked}, nil
khenaidoo5fc5cea2021-08-11 17:39:16 -0400189 }
190 if pickResult.Done != nil {
191 // Calling done with nil error, no bytes sent and no bytes received.
192 // DoneInfo with default value works.
193 pickResult.Done(balancer.DoneInfo{})
194 }
195 logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
196 // If ok == false, ac.state is not READY.
197 // A valid picker always returns READY subConn. This means the state of ac
198 // just changed, and picker will be updated shortly.
199 // continue back to the beginning of the for loop to repick.
200 }
201}
202
203func (pw *pickerWrapper) close() {
Abhay Kumar03713392025-12-30 05:20:58 +0000204 old := pw.pickerGen.Swap(nil)
205 close(old.blockingCh)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400206}
Akash Kankanala761955c2024-02-21 19:32:20 +0530207
Abhay Kumar03713392025-12-30 05:20:58 +0000208// reset clears the pickerWrapper and prepares it for being used again when idle
209// mode is exited.
210func (pw *pickerWrapper) reset() {
211 old := pw.pickerGen.Swap(&pickerGeneration{blockingCh: make(chan struct{})})
212 close(old.blockingCh)
Akash Kankanala761955c2024-02-21 19:32:20 +0530213}
214
215// dropError is a wrapper error that indicates the LB policy wishes to drop the
216// RPC and not retry it.
217type dropError struct {
218 error
219}