khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 1 | /* |
| 2 | * |
| 3 | * Copyright 2017 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | package grpc |
| 20 | |
| 21 | import ( |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 22 | "context" |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 23 | "fmt" |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 24 | "strings" |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 25 | "sync" |
| 26 | |
| 27 | "google.golang.org/grpc/balancer" |
| 28 | "google.golang.org/grpc/connectivity" |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 29 | "google.golang.org/grpc/internal/balancer/gracefulswitch" |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 30 | "google.golang.org/grpc/internal/channelz" |
| 31 | "google.golang.org/grpc/internal/grpcsync" |
| 32 | "google.golang.org/grpc/resolver" |
| 33 | ) |
| 34 | |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 35 | type ccbMode int |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 36 | |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 37 | const ( |
| 38 | ccbModeActive = iota |
| 39 | ccbModeIdle |
| 40 | ccbModeClosed |
| 41 | ccbModeExitingIdle |
| 42 | ) |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 43 | |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 44 | // ccBalancerWrapper sits between the ClientConn and the Balancer. |
| 45 | // |
| 46 | // ccBalancerWrapper implements methods corresponding to the ones on the |
| 47 | // balancer.Balancer interface. The ClientConn is free to call these methods |
| 48 | // concurrently and the ccBalancerWrapper ensures that calls from the ClientConn |
| 49 | // to the Balancer happen synchronously and in order. |
| 50 | // |
| 51 | // ccBalancerWrapper also implements the balancer.ClientConn interface and is |
| 52 | // passed to the Balancer implementations. It invokes unexported methods on the |
| 53 | // ClientConn to handle these calls from the Balancer. |
| 54 | // |
| 55 | // It uses the gracefulswitch.Balancer internally to ensure that balancer |
| 56 | // switches happen in a graceful manner. |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 57 | type ccBalancerWrapper struct { |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 58 | // The following fields are initialized when the wrapper is created and are |
| 59 | // read-only afterwards, and therefore can be accessed without a mutex. |
| 60 | cc *ClientConn |
| 61 | opts balancer.BuildOptions |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 62 | |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 63 | // Outgoing (gRPC --> balancer) calls are guaranteed to execute in a |
| 64 | // mutually exclusive manner as they are scheduled in the serializer. Fields |
| 65 | // accessed *only* in these serializer callbacks, can therefore be accessed |
| 66 | // without a mutex. |
| 67 | balancer *gracefulswitch.Balancer |
| 68 | curBalancerName string |
| 69 | |
| 70 | // mu guards access to the below fields. Access to the serializer and its |
| 71 | // cancel function needs to be mutex protected because they are overwritten |
| 72 | // when the wrapper exits idle mode. |
| 73 | mu sync.Mutex |
| 74 | serializer *grpcsync.CallbackSerializer // To serialize all outoing calls. |
| 75 | serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time. |
| 76 | mode ccbMode // Tracks the current mode of the wrapper. |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 77 | } |
| 78 | |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 79 | // newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer |
| 80 | // is not created until the switchTo() method is invoked. |
| 81 | func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper { |
| 82 | ctx, cancel := context.WithCancel(context.Background()) |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 83 | ccb := &ccBalancerWrapper{ |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 84 | cc: cc, |
| 85 | opts: bopts, |
| 86 | serializer: grpcsync.NewCallbackSerializer(ctx), |
| 87 | serializerCancel: cancel, |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 88 | } |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 89 | ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts) |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 90 | return ccb |
| 91 | } |
| 92 | |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 93 | // updateClientConnState is invoked by grpc to push a ClientConnState update to |
| 94 | // the underlying balancer. |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 95 | func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 96 | ccb.mu.Lock() |
| 97 | errCh := make(chan error, 1) |
| 98 | // Here and everywhere else where Schedule() is called, it is done with the |
| 99 | // lock held. But the lock guards only the scheduling part. The actual |
| 100 | // callback is called asynchronously without the lock being held. |
| 101 | ok := ccb.serializer.Schedule(func(_ context.Context) { |
| 102 | // If the addresses specified in the update contain addresses of type |
| 103 | // "grpclb" and the selected LB policy is not "grpclb", these addresses |
| 104 | // will be filtered out and ccs will be modified with the updated |
| 105 | // address list. |
| 106 | if ccb.curBalancerName != grpclbName { |
| 107 | var addrs []resolver.Address |
| 108 | for _, addr := range ccs.ResolverState.Addresses { |
| 109 | if addr.Type == resolver.GRPCLB { |
| 110 | continue |
| 111 | } |
| 112 | addrs = append(addrs, addr) |
| 113 | } |
| 114 | ccs.ResolverState.Addresses = addrs |
| 115 | } |
| 116 | errCh <- ccb.balancer.UpdateClientConnState(*ccs) |
| 117 | }) |
| 118 | if !ok { |
| 119 | // If we are unable to schedule a function with the serializer, it |
| 120 | // indicates that it has been closed. A serializer is only closed when |
| 121 | // the wrapper is closed or is in idle. |
| 122 | ccb.mu.Unlock() |
| 123 | return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer") |
| 124 | } |
| 125 | ccb.mu.Unlock() |
| 126 | |
| 127 | // We get here only if the above call to Schedule succeeds, in which case it |
| 128 | // is guaranteed that the scheduled function will run. Therefore it is safe |
| 129 | // to block on this channel. |
| 130 | err := <-errCh |
| 131 | if logger.V(2) && err != nil { |
| 132 | logger.Infof("error from balancer.UpdateClientConnState: %v", err) |
| 133 | } |
| 134 | return err |
| 135 | } |
| 136 | |
| 137 | // updateSubConnState is invoked by grpc to push a subConn state update to the |
| 138 | // underlying balancer. |
| 139 | func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { |
| 140 | ccb.mu.Lock() |
| 141 | ccb.serializer.Schedule(func(_ context.Context) { |
| 142 | ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) |
| 143 | }) |
| 144 | ccb.mu.Unlock() |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 145 | } |
| 146 | |
| 147 | func (ccb *ccBalancerWrapper) resolverError(err error) { |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 148 | ccb.mu.Lock() |
| 149 | ccb.serializer.Schedule(func(_ context.Context) { |
| 150 | ccb.balancer.ResolverError(err) |
| 151 | }) |
| 152 | ccb.mu.Unlock() |
| 153 | } |
| 154 | |
| 155 | // switchTo is invoked by grpc to instruct the balancer wrapper to switch to the |
| 156 | // LB policy identified by name. |
| 157 | // |
| 158 | // ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the |
| 159 | // first good update from the name resolver, it determines the LB policy to use |
| 160 | // and invokes the switchTo() method. Upon receipt of every subsequent update |
| 161 | // from the name resolver, it invokes this method. |
| 162 | // |
| 163 | // the ccBalancerWrapper keeps track of the current LB policy name, and skips |
| 164 | // the graceful balancer switching process if the name does not change. |
| 165 | func (ccb *ccBalancerWrapper) switchTo(name string) { |
| 166 | ccb.mu.Lock() |
| 167 | ccb.serializer.Schedule(func(_ context.Context) { |
| 168 | // TODO: Other languages use case-sensitive balancer registries. We should |
| 169 | // switch as well. See: https://github.com/grpc/grpc-go/issues/5288. |
| 170 | if strings.EqualFold(ccb.curBalancerName, name) { |
| 171 | return |
| 172 | } |
| 173 | ccb.buildLoadBalancingPolicy(name) |
| 174 | }) |
| 175 | ccb.mu.Unlock() |
| 176 | } |
| 177 | |
| 178 | // buildLoadBalancingPolicy performs the following: |
| 179 | // - retrieve a balancer builder for the given name. Use the default LB |
| 180 | // policy, pick_first, if no LB policy with name is found in the registry. |
| 181 | // - instruct the gracefulswitch balancer to switch to the above builder. This |
| 182 | // will actually build the new balancer. |
| 183 | // - update the `curBalancerName` field |
| 184 | // |
| 185 | // Must be called from a serializer callback. |
| 186 | func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { |
| 187 | builder := balancer.Get(name) |
| 188 | if builder == nil { |
| 189 | channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) |
| 190 | builder = newPickfirstBuilder() |
| 191 | } else { |
| 192 | channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) |
| 193 | } |
| 194 | |
| 195 | if err := ccb.balancer.SwitchTo(builder); err != nil { |
| 196 | channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) |
| 197 | return |
| 198 | } |
| 199 | ccb.curBalancerName = builder.Name() |
| 200 | } |
| 201 | |
| 202 | func (ccb *ccBalancerWrapper) close() { |
| 203 | channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") |
| 204 | ccb.closeBalancer(ccbModeClosed) |
| 205 | } |
| 206 | |
| 207 | // enterIdleMode is invoked by grpc when the channel enters idle mode upon |
| 208 | // expiry of idle_timeout. This call blocks until the balancer is closed. |
| 209 | func (ccb *ccBalancerWrapper) enterIdleMode() { |
| 210 | channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode") |
| 211 | ccb.closeBalancer(ccbModeIdle) |
| 212 | } |
| 213 | |
| 214 | // closeBalancer is invoked when the channel is being closed or when it enters |
| 215 | // idle mode upon expiry of idle_timeout. |
| 216 | func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) { |
| 217 | ccb.mu.Lock() |
| 218 | if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle { |
| 219 | ccb.mu.Unlock() |
| 220 | return |
| 221 | } |
| 222 | |
| 223 | ccb.mode = m |
| 224 | done := ccb.serializer.Done |
| 225 | b := ccb.balancer |
| 226 | ok := ccb.serializer.Schedule(func(_ context.Context) { |
| 227 | // Close the serializer to ensure that no more calls from gRPC are sent |
| 228 | // to the balancer. |
| 229 | ccb.serializerCancel() |
| 230 | // Empty the current balancer name because we don't have a balancer |
| 231 | // anymore and also so that we act on the next call to switchTo by |
| 232 | // creating a new balancer specified by the new resolver. |
| 233 | ccb.curBalancerName = "" |
| 234 | }) |
| 235 | if !ok { |
| 236 | ccb.mu.Unlock() |
| 237 | return |
| 238 | } |
| 239 | ccb.mu.Unlock() |
| 240 | |
| 241 | // Give enqueued callbacks a chance to finish. |
| 242 | <-done |
| 243 | // Spawn a goroutine to close the balancer (since it may block trying to |
| 244 | // cleanup all allocated resources) and return early. |
| 245 | go b.Close() |
| 246 | } |
| 247 | |
| 248 | // exitIdleMode is invoked by grpc when the channel exits idle mode either |
| 249 | // because of an RPC or because of an invocation of the Connect() API. This |
| 250 | // recreates the balancer that was closed previously when entering idle mode. |
| 251 | // |
| 252 | // If the channel is not in idle mode, we know for a fact that we are here as a |
| 253 | // result of the user calling the Connect() method on the ClientConn. In this |
| 254 | // case, we can simply forward the call to the underlying balancer, instructing |
| 255 | // it to reconnect to the backends. |
| 256 | func (ccb *ccBalancerWrapper) exitIdleMode() { |
| 257 | ccb.mu.Lock() |
| 258 | if ccb.mode == ccbModeClosed { |
| 259 | // Request to exit idle is a no-op when wrapper is already closed. |
| 260 | ccb.mu.Unlock() |
| 261 | return |
| 262 | } |
| 263 | |
| 264 | if ccb.mode == ccbModeIdle { |
| 265 | // Recreate the serializer which was closed when we entered idle. |
| 266 | ctx, cancel := context.WithCancel(context.Background()) |
| 267 | ccb.serializer = grpcsync.NewCallbackSerializer(ctx) |
| 268 | ccb.serializerCancel = cancel |
| 269 | } |
| 270 | |
| 271 | // The ClientConn guarantees that mutual exclusion between close() and |
| 272 | // exitIdleMode(), and since we just created a new serializer, we can be |
| 273 | // sure that the below function will be scheduled. |
| 274 | done := make(chan struct{}) |
| 275 | ccb.serializer.Schedule(func(_ context.Context) { |
| 276 | defer close(done) |
| 277 | |
| 278 | ccb.mu.Lock() |
| 279 | defer ccb.mu.Unlock() |
| 280 | |
| 281 | if ccb.mode != ccbModeIdle { |
| 282 | ccb.balancer.ExitIdle() |
| 283 | return |
| 284 | } |
| 285 | |
| 286 | // Gracefulswitch balancer does not support a switchTo operation after |
| 287 | // being closed. Hence we need to create a new one here. |
| 288 | ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) |
| 289 | ccb.mode = ccbModeActive |
| 290 | channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") |
| 291 | |
| 292 | }) |
| 293 | ccb.mu.Unlock() |
| 294 | |
| 295 | <-done |
| 296 | } |
| 297 | |
| 298 | func (ccb *ccBalancerWrapper) isIdleOrClosed() bool { |
| 299 | ccb.mu.Lock() |
| 300 | defer ccb.mu.Unlock() |
| 301 | return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 302 | } |
| 303 | |
| 304 | func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 305 | if ccb.isIdleOrClosed() { |
| 306 | return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle") |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 307 | } |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 308 | |
| 309 | if len(addrs) == 0 { |
| 310 | return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 311 | } |
| 312 | ac, err := ccb.cc.newAddrConn(addrs, opts) |
| 313 | if err != nil { |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 314 | channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 315 | return nil, err |
| 316 | } |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 317 | acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)} |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 318 | ac.acbw = acbw |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 319 | return acbw, nil |
| 320 | } |
| 321 | |
| 322 | func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 323 | if ccb.isIdleOrClosed() { |
| 324 | // It it safe to ignore this call when the balancer is closed or in idle |
| 325 | // because the ClientConn takes care of closing the connections. |
| 326 | // |
| 327 | // Not returning early from here when the balancer is closed or in idle |
| 328 | // leads to a deadlock though, because of the following sequence of |
| 329 | // calls when holding cc.mu: |
| 330 | // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> |
| 331 | // ccb.RemoveAddrConn --> cc.removeAddrConn |
| 332 | return |
| 333 | } |
| 334 | |
| 335 | acbw, ok := sc.(*acBalancerWrapper) |
| 336 | if !ok { |
| 337 | return |
| 338 | } |
| 339 | ccb.cc.removeAddrConn(acbw.ac, errConnDrain) |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 340 | } |
| 341 | |
| 342 | func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 343 | if ccb.isIdleOrClosed() { |
| 344 | return |
| 345 | } |
| 346 | |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 347 | acbw, ok := sc.(*acBalancerWrapper) |
| 348 | if !ok { |
| 349 | return |
| 350 | } |
| 351 | acbw.UpdateAddresses(addrs) |
| 352 | } |
| 353 | |
| 354 | func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 355 | if ccb.isIdleOrClosed() { |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 356 | return |
| 357 | } |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 358 | |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 359 | // Update picker before updating state. Even though the ordering here does |
| 360 | // not matter, it can lead to multiple calls of Pick in the common start-up |
| 361 | // case where we wait for ready and then perform an RPC. If the picker is |
| 362 | // updated later, we could call the "connecting" picker when the state is |
| 363 | // updated, and then call the "ready" picker after the picker gets updated. |
| 364 | ccb.cc.blockingpicker.updatePicker(s.Picker) |
| 365 | ccb.cc.csMgr.updateState(s.ConnectivityState) |
| 366 | } |
| 367 | |
| 368 | func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 369 | if ccb.isIdleOrClosed() { |
| 370 | return |
| 371 | } |
| 372 | |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 373 | ccb.cc.resolveNow(o) |
| 374 | } |
| 375 | |
| 376 | func (ccb *ccBalancerWrapper) Target() string { |
| 377 | return ccb.cc.target |
| 378 | } |
| 379 | |
| 380 | // acBalancerWrapper is a wrapper on top of ac for balancers. |
| 381 | // It implements balancer.SubConn interface. |
| 382 | type acBalancerWrapper struct { |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 383 | ac *addrConn // read-only |
| 384 | |
| 385 | mu sync.Mutex |
| 386 | producers map[balancer.ProducerBuilder]*refCountedProducer |
| 387 | } |
| 388 | |
| 389 | func (acbw *acBalancerWrapper) String() string { |
| 390 | return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int()) |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 391 | } |
| 392 | |
| 393 | func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 394 | acbw.ac.updateAddrs(addrs) |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 395 | } |
| 396 | |
| 397 | func (acbw *acBalancerWrapper) Connect() { |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 398 | go acbw.ac.connect() |
| 399 | } |
| 400 | |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 401 | // NewStream begins a streaming RPC on the addrConn. If the addrConn is not |
| 402 | // ready, blocks until it is or ctx expires. Returns an error when the context |
| 403 | // expires or the addrConn is shut down. |
| 404 | func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { |
| 405 | transport, err := acbw.ac.getTransport(ctx) |
| 406 | if err != nil { |
| 407 | return nil, err |
| 408 | } |
| 409 | return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...) |
| 410 | } |
| 411 | |
| 412 | // Invoke performs a unary RPC. If the addrConn is not ready, returns |
| 413 | // errSubConnNotReady. |
| 414 | func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error { |
| 415 | cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...) |
| 416 | if err != nil { |
| 417 | return err |
| 418 | } |
| 419 | if err := cs.SendMsg(args); err != nil { |
| 420 | return err |
| 421 | } |
| 422 | return cs.RecvMsg(reply) |
| 423 | } |
| 424 | |
| 425 | type refCountedProducer struct { |
| 426 | producer balancer.Producer |
| 427 | refs int // number of current refs to the producer |
| 428 | close func() // underlying producer's close function |
| 429 | } |
| 430 | |
| 431 | func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) { |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 432 | acbw.mu.Lock() |
| 433 | defer acbw.mu.Unlock() |
Akash Kankanala | 761955c | 2024-02-21 19:32:20 +0530 | [diff] [blame^] | 434 | |
| 435 | // Look up existing producer from this builder. |
| 436 | pData := acbw.producers[pb] |
| 437 | if pData == nil { |
| 438 | // Not found; create a new one and add it to the producers map. |
| 439 | p, close := pb.Build(acbw) |
| 440 | pData = &refCountedProducer{producer: p, close: close} |
| 441 | acbw.producers[pb] = pData |
| 442 | } |
| 443 | // Account for this new reference. |
| 444 | pData.refs++ |
| 445 | |
| 446 | // Return a cleanup function wrapped in a OnceFunc to remove this reference |
| 447 | // and delete the refCountedProducer from the map if the total reference |
| 448 | // count goes to zero. |
| 449 | unref := func() { |
| 450 | acbw.mu.Lock() |
| 451 | pData.refs-- |
| 452 | if pData.refs == 0 { |
| 453 | defer pData.close() // Run outside the acbw mutex |
| 454 | delete(acbw.producers, pb) |
| 455 | } |
| 456 | acbw.mu.Unlock() |
| 457 | } |
| 458 | return pData.producer, grpcsync.OnceFunc(unref) |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 459 | } |