| package bbolt |
| |
| import ( |
| "errors" |
| "fmt" |
| "io" |
| "os" |
| "runtime" |
| "sort" |
| "strings" |
| "sync/atomic" |
| "time" |
| "unsafe" |
| |
| berrors "go.etcd.io/bbolt/errors" |
| "go.etcd.io/bbolt/internal/common" |
| ) |
| |
| // Tx represents a read-only or read/write transaction on the database. |
| // Read-only transactions can be used for retrieving values for keys and creating cursors. |
| // Read/write transactions can create and remove buckets and create and remove keys. |
| // |
| // IMPORTANT: You must commit or rollback transactions when you are done with |
| // them. Pages can not be reclaimed by the writer until no more transactions |
| // are using them. A long running read transaction can cause the database to |
| // quickly grow. |
| type Tx struct { |
| writable bool |
| managed bool |
| db *DB |
| meta *common.Meta |
| root Bucket |
| pages map[common.Pgid]*common.Page |
| stats TxStats |
| commitHandlers []func() |
| |
| // WriteFlag specifies the flag for write-related methods like WriteTo(). |
| // Tx opens the database file with the specified flag to copy the data. |
| // |
| // By default, the flag is unset, which works well for mostly in-memory |
| // workloads. For databases that are much larger than available RAM, |
| // set the flag to syscall.O_DIRECT to avoid trashing the page cache. |
| WriteFlag int |
| } |
| |
| // init initializes the transaction. |
| func (tx *Tx) init(db *DB) { |
| tx.db = db |
| tx.pages = nil |
| |
| // Copy the meta page since it can be changed by the writer. |
| tx.meta = &common.Meta{} |
| db.meta().Copy(tx.meta) |
| |
| // Copy over the root bucket. |
| tx.root = newBucket(tx) |
| tx.root.InBucket = &common.InBucket{} |
| *tx.root.InBucket = *(tx.meta.RootBucket()) |
| |
| // Increment the transaction id and add a page cache for writable transactions. |
| if tx.writable { |
| tx.pages = make(map[common.Pgid]*common.Page) |
| tx.meta.IncTxid() |
| } |
| } |
| |
| // ID returns the transaction id. |
| func (tx *Tx) ID() int { |
| if tx == nil || tx.meta == nil { |
| return -1 |
| } |
| return int(tx.meta.Txid()) |
| } |
| |
| // DB returns a reference to the database that created the transaction. |
| func (tx *Tx) DB() *DB { |
| return tx.db |
| } |
| |
| // Size returns current database size in bytes as seen by this transaction. |
| func (tx *Tx) Size() int64 { |
| return int64(tx.meta.Pgid()) * int64(tx.db.pageSize) |
| } |
| |
| // Writable returns whether the transaction can perform write operations. |
| func (tx *Tx) Writable() bool { |
| return tx.writable |
| } |
| |
| // Cursor creates a cursor associated with the root bucket. |
| // All items in the cursor will return a nil value because all root bucket keys point to buckets. |
| // The cursor is only valid as long as the transaction is open. |
| // Do not use a cursor after the transaction is closed. |
| func (tx *Tx) Cursor() *Cursor { |
| return tx.root.Cursor() |
| } |
| |
| // Stats retrieves a copy of the current transaction statistics. |
| func (tx *Tx) Stats() TxStats { |
| return tx.stats |
| } |
| |
| // Inspect returns the structure of the database. |
| func (tx *Tx) Inspect() BucketStructure { |
| return tx.root.Inspect() |
| } |
| |
| // Bucket retrieves a bucket by name. |
| // Returns nil if the bucket does not exist. |
| // The bucket instance is only valid for the lifetime of the transaction. |
| func (tx *Tx) Bucket(name []byte) *Bucket { |
| return tx.root.Bucket(name) |
| } |
| |
| // CreateBucket creates a new bucket. |
| // Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long. |
| // The bucket instance is only valid for the lifetime of the transaction. |
| func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) { |
| return tx.root.CreateBucket(name) |
| } |
| |
| // CreateBucketIfNotExists creates a new bucket if it doesn't already exist. |
| // Returns an error if the bucket name is blank, or if the bucket name is too long. |
| // The bucket instance is only valid for the lifetime of the transaction. |
| func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) { |
| return tx.root.CreateBucketIfNotExists(name) |
| } |
| |
| // DeleteBucket deletes a bucket. |
| // Returns an error if the bucket cannot be found or if the key represents a non-bucket value. |
| func (tx *Tx) DeleteBucket(name []byte) error { |
| return tx.root.DeleteBucket(name) |
| } |
| |
| // MoveBucket moves a sub-bucket from the source bucket to the destination bucket. |
| // Returns an error if |
| // 1. the sub-bucket cannot be found in the source bucket; |
| // 2. or the key already exists in the destination bucket; |
| // 3. the key represents a non-bucket value. |
| // |
| // If src is nil, it means moving a top level bucket into the target bucket. |
| // If dst is nil, it means converting the child bucket into a top level bucket. |
| func (tx *Tx) MoveBucket(child []byte, src *Bucket, dst *Bucket) error { |
| if src == nil { |
| src = &tx.root |
| } |
| if dst == nil { |
| dst = &tx.root |
| } |
| return src.MoveBucket(child, dst) |
| } |
| |
| // ForEach executes a function for each bucket in the root. |
| // If the provided function returns an error then the iteration is stopped and |
| // the error is returned to the caller. |
| func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error { |
| return tx.root.ForEach(func(k, v []byte) error { |
| return fn(k, tx.root.Bucket(k)) |
| }) |
| } |
| |
| // OnCommit adds a handler function to be executed after the transaction successfully commits. |
| func (tx *Tx) OnCommit(fn func()) { |
| tx.commitHandlers = append(tx.commitHandlers, fn) |
| } |
| |
| // Commit writes all changes to disk, updates the meta page and closes the transaction. |
| // Returns an error if a disk write error occurs, or if Commit is |
| // called on a read-only transaction. |
| func (tx *Tx) Commit() (err error) { |
| txId := tx.ID() |
| lg := tx.db.Logger() |
| if lg != discardLogger { |
| lg.Debugf("Committing transaction %d", txId) |
| defer func() { |
| if err != nil { |
| lg.Errorf("Committing transaction failed: %v", err) |
| } else { |
| lg.Debugf("Committing transaction %d successfully", txId) |
| } |
| }() |
| } |
| |
| common.Assert(!tx.managed, "managed tx commit not allowed") |
| if tx.db == nil { |
| return berrors.ErrTxClosed |
| } else if !tx.writable { |
| return berrors.ErrTxNotWritable |
| } |
| |
| // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. |
| |
| // Rebalance nodes which have had deletions. |
| var startTime = time.Now() |
| tx.root.rebalance() |
| if tx.stats.GetRebalance() > 0 { |
| tx.stats.IncRebalanceTime(time.Since(startTime)) |
| } |
| |
| opgid := tx.meta.Pgid() |
| |
| // spill data onto dirty pages. |
| startTime = time.Now() |
| if err = tx.root.spill(); err != nil { |
| lg.Errorf("spilling data onto dirty pages failed: %v", err) |
| tx.rollback() |
| return err |
| } |
| tx.stats.IncSpillTime(time.Since(startTime)) |
| |
| // Free the old root bucket. |
| tx.meta.RootBucket().SetRootPage(tx.root.RootPage()) |
| |
| // Free the old freelist because commit writes out a fresh freelist. |
| if tx.meta.Freelist() != common.PgidNoFreelist { |
| tx.db.freelist.Free(tx.meta.Txid(), tx.db.page(tx.meta.Freelist())) |
| } |
| |
| if !tx.db.NoFreelistSync { |
| err = tx.commitFreelist() |
| if err != nil { |
| lg.Errorf("committing freelist failed: %v", err) |
| return err |
| } |
| } else { |
| tx.meta.SetFreelist(common.PgidNoFreelist) |
| } |
| |
| // If the high water mark has moved up then attempt to grow the database. |
| if tx.meta.Pgid() > opgid { |
| _ = errors.New("") |
| // gofail: var lackOfDiskSpace string |
| // tx.rollback() |
| // return errors.New(lackOfDiskSpace) |
| if err = tx.db.grow(int(tx.meta.Pgid()+1) * tx.db.pageSize); err != nil { |
| lg.Errorf("growing db size failed, pgid: %d, pagesize: %d, error: %v", tx.meta.Pgid(), tx.db.pageSize, err) |
| tx.rollback() |
| return err |
| } |
| } |
| |
| // Write dirty pages to disk. |
| startTime = time.Now() |
| if err = tx.write(); err != nil { |
| lg.Errorf("writing data failed: %v", err) |
| tx.rollback() |
| return err |
| } |
| |
| // If strict mode is enabled then perform a consistency check. |
| if tx.db.StrictMode { |
| ch := tx.Check() |
| var errs []string |
| for { |
| chkErr, ok := <-ch |
| if !ok { |
| break |
| } |
| errs = append(errs, chkErr.Error()) |
| } |
| if len(errs) > 0 { |
| panic("check fail: " + strings.Join(errs, "\n")) |
| } |
| } |
| |
| // Write meta to disk. |
| if err = tx.writeMeta(); err != nil { |
| lg.Errorf("writeMeta failed: %v", err) |
| tx.rollback() |
| return err |
| } |
| tx.stats.IncWriteTime(time.Since(startTime)) |
| |
| // Finalize the transaction. |
| tx.close() |
| |
| // Execute commit handlers now that the locks have been removed. |
| for _, fn := range tx.commitHandlers { |
| fn() |
| } |
| |
| return nil |
| } |
| |
| func (tx *Tx) commitFreelist() error { |
| // Allocate new pages for the new free list. This will overestimate |
| // the size of the freelist but not underestimate the size (which would be bad). |
| p, err := tx.allocate((tx.db.freelist.EstimatedWritePageSize() / tx.db.pageSize) + 1) |
| if err != nil { |
| tx.rollback() |
| return err |
| } |
| |
| tx.db.freelist.Write(p) |
| tx.meta.SetFreelist(p.Id()) |
| |
| return nil |
| } |
| |
| // Rollback closes the transaction and ignores all previous updates. Read-only |
| // transactions must be rolled back and not committed. |
| func (tx *Tx) Rollback() error { |
| common.Assert(!tx.managed, "managed tx rollback not allowed") |
| if tx.db == nil { |
| return berrors.ErrTxClosed |
| } |
| tx.nonPhysicalRollback() |
| return nil |
| } |
| |
| // nonPhysicalRollback is called when user calls Rollback directly, in this case we do not need to reload the free pages from disk. |
| func (tx *Tx) nonPhysicalRollback() { |
| if tx.db == nil { |
| return |
| } |
| if tx.writable { |
| tx.db.freelist.Rollback(tx.meta.Txid()) |
| } |
| tx.close() |
| } |
| |
| // rollback needs to reload the free pages from disk in case some system error happens like fsync error. |
| func (tx *Tx) rollback() { |
| if tx.db == nil { |
| return |
| } |
| if tx.writable { |
| tx.db.freelist.Rollback(tx.meta.Txid()) |
| // When mmap fails, the `data`, `dataref` and `datasz` may be reset to |
| // zero values, and there is no way to reload free page IDs in this case. |
| if tx.db.data != nil { |
| if !tx.db.hasSyncedFreelist() { |
| // Reconstruct free page list by scanning the DB to get the whole free page list. |
| // Note: scanning the whole db is heavy if your db size is large in NoSyncFreeList mode. |
| tx.db.freelist.NoSyncReload(tx.db.freepages()) |
| } else { |
| // Read free page list from freelist page. |
| tx.db.freelist.Reload(tx.db.page(tx.db.meta().Freelist())) |
| } |
| } |
| } |
| tx.close() |
| } |
| |
| func (tx *Tx) close() { |
| if tx.db == nil { |
| return |
| } |
| if tx.writable { |
| // Grab freelist stats. |
| var freelistFreeN = tx.db.freelist.FreeCount() |
| var freelistPendingN = tx.db.freelist.PendingCount() |
| var freelistAlloc = tx.db.freelist.EstimatedWritePageSize() |
| |
| // Remove transaction ref & writer lock. |
| tx.db.rwtx = nil |
| tx.db.rwlock.Unlock() |
| |
| // Merge statistics. |
| tx.db.statlock.Lock() |
| tx.db.stats.FreePageN = freelistFreeN |
| tx.db.stats.PendingPageN = freelistPendingN |
| tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize |
| tx.db.stats.FreelistInuse = freelistAlloc |
| tx.db.stats.TxStats.add(&tx.stats) |
| tx.db.statlock.Unlock() |
| } else { |
| tx.db.removeTx(tx) |
| } |
| |
| // Clear all references. |
| tx.db = nil |
| tx.meta = nil |
| tx.root = Bucket{tx: tx} |
| tx.pages = nil |
| } |
| |
| // Copy writes the entire database to a writer. |
| // This function exists for backwards compatibility. |
| // |
| // Deprecated: Use WriteTo() instead. |
| func (tx *Tx) Copy(w io.Writer) error { |
| _, err := tx.WriteTo(w) |
| return err |
| } |
| |
| // WriteTo writes the entire database to a writer. |
| // If err == nil then exactly tx.Size() bytes will be written into the writer. |
| func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) { |
| var f *os.File |
| // There is a risk that between the time a read-only transaction |
| // is created and the time the file is actually opened, the |
| // underlying db file at tx.db.path may have been replaced |
| // (e.g. via rename). In that case, opening the file again would |
| // unexpectedly point to a different file, rather than the one |
| // the transaction was based on. |
| // |
| // To overcome this, we reuse the already opened file handle when |
| // WritFlag not set. When the WriteFlag is set, we reopen the file |
| // but verify that it still refers to the same underlying file |
| // (by device and inode). If it does not, we fall back to |
| // reusing the existing already opened file handle. |
| if tx.WriteFlag != 0 { |
| // Attempt to open reader with WriteFlag |
| f, err = tx.db.openFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0) |
| if err != nil { |
| return 0, err |
| } |
| |
| if ok, err := sameFile(tx.db.file, f); !ok { |
| lg := tx.db.Logger() |
| if cerr := f.Close(); cerr != nil { |
| lg.Errorf("failed to close the file (%s): %v", tx.db.path, cerr) |
| } |
| lg.Warningf("The underlying file has changed, so reuse the already opened file (%s): %v", tx.db.path, err) |
| f = tx.db.file |
| } else { |
| defer func() { |
| if cerr := f.Close(); err == nil { |
| err = cerr |
| } |
| }() |
| } |
| } else { |
| f = tx.db.file |
| } |
| |
| // Generate a meta page. We use the same page data for both meta pages. |
| buf := make([]byte, tx.db.pageSize) |
| page := (*common.Page)(unsafe.Pointer(&buf[0])) |
| page.SetFlags(common.MetaPageFlag) |
| *page.Meta() = *tx.meta |
| |
| // Write meta 0. |
| page.SetId(0) |
| page.Meta().SetChecksum(page.Meta().Sum64()) |
| nn, err := w.Write(buf) |
| n += int64(nn) |
| if err != nil { |
| return n, fmt.Errorf("meta 0 copy: %s", err) |
| } |
| |
| // Write meta 1 with a lower transaction id. |
| page.SetId(1) |
| page.Meta().DecTxid() |
| page.Meta().SetChecksum(page.Meta().Sum64()) |
| nn, err = w.Write(buf) |
| n += int64(nn) |
| if err != nil { |
| return n, fmt.Errorf("meta 1 copy: %s", err) |
| } |
| |
| // Copy data pages using a SectionReader to avoid affecting f's offset. |
| dataOffset := int64(tx.db.pageSize * 2) |
| dataSize := tx.Size() - dataOffset |
| sr := io.NewSectionReader(f, dataOffset, dataSize) |
| |
| // Copy data pages. |
| wn, err := io.CopyN(w, sr, dataSize) |
| n += wn |
| if err != nil { |
| return n, err |
| } |
| |
| return n, nil |
| } |
| |
| func sameFile(f1, f2 *os.File) (bool, error) { |
| fi1, err := f1.Stat() |
| if err != nil { |
| return false, fmt.Errorf("failed to get fileInfo of the first file (%s): %w", f1.Name(), err) |
| } |
| fi2, err := f2.Stat() |
| if err != nil { |
| return false, fmt.Errorf("failed to get fileInfo of the second file (%s): %w", f2.Name(), err) |
| } |
| |
| return os.SameFile(fi1, fi2), nil |
| } |
| |
| // CopyFile copies the entire database to file at the given path. |
| // A reader transaction is maintained during the copy so it is safe to continue |
| // using the database while a copy is in progress. |
| func (tx *Tx) CopyFile(path string, mode os.FileMode) error { |
| f, err := tx.db.openFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode) |
| if err != nil { |
| return err |
| } |
| |
| _, err = tx.WriteTo(f) |
| if err != nil { |
| _ = f.Close() |
| return err |
| } |
| return f.Close() |
| } |
| |
| // allocate returns a contiguous block of memory starting at a given page. |
| func (tx *Tx) allocate(count int) (*common.Page, error) { |
| lg := tx.db.Logger() |
| p, err := tx.db.allocate(tx.meta.Txid(), count) |
| if err != nil { |
| lg.Errorf("allocating failed, txid: %d, count: %d, error: %v", tx.meta.Txid(), count, err) |
| return nil, err |
| } |
| |
| // Save to our page cache. |
| tx.pages[p.Id()] = p |
| |
| // Update statistics. |
| tx.stats.IncPageCount(int64(count)) |
| tx.stats.IncPageAlloc(int64(count * tx.db.pageSize)) |
| |
| return p, nil |
| } |
| |
| // write writes any dirty pages to disk. |
| func (tx *Tx) write() error { |
| // Sort pages by id. |
| lg := tx.db.Logger() |
| pages := make(common.Pages, 0, len(tx.pages)) |
| for _, p := range tx.pages { |
| pages = append(pages, p) |
| } |
| // Clear out page cache early. |
| tx.pages = make(map[common.Pgid]*common.Page) |
| sort.Sort(pages) |
| |
| // Write pages to disk in order. |
| for _, p := range pages { |
| rem := (uint64(p.Overflow()) + 1) * uint64(tx.db.pageSize) |
| offset := int64(p.Id()) * int64(tx.db.pageSize) |
| var written uintptr |
| |
| // Write out page in "max allocation" sized chunks. |
| for { |
| sz := rem |
| if sz > common.MaxAllocSize-1 { |
| sz = common.MaxAllocSize - 1 |
| } |
| buf := common.UnsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz)) |
| |
| if _, err := tx.db.ops.writeAt(buf, offset); err != nil { |
| lg.Errorf("writeAt failed, offset: %d: %w", offset, err) |
| return err |
| } |
| |
| // Update statistics. |
| tx.stats.IncWrite(1) |
| |
| // Exit inner for loop if we've written all the chunks. |
| rem -= sz |
| if rem == 0 { |
| break |
| } |
| |
| // Otherwise move offset forward and move pointer to next chunk. |
| offset += int64(sz) |
| written += uintptr(sz) |
| } |
| } |
| |
| // Ignore file sync if flag is set on DB. |
| if !tx.db.NoSync || common.IgnoreNoSync { |
| // gofail: var beforeSyncDataPages struct{} |
| if err := fdatasync(tx.db); err != nil { |
| lg.Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err) |
| return err |
| } |
| } |
| |
| // Put small pages back to page pool. |
| for _, p := range pages { |
| // Ignore page sizes over 1 page. |
| // These are allocated using make() instead of the page pool. |
| if int(p.Overflow()) != 0 { |
| continue |
| } |
| |
| buf := common.UnsafeByteSlice(unsafe.Pointer(p), 0, 0, tx.db.pageSize) |
| |
| // See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1 |
| for i := range buf { |
| buf[i] = 0 |
| } |
| tx.db.pagePool.Put(buf) //nolint:staticcheck |
| } |
| |
| return nil |
| } |
| |
| // writeMeta writes the meta to the disk. |
| func (tx *Tx) writeMeta() error { |
| // gofail: var beforeWriteMetaError string |
| // return errors.New(beforeWriteMetaError) |
| |
| // Create a temporary buffer for the meta page. |
| lg := tx.db.Logger() |
| buf := make([]byte, tx.db.pageSize) |
| p := tx.db.pageInBuffer(buf, 0) |
| tx.meta.Write(p) |
| |
| // Write the meta page to file. |
| tx.db.metalock.Lock() |
| if _, err := tx.db.ops.writeAt(buf, int64(p.Id())*int64(tx.db.pageSize)); err != nil { |
| tx.db.metalock.Unlock() |
| lg.Errorf("writeAt failed, pgid: %d, pageSize: %d, error: %v", p.Id(), tx.db.pageSize, err) |
| return err |
| } |
| tx.db.metalock.Unlock() |
| if !tx.db.NoSync || common.IgnoreNoSync { |
| // gofail: var beforeSyncMetaPage struct{} |
| if err := fdatasync(tx.db); err != nil { |
| lg.Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err) |
| return err |
| } |
| } |
| |
| // Update statistics. |
| tx.stats.IncWrite(1) |
| |
| return nil |
| } |
| |
| // page returns a reference to the page with a given id. |
| // If page has been written to then a temporary buffered page is returned. |
| func (tx *Tx) page(id common.Pgid) *common.Page { |
| // Check the dirty pages first. |
| if tx.pages != nil { |
| if p, ok := tx.pages[id]; ok { |
| p.FastCheck(id) |
| return p |
| } |
| } |
| |
| // Otherwise return directly from the mmap. |
| p := tx.db.page(id) |
| p.FastCheck(id) |
| return p |
| } |
| |
| // forEachPage iterates over every page within a given page and executes a function. |
| func (tx *Tx) forEachPage(pgidnum common.Pgid, fn func(*common.Page, int, []common.Pgid)) { |
| stack := make([]common.Pgid, 10) |
| stack[0] = pgidnum |
| tx.forEachPageInternal(stack[:1], fn) |
| } |
| |
| func (tx *Tx) forEachPageInternal(pgidstack []common.Pgid, fn func(*common.Page, int, []common.Pgid)) { |
| p := tx.page(pgidstack[len(pgidstack)-1]) |
| |
| // Execute function. |
| fn(p, len(pgidstack)-1, pgidstack) |
| |
| // Recursively loop over children. |
| if p.IsBranchPage() { |
| for i := 0; i < int(p.Count()); i++ { |
| elem := p.BranchPageElement(uint16(i)) |
| tx.forEachPageInternal(append(pgidstack, elem.Pgid()), fn) |
| } |
| } |
| } |
| |
| // Page returns page information for a given page number. |
| // This is only safe for concurrent use when used by a writable transaction. |
| func (tx *Tx) Page(id int) (*common.PageInfo, error) { |
| if tx.db == nil { |
| return nil, berrors.ErrTxClosed |
| } else if common.Pgid(id) >= tx.meta.Pgid() { |
| return nil, nil |
| } |
| |
| if tx.db.freelist == nil { |
| return nil, berrors.ErrFreePagesNotLoaded |
| } |
| |
| // Build the page info. |
| p := tx.db.page(common.Pgid(id)) |
| info := &common.PageInfo{ |
| ID: id, |
| Count: int(p.Count()), |
| OverflowCount: int(p.Overflow()), |
| } |
| |
| // Determine the type (or if it's free). |
| if tx.db.freelist.Freed(common.Pgid(id)) { |
| info.Type = "free" |
| } else { |
| info.Type = p.Typ() |
| } |
| |
| return info, nil |
| } |
| |
| // TxStats represents statistics about the actions performed by the transaction. |
| type TxStats struct { |
| // Page statistics. |
| // |
| // DEPRECATED: Use GetPageCount() or IncPageCount() |
| PageCount int64 // number of page allocations |
| // DEPRECATED: Use GetPageAlloc() or IncPageAlloc() |
| PageAlloc int64 // total bytes allocated |
| |
| // Cursor statistics. |
| // |
| // DEPRECATED: Use GetCursorCount() or IncCursorCount() |
| CursorCount int64 // number of cursors created |
| |
| // Node statistics |
| // |
| // DEPRECATED: Use GetNodeCount() or IncNodeCount() |
| NodeCount int64 // number of node allocations |
| // DEPRECATED: Use GetNodeDeref() or IncNodeDeref() |
| NodeDeref int64 // number of node dereferences |
| |
| // Rebalance statistics. |
| // |
| // DEPRECATED: Use GetRebalance() or IncRebalance() |
| Rebalance int64 // number of node rebalances |
| // DEPRECATED: Use GetRebalanceTime() or IncRebalanceTime() |
| RebalanceTime time.Duration // total time spent rebalancing |
| |
| // Split/Spill statistics. |
| // |
| // DEPRECATED: Use GetSplit() or IncSplit() |
| Split int64 // number of nodes split |
| // DEPRECATED: Use GetSpill() or IncSpill() |
| Spill int64 // number of nodes spilled |
| // DEPRECATED: Use GetSpillTime() or IncSpillTime() |
| SpillTime time.Duration // total time spent spilling |
| |
| // Write statistics. |
| // |
| // DEPRECATED: Use GetWrite() or IncWrite() |
| Write int64 // number of writes performed |
| // DEPRECATED: Use GetWriteTime() or IncWriteTime() |
| WriteTime time.Duration // total time spent writing to disk |
| } |
| |
| func (s *TxStats) add(other *TxStats) { |
| s.IncPageCount(other.GetPageCount()) |
| s.IncPageAlloc(other.GetPageAlloc()) |
| s.IncCursorCount(other.GetCursorCount()) |
| s.IncNodeCount(other.GetNodeCount()) |
| s.IncNodeDeref(other.GetNodeDeref()) |
| s.IncRebalance(other.GetRebalance()) |
| s.IncRebalanceTime(other.GetRebalanceTime()) |
| s.IncSplit(other.GetSplit()) |
| s.IncSpill(other.GetSpill()) |
| s.IncSpillTime(other.GetSpillTime()) |
| s.IncWrite(other.GetWrite()) |
| s.IncWriteTime(other.GetWriteTime()) |
| } |
| |
| // Sub calculates and returns the difference between two sets of transaction stats. |
| // This is useful when obtaining stats at two different points and time and |
| // you need the performance counters that occurred within that time span. |
| func (s *TxStats) Sub(other *TxStats) TxStats { |
| var diff TxStats |
| diff.PageCount = s.GetPageCount() - other.GetPageCount() |
| diff.PageAlloc = s.GetPageAlloc() - other.GetPageAlloc() |
| diff.CursorCount = s.GetCursorCount() - other.GetCursorCount() |
| diff.NodeCount = s.GetNodeCount() - other.GetNodeCount() |
| diff.NodeDeref = s.GetNodeDeref() - other.GetNodeDeref() |
| diff.Rebalance = s.GetRebalance() - other.GetRebalance() |
| diff.RebalanceTime = s.GetRebalanceTime() - other.GetRebalanceTime() |
| diff.Split = s.GetSplit() - other.GetSplit() |
| diff.Spill = s.GetSpill() - other.GetSpill() |
| diff.SpillTime = s.GetSpillTime() - other.GetSpillTime() |
| diff.Write = s.GetWrite() - other.GetWrite() |
| diff.WriteTime = s.GetWriteTime() - other.GetWriteTime() |
| return diff |
| } |
| |
| // GetPageCount returns PageCount atomically. |
| func (s *TxStats) GetPageCount() int64 { |
| return atomic.LoadInt64(&s.PageCount) |
| } |
| |
| // IncPageCount increases PageCount atomically and returns the new value. |
| func (s *TxStats) IncPageCount(delta int64) int64 { |
| return atomic.AddInt64(&s.PageCount, delta) |
| } |
| |
| // GetPageAlloc returns PageAlloc atomically. |
| func (s *TxStats) GetPageAlloc() int64 { |
| return atomic.LoadInt64(&s.PageAlloc) |
| } |
| |
| // IncPageAlloc increases PageAlloc atomically and returns the new value. |
| func (s *TxStats) IncPageAlloc(delta int64) int64 { |
| return atomic.AddInt64(&s.PageAlloc, delta) |
| } |
| |
| // GetCursorCount returns CursorCount atomically. |
| func (s *TxStats) GetCursorCount() int64 { |
| return atomic.LoadInt64(&s.CursorCount) |
| } |
| |
| // IncCursorCount increases CursorCount atomically and return the new value. |
| func (s *TxStats) IncCursorCount(delta int64) int64 { |
| return atomic.AddInt64(&s.CursorCount, delta) |
| } |
| |
| // GetNodeCount returns NodeCount atomically. |
| func (s *TxStats) GetNodeCount() int64 { |
| return atomic.LoadInt64(&s.NodeCount) |
| } |
| |
| // IncNodeCount increases NodeCount atomically and returns the new value. |
| func (s *TxStats) IncNodeCount(delta int64) int64 { |
| return atomic.AddInt64(&s.NodeCount, delta) |
| } |
| |
| // GetNodeDeref returns NodeDeref atomically. |
| func (s *TxStats) GetNodeDeref() int64 { |
| return atomic.LoadInt64(&s.NodeDeref) |
| } |
| |
| // IncNodeDeref increases NodeDeref atomically and returns the new value. |
| func (s *TxStats) IncNodeDeref(delta int64) int64 { |
| return atomic.AddInt64(&s.NodeDeref, delta) |
| } |
| |
| // GetRebalance returns Rebalance atomically. |
| func (s *TxStats) GetRebalance() int64 { |
| return atomic.LoadInt64(&s.Rebalance) |
| } |
| |
| // IncRebalance increases Rebalance atomically and returns the new value. |
| func (s *TxStats) IncRebalance(delta int64) int64 { |
| return atomic.AddInt64(&s.Rebalance, delta) |
| } |
| |
| // GetRebalanceTime returns RebalanceTime atomically. |
| func (s *TxStats) GetRebalanceTime() time.Duration { |
| return atomicLoadDuration(&s.RebalanceTime) |
| } |
| |
| // IncRebalanceTime increases RebalanceTime atomically and returns the new value. |
| func (s *TxStats) IncRebalanceTime(delta time.Duration) time.Duration { |
| return atomicAddDuration(&s.RebalanceTime, delta) |
| } |
| |
| // GetSplit returns Split atomically. |
| func (s *TxStats) GetSplit() int64 { |
| return atomic.LoadInt64(&s.Split) |
| } |
| |
| // IncSplit increases Split atomically and returns the new value. |
| func (s *TxStats) IncSplit(delta int64) int64 { |
| return atomic.AddInt64(&s.Split, delta) |
| } |
| |
| // GetSpill returns Spill atomically. |
| func (s *TxStats) GetSpill() int64 { |
| return atomic.LoadInt64(&s.Spill) |
| } |
| |
| // IncSpill increases Spill atomically and returns the new value. |
| func (s *TxStats) IncSpill(delta int64) int64 { |
| return atomic.AddInt64(&s.Spill, delta) |
| } |
| |
| // GetSpillTime returns SpillTime atomically. |
| func (s *TxStats) GetSpillTime() time.Duration { |
| return atomicLoadDuration(&s.SpillTime) |
| } |
| |
| // IncSpillTime increases SpillTime atomically and returns the new value. |
| func (s *TxStats) IncSpillTime(delta time.Duration) time.Duration { |
| return atomicAddDuration(&s.SpillTime, delta) |
| } |
| |
| // GetWrite returns Write atomically. |
| func (s *TxStats) GetWrite() int64 { |
| return atomic.LoadInt64(&s.Write) |
| } |
| |
| // IncWrite increases Write atomically and returns the new value. |
| func (s *TxStats) IncWrite(delta int64) int64 { |
| return atomic.AddInt64(&s.Write, delta) |
| } |
| |
| // GetWriteTime returns WriteTime atomically. |
| func (s *TxStats) GetWriteTime() time.Duration { |
| return atomicLoadDuration(&s.WriteTime) |
| } |
| |
| // IncWriteTime increases WriteTime atomically and returns the new value. |
| func (s *TxStats) IncWriteTime(delta time.Duration) time.Duration { |
| return atomicAddDuration(&s.WriteTime, delta) |
| } |
| |
| func atomicAddDuration(ptr *time.Duration, du time.Duration) time.Duration { |
| return time.Duration(atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du))) |
| } |
| |
| func atomicLoadDuration(ptr *time.Duration) time.Duration { |
| return time.Duration(atomic.LoadInt64((*int64)(unsafe.Pointer(ptr)))) |
| } |