blob: 1669fb16a2f1a51ed32eb456383d887286a448f9 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package bbolt
2
3import (
4 "errors"
5 "fmt"
6 "io"
7 "os"
8 "runtime"
9 "sort"
10 "strings"
11 "sync/atomic"
12 "time"
13 "unsafe"
14
15 berrors "go.etcd.io/bbolt/errors"
16 "go.etcd.io/bbolt/internal/common"
17)
18
19// Tx represents a read-only or read/write transaction on the database.
20// Read-only transactions can be used for retrieving values for keys and creating cursors.
21// Read/write transactions can create and remove buckets and create and remove keys.
22//
23// IMPORTANT: You must commit or rollback transactions when you are done with
24// them. Pages can not be reclaimed by the writer until no more transactions
25// are using them. A long running read transaction can cause the database to
26// quickly grow.
27type Tx struct {
28 writable bool
29 managed bool
30 db *DB
31 meta *common.Meta
32 root Bucket
33 pages map[common.Pgid]*common.Page
34 stats TxStats
35 commitHandlers []func()
36
37 // WriteFlag specifies the flag for write-related methods like WriteTo().
38 // Tx opens the database file with the specified flag to copy the data.
39 //
40 // By default, the flag is unset, which works well for mostly in-memory
41 // workloads. For databases that are much larger than available RAM,
42 // set the flag to syscall.O_DIRECT to avoid trashing the page cache.
43 WriteFlag int
44}
45
46// init initializes the transaction.
47func (tx *Tx) init(db *DB) {
48 tx.db = db
49 tx.pages = nil
50
51 // Copy the meta page since it can be changed by the writer.
52 tx.meta = &common.Meta{}
53 db.meta().Copy(tx.meta)
54
55 // Copy over the root bucket.
56 tx.root = newBucket(tx)
57 tx.root.InBucket = &common.InBucket{}
58 *tx.root.InBucket = *(tx.meta.RootBucket())
59
60 // Increment the transaction id and add a page cache for writable transactions.
61 if tx.writable {
62 tx.pages = make(map[common.Pgid]*common.Page)
63 tx.meta.IncTxid()
64 }
65}
66
67// ID returns the transaction id.
68func (tx *Tx) ID() int {
69 if tx == nil || tx.meta == nil {
70 return -1
71 }
72 return int(tx.meta.Txid())
73}
74
75// DB returns a reference to the database that created the transaction.
76func (tx *Tx) DB() *DB {
77 return tx.db
78}
79
80// Size returns current database size in bytes as seen by this transaction.
81func (tx *Tx) Size() int64 {
82 return int64(tx.meta.Pgid()) * int64(tx.db.pageSize)
83}
84
85// Writable returns whether the transaction can perform write operations.
86func (tx *Tx) Writable() bool {
87 return tx.writable
88}
89
90// Cursor creates a cursor associated with the root bucket.
91// All items in the cursor will return a nil value because all root bucket keys point to buckets.
92// The cursor is only valid as long as the transaction is open.
93// Do not use a cursor after the transaction is closed.
94func (tx *Tx) Cursor() *Cursor {
95 return tx.root.Cursor()
96}
97
98// Stats retrieves a copy of the current transaction statistics.
99func (tx *Tx) Stats() TxStats {
100 return tx.stats
101}
102
103// Inspect returns the structure of the database.
104func (tx *Tx) Inspect() BucketStructure {
105 return tx.root.Inspect()
106}
107
108// Bucket retrieves a bucket by name.
109// Returns nil if the bucket does not exist.
110// The bucket instance is only valid for the lifetime of the transaction.
111func (tx *Tx) Bucket(name []byte) *Bucket {
112 return tx.root.Bucket(name)
113}
114
115// CreateBucket creates a new bucket.
116// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
117// The bucket instance is only valid for the lifetime of the transaction.
118func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
119 return tx.root.CreateBucket(name)
120}
121
122// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
123// Returns an error if the bucket name is blank, or if the bucket name is too long.
124// The bucket instance is only valid for the lifetime of the transaction.
125func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
126 return tx.root.CreateBucketIfNotExists(name)
127}
128
129// DeleteBucket deletes a bucket.
130// Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
131func (tx *Tx) DeleteBucket(name []byte) error {
132 return tx.root.DeleteBucket(name)
133}
134
135// MoveBucket moves a sub-bucket from the source bucket to the destination bucket.
136// Returns an error if
137// 1. the sub-bucket cannot be found in the source bucket;
138// 2. or the key already exists in the destination bucket;
139// 3. the key represents a non-bucket value.
140//
141// If src is nil, it means moving a top level bucket into the target bucket.
142// If dst is nil, it means converting the child bucket into a top level bucket.
143func (tx *Tx) MoveBucket(child []byte, src *Bucket, dst *Bucket) error {
144 if src == nil {
145 src = &tx.root
146 }
147 if dst == nil {
148 dst = &tx.root
149 }
150 return src.MoveBucket(child, dst)
151}
152
153// ForEach executes a function for each bucket in the root.
154// If the provided function returns an error then the iteration is stopped and
155// the error is returned to the caller.
156func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
157 return tx.root.ForEach(func(k, v []byte) error {
158 return fn(k, tx.root.Bucket(k))
159 })
160}
161
162// OnCommit adds a handler function to be executed after the transaction successfully commits.
163func (tx *Tx) OnCommit(fn func()) {
164 tx.commitHandlers = append(tx.commitHandlers, fn)
165}
166
167// Commit writes all changes to disk, updates the meta page and closes the transaction.
168// Returns an error if a disk write error occurs, or if Commit is
169// called on a read-only transaction.
170func (tx *Tx) Commit() (err error) {
171 txId := tx.ID()
172 lg := tx.db.Logger()
173 if lg != discardLogger {
174 lg.Debugf("Committing transaction %d", txId)
175 defer func() {
176 if err != nil {
177 lg.Errorf("Committing transaction failed: %v", err)
178 } else {
179 lg.Debugf("Committing transaction %d successfully", txId)
180 }
181 }()
182 }
183
184 common.Assert(!tx.managed, "managed tx commit not allowed")
185 if tx.db == nil {
186 return berrors.ErrTxClosed
187 } else if !tx.writable {
188 return berrors.ErrTxNotWritable
189 }
190
191 // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
192
193 // Rebalance nodes which have had deletions.
194 var startTime = time.Now()
195 tx.root.rebalance()
196 if tx.stats.GetRebalance() > 0 {
197 tx.stats.IncRebalanceTime(time.Since(startTime))
198 }
199
200 opgid := tx.meta.Pgid()
201
202 // spill data onto dirty pages.
203 startTime = time.Now()
204 if err = tx.root.spill(); err != nil {
205 lg.Errorf("spilling data onto dirty pages failed: %v", err)
206 tx.rollback()
207 return err
208 }
209 tx.stats.IncSpillTime(time.Since(startTime))
210
211 // Free the old root bucket.
212 tx.meta.RootBucket().SetRootPage(tx.root.RootPage())
213
214 // Free the old freelist because commit writes out a fresh freelist.
215 if tx.meta.Freelist() != common.PgidNoFreelist {
216 tx.db.freelist.Free(tx.meta.Txid(), tx.db.page(tx.meta.Freelist()))
217 }
218
219 if !tx.db.NoFreelistSync {
220 err = tx.commitFreelist()
221 if err != nil {
222 lg.Errorf("committing freelist failed: %v", err)
223 return err
224 }
225 } else {
226 tx.meta.SetFreelist(common.PgidNoFreelist)
227 }
228
229 // If the high water mark has moved up then attempt to grow the database.
230 if tx.meta.Pgid() > opgid {
231 _ = errors.New("")
232 // gofail: var lackOfDiskSpace string
233 // tx.rollback()
234 // return errors.New(lackOfDiskSpace)
235 if err = tx.db.grow(int(tx.meta.Pgid()+1) * tx.db.pageSize); err != nil {
236 lg.Errorf("growing db size failed, pgid: %d, pagesize: %d, error: %v", tx.meta.Pgid(), tx.db.pageSize, err)
237 tx.rollback()
238 return err
239 }
240 }
241
242 // Write dirty pages to disk.
243 startTime = time.Now()
244 if err = tx.write(); err != nil {
245 lg.Errorf("writing data failed: %v", err)
246 tx.rollback()
247 return err
248 }
249
250 // If strict mode is enabled then perform a consistency check.
251 if tx.db.StrictMode {
252 ch := tx.Check()
253 var errs []string
254 for {
255 chkErr, ok := <-ch
256 if !ok {
257 break
258 }
259 errs = append(errs, chkErr.Error())
260 }
261 if len(errs) > 0 {
262 panic("check fail: " + strings.Join(errs, "\n"))
263 }
264 }
265
266 // Write meta to disk.
267 if err = tx.writeMeta(); err != nil {
268 lg.Errorf("writeMeta failed: %v", err)
269 tx.rollback()
270 return err
271 }
272 tx.stats.IncWriteTime(time.Since(startTime))
273
274 // Finalize the transaction.
275 tx.close()
276
277 // Execute commit handlers now that the locks have been removed.
278 for _, fn := range tx.commitHandlers {
279 fn()
280 }
281
282 return nil
283}
284
285func (tx *Tx) commitFreelist() error {
286 // Allocate new pages for the new free list. This will overestimate
287 // the size of the freelist but not underestimate the size (which would be bad).
288 p, err := tx.allocate((tx.db.freelist.EstimatedWritePageSize() / tx.db.pageSize) + 1)
289 if err != nil {
290 tx.rollback()
291 return err
292 }
293
294 tx.db.freelist.Write(p)
295 tx.meta.SetFreelist(p.Id())
296
297 return nil
298}
299
300// Rollback closes the transaction and ignores all previous updates. Read-only
301// transactions must be rolled back and not committed.
302func (tx *Tx) Rollback() error {
303 common.Assert(!tx.managed, "managed tx rollback not allowed")
304 if tx.db == nil {
305 return berrors.ErrTxClosed
306 }
307 tx.nonPhysicalRollback()
308 return nil
309}
310
311// nonPhysicalRollback is called when user calls Rollback directly, in this case we do not need to reload the free pages from disk.
312func (tx *Tx) nonPhysicalRollback() {
313 if tx.db == nil {
314 return
315 }
316 if tx.writable {
317 tx.db.freelist.Rollback(tx.meta.Txid())
318 }
319 tx.close()
320}
321
322// rollback needs to reload the free pages from disk in case some system error happens like fsync error.
323func (tx *Tx) rollback() {
324 if tx.db == nil {
325 return
326 }
327 if tx.writable {
328 tx.db.freelist.Rollback(tx.meta.Txid())
329 // When mmap fails, the `data`, `dataref` and `datasz` may be reset to
330 // zero values, and there is no way to reload free page IDs in this case.
331 if tx.db.data != nil {
332 if !tx.db.hasSyncedFreelist() {
333 // Reconstruct free page list by scanning the DB to get the whole free page list.
334 // Note: scanning the whole db is heavy if your db size is large in NoSyncFreeList mode.
335 tx.db.freelist.NoSyncReload(tx.db.freepages())
336 } else {
337 // Read free page list from freelist page.
338 tx.db.freelist.Reload(tx.db.page(tx.db.meta().Freelist()))
339 }
340 }
341 }
342 tx.close()
343}
344
345func (tx *Tx) close() {
346 if tx.db == nil {
347 return
348 }
349 if tx.writable {
350 // Grab freelist stats.
351 var freelistFreeN = tx.db.freelist.FreeCount()
352 var freelistPendingN = tx.db.freelist.PendingCount()
353 var freelistAlloc = tx.db.freelist.EstimatedWritePageSize()
354
355 // Remove transaction ref & writer lock.
356 tx.db.rwtx = nil
357 tx.db.rwlock.Unlock()
358
359 // Merge statistics.
360 tx.db.statlock.Lock()
361 tx.db.stats.FreePageN = freelistFreeN
362 tx.db.stats.PendingPageN = freelistPendingN
363 tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
364 tx.db.stats.FreelistInuse = freelistAlloc
365 tx.db.stats.TxStats.add(&tx.stats)
366 tx.db.statlock.Unlock()
367 } else {
368 tx.db.removeTx(tx)
369 }
370
371 // Clear all references.
372 tx.db = nil
373 tx.meta = nil
374 tx.root = Bucket{tx: tx}
375 tx.pages = nil
376}
377
378// Copy writes the entire database to a writer.
379// This function exists for backwards compatibility.
380//
381// Deprecated: Use WriteTo() instead.
382func (tx *Tx) Copy(w io.Writer) error {
383 _, err := tx.WriteTo(w)
384 return err
385}
386
387// WriteTo writes the entire database to a writer.
388// If err == nil then exactly tx.Size() bytes will be written into the writer.
389func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
390 var f *os.File
391 // There is a risk that between the time a read-only transaction
392 // is created and the time the file is actually opened, the
393 // underlying db file at tx.db.path may have been replaced
394 // (e.g. via rename). In that case, opening the file again would
395 // unexpectedly point to a different file, rather than the one
396 // the transaction was based on.
397 //
398 // To overcome this, we reuse the already opened file handle when
399 // WritFlag not set. When the WriteFlag is set, we reopen the file
400 // but verify that it still refers to the same underlying file
401 // (by device and inode). If it does not, we fall back to
402 // reusing the existing already opened file handle.
403 if tx.WriteFlag != 0 {
404 // Attempt to open reader with WriteFlag
405 f, err = tx.db.openFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0)
406 if err != nil {
407 return 0, err
408 }
409
410 if ok, err := sameFile(tx.db.file, f); !ok {
411 lg := tx.db.Logger()
412 if cerr := f.Close(); cerr != nil {
413 lg.Errorf("failed to close the file (%s): %v", tx.db.path, cerr)
414 }
415 lg.Warningf("The underlying file has changed, so reuse the already opened file (%s): %v", tx.db.path, err)
416 f = tx.db.file
417 } else {
418 defer func() {
419 if cerr := f.Close(); err == nil {
420 err = cerr
421 }
422 }()
423 }
424 } else {
425 f = tx.db.file
426 }
427
428 // Generate a meta page. We use the same page data for both meta pages.
429 buf := make([]byte, tx.db.pageSize)
430 page := (*common.Page)(unsafe.Pointer(&buf[0]))
431 page.SetFlags(common.MetaPageFlag)
432 *page.Meta() = *tx.meta
433
434 // Write meta 0.
435 page.SetId(0)
436 page.Meta().SetChecksum(page.Meta().Sum64())
437 nn, err := w.Write(buf)
438 n += int64(nn)
439 if err != nil {
440 return n, fmt.Errorf("meta 0 copy: %s", err)
441 }
442
443 // Write meta 1 with a lower transaction id.
444 page.SetId(1)
445 page.Meta().DecTxid()
446 page.Meta().SetChecksum(page.Meta().Sum64())
447 nn, err = w.Write(buf)
448 n += int64(nn)
449 if err != nil {
450 return n, fmt.Errorf("meta 1 copy: %s", err)
451 }
452
453 // Copy data pages using a SectionReader to avoid affecting f's offset.
454 dataOffset := int64(tx.db.pageSize * 2)
455 dataSize := tx.Size() - dataOffset
456 sr := io.NewSectionReader(f, dataOffset, dataSize)
457
458 // Copy data pages.
459 wn, err := io.CopyN(w, sr, dataSize)
460 n += wn
461 if err != nil {
462 return n, err
463 }
464
465 return n, nil
466}
467
468func sameFile(f1, f2 *os.File) (bool, error) {
469 fi1, err := f1.Stat()
470 if err != nil {
471 return false, fmt.Errorf("failed to get fileInfo of the first file (%s): %w", f1.Name(), err)
472 }
473 fi2, err := f2.Stat()
474 if err != nil {
475 return false, fmt.Errorf("failed to get fileInfo of the second file (%s): %w", f2.Name(), err)
476 }
477
478 return os.SameFile(fi1, fi2), nil
479}
480
481// CopyFile copies the entire database to file at the given path.
482// A reader transaction is maintained during the copy so it is safe to continue
483// using the database while a copy is in progress.
484func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
485 f, err := tx.db.openFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
486 if err != nil {
487 return err
488 }
489
490 _, err = tx.WriteTo(f)
491 if err != nil {
492 _ = f.Close()
493 return err
494 }
495 return f.Close()
496}
497
498// allocate returns a contiguous block of memory starting at a given page.
499func (tx *Tx) allocate(count int) (*common.Page, error) {
500 lg := tx.db.Logger()
501 p, err := tx.db.allocate(tx.meta.Txid(), count)
502 if err != nil {
503 lg.Errorf("allocating failed, txid: %d, count: %d, error: %v", tx.meta.Txid(), count, err)
504 return nil, err
505 }
506
507 // Save to our page cache.
508 tx.pages[p.Id()] = p
509
510 // Update statistics.
511 tx.stats.IncPageCount(int64(count))
512 tx.stats.IncPageAlloc(int64(count * tx.db.pageSize))
513
514 return p, nil
515}
516
517// write writes any dirty pages to disk.
518func (tx *Tx) write() error {
519 // Sort pages by id.
520 lg := tx.db.Logger()
521 pages := make(common.Pages, 0, len(tx.pages))
522 for _, p := range tx.pages {
523 pages = append(pages, p)
524 }
525 // Clear out page cache early.
526 tx.pages = make(map[common.Pgid]*common.Page)
527 sort.Sort(pages)
528
529 // Write pages to disk in order.
530 for _, p := range pages {
531 rem := (uint64(p.Overflow()) + 1) * uint64(tx.db.pageSize)
532 offset := int64(p.Id()) * int64(tx.db.pageSize)
533 var written uintptr
534
535 // Write out page in "max allocation" sized chunks.
536 for {
537 sz := rem
538 if sz > common.MaxAllocSize-1 {
539 sz = common.MaxAllocSize - 1
540 }
541 buf := common.UnsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))
542
543 if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
544 lg.Errorf("writeAt failed, offset: %d: %w", offset, err)
545 return err
546 }
547
548 // Update statistics.
549 tx.stats.IncWrite(1)
550
551 // Exit inner for loop if we've written all the chunks.
552 rem -= sz
553 if rem == 0 {
554 break
555 }
556
557 // Otherwise move offset forward and move pointer to next chunk.
558 offset += int64(sz)
559 written += uintptr(sz)
560 }
561 }
562
563 // Ignore file sync if flag is set on DB.
564 if !tx.db.NoSync || common.IgnoreNoSync {
565 // gofail: var beforeSyncDataPages struct{}
566 if err := fdatasync(tx.db); err != nil {
567 lg.Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err)
568 return err
569 }
570 }
571
572 // Put small pages back to page pool.
573 for _, p := range pages {
574 // Ignore page sizes over 1 page.
575 // These are allocated using make() instead of the page pool.
576 if int(p.Overflow()) != 0 {
577 continue
578 }
579
580 buf := common.UnsafeByteSlice(unsafe.Pointer(p), 0, 0, tx.db.pageSize)
581
582 // See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
583 for i := range buf {
584 buf[i] = 0
585 }
586 tx.db.pagePool.Put(buf) //nolint:staticcheck
587 }
588
589 return nil
590}
591
592// writeMeta writes the meta to the disk.
593func (tx *Tx) writeMeta() error {
594 // gofail: var beforeWriteMetaError string
595 // return errors.New(beforeWriteMetaError)
596
597 // Create a temporary buffer for the meta page.
598 lg := tx.db.Logger()
599 buf := make([]byte, tx.db.pageSize)
600 p := tx.db.pageInBuffer(buf, 0)
601 tx.meta.Write(p)
602
603 // Write the meta page to file.
604 tx.db.metalock.Lock()
605 if _, err := tx.db.ops.writeAt(buf, int64(p.Id())*int64(tx.db.pageSize)); err != nil {
606 tx.db.metalock.Unlock()
607 lg.Errorf("writeAt failed, pgid: %d, pageSize: %d, error: %v", p.Id(), tx.db.pageSize, err)
608 return err
609 }
610 tx.db.metalock.Unlock()
611 if !tx.db.NoSync || common.IgnoreNoSync {
612 // gofail: var beforeSyncMetaPage struct{}
613 if err := fdatasync(tx.db); err != nil {
614 lg.Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err)
615 return err
616 }
617 }
618
619 // Update statistics.
620 tx.stats.IncWrite(1)
621
622 return nil
623}
624
625// page returns a reference to the page with a given id.
626// If page has been written to then a temporary buffered page is returned.
627func (tx *Tx) page(id common.Pgid) *common.Page {
628 // Check the dirty pages first.
629 if tx.pages != nil {
630 if p, ok := tx.pages[id]; ok {
631 p.FastCheck(id)
632 return p
633 }
634 }
635
636 // Otherwise return directly from the mmap.
637 p := tx.db.page(id)
638 p.FastCheck(id)
639 return p
640}
641
642// forEachPage iterates over every page within a given page and executes a function.
643func (tx *Tx) forEachPage(pgidnum common.Pgid, fn func(*common.Page, int, []common.Pgid)) {
644 stack := make([]common.Pgid, 10)
645 stack[0] = pgidnum
646 tx.forEachPageInternal(stack[:1], fn)
647}
648
649func (tx *Tx) forEachPageInternal(pgidstack []common.Pgid, fn func(*common.Page, int, []common.Pgid)) {
650 p := tx.page(pgidstack[len(pgidstack)-1])
651
652 // Execute function.
653 fn(p, len(pgidstack)-1, pgidstack)
654
655 // Recursively loop over children.
656 if p.IsBranchPage() {
657 for i := 0; i < int(p.Count()); i++ {
658 elem := p.BranchPageElement(uint16(i))
659 tx.forEachPageInternal(append(pgidstack, elem.Pgid()), fn)
660 }
661 }
662}
663
664// Page returns page information for a given page number.
665// This is only safe for concurrent use when used by a writable transaction.
666func (tx *Tx) Page(id int) (*common.PageInfo, error) {
667 if tx.db == nil {
668 return nil, berrors.ErrTxClosed
669 } else if common.Pgid(id) >= tx.meta.Pgid() {
670 return nil, nil
671 }
672
673 if tx.db.freelist == nil {
674 return nil, berrors.ErrFreePagesNotLoaded
675 }
676
677 // Build the page info.
678 p := tx.db.page(common.Pgid(id))
679 info := &common.PageInfo{
680 ID: id,
681 Count: int(p.Count()),
682 OverflowCount: int(p.Overflow()),
683 }
684
685 // Determine the type (or if it's free).
686 if tx.db.freelist.Freed(common.Pgid(id)) {
687 info.Type = "free"
688 } else {
689 info.Type = p.Typ()
690 }
691
692 return info, nil
693}
694
695// TxStats represents statistics about the actions performed by the transaction.
696type TxStats struct {
697 // Page statistics.
698 //
699 // DEPRECATED: Use GetPageCount() or IncPageCount()
700 PageCount int64 // number of page allocations
701 // DEPRECATED: Use GetPageAlloc() or IncPageAlloc()
702 PageAlloc int64 // total bytes allocated
703
704 // Cursor statistics.
705 //
706 // DEPRECATED: Use GetCursorCount() or IncCursorCount()
707 CursorCount int64 // number of cursors created
708
709 // Node statistics
710 //
711 // DEPRECATED: Use GetNodeCount() or IncNodeCount()
712 NodeCount int64 // number of node allocations
713 // DEPRECATED: Use GetNodeDeref() or IncNodeDeref()
714 NodeDeref int64 // number of node dereferences
715
716 // Rebalance statistics.
717 //
718 // DEPRECATED: Use GetRebalance() or IncRebalance()
719 Rebalance int64 // number of node rebalances
720 // DEPRECATED: Use GetRebalanceTime() or IncRebalanceTime()
721 RebalanceTime time.Duration // total time spent rebalancing
722
723 // Split/Spill statistics.
724 //
725 // DEPRECATED: Use GetSplit() or IncSplit()
726 Split int64 // number of nodes split
727 // DEPRECATED: Use GetSpill() or IncSpill()
728 Spill int64 // number of nodes spilled
729 // DEPRECATED: Use GetSpillTime() or IncSpillTime()
730 SpillTime time.Duration // total time spent spilling
731
732 // Write statistics.
733 //
734 // DEPRECATED: Use GetWrite() or IncWrite()
735 Write int64 // number of writes performed
736 // DEPRECATED: Use GetWriteTime() or IncWriteTime()
737 WriteTime time.Duration // total time spent writing to disk
738}
739
740func (s *TxStats) add(other *TxStats) {
741 s.IncPageCount(other.GetPageCount())
742 s.IncPageAlloc(other.GetPageAlloc())
743 s.IncCursorCount(other.GetCursorCount())
744 s.IncNodeCount(other.GetNodeCount())
745 s.IncNodeDeref(other.GetNodeDeref())
746 s.IncRebalance(other.GetRebalance())
747 s.IncRebalanceTime(other.GetRebalanceTime())
748 s.IncSplit(other.GetSplit())
749 s.IncSpill(other.GetSpill())
750 s.IncSpillTime(other.GetSpillTime())
751 s.IncWrite(other.GetWrite())
752 s.IncWriteTime(other.GetWriteTime())
753}
754
755// Sub calculates and returns the difference between two sets of transaction stats.
756// This is useful when obtaining stats at two different points and time and
757// you need the performance counters that occurred within that time span.
758func (s *TxStats) Sub(other *TxStats) TxStats {
759 var diff TxStats
760 diff.PageCount = s.GetPageCount() - other.GetPageCount()
761 diff.PageAlloc = s.GetPageAlloc() - other.GetPageAlloc()
762 diff.CursorCount = s.GetCursorCount() - other.GetCursorCount()
763 diff.NodeCount = s.GetNodeCount() - other.GetNodeCount()
764 diff.NodeDeref = s.GetNodeDeref() - other.GetNodeDeref()
765 diff.Rebalance = s.GetRebalance() - other.GetRebalance()
766 diff.RebalanceTime = s.GetRebalanceTime() - other.GetRebalanceTime()
767 diff.Split = s.GetSplit() - other.GetSplit()
768 diff.Spill = s.GetSpill() - other.GetSpill()
769 diff.SpillTime = s.GetSpillTime() - other.GetSpillTime()
770 diff.Write = s.GetWrite() - other.GetWrite()
771 diff.WriteTime = s.GetWriteTime() - other.GetWriteTime()
772 return diff
773}
774
775// GetPageCount returns PageCount atomically.
776func (s *TxStats) GetPageCount() int64 {
777 return atomic.LoadInt64(&s.PageCount)
778}
779
780// IncPageCount increases PageCount atomically and returns the new value.
781func (s *TxStats) IncPageCount(delta int64) int64 {
782 return atomic.AddInt64(&s.PageCount, delta)
783}
784
785// GetPageAlloc returns PageAlloc atomically.
786func (s *TxStats) GetPageAlloc() int64 {
787 return atomic.LoadInt64(&s.PageAlloc)
788}
789
790// IncPageAlloc increases PageAlloc atomically and returns the new value.
791func (s *TxStats) IncPageAlloc(delta int64) int64 {
792 return atomic.AddInt64(&s.PageAlloc, delta)
793}
794
795// GetCursorCount returns CursorCount atomically.
796func (s *TxStats) GetCursorCount() int64 {
797 return atomic.LoadInt64(&s.CursorCount)
798}
799
800// IncCursorCount increases CursorCount atomically and return the new value.
801func (s *TxStats) IncCursorCount(delta int64) int64 {
802 return atomic.AddInt64(&s.CursorCount, delta)
803}
804
805// GetNodeCount returns NodeCount atomically.
806func (s *TxStats) GetNodeCount() int64 {
807 return atomic.LoadInt64(&s.NodeCount)
808}
809
810// IncNodeCount increases NodeCount atomically and returns the new value.
811func (s *TxStats) IncNodeCount(delta int64) int64 {
812 return atomic.AddInt64(&s.NodeCount, delta)
813}
814
815// GetNodeDeref returns NodeDeref atomically.
816func (s *TxStats) GetNodeDeref() int64 {
817 return atomic.LoadInt64(&s.NodeDeref)
818}
819
820// IncNodeDeref increases NodeDeref atomically and returns the new value.
821func (s *TxStats) IncNodeDeref(delta int64) int64 {
822 return atomic.AddInt64(&s.NodeDeref, delta)
823}
824
825// GetRebalance returns Rebalance atomically.
826func (s *TxStats) GetRebalance() int64 {
827 return atomic.LoadInt64(&s.Rebalance)
828}
829
830// IncRebalance increases Rebalance atomically and returns the new value.
831func (s *TxStats) IncRebalance(delta int64) int64 {
832 return atomic.AddInt64(&s.Rebalance, delta)
833}
834
835// GetRebalanceTime returns RebalanceTime atomically.
836func (s *TxStats) GetRebalanceTime() time.Duration {
837 return atomicLoadDuration(&s.RebalanceTime)
838}
839
840// IncRebalanceTime increases RebalanceTime atomically and returns the new value.
841func (s *TxStats) IncRebalanceTime(delta time.Duration) time.Duration {
842 return atomicAddDuration(&s.RebalanceTime, delta)
843}
844
845// GetSplit returns Split atomically.
846func (s *TxStats) GetSplit() int64 {
847 return atomic.LoadInt64(&s.Split)
848}
849
850// IncSplit increases Split atomically and returns the new value.
851func (s *TxStats) IncSplit(delta int64) int64 {
852 return atomic.AddInt64(&s.Split, delta)
853}
854
855// GetSpill returns Spill atomically.
856func (s *TxStats) GetSpill() int64 {
857 return atomic.LoadInt64(&s.Spill)
858}
859
860// IncSpill increases Spill atomically and returns the new value.
861func (s *TxStats) IncSpill(delta int64) int64 {
862 return atomic.AddInt64(&s.Spill, delta)
863}
864
865// GetSpillTime returns SpillTime atomically.
866func (s *TxStats) GetSpillTime() time.Duration {
867 return atomicLoadDuration(&s.SpillTime)
868}
869
870// IncSpillTime increases SpillTime atomically and returns the new value.
871func (s *TxStats) IncSpillTime(delta time.Duration) time.Duration {
872 return atomicAddDuration(&s.SpillTime, delta)
873}
874
875// GetWrite returns Write atomically.
876func (s *TxStats) GetWrite() int64 {
877 return atomic.LoadInt64(&s.Write)
878}
879
880// IncWrite increases Write atomically and returns the new value.
881func (s *TxStats) IncWrite(delta int64) int64 {
882 return atomic.AddInt64(&s.Write, delta)
883}
884
885// GetWriteTime returns WriteTime atomically.
886func (s *TxStats) GetWriteTime() time.Duration {
887 return atomicLoadDuration(&s.WriteTime)
888}
889
890// IncWriteTime increases WriteTime atomically and returns the new value.
891func (s *TxStats) IncWriteTime(delta time.Duration) time.Duration {
892 return atomicAddDuration(&s.WriteTime, delta)
893}
894
895func atomicAddDuration(ptr *time.Duration, du time.Duration) time.Duration {
896 return time.Duration(atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du)))
897}
898
899func atomicLoadDuration(ptr *time.Duration) time.Duration {
900 return time.Duration(atomic.LoadInt64((*int64)(unsafe.Pointer(ptr))))
901}