| Abhay Kumar | 40252eb | 2025-10-13 13:25:53 +0000 | [diff] [blame^] | 1 | package bbolt |
| 2 | |
| 3 | import ( |
| 4 | "errors" |
| 5 | "fmt" |
| 6 | "io" |
| 7 | "os" |
| 8 | "runtime" |
| 9 | "sort" |
| 10 | "strings" |
| 11 | "sync/atomic" |
| 12 | "time" |
| 13 | "unsafe" |
| 14 | |
| 15 | berrors "go.etcd.io/bbolt/errors" |
| 16 | "go.etcd.io/bbolt/internal/common" |
| 17 | ) |
| 18 | |
| 19 | // Tx represents a read-only or read/write transaction on the database. |
| 20 | // Read-only transactions can be used for retrieving values for keys and creating cursors. |
| 21 | // Read/write transactions can create and remove buckets and create and remove keys. |
| 22 | // |
| 23 | // IMPORTANT: You must commit or rollback transactions when you are done with |
| 24 | // them. Pages can not be reclaimed by the writer until no more transactions |
| 25 | // are using them. A long running read transaction can cause the database to |
| 26 | // quickly grow. |
| 27 | type Tx struct { |
| 28 | writable bool |
| 29 | managed bool |
| 30 | db *DB |
| 31 | meta *common.Meta |
| 32 | root Bucket |
| 33 | pages map[common.Pgid]*common.Page |
| 34 | stats TxStats |
| 35 | commitHandlers []func() |
| 36 | |
| 37 | // WriteFlag specifies the flag for write-related methods like WriteTo(). |
| 38 | // Tx opens the database file with the specified flag to copy the data. |
| 39 | // |
| 40 | // By default, the flag is unset, which works well for mostly in-memory |
| 41 | // workloads. For databases that are much larger than available RAM, |
| 42 | // set the flag to syscall.O_DIRECT to avoid trashing the page cache. |
| 43 | WriteFlag int |
| 44 | } |
| 45 | |
| 46 | // init initializes the transaction. |
| 47 | func (tx *Tx) init(db *DB) { |
| 48 | tx.db = db |
| 49 | tx.pages = nil |
| 50 | |
| 51 | // Copy the meta page since it can be changed by the writer. |
| 52 | tx.meta = &common.Meta{} |
| 53 | db.meta().Copy(tx.meta) |
| 54 | |
| 55 | // Copy over the root bucket. |
| 56 | tx.root = newBucket(tx) |
| 57 | tx.root.InBucket = &common.InBucket{} |
| 58 | *tx.root.InBucket = *(tx.meta.RootBucket()) |
| 59 | |
| 60 | // Increment the transaction id and add a page cache for writable transactions. |
| 61 | if tx.writable { |
| 62 | tx.pages = make(map[common.Pgid]*common.Page) |
| 63 | tx.meta.IncTxid() |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | // ID returns the transaction id. |
| 68 | func (tx *Tx) ID() int { |
| 69 | if tx == nil || tx.meta == nil { |
| 70 | return -1 |
| 71 | } |
| 72 | return int(tx.meta.Txid()) |
| 73 | } |
| 74 | |
| 75 | // DB returns a reference to the database that created the transaction. |
| 76 | func (tx *Tx) DB() *DB { |
| 77 | return tx.db |
| 78 | } |
| 79 | |
| 80 | // Size returns current database size in bytes as seen by this transaction. |
| 81 | func (tx *Tx) Size() int64 { |
| 82 | return int64(tx.meta.Pgid()) * int64(tx.db.pageSize) |
| 83 | } |
| 84 | |
| 85 | // Writable returns whether the transaction can perform write operations. |
| 86 | func (tx *Tx) Writable() bool { |
| 87 | return tx.writable |
| 88 | } |
| 89 | |
| 90 | // Cursor creates a cursor associated with the root bucket. |
| 91 | // All items in the cursor will return a nil value because all root bucket keys point to buckets. |
| 92 | // The cursor is only valid as long as the transaction is open. |
| 93 | // Do not use a cursor after the transaction is closed. |
| 94 | func (tx *Tx) Cursor() *Cursor { |
| 95 | return tx.root.Cursor() |
| 96 | } |
| 97 | |
| 98 | // Stats retrieves a copy of the current transaction statistics. |
| 99 | func (tx *Tx) Stats() TxStats { |
| 100 | return tx.stats |
| 101 | } |
| 102 | |
| 103 | // Inspect returns the structure of the database. |
| 104 | func (tx *Tx) Inspect() BucketStructure { |
| 105 | return tx.root.Inspect() |
| 106 | } |
| 107 | |
| 108 | // Bucket retrieves a bucket by name. |
| 109 | // Returns nil if the bucket does not exist. |
| 110 | // The bucket instance is only valid for the lifetime of the transaction. |
| 111 | func (tx *Tx) Bucket(name []byte) *Bucket { |
| 112 | return tx.root.Bucket(name) |
| 113 | } |
| 114 | |
| 115 | // CreateBucket creates a new bucket. |
| 116 | // Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long. |
| 117 | // The bucket instance is only valid for the lifetime of the transaction. |
| 118 | func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) { |
| 119 | return tx.root.CreateBucket(name) |
| 120 | } |
| 121 | |
| 122 | // CreateBucketIfNotExists creates a new bucket if it doesn't already exist. |
| 123 | // Returns an error if the bucket name is blank, or if the bucket name is too long. |
| 124 | // The bucket instance is only valid for the lifetime of the transaction. |
| 125 | func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) { |
| 126 | return tx.root.CreateBucketIfNotExists(name) |
| 127 | } |
| 128 | |
| 129 | // DeleteBucket deletes a bucket. |
| 130 | // Returns an error if the bucket cannot be found or if the key represents a non-bucket value. |
| 131 | func (tx *Tx) DeleteBucket(name []byte) error { |
| 132 | return tx.root.DeleteBucket(name) |
| 133 | } |
| 134 | |
| 135 | // MoveBucket moves a sub-bucket from the source bucket to the destination bucket. |
| 136 | // Returns an error if |
| 137 | // 1. the sub-bucket cannot be found in the source bucket; |
| 138 | // 2. or the key already exists in the destination bucket; |
| 139 | // 3. the key represents a non-bucket value. |
| 140 | // |
| 141 | // If src is nil, it means moving a top level bucket into the target bucket. |
| 142 | // If dst is nil, it means converting the child bucket into a top level bucket. |
| 143 | func (tx *Tx) MoveBucket(child []byte, src *Bucket, dst *Bucket) error { |
| 144 | if src == nil { |
| 145 | src = &tx.root |
| 146 | } |
| 147 | if dst == nil { |
| 148 | dst = &tx.root |
| 149 | } |
| 150 | return src.MoveBucket(child, dst) |
| 151 | } |
| 152 | |
| 153 | // ForEach executes a function for each bucket in the root. |
| 154 | // If the provided function returns an error then the iteration is stopped and |
| 155 | // the error is returned to the caller. |
| 156 | func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error { |
| 157 | return tx.root.ForEach(func(k, v []byte) error { |
| 158 | return fn(k, tx.root.Bucket(k)) |
| 159 | }) |
| 160 | } |
| 161 | |
| 162 | // OnCommit adds a handler function to be executed after the transaction successfully commits. |
| 163 | func (tx *Tx) OnCommit(fn func()) { |
| 164 | tx.commitHandlers = append(tx.commitHandlers, fn) |
| 165 | } |
| 166 | |
| 167 | // Commit writes all changes to disk, updates the meta page and closes the transaction. |
| 168 | // Returns an error if a disk write error occurs, or if Commit is |
| 169 | // called on a read-only transaction. |
| 170 | func (tx *Tx) Commit() (err error) { |
| 171 | txId := tx.ID() |
| 172 | lg := tx.db.Logger() |
| 173 | if lg != discardLogger { |
| 174 | lg.Debugf("Committing transaction %d", txId) |
| 175 | defer func() { |
| 176 | if err != nil { |
| 177 | lg.Errorf("Committing transaction failed: %v", err) |
| 178 | } else { |
| 179 | lg.Debugf("Committing transaction %d successfully", txId) |
| 180 | } |
| 181 | }() |
| 182 | } |
| 183 | |
| 184 | common.Assert(!tx.managed, "managed tx commit not allowed") |
| 185 | if tx.db == nil { |
| 186 | return berrors.ErrTxClosed |
| 187 | } else if !tx.writable { |
| 188 | return berrors.ErrTxNotWritable |
| 189 | } |
| 190 | |
| 191 | // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. |
| 192 | |
| 193 | // Rebalance nodes which have had deletions. |
| 194 | var startTime = time.Now() |
| 195 | tx.root.rebalance() |
| 196 | if tx.stats.GetRebalance() > 0 { |
| 197 | tx.stats.IncRebalanceTime(time.Since(startTime)) |
| 198 | } |
| 199 | |
| 200 | opgid := tx.meta.Pgid() |
| 201 | |
| 202 | // spill data onto dirty pages. |
| 203 | startTime = time.Now() |
| 204 | if err = tx.root.spill(); err != nil { |
| 205 | lg.Errorf("spilling data onto dirty pages failed: %v", err) |
| 206 | tx.rollback() |
| 207 | return err |
| 208 | } |
| 209 | tx.stats.IncSpillTime(time.Since(startTime)) |
| 210 | |
| 211 | // Free the old root bucket. |
| 212 | tx.meta.RootBucket().SetRootPage(tx.root.RootPage()) |
| 213 | |
| 214 | // Free the old freelist because commit writes out a fresh freelist. |
| 215 | if tx.meta.Freelist() != common.PgidNoFreelist { |
| 216 | tx.db.freelist.Free(tx.meta.Txid(), tx.db.page(tx.meta.Freelist())) |
| 217 | } |
| 218 | |
| 219 | if !tx.db.NoFreelistSync { |
| 220 | err = tx.commitFreelist() |
| 221 | if err != nil { |
| 222 | lg.Errorf("committing freelist failed: %v", err) |
| 223 | return err |
| 224 | } |
| 225 | } else { |
| 226 | tx.meta.SetFreelist(common.PgidNoFreelist) |
| 227 | } |
| 228 | |
| 229 | // If the high water mark has moved up then attempt to grow the database. |
| 230 | if tx.meta.Pgid() > opgid { |
| 231 | _ = errors.New("") |
| 232 | // gofail: var lackOfDiskSpace string |
| 233 | // tx.rollback() |
| 234 | // return errors.New(lackOfDiskSpace) |
| 235 | if err = tx.db.grow(int(tx.meta.Pgid()+1) * tx.db.pageSize); err != nil { |
| 236 | lg.Errorf("growing db size failed, pgid: %d, pagesize: %d, error: %v", tx.meta.Pgid(), tx.db.pageSize, err) |
| 237 | tx.rollback() |
| 238 | return err |
| 239 | } |
| 240 | } |
| 241 | |
| 242 | // Write dirty pages to disk. |
| 243 | startTime = time.Now() |
| 244 | if err = tx.write(); err != nil { |
| 245 | lg.Errorf("writing data failed: %v", err) |
| 246 | tx.rollback() |
| 247 | return err |
| 248 | } |
| 249 | |
| 250 | // If strict mode is enabled then perform a consistency check. |
| 251 | if tx.db.StrictMode { |
| 252 | ch := tx.Check() |
| 253 | var errs []string |
| 254 | for { |
| 255 | chkErr, ok := <-ch |
| 256 | if !ok { |
| 257 | break |
| 258 | } |
| 259 | errs = append(errs, chkErr.Error()) |
| 260 | } |
| 261 | if len(errs) > 0 { |
| 262 | panic("check fail: " + strings.Join(errs, "\n")) |
| 263 | } |
| 264 | } |
| 265 | |
| 266 | // Write meta to disk. |
| 267 | if err = tx.writeMeta(); err != nil { |
| 268 | lg.Errorf("writeMeta failed: %v", err) |
| 269 | tx.rollback() |
| 270 | return err |
| 271 | } |
| 272 | tx.stats.IncWriteTime(time.Since(startTime)) |
| 273 | |
| 274 | // Finalize the transaction. |
| 275 | tx.close() |
| 276 | |
| 277 | // Execute commit handlers now that the locks have been removed. |
| 278 | for _, fn := range tx.commitHandlers { |
| 279 | fn() |
| 280 | } |
| 281 | |
| 282 | return nil |
| 283 | } |
| 284 | |
| 285 | func (tx *Tx) commitFreelist() error { |
| 286 | // Allocate new pages for the new free list. This will overestimate |
| 287 | // the size of the freelist but not underestimate the size (which would be bad). |
| 288 | p, err := tx.allocate((tx.db.freelist.EstimatedWritePageSize() / tx.db.pageSize) + 1) |
| 289 | if err != nil { |
| 290 | tx.rollback() |
| 291 | return err |
| 292 | } |
| 293 | |
| 294 | tx.db.freelist.Write(p) |
| 295 | tx.meta.SetFreelist(p.Id()) |
| 296 | |
| 297 | return nil |
| 298 | } |
| 299 | |
| 300 | // Rollback closes the transaction and ignores all previous updates. Read-only |
| 301 | // transactions must be rolled back and not committed. |
| 302 | func (tx *Tx) Rollback() error { |
| 303 | common.Assert(!tx.managed, "managed tx rollback not allowed") |
| 304 | if tx.db == nil { |
| 305 | return berrors.ErrTxClosed |
| 306 | } |
| 307 | tx.nonPhysicalRollback() |
| 308 | return nil |
| 309 | } |
| 310 | |
| 311 | // nonPhysicalRollback is called when user calls Rollback directly, in this case we do not need to reload the free pages from disk. |
| 312 | func (tx *Tx) nonPhysicalRollback() { |
| 313 | if tx.db == nil { |
| 314 | return |
| 315 | } |
| 316 | if tx.writable { |
| 317 | tx.db.freelist.Rollback(tx.meta.Txid()) |
| 318 | } |
| 319 | tx.close() |
| 320 | } |
| 321 | |
| 322 | // rollback needs to reload the free pages from disk in case some system error happens like fsync error. |
| 323 | func (tx *Tx) rollback() { |
| 324 | if tx.db == nil { |
| 325 | return |
| 326 | } |
| 327 | if tx.writable { |
| 328 | tx.db.freelist.Rollback(tx.meta.Txid()) |
| 329 | // When mmap fails, the `data`, `dataref` and `datasz` may be reset to |
| 330 | // zero values, and there is no way to reload free page IDs in this case. |
| 331 | if tx.db.data != nil { |
| 332 | if !tx.db.hasSyncedFreelist() { |
| 333 | // Reconstruct free page list by scanning the DB to get the whole free page list. |
| 334 | // Note: scanning the whole db is heavy if your db size is large in NoSyncFreeList mode. |
| 335 | tx.db.freelist.NoSyncReload(tx.db.freepages()) |
| 336 | } else { |
| 337 | // Read free page list from freelist page. |
| 338 | tx.db.freelist.Reload(tx.db.page(tx.db.meta().Freelist())) |
| 339 | } |
| 340 | } |
| 341 | } |
| 342 | tx.close() |
| 343 | } |
| 344 | |
| 345 | func (tx *Tx) close() { |
| 346 | if tx.db == nil { |
| 347 | return |
| 348 | } |
| 349 | if tx.writable { |
| 350 | // Grab freelist stats. |
| 351 | var freelistFreeN = tx.db.freelist.FreeCount() |
| 352 | var freelistPendingN = tx.db.freelist.PendingCount() |
| 353 | var freelistAlloc = tx.db.freelist.EstimatedWritePageSize() |
| 354 | |
| 355 | // Remove transaction ref & writer lock. |
| 356 | tx.db.rwtx = nil |
| 357 | tx.db.rwlock.Unlock() |
| 358 | |
| 359 | // Merge statistics. |
| 360 | tx.db.statlock.Lock() |
| 361 | tx.db.stats.FreePageN = freelistFreeN |
| 362 | tx.db.stats.PendingPageN = freelistPendingN |
| 363 | tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize |
| 364 | tx.db.stats.FreelistInuse = freelistAlloc |
| 365 | tx.db.stats.TxStats.add(&tx.stats) |
| 366 | tx.db.statlock.Unlock() |
| 367 | } else { |
| 368 | tx.db.removeTx(tx) |
| 369 | } |
| 370 | |
| 371 | // Clear all references. |
| 372 | tx.db = nil |
| 373 | tx.meta = nil |
| 374 | tx.root = Bucket{tx: tx} |
| 375 | tx.pages = nil |
| 376 | } |
| 377 | |
| 378 | // Copy writes the entire database to a writer. |
| 379 | // This function exists for backwards compatibility. |
| 380 | // |
| 381 | // Deprecated: Use WriteTo() instead. |
| 382 | func (tx *Tx) Copy(w io.Writer) error { |
| 383 | _, err := tx.WriteTo(w) |
| 384 | return err |
| 385 | } |
| 386 | |
| 387 | // WriteTo writes the entire database to a writer. |
| 388 | // If err == nil then exactly tx.Size() bytes will be written into the writer. |
| 389 | func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) { |
| 390 | var f *os.File |
| 391 | // There is a risk that between the time a read-only transaction |
| 392 | // is created and the time the file is actually opened, the |
| 393 | // underlying db file at tx.db.path may have been replaced |
| 394 | // (e.g. via rename). In that case, opening the file again would |
| 395 | // unexpectedly point to a different file, rather than the one |
| 396 | // the transaction was based on. |
| 397 | // |
| 398 | // To overcome this, we reuse the already opened file handle when |
| 399 | // WritFlag not set. When the WriteFlag is set, we reopen the file |
| 400 | // but verify that it still refers to the same underlying file |
| 401 | // (by device and inode). If it does not, we fall back to |
| 402 | // reusing the existing already opened file handle. |
| 403 | if tx.WriteFlag != 0 { |
| 404 | // Attempt to open reader with WriteFlag |
| 405 | f, err = tx.db.openFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0) |
| 406 | if err != nil { |
| 407 | return 0, err |
| 408 | } |
| 409 | |
| 410 | if ok, err := sameFile(tx.db.file, f); !ok { |
| 411 | lg := tx.db.Logger() |
| 412 | if cerr := f.Close(); cerr != nil { |
| 413 | lg.Errorf("failed to close the file (%s): %v", tx.db.path, cerr) |
| 414 | } |
| 415 | lg.Warningf("The underlying file has changed, so reuse the already opened file (%s): %v", tx.db.path, err) |
| 416 | f = tx.db.file |
| 417 | } else { |
| 418 | defer func() { |
| 419 | if cerr := f.Close(); err == nil { |
| 420 | err = cerr |
| 421 | } |
| 422 | }() |
| 423 | } |
| 424 | } else { |
| 425 | f = tx.db.file |
| 426 | } |
| 427 | |
| 428 | // Generate a meta page. We use the same page data for both meta pages. |
| 429 | buf := make([]byte, tx.db.pageSize) |
| 430 | page := (*common.Page)(unsafe.Pointer(&buf[0])) |
| 431 | page.SetFlags(common.MetaPageFlag) |
| 432 | *page.Meta() = *tx.meta |
| 433 | |
| 434 | // Write meta 0. |
| 435 | page.SetId(0) |
| 436 | page.Meta().SetChecksum(page.Meta().Sum64()) |
| 437 | nn, err := w.Write(buf) |
| 438 | n += int64(nn) |
| 439 | if err != nil { |
| 440 | return n, fmt.Errorf("meta 0 copy: %s", err) |
| 441 | } |
| 442 | |
| 443 | // Write meta 1 with a lower transaction id. |
| 444 | page.SetId(1) |
| 445 | page.Meta().DecTxid() |
| 446 | page.Meta().SetChecksum(page.Meta().Sum64()) |
| 447 | nn, err = w.Write(buf) |
| 448 | n += int64(nn) |
| 449 | if err != nil { |
| 450 | return n, fmt.Errorf("meta 1 copy: %s", err) |
| 451 | } |
| 452 | |
| 453 | // Copy data pages using a SectionReader to avoid affecting f's offset. |
| 454 | dataOffset := int64(tx.db.pageSize * 2) |
| 455 | dataSize := tx.Size() - dataOffset |
| 456 | sr := io.NewSectionReader(f, dataOffset, dataSize) |
| 457 | |
| 458 | // Copy data pages. |
| 459 | wn, err := io.CopyN(w, sr, dataSize) |
| 460 | n += wn |
| 461 | if err != nil { |
| 462 | return n, err |
| 463 | } |
| 464 | |
| 465 | return n, nil |
| 466 | } |
| 467 | |
| 468 | func sameFile(f1, f2 *os.File) (bool, error) { |
| 469 | fi1, err := f1.Stat() |
| 470 | if err != nil { |
| 471 | return false, fmt.Errorf("failed to get fileInfo of the first file (%s): %w", f1.Name(), err) |
| 472 | } |
| 473 | fi2, err := f2.Stat() |
| 474 | if err != nil { |
| 475 | return false, fmt.Errorf("failed to get fileInfo of the second file (%s): %w", f2.Name(), err) |
| 476 | } |
| 477 | |
| 478 | return os.SameFile(fi1, fi2), nil |
| 479 | } |
| 480 | |
| 481 | // CopyFile copies the entire database to file at the given path. |
| 482 | // A reader transaction is maintained during the copy so it is safe to continue |
| 483 | // using the database while a copy is in progress. |
| 484 | func (tx *Tx) CopyFile(path string, mode os.FileMode) error { |
| 485 | f, err := tx.db.openFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode) |
| 486 | if err != nil { |
| 487 | return err |
| 488 | } |
| 489 | |
| 490 | _, err = tx.WriteTo(f) |
| 491 | if err != nil { |
| 492 | _ = f.Close() |
| 493 | return err |
| 494 | } |
| 495 | return f.Close() |
| 496 | } |
| 497 | |
| 498 | // allocate returns a contiguous block of memory starting at a given page. |
| 499 | func (tx *Tx) allocate(count int) (*common.Page, error) { |
| 500 | lg := tx.db.Logger() |
| 501 | p, err := tx.db.allocate(tx.meta.Txid(), count) |
| 502 | if err != nil { |
| 503 | lg.Errorf("allocating failed, txid: %d, count: %d, error: %v", tx.meta.Txid(), count, err) |
| 504 | return nil, err |
| 505 | } |
| 506 | |
| 507 | // Save to our page cache. |
| 508 | tx.pages[p.Id()] = p |
| 509 | |
| 510 | // Update statistics. |
| 511 | tx.stats.IncPageCount(int64(count)) |
| 512 | tx.stats.IncPageAlloc(int64(count * tx.db.pageSize)) |
| 513 | |
| 514 | return p, nil |
| 515 | } |
| 516 | |
| 517 | // write writes any dirty pages to disk. |
| 518 | func (tx *Tx) write() error { |
| 519 | // Sort pages by id. |
| 520 | lg := tx.db.Logger() |
| 521 | pages := make(common.Pages, 0, len(tx.pages)) |
| 522 | for _, p := range tx.pages { |
| 523 | pages = append(pages, p) |
| 524 | } |
| 525 | // Clear out page cache early. |
| 526 | tx.pages = make(map[common.Pgid]*common.Page) |
| 527 | sort.Sort(pages) |
| 528 | |
| 529 | // Write pages to disk in order. |
| 530 | for _, p := range pages { |
| 531 | rem := (uint64(p.Overflow()) + 1) * uint64(tx.db.pageSize) |
| 532 | offset := int64(p.Id()) * int64(tx.db.pageSize) |
| 533 | var written uintptr |
| 534 | |
| 535 | // Write out page in "max allocation" sized chunks. |
| 536 | for { |
| 537 | sz := rem |
| 538 | if sz > common.MaxAllocSize-1 { |
| 539 | sz = common.MaxAllocSize - 1 |
| 540 | } |
| 541 | buf := common.UnsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz)) |
| 542 | |
| 543 | if _, err := tx.db.ops.writeAt(buf, offset); err != nil { |
| 544 | lg.Errorf("writeAt failed, offset: %d: %w", offset, err) |
| 545 | return err |
| 546 | } |
| 547 | |
| 548 | // Update statistics. |
| 549 | tx.stats.IncWrite(1) |
| 550 | |
| 551 | // Exit inner for loop if we've written all the chunks. |
| 552 | rem -= sz |
| 553 | if rem == 0 { |
| 554 | break |
| 555 | } |
| 556 | |
| 557 | // Otherwise move offset forward and move pointer to next chunk. |
| 558 | offset += int64(sz) |
| 559 | written += uintptr(sz) |
| 560 | } |
| 561 | } |
| 562 | |
| 563 | // Ignore file sync if flag is set on DB. |
| 564 | if !tx.db.NoSync || common.IgnoreNoSync { |
| 565 | // gofail: var beforeSyncDataPages struct{} |
| 566 | if err := fdatasync(tx.db); err != nil { |
| 567 | lg.Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err) |
| 568 | return err |
| 569 | } |
| 570 | } |
| 571 | |
| 572 | // Put small pages back to page pool. |
| 573 | for _, p := range pages { |
| 574 | // Ignore page sizes over 1 page. |
| 575 | // These are allocated using make() instead of the page pool. |
| 576 | if int(p.Overflow()) != 0 { |
| 577 | continue |
| 578 | } |
| 579 | |
| 580 | buf := common.UnsafeByteSlice(unsafe.Pointer(p), 0, 0, tx.db.pageSize) |
| 581 | |
| 582 | // See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1 |
| 583 | for i := range buf { |
| 584 | buf[i] = 0 |
| 585 | } |
| 586 | tx.db.pagePool.Put(buf) //nolint:staticcheck |
| 587 | } |
| 588 | |
| 589 | return nil |
| 590 | } |
| 591 | |
| 592 | // writeMeta writes the meta to the disk. |
| 593 | func (tx *Tx) writeMeta() error { |
| 594 | // gofail: var beforeWriteMetaError string |
| 595 | // return errors.New(beforeWriteMetaError) |
| 596 | |
| 597 | // Create a temporary buffer for the meta page. |
| 598 | lg := tx.db.Logger() |
| 599 | buf := make([]byte, tx.db.pageSize) |
| 600 | p := tx.db.pageInBuffer(buf, 0) |
| 601 | tx.meta.Write(p) |
| 602 | |
| 603 | // Write the meta page to file. |
| 604 | tx.db.metalock.Lock() |
| 605 | if _, err := tx.db.ops.writeAt(buf, int64(p.Id())*int64(tx.db.pageSize)); err != nil { |
| 606 | tx.db.metalock.Unlock() |
| 607 | lg.Errorf("writeAt failed, pgid: %d, pageSize: %d, error: %v", p.Id(), tx.db.pageSize, err) |
| 608 | return err |
| 609 | } |
| 610 | tx.db.metalock.Unlock() |
| 611 | if !tx.db.NoSync || common.IgnoreNoSync { |
| 612 | // gofail: var beforeSyncMetaPage struct{} |
| 613 | if err := fdatasync(tx.db); err != nil { |
| 614 | lg.Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err) |
| 615 | return err |
| 616 | } |
| 617 | } |
| 618 | |
| 619 | // Update statistics. |
| 620 | tx.stats.IncWrite(1) |
| 621 | |
| 622 | return nil |
| 623 | } |
| 624 | |
| 625 | // page returns a reference to the page with a given id. |
| 626 | // If page has been written to then a temporary buffered page is returned. |
| 627 | func (tx *Tx) page(id common.Pgid) *common.Page { |
| 628 | // Check the dirty pages first. |
| 629 | if tx.pages != nil { |
| 630 | if p, ok := tx.pages[id]; ok { |
| 631 | p.FastCheck(id) |
| 632 | return p |
| 633 | } |
| 634 | } |
| 635 | |
| 636 | // Otherwise return directly from the mmap. |
| 637 | p := tx.db.page(id) |
| 638 | p.FastCheck(id) |
| 639 | return p |
| 640 | } |
| 641 | |
| 642 | // forEachPage iterates over every page within a given page and executes a function. |
| 643 | func (tx *Tx) forEachPage(pgidnum common.Pgid, fn func(*common.Page, int, []common.Pgid)) { |
| 644 | stack := make([]common.Pgid, 10) |
| 645 | stack[0] = pgidnum |
| 646 | tx.forEachPageInternal(stack[:1], fn) |
| 647 | } |
| 648 | |
| 649 | func (tx *Tx) forEachPageInternal(pgidstack []common.Pgid, fn func(*common.Page, int, []common.Pgid)) { |
| 650 | p := tx.page(pgidstack[len(pgidstack)-1]) |
| 651 | |
| 652 | // Execute function. |
| 653 | fn(p, len(pgidstack)-1, pgidstack) |
| 654 | |
| 655 | // Recursively loop over children. |
| 656 | if p.IsBranchPage() { |
| 657 | for i := 0; i < int(p.Count()); i++ { |
| 658 | elem := p.BranchPageElement(uint16(i)) |
| 659 | tx.forEachPageInternal(append(pgidstack, elem.Pgid()), fn) |
| 660 | } |
| 661 | } |
| 662 | } |
| 663 | |
| 664 | // Page returns page information for a given page number. |
| 665 | // This is only safe for concurrent use when used by a writable transaction. |
| 666 | func (tx *Tx) Page(id int) (*common.PageInfo, error) { |
| 667 | if tx.db == nil { |
| 668 | return nil, berrors.ErrTxClosed |
| 669 | } else if common.Pgid(id) >= tx.meta.Pgid() { |
| 670 | return nil, nil |
| 671 | } |
| 672 | |
| 673 | if tx.db.freelist == nil { |
| 674 | return nil, berrors.ErrFreePagesNotLoaded |
| 675 | } |
| 676 | |
| 677 | // Build the page info. |
| 678 | p := tx.db.page(common.Pgid(id)) |
| 679 | info := &common.PageInfo{ |
| 680 | ID: id, |
| 681 | Count: int(p.Count()), |
| 682 | OverflowCount: int(p.Overflow()), |
| 683 | } |
| 684 | |
| 685 | // Determine the type (or if it's free). |
| 686 | if tx.db.freelist.Freed(common.Pgid(id)) { |
| 687 | info.Type = "free" |
| 688 | } else { |
| 689 | info.Type = p.Typ() |
| 690 | } |
| 691 | |
| 692 | return info, nil |
| 693 | } |
| 694 | |
| 695 | // TxStats represents statistics about the actions performed by the transaction. |
| 696 | type TxStats struct { |
| 697 | // Page statistics. |
| 698 | // |
| 699 | // DEPRECATED: Use GetPageCount() or IncPageCount() |
| 700 | PageCount int64 // number of page allocations |
| 701 | // DEPRECATED: Use GetPageAlloc() or IncPageAlloc() |
| 702 | PageAlloc int64 // total bytes allocated |
| 703 | |
| 704 | // Cursor statistics. |
| 705 | // |
| 706 | // DEPRECATED: Use GetCursorCount() or IncCursorCount() |
| 707 | CursorCount int64 // number of cursors created |
| 708 | |
| 709 | // Node statistics |
| 710 | // |
| 711 | // DEPRECATED: Use GetNodeCount() or IncNodeCount() |
| 712 | NodeCount int64 // number of node allocations |
| 713 | // DEPRECATED: Use GetNodeDeref() or IncNodeDeref() |
| 714 | NodeDeref int64 // number of node dereferences |
| 715 | |
| 716 | // Rebalance statistics. |
| 717 | // |
| 718 | // DEPRECATED: Use GetRebalance() or IncRebalance() |
| 719 | Rebalance int64 // number of node rebalances |
| 720 | // DEPRECATED: Use GetRebalanceTime() or IncRebalanceTime() |
| 721 | RebalanceTime time.Duration // total time spent rebalancing |
| 722 | |
| 723 | // Split/Spill statistics. |
| 724 | // |
| 725 | // DEPRECATED: Use GetSplit() or IncSplit() |
| 726 | Split int64 // number of nodes split |
| 727 | // DEPRECATED: Use GetSpill() or IncSpill() |
| 728 | Spill int64 // number of nodes spilled |
| 729 | // DEPRECATED: Use GetSpillTime() or IncSpillTime() |
| 730 | SpillTime time.Duration // total time spent spilling |
| 731 | |
| 732 | // Write statistics. |
| 733 | // |
| 734 | // DEPRECATED: Use GetWrite() or IncWrite() |
| 735 | Write int64 // number of writes performed |
| 736 | // DEPRECATED: Use GetWriteTime() or IncWriteTime() |
| 737 | WriteTime time.Duration // total time spent writing to disk |
| 738 | } |
| 739 | |
| 740 | func (s *TxStats) add(other *TxStats) { |
| 741 | s.IncPageCount(other.GetPageCount()) |
| 742 | s.IncPageAlloc(other.GetPageAlloc()) |
| 743 | s.IncCursorCount(other.GetCursorCount()) |
| 744 | s.IncNodeCount(other.GetNodeCount()) |
| 745 | s.IncNodeDeref(other.GetNodeDeref()) |
| 746 | s.IncRebalance(other.GetRebalance()) |
| 747 | s.IncRebalanceTime(other.GetRebalanceTime()) |
| 748 | s.IncSplit(other.GetSplit()) |
| 749 | s.IncSpill(other.GetSpill()) |
| 750 | s.IncSpillTime(other.GetSpillTime()) |
| 751 | s.IncWrite(other.GetWrite()) |
| 752 | s.IncWriteTime(other.GetWriteTime()) |
| 753 | } |
| 754 | |
| 755 | // Sub calculates and returns the difference between two sets of transaction stats. |
| 756 | // This is useful when obtaining stats at two different points and time and |
| 757 | // you need the performance counters that occurred within that time span. |
| 758 | func (s *TxStats) Sub(other *TxStats) TxStats { |
| 759 | var diff TxStats |
| 760 | diff.PageCount = s.GetPageCount() - other.GetPageCount() |
| 761 | diff.PageAlloc = s.GetPageAlloc() - other.GetPageAlloc() |
| 762 | diff.CursorCount = s.GetCursorCount() - other.GetCursorCount() |
| 763 | diff.NodeCount = s.GetNodeCount() - other.GetNodeCount() |
| 764 | diff.NodeDeref = s.GetNodeDeref() - other.GetNodeDeref() |
| 765 | diff.Rebalance = s.GetRebalance() - other.GetRebalance() |
| 766 | diff.RebalanceTime = s.GetRebalanceTime() - other.GetRebalanceTime() |
| 767 | diff.Split = s.GetSplit() - other.GetSplit() |
| 768 | diff.Spill = s.GetSpill() - other.GetSpill() |
| 769 | diff.SpillTime = s.GetSpillTime() - other.GetSpillTime() |
| 770 | diff.Write = s.GetWrite() - other.GetWrite() |
| 771 | diff.WriteTime = s.GetWriteTime() - other.GetWriteTime() |
| 772 | return diff |
| 773 | } |
| 774 | |
| 775 | // GetPageCount returns PageCount atomically. |
| 776 | func (s *TxStats) GetPageCount() int64 { |
| 777 | return atomic.LoadInt64(&s.PageCount) |
| 778 | } |
| 779 | |
| 780 | // IncPageCount increases PageCount atomically and returns the new value. |
| 781 | func (s *TxStats) IncPageCount(delta int64) int64 { |
| 782 | return atomic.AddInt64(&s.PageCount, delta) |
| 783 | } |
| 784 | |
| 785 | // GetPageAlloc returns PageAlloc atomically. |
| 786 | func (s *TxStats) GetPageAlloc() int64 { |
| 787 | return atomic.LoadInt64(&s.PageAlloc) |
| 788 | } |
| 789 | |
| 790 | // IncPageAlloc increases PageAlloc atomically and returns the new value. |
| 791 | func (s *TxStats) IncPageAlloc(delta int64) int64 { |
| 792 | return atomic.AddInt64(&s.PageAlloc, delta) |
| 793 | } |
| 794 | |
| 795 | // GetCursorCount returns CursorCount atomically. |
| 796 | func (s *TxStats) GetCursorCount() int64 { |
| 797 | return atomic.LoadInt64(&s.CursorCount) |
| 798 | } |
| 799 | |
| 800 | // IncCursorCount increases CursorCount atomically and return the new value. |
| 801 | func (s *TxStats) IncCursorCount(delta int64) int64 { |
| 802 | return atomic.AddInt64(&s.CursorCount, delta) |
| 803 | } |
| 804 | |
| 805 | // GetNodeCount returns NodeCount atomically. |
| 806 | func (s *TxStats) GetNodeCount() int64 { |
| 807 | return atomic.LoadInt64(&s.NodeCount) |
| 808 | } |
| 809 | |
| 810 | // IncNodeCount increases NodeCount atomically and returns the new value. |
| 811 | func (s *TxStats) IncNodeCount(delta int64) int64 { |
| 812 | return atomic.AddInt64(&s.NodeCount, delta) |
| 813 | } |
| 814 | |
| 815 | // GetNodeDeref returns NodeDeref atomically. |
| 816 | func (s *TxStats) GetNodeDeref() int64 { |
| 817 | return atomic.LoadInt64(&s.NodeDeref) |
| 818 | } |
| 819 | |
| 820 | // IncNodeDeref increases NodeDeref atomically and returns the new value. |
| 821 | func (s *TxStats) IncNodeDeref(delta int64) int64 { |
| 822 | return atomic.AddInt64(&s.NodeDeref, delta) |
| 823 | } |
| 824 | |
| 825 | // GetRebalance returns Rebalance atomically. |
| 826 | func (s *TxStats) GetRebalance() int64 { |
| 827 | return atomic.LoadInt64(&s.Rebalance) |
| 828 | } |
| 829 | |
| 830 | // IncRebalance increases Rebalance atomically and returns the new value. |
| 831 | func (s *TxStats) IncRebalance(delta int64) int64 { |
| 832 | return atomic.AddInt64(&s.Rebalance, delta) |
| 833 | } |
| 834 | |
| 835 | // GetRebalanceTime returns RebalanceTime atomically. |
| 836 | func (s *TxStats) GetRebalanceTime() time.Duration { |
| 837 | return atomicLoadDuration(&s.RebalanceTime) |
| 838 | } |
| 839 | |
| 840 | // IncRebalanceTime increases RebalanceTime atomically and returns the new value. |
| 841 | func (s *TxStats) IncRebalanceTime(delta time.Duration) time.Duration { |
| 842 | return atomicAddDuration(&s.RebalanceTime, delta) |
| 843 | } |
| 844 | |
| 845 | // GetSplit returns Split atomically. |
| 846 | func (s *TxStats) GetSplit() int64 { |
| 847 | return atomic.LoadInt64(&s.Split) |
| 848 | } |
| 849 | |
| 850 | // IncSplit increases Split atomically and returns the new value. |
| 851 | func (s *TxStats) IncSplit(delta int64) int64 { |
| 852 | return atomic.AddInt64(&s.Split, delta) |
| 853 | } |
| 854 | |
| 855 | // GetSpill returns Spill atomically. |
| 856 | func (s *TxStats) GetSpill() int64 { |
| 857 | return atomic.LoadInt64(&s.Spill) |
| 858 | } |
| 859 | |
| 860 | // IncSpill increases Spill atomically and returns the new value. |
| 861 | func (s *TxStats) IncSpill(delta int64) int64 { |
| 862 | return atomic.AddInt64(&s.Spill, delta) |
| 863 | } |
| 864 | |
| 865 | // GetSpillTime returns SpillTime atomically. |
| 866 | func (s *TxStats) GetSpillTime() time.Duration { |
| 867 | return atomicLoadDuration(&s.SpillTime) |
| 868 | } |
| 869 | |
| 870 | // IncSpillTime increases SpillTime atomically and returns the new value. |
| 871 | func (s *TxStats) IncSpillTime(delta time.Duration) time.Duration { |
| 872 | return atomicAddDuration(&s.SpillTime, delta) |
| 873 | } |
| 874 | |
| 875 | // GetWrite returns Write atomically. |
| 876 | func (s *TxStats) GetWrite() int64 { |
| 877 | return atomic.LoadInt64(&s.Write) |
| 878 | } |
| 879 | |
| 880 | // IncWrite increases Write atomically and returns the new value. |
| 881 | func (s *TxStats) IncWrite(delta int64) int64 { |
| 882 | return atomic.AddInt64(&s.Write, delta) |
| 883 | } |
| 884 | |
| 885 | // GetWriteTime returns WriteTime atomically. |
| 886 | func (s *TxStats) GetWriteTime() time.Duration { |
| 887 | return atomicLoadDuration(&s.WriteTime) |
| 888 | } |
| 889 | |
| 890 | // IncWriteTime increases WriteTime atomically and returns the new value. |
| 891 | func (s *TxStats) IncWriteTime(delta time.Duration) time.Duration { |
| 892 | return atomicAddDuration(&s.WriteTime, delta) |
| 893 | } |
| 894 | |
| 895 | func atomicAddDuration(ptr *time.Duration, du time.Duration) time.Duration { |
| 896 | return time.Duration(atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du))) |
| 897 | } |
| 898 | |
| 899 | func atomicLoadDuration(ptr *time.Duration) time.Duration { |
| 900 | return time.Duration(atomic.LoadInt64((*int64)(unsafe.Pointer(ptr)))) |
| 901 | } |