| Abhay Kumar | a2ae599 | 2025-11-10 14:02:24 +0000 | [diff] [blame^] | 1 | package bbolt |
| 2 | |
| 3 | // Compact will create a copy of the source DB and in the destination DB. This may |
| 4 | // reclaim space that the source database no longer has use for. txMaxSize can be |
| 5 | // used to limit the transactions size of this process and may trigger intermittent |
| 6 | // commits. A value of zero will ignore transaction sizes. |
| 7 | // TODO: merge with: https://github.com/etcd-io/etcd/blob/b7f0f52a16dbf83f18ca1d803f7892d750366a94/mvcc/backend/backend.go#L349 |
| 8 | func Compact(dst, src *DB, txMaxSize int64) error { |
| 9 | // commit regularly, or we'll run out of memory for large datasets if using one transaction. |
| 10 | var size int64 |
| 11 | tx, err := dst.Begin(true) |
| 12 | if err != nil { |
| 13 | return err |
| 14 | } |
| 15 | defer func() { |
| 16 | if tempErr := tx.Rollback(); tempErr != nil { |
| 17 | err = tempErr |
| 18 | } |
| 19 | }() |
| 20 | |
| 21 | if err := walk(src, func(keys [][]byte, k, v []byte, seq uint64) error { |
| 22 | // On each key/value, check if we have exceeded tx size. |
| 23 | sz := int64(len(k) + len(v)) |
| 24 | if size+sz > txMaxSize && txMaxSize != 0 { |
| 25 | // Commit previous transaction. |
| 26 | if err := tx.Commit(); err != nil { |
| 27 | return err |
| 28 | } |
| 29 | |
| 30 | // Start new transaction. |
| 31 | tx, err = dst.Begin(true) |
| 32 | if err != nil { |
| 33 | return err |
| 34 | } |
| 35 | size = 0 |
| 36 | } |
| 37 | size += sz |
| 38 | |
| 39 | // Create bucket on the root transaction if this is the first level. |
| 40 | nk := len(keys) |
| 41 | if nk == 0 { |
| 42 | bkt, err := tx.CreateBucket(k) |
| 43 | if err != nil { |
| 44 | return err |
| 45 | } |
| 46 | if err := bkt.SetSequence(seq); err != nil { |
| 47 | return err |
| 48 | } |
| 49 | return nil |
| 50 | } |
| 51 | |
| 52 | // Create buckets on subsequent levels, if necessary. |
| 53 | b := tx.Bucket(keys[0]) |
| 54 | if nk > 1 { |
| 55 | for _, k := range keys[1:] { |
| 56 | b = b.Bucket(k) |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | // Fill the entire page for best compaction. |
| 61 | b.FillPercent = 1.0 |
| 62 | |
| 63 | // If there is no value then this is a bucket call. |
| 64 | if v == nil { |
| 65 | bkt, err := b.CreateBucket(k) |
| 66 | if err != nil { |
| 67 | return err |
| 68 | } |
| 69 | if err := bkt.SetSequence(seq); err != nil { |
| 70 | return err |
| 71 | } |
| 72 | return nil |
| 73 | } |
| 74 | |
| 75 | // Otherwise treat it as a key/value pair. |
| 76 | return b.Put(k, v) |
| 77 | }); err != nil { |
| 78 | return err |
| 79 | } |
| 80 | err = tx.Commit() |
| 81 | |
| 82 | return err |
| 83 | } |
| 84 | |
| 85 | // walkFunc is the type of the function called for keys (buckets and "normal" |
| 86 | // values) discovered by Walk. keys is the list of keys to descend to the bucket |
| 87 | // owning the discovered key/value pair k/v. |
| 88 | type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error |
| 89 | |
| 90 | // walk walks recursively the bolt database db, calling walkFn for each key it finds. |
| 91 | func walk(db *DB, walkFn walkFunc) error { |
| 92 | return db.View(func(tx *Tx) error { |
| 93 | return tx.ForEach(func(name []byte, b *Bucket) error { |
| 94 | return walkBucket(b, nil, name, nil, b.Sequence(), walkFn) |
| 95 | }) |
| 96 | }) |
| 97 | } |
| 98 | |
| 99 | func walkBucket(b *Bucket, keypath [][]byte, k, v []byte, seq uint64, fn walkFunc) error { |
| 100 | // Execute callback. |
| 101 | if err := fn(keypath, k, v, seq); err != nil { |
| 102 | return err |
| 103 | } |
| 104 | |
| 105 | // If this is not a bucket then stop. |
| 106 | if v != nil { |
| 107 | return nil |
| 108 | } |
| 109 | |
| 110 | // Iterate over each child key/value. |
| 111 | keypath = append(keypath, k) |
| 112 | return b.ForEach(func(k, v []byte) error { |
| 113 | if v == nil { |
| 114 | bkt := b.Bucket(k) |
| 115 | return walkBucket(bkt, keypath, k, nil, bkt.Sequence(), fn) |
| 116 | } |
| 117 | return walkBucket(b, keypath, k, v, b.Sequence(), fn) |
| 118 | }) |
| 119 | } |