blob: 0a57332ac7893d608b6a084dd95c75d62e1e77e8 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "sync"
20
21 "google.golang.org/grpc"
22
23 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
24)
25
26// Txn is the interface that wraps mini-transactions.
27//
28// Txn(context.TODO()).If(
29// Compare(Value(k1), ">", v1),
30// Compare(Version(k1), "=", 2)
31// ).Then(
32// OpPut(k2,v2), OpPut(k3,v3)
33// ).Else(
34// OpPut(k4,v4), OpPut(k5,v5)
35// ).Commit()
36type Txn interface {
37 // If takes a list of comparison. If all comparisons passed in succeed,
38 // the operations passed into Then() will be executed. Or the operations
39 // passed into Else() will be executed.
40 If(cs ...Cmp) Txn
41
42 // Then takes a list of operations. The Ops list will be executed, if the
43 // comparisons passed in If() succeed.
44 Then(ops ...Op) Txn
45
46 // Else takes a list of operations. The Ops list will be executed, if the
47 // comparisons passed in If() fail.
48 Else(ops ...Op) Txn
49
50 // Commit tries to commit the transaction.
51 Commit() (*TxnResponse, error)
52}
53
54type txn struct {
55 kv *kv
56 ctx context.Context
57
58 mu sync.Mutex
59 cif bool
60 cthen bool
61 celse bool
62
63 isWrite bool
64
65 cmps []*pb.Compare
66
67 sus []*pb.RequestOp
68 fas []*pb.RequestOp
69
70 callOpts []grpc.CallOption
71}
72
73func (txn *txn) If(cs ...Cmp) Txn {
74 txn.mu.Lock()
75 defer txn.mu.Unlock()
76
77 if txn.cif {
78 panic("cannot call If twice!")
79 }
80
81 if txn.cthen {
82 panic("cannot call If after Then!")
83 }
84
85 if txn.celse {
86 panic("cannot call If after Else!")
87 }
88
89 txn.cif = true
90
91 for i := range cs {
92 txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i]))
93 }
94
95 return txn
96}
97
98func (txn *txn) Then(ops ...Op) Txn {
99 txn.mu.Lock()
100 defer txn.mu.Unlock()
101
102 if txn.cthen {
103 panic("cannot call Then twice!")
104 }
105 if txn.celse {
106 panic("cannot call Then after Else!")
107 }
108
109 txn.cthen = true
110
111 for _, op := range ops {
112 txn.isWrite = txn.isWrite || op.isWrite()
113 txn.sus = append(txn.sus, op.toRequestOp())
114 }
115
116 return txn
117}
118
119func (txn *txn) Else(ops ...Op) Txn {
120 txn.mu.Lock()
121 defer txn.mu.Unlock()
122
123 if txn.celse {
124 panic("cannot call Else twice!")
125 }
126
127 txn.celse = true
128
129 for _, op := range ops {
130 txn.isWrite = txn.isWrite || op.isWrite()
131 txn.fas = append(txn.fas, op.toRequestOp())
132 }
133
134 return txn
135}
136
137func (txn *txn) Commit() (*TxnResponse, error) {
138 txn.mu.Lock()
139 defer txn.mu.Unlock()
140
141 r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
142
143 var resp *pb.TxnResponse
144 var err error
145 resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
146 if err != nil {
147 return nil, ContextError(txn.ctx, err)
148 }
149 return (*TxnResponse)(resp), nil
150}