| /* |
| * |
| * Copyright 2023 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package grpcsync |
| |
| import ( |
| "context" |
| "sync" |
| ) |
| |
| // Subscriber represents an entity that is subscribed to messages published on |
| // a PubSub. It wraps the callback to be invoked by the PubSub when a new |
| // message is published. |
| type Subscriber interface { |
| // OnMessage is invoked when a new message is published. Implementations |
| // must not block in this method. |
| OnMessage(msg any) |
| } |
| |
| // PubSub is a simple one-to-many publish-subscribe system that supports |
| // messages of arbitrary type. It guarantees that messages are delivered in |
| // the same order in which they were published. |
| // |
| // Publisher invokes the Publish() method to publish new messages, while |
| // subscribers interested in receiving these messages register a callback |
| // via the Subscribe() method. |
| // |
| // Once a PubSub is stopped, no more messages can be published, but any pending |
| // published messages will be delivered to the subscribers. Done may be used |
| // to determine when all published messages have been delivered. |
| type PubSub struct { |
| cs *CallbackSerializer |
| |
| // Access to the below fields are guarded by this mutex. |
| mu sync.Mutex |
| msg any |
| subscribers map[Subscriber]bool |
| } |
| |
| // NewPubSub returns a new PubSub instance. Users should cancel the |
| // provided context to shutdown the PubSub. |
| func NewPubSub(ctx context.Context) *PubSub { |
| return &PubSub{ |
| cs: NewCallbackSerializer(ctx), |
| subscribers: map[Subscriber]bool{}, |
| } |
| } |
| |
| // Subscribe registers the provided Subscriber to the PubSub. |
| // |
| // If the PubSub contains a previously published message, the Subscriber's |
| // OnMessage() callback will be invoked asynchronously with the existing |
| // message to begin with, and subsequently for every newly published message. |
| // |
| // The caller is responsible for invoking the returned cancel function to |
| // unsubscribe itself from the PubSub. |
| func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) { |
| ps.mu.Lock() |
| defer ps.mu.Unlock() |
| |
| ps.subscribers[sub] = true |
| |
| if ps.msg != nil { |
| msg := ps.msg |
| ps.cs.TrySchedule(func(context.Context) { |
| ps.mu.Lock() |
| defer ps.mu.Unlock() |
| if !ps.subscribers[sub] { |
| return |
| } |
| sub.OnMessage(msg) |
| }) |
| } |
| |
| return func() { |
| ps.mu.Lock() |
| defer ps.mu.Unlock() |
| delete(ps.subscribers, sub) |
| } |
| } |
| |
| // Publish publishes the provided message to the PubSub, and invokes |
| // callbacks registered by subscribers asynchronously. |
| func (ps *PubSub) Publish(msg any) { |
| ps.mu.Lock() |
| defer ps.mu.Unlock() |
| |
| ps.msg = msg |
| for sub := range ps.subscribers { |
| s := sub |
| ps.cs.TrySchedule(func(context.Context) { |
| ps.mu.Lock() |
| defer ps.mu.Unlock() |
| if !ps.subscribers[s] { |
| return |
| } |
| s.OnMessage(msg) |
| }) |
| } |
| } |
| |
| // Done returns a channel that is closed after the context passed to NewPubSub |
| // is canceled and all updates have been sent to subscribers. |
| func (ps *PubSub) Done() <-chan struct{} { |
| return ps.cs.Done() |
| } |