blob: 6371ace972ce13648d3c16eb89a6afba63d1f63c [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package bbolt
2
3import (
4 "bytes"
5 "fmt"
6 "unsafe"
7
8 "go.etcd.io/bbolt/errors"
9 "go.etcd.io/bbolt/internal/common"
10)
11
12const (
13 // MaxKeySize is the maximum length of a key, in bytes.
14 MaxKeySize = 32768
15
16 // MaxValueSize is the maximum length of a value, in bytes.
17 MaxValueSize = (1 << 31) - 2
18)
19
20const (
21 minFillPercent = 0.1
22 maxFillPercent = 1.0
23)
24
25// DefaultFillPercent is the percentage that split pages are filled.
26// This value can be changed by setting Bucket.FillPercent.
27const DefaultFillPercent = 0.5
28
29// Bucket represents a collection of key/value pairs inside the database.
30type Bucket struct {
31 *common.InBucket
32 tx *Tx // the associated transaction
33 buckets map[string]*Bucket // subbucket cache
34 page *common.Page // inline page reference
35 rootNode *node // materialized node for the root page.
36 nodes map[common.Pgid]*node // node cache
37
38 // Sets the threshold for filling nodes when they split. By default,
39 // the bucket will fill to 50% but it can be useful to increase this
40 // amount if you know that your write workloads are mostly append-only.
41 //
42 // This is non-persisted across transactions so it must be set in every Tx.
43 FillPercent float64
44}
45
46// newBucket returns a new bucket associated with a transaction.
47func newBucket(tx *Tx) Bucket {
48 var b = Bucket{tx: tx, FillPercent: DefaultFillPercent}
49 if tx.writable {
50 b.buckets = make(map[string]*Bucket)
51 b.nodes = make(map[common.Pgid]*node)
52 }
53 return b
54}
55
56// Tx returns the tx of the bucket.
57func (b *Bucket) Tx() *Tx {
58 return b.tx
59}
60
61// Root returns the root of the bucket.
62func (b *Bucket) Root() common.Pgid {
63 return b.RootPage()
64}
65
66// Writable returns whether the bucket is writable.
67func (b *Bucket) Writable() bool {
68 return b.tx.writable
69}
70
71// Cursor creates a cursor associated with the bucket.
72// The cursor is only valid as long as the transaction is open.
73// Do not use a cursor after the transaction is closed.
74func (b *Bucket) Cursor() *Cursor {
75 // Update transaction statistics.
76 b.tx.stats.IncCursorCount(1)
77
78 // Allocate and return a cursor.
79 return &Cursor{
80 bucket: b,
81 stack: make([]elemRef, 0),
82 }
83}
84
85// Bucket retrieves a nested bucket by name.
86// Returns nil if the bucket does not exist.
87// The bucket instance is only valid for the lifetime of the transaction.
88func (b *Bucket) Bucket(name []byte) *Bucket {
89 if b.buckets != nil {
90 if child := b.buckets[string(name)]; child != nil {
91 return child
92 }
93 }
94
95 // Move cursor to key.
96 c := b.Cursor()
97 k, v, flags := c.seek(name)
98
99 // Return nil if the key doesn't exist or it is not a bucket.
100 if !bytes.Equal(name, k) || (flags&common.BucketLeafFlag) == 0 {
101 return nil
102 }
103
104 // Otherwise create a bucket and cache it.
105 var child = b.openBucket(v)
106 if b.buckets != nil {
107 b.buckets[string(name)] = child
108 }
109
110 return child
111}
112
113// Helper method that re-interprets a sub-bucket value
114// from a parent into a Bucket
115func (b *Bucket) openBucket(value []byte) *Bucket {
116 var child = newBucket(b.tx)
117
118 // Unaligned access requires a copy to be made.
119 const unalignedMask = unsafe.Alignof(struct {
120 common.InBucket
121 common.Page
122 }{}) - 1
123 unaligned := uintptr(unsafe.Pointer(&value[0]))&unalignedMask != 0
124 if unaligned {
125 value = cloneBytes(value)
126 }
127
128 // If this is a writable transaction then we need to copy the bucket entry.
129 // Read-only transactions can point directly at the mmap entry.
130 if b.tx.writable && !unaligned {
131 child.InBucket = &common.InBucket{}
132 *child.InBucket = *(*common.InBucket)(unsafe.Pointer(&value[0]))
133 } else {
134 child.InBucket = (*common.InBucket)(unsafe.Pointer(&value[0]))
135 }
136
137 // Save a reference to the inline page if the bucket is inline.
138 if child.RootPage() == 0 {
139 child.page = (*common.Page)(unsafe.Pointer(&value[common.BucketHeaderSize]))
140 }
141
142 return &child
143}
144
145// CreateBucket creates a new bucket at the given key and returns the new bucket.
146// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
147// The bucket instance is only valid for the lifetime of the transaction.
148func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) {
149 if lg := b.tx.db.Logger(); lg != discardLogger {
150 lg.Debugf("Creating bucket %q", key)
151 defer func() {
152 if err != nil {
153 lg.Errorf("Creating bucket %q failed: %v", key, err)
154 } else {
155 lg.Debugf("Creating bucket %q successfully", key)
156 }
157 }()
158 }
159 if b.tx.db == nil {
160 return nil, errors.ErrTxClosed
161 } else if !b.tx.writable {
162 return nil, errors.ErrTxNotWritable
163 } else if len(key) == 0 {
164 return nil, errors.ErrBucketNameRequired
165 }
166
167 // Insert into node.
168 // Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent
169 // it from being marked as leaking, and accordingly cannot be allocated on stack.
170 newKey := cloneBytes(key)
171
172 // Move cursor to correct position.
173 c := b.Cursor()
174 k, _, flags := c.seek(newKey)
175
176 // Return an error if there is an existing key.
177 if bytes.Equal(newKey, k) {
178 if (flags & common.BucketLeafFlag) != 0 {
179 return nil, errors.ErrBucketExists
180 }
181 return nil, errors.ErrIncompatibleValue
182 }
183
184 // Create empty, inline bucket.
185 var bucket = Bucket{
186 InBucket: &common.InBucket{},
187 rootNode: &node{isLeaf: true},
188 FillPercent: DefaultFillPercent,
189 }
190 var value = bucket.write()
191
192 c.node().put(newKey, newKey, value, 0, common.BucketLeafFlag)
193
194 // Since subbuckets are not allowed on inline buckets, we need to
195 // dereference the inline page, if it exists. This will cause the bucket
196 // to be treated as a regular, non-inline bucket for the rest of the tx.
197 b.page = nil
198
199 return b.Bucket(newKey), nil
200}
201
202// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
203// Returns an error if the bucket name is blank, or if the bucket name is too long.
204// The bucket instance is only valid for the lifetime of the transaction.
205func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) {
206 if lg := b.tx.db.Logger(); lg != discardLogger {
207 lg.Debugf("Creating bucket if not exist %q", key)
208 defer func() {
209 if err != nil {
210 lg.Errorf("Creating bucket if not exist %q failed: %v", key, err)
211 } else {
212 lg.Debugf("Creating bucket if not exist %q successfully", key)
213 }
214 }()
215 }
216
217 if b.tx.db == nil {
218 return nil, errors.ErrTxClosed
219 } else if !b.tx.writable {
220 return nil, errors.ErrTxNotWritable
221 } else if len(key) == 0 {
222 return nil, errors.ErrBucketNameRequired
223 }
224
225 // Insert into node.
226 // Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent
227 // it from being marked as leaking, and accordingly cannot be allocated on stack.
228 newKey := cloneBytes(key)
229
230 if b.buckets != nil {
231 if child := b.buckets[string(newKey)]; child != nil {
232 return child, nil
233 }
234 }
235
236 // Move cursor to correct position.
237 c := b.Cursor()
238 k, v, flags := c.seek(newKey)
239
240 // Return an error if there is an existing non-bucket key.
241 if bytes.Equal(newKey, k) {
242 if (flags & common.BucketLeafFlag) != 0 {
243 var child = b.openBucket(v)
244 if b.buckets != nil {
245 b.buckets[string(newKey)] = child
246 }
247
248 return child, nil
249 }
250 return nil, errors.ErrIncompatibleValue
251 }
252
253 // Create empty, inline bucket.
254 var bucket = Bucket{
255 InBucket: &common.InBucket{},
256 rootNode: &node{isLeaf: true},
257 FillPercent: DefaultFillPercent,
258 }
259 var value = bucket.write()
260
261 c.node().put(newKey, newKey, value, 0, common.BucketLeafFlag)
262
263 // Since subbuckets are not allowed on inline buckets, we need to
264 // dereference the inline page, if it exists. This will cause the bucket
265 // to be treated as a regular, non-inline bucket for the rest of the tx.
266 b.page = nil
267
268 return b.Bucket(newKey), nil
269}
270
271// DeleteBucket deletes a bucket at the given key.
272// Returns an error if the bucket does not exist, or if the key represents a non-bucket value.
273func (b *Bucket) DeleteBucket(key []byte) (err error) {
274 if lg := b.tx.db.Logger(); lg != discardLogger {
275 lg.Debugf("Deleting bucket %q", key)
276 defer func() {
277 if err != nil {
278 lg.Errorf("Deleting bucket %q failed: %v", key, err)
279 } else {
280 lg.Debugf("Deleting bucket %q successfully", key)
281 }
282 }()
283 }
284
285 if b.tx.db == nil {
286 return errors.ErrTxClosed
287 } else if !b.Writable() {
288 return errors.ErrTxNotWritable
289 }
290
291 newKey := cloneBytes(key)
292
293 // Move cursor to correct position.
294 c := b.Cursor()
295 k, _, flags := c.seek(newKey)
296
297 // Return an error if bucket doesn't exist or is not a bucket.
298 if !bytes.Equal(newKey, k) {
299 return errors.ErrBucketNotFound
300 } else if (flags & common.BucketLeafFlag) == 0 {
301 return errors.ErrIncompatibleValue
302 }
303
304 // Recursively delete all child buckets.
305 child := b.Bucket(newKey)
306 err = child.ForEachBucket(func(k []byte) error {
307 if err := child.DeleteBucket(k); err != nil {
308 return fmt.Errorf("delete bucket: %s", err)
309 }
310 return nil
311 })
312 if err != nil {
313 return err
314 }
315
316 // Remove cached copy.
317 delete(b.buckets, string(newKey))
318
319 // Release all bucket pages to freelist.
320 child.nodes = nil
321 child.rootNode = nil
322 child.free()
323
324 // Delete the node if we have a matching key.
325 c.node().del(newKey)
326
327 return nil
328}
329
330// MoveBucket moves a sub-bucket from the source bucket to the destination bucket.
331// Returns an error if
332// 1. the sub-bucket cannot be found in the source bucket;
333// 2. or the key already exists in the destination bucket;
334// 3. or the key represents a non-bucket value;
335// 4. the source and destination buckets are the same.
336func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) {
337 lg := b.tx.db.Logger()
338 if lg != discardLogger {
339 lg.Debugf("Moving bucket %q", key)
340 defer func() {
341 if err != nil {
342 lg.Errorf("Moving bucket %q failed: %v", key, err)
343 } else {
344 lg.Debugf("Moving bucket %q successfully", key)
345 }
346 }()
347 }
348
349 if b.tx.db == nil || dstBucket.tx.db == nil {
350 return errors.ErrTxClosed
351 } else if !b.Writable() || !dstBucket.Writable() {
352 return errors.ErrTxNotWritable
353 }
354
355 if b.tx.db.Path() != dstBucket.tx.db.Path() || b.tx != dstBucket.tx {
356 lg.Errorf("The source and target buckets are not in the same db file, source bucket in %s and target bucket in %s", b.tx.db.Path(), dstBucket.tx.db.Path())
357 return errors.ErrDifferentDB
358 }
359
360 newKey := cloneBytes(key)
361
362 // Move cursor to correct position.
363 c := b.Cursor()
364 k, v, flags := c.seek(newKey)
365
366 // Return an error if bucket doesn't exist or is not a bucket.
367 if !bytes.Equal(newKey, k) {
368 return errors.ErrBucketNotFound
369 } else if (flags & common.BucketLeafFlag) == 0 {
370 lg.Errorf("An incompatible key %s exists in the source bucket", newKey)
371 return errors.ErrIncompatibleValue
372 }
373
374 // Do nothing (return true directly) if the source bucket and the
375 // destination bucket are actually the same bucket.
376 if b == dstBucket || (b.RootPage() == dstBucket.RootPage() && b.RootPage() != 0) {
377 lg.Errorf("The source bucket (%s) and the target bucket (%s) are the same bucket", b, dstBucket)
378 return errors.ErrSameBuckets
379 }
380
381 // check whether the key already exists in the destination bucket
382 curDst := dstBucket.Cursor()
383 k, _, flags = curDst.seek(newKey)
384
385 // Return an error if there is an existing key in the destination bucket.
386 if bytes.Equal(newKey, k) {
387 if (flags & common.BucketLeafFlag) != 0 {
388 return errors.ErrBucketExists
389 }
390 lg.Errorf("An incompatible key %s exists in the target bucket", newKey)
391 return errors.ErrIncompatibleValue
392 }
393
394 // remove the sub-bucket from the source bucket
395 delete(b.buckets, string(newKey))
396 c.node().del(newKey)
397
398 // add te sub-bucket to the destination bucket
399 newValue := cloneBytes(v)
400 curDst.node().put(newKey, newKey, newValue, 0, common.BucketLeafFlag)
401
402 return nil
403}
404
405// Inspect returns the structure of the bucket.
406func (b *Bucket) Inspect() BucketStructure {
407 return b.recursivelyInspect([]byte("root"))
408}
409
410func (b *Bucket) recursivelyInspect(name []byte) BucketStructure {
411 bs := BucketStructure{Name: string(name)}
412
413 keyN := 0
414 c := b.Cursor()
415 for k, _, flags := c.first(); k != nil; k, _, flags = c.next() {
416 if flags&common.BucketLeafFlag != 0 {
417 childBucket := b.Bucket(k)
418 childBS := childBucket.recursivelyInspect(k)
419 bs.Children = append(bs.Children, childBS)
420 } else {
421 keyN++
422 }
423 }
424 bs.KeyN = keyN
425
426 return bs
427}
428
429// Get retrieves the value for a key in the bucket.
430// Returns a nil value if the key does not exist or if the key is a nested bucket.
431// The returned value is only valid for the life of the transaction.
432// The returned memory is owned by bbolt and must never be modified; writing to this memory might corrupt the database.
433func (b *Bucket) Get(key []byte) []byte {
434 k, v, flags := b.Cursor().seek(key)
435
436 // Return nil if this is a bucket.
437 if (flags & common.BucketLeafFlag) != 0 {
438 return nil
439 }
440
441 // If our target node isn't the same key as what's passed in then return nil.
442 if !bytes.Equal(key, k) {
443 return nil
444 }
445 return v
446}
447
448// Put sets the value for a key in the bucket.
449// If the key exist then its previous value will be overwritten.
450// Supplied value must remain valid for the life of the transaction.
451// Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
452func (b *Bucket) Put(key []byte, value []byte) (err error) {
453 if lg := b.tx.db.Logger(); lg != discardLogger {
454 lg.Debugf("Putting key %q", key)
455 defer func() {
456 if err != nil {
457 lg.Errorf("Putting key %q failed: %v", key, err)
458 } else {
459 lg.Debugf("Putting key %q successfully", key)
460 }
461 }()
462 }
463 if b.tx.db == nil {
464 return errors.ErrTxClosed
465 } else if !b.Writable() {
466 return errors.ErrTxNotWritable
467 } else if len(key) == 0 {
468 return errors.ErrKeyRequired
469 } else if len(key) > MaxKeySize {
470 return errors.ErrKeyTooLarge
471 } else if int64(len(value)) > MaxValueSize {
472 return errors.ErrValueTooLarge
473 }
474
475 // Insert into node.
476 // Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent
477 // it from being marked as leaking, and accordingly cannot be allocated on stack.
478 newKey := cloneBytes(key)
479
480 // Move cursor to correct position.
481 c := b.Cursor()
482 k, _, flags := c.seek(newKey)
483
484 // Return an error if there is an existing key with a bucket value.
485 if bytes.Equal(newKey, k) && (flags&common.BucketLeafFlag) != 0 {
486 return errors.ErrIncompatibleValue
487 }
488
489 // gofail: var beforeBucketPut struct{}
490
491 c.node().put(newKey, newKey, value, 0, 0)
492
493 return nil
494}
495
496// Delete removes a key from the bucket.
497// If the key does not exist then nothing is done and a nil error is returned.
498// Returns an error if the bucket was created from a read-only transaction.
499func (b *Bucket) Delete(key []byte) (err error) {
500 if lg := b.tx.db.Logger(); lg != discardLogger {
501 lg.Debugf("Deleting key %q", key)
502 defer func() {
503 if err != nil {
504 lg.Errorf("Deleting key %q failed: %v", key, err)
505 } else {
506 lg.Debugf("Deleting key %q successfully", key)
507 }
508 }()
509 }
510
511 if b.tx.db == nil {
512 return errors.ErrTxClosed
513 } else if !b.Writable() {
514 return errors.ErrTxNotWritable
515 }
516
517 // Move cursor to correct position.
518 c := b.Cursor()
519 k, _, flags := c.seek(key)
520
521 // Return nil if the key doesn't exist.
522 if !bytes.Equal(key, k) {
523 return nil
524 }
525
526 // Return an error if there is already existing bucket value.
527 if (flags & common.BucketLeafFlag) != 0 {
528 return errors.ErrIncompatibleValue
529 }
530
531 // Delete the node if we have a matching key.
532 c.node().del(key)
533
534 return nil
535}
536
537// Sequence returns the current integer for the bucket without incrementing it.
538func (b *Bucket) Sequence() uint64 {
539 return b.InSequence()
540}
541
542// SetSequence updates the sequence number for the bucket.
543func (b *Bucket) SetSequence(v uint64) error {
544 if b.tx.db == nil {
545 return errors.ErrTxClosed
546 } else if !b.Writable() {
547 return errors.ErrTxNotWritable
548 }
549
550 // Materialize the root node if it hasn't been already so that the
551 // bucket will be saved during commit.
552 if b.rootNode == nil {
553 _ = b.node(b.RootPage(), nil)
554 }
555
556 // Set the sequence.
557 b.SetInSequence(v)
558 return nil
559}
560
561// NextSequence returns an autoincrementing integer for the bucket.
562func (b *Bucket) NextSequence() (uint64, error) {
563 if b.tx.db == nil {
564 return 0, errors.ErrTxClosed
565 } else if !b.Writable() {
566 return 0, errors.ErrTxNotWritable
567 }
568
569 // Materialize the root node if it hasn't been already so that the
570 // bucket will be saved during commit.
571 if b.rootNode == nil {
572 _ = b.node(b.RootPage(), nil)
573 }
574
575 // Increment and return the sequence.
576 b.IncSequence()
577 return b.Sequence(), nil
578}
579
580// ForEach executes a function for each key/value pair in a bucket.
581// Because ForEach uses a Cursor, the iteration over keys is in lexicographical order.
582// If the provided function returns an error then the iteration is stopped and
583// the error is returned to the caller. The provided function must not modify
584// the bucket; this will result in undefined behavior.
585func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
586 if b.tx.db == nil {
587 return errors.ErrTxClosed
588 }
589 c := b.Cursor()
590 for k, v := c.First(); k != nil; k, v = c.Next() {
591 if err := fn(k, v); err != nil {
592 return err
593 }
594 }
595 return nil
596}
597
598func (b *Bucket) ForEachBucket(fn func(k []byte) error) error {
599 if b.tx.db == nil {
600 return errors.ErrTxClosed
601 }
602 c := b.Cursor()
603 for k, _, flags := c.first(); k != nil; k, _, flags = c.next() {
604 if flags&common.BucketLeafFlag != 0 {
605 if err := fn(k); err != nil {
606 return err
607 }
608 }
609 }
610 return nil
611}
612
613// Stats returns stats on a bucket.
614func (b *Bucket) Stats() BucketStats {
615 var s, subStats BucketStats
616 pageSize := b.tx.db.pageSize
617 s.BucketN += 1
618 if b.RootPage() == 0 {
619 s.InlineBucketN += 1
620 }
621 b.forEachPage(func(p *common.Page, depth int, pgstack []common.Pgid) {
622 if p.IsLeafPage() {
623 s.KeyN += int(p.Count())
624
625 // used totals the used bytes for the page
626 used := common.PageHeaderSize
627
628 if p.Count() != 0 {
629 // If page has any elements, add all element headers.
630 used += common.LeafPageElementSize * uintptr(p.Count()-1)
631
632 // Add all element key, value sizes.
633 // The computation takes advantage of the fact that the position
634 // of the last element's key/value equals to the total of the sizes
635 // of all previous elements' keys and values.
636 // It also includes the last element's header.
637 lastElement := p.LeafPageElement(p.Count() - 1)
638 used += uintptr(lastElement.Pos() + lastElement.Ksize() + lastElement.Vsize())
639 }
640
641 if b.RootPage() == 0 {
642 // For inlined bucket just update the inline stats
643 s.InlineBucketInuse += int(used)
644 } else {
645 // For non-inlined bucket update all the leaf stats
646 s.LeafPageN++
647 s.LeafInuse += int(used)
648 s.LeafOverflowN += int(p.Overflow())
649
650 // Collect stats from sub-buckets.
651 // Do that by iterating over all element headers
652 // looking for the ones with the bucketLeafFlag.
653 for i := uint16(0); i < p.Count(); i++ {
654 e := p.LeafPageElement(i)
655 if (e.Flags() & common.BucketLeafFlag) != 0 {
656 // For any bucket element, open the element value
657 // and recursively call Stats on the contained bucket.
658 subStats.Add(b.openBucket(e.Value()).Stats())
659 }
660 }
661 }
662 } else if p.IsBranchPage() {
663 s.BranchPageN++
664 lastElement := p.BranchPageElement(p.Count() - 1)
665
666 // used totals the used bytes for the page
667 // Add header and all element headers.
668 used := common.PageHeaderSize + (common.BranchPageElementSize * uintptr(p.Count()-1))
669
670 // Add size of all keys and values.
671 // Again, use the fact that last element's position equals to
672 // the total of key, value sizes of all previous elements.
673 used += uintptr(lastElement.Pos() + lastElement.Ksize())
674 s.BranchInuse += int(used)
675 s.BranchOverflowN += int(p.Overflow())
676 }
677
678 // Keep track of maximum page depth.
679 if depth+1 > s.Depth {
680 s.Depth = depth + 1
681 }
682 })
683
684 // Alloc stats can be computed from page counts and pageSize.
685 s.BranchAlloc = (s.BranchPageN + s.BranchOverflowN) * pageSize
686 s.LeafAlloc = (s.LeafPageN + s.LeafOverflowN) * pageSize
687
688 // Add the max depth of sub-buckets to get total nested depth.
689 s.Depth += subStats.Depth
690 // Add the stats for all sub-buckets
691 s.Add(subStats)
692 return s
693}
694
695// forEachPage iterates over every page in a bucket, including inline pages.
696func (b *Bucket) forEachPage(fn func(*common.Page, int, []common.Pgid)) {
697 // If we have an inline page then just use that.
698 if b.page != nil {
699 fn(b.page, 0, []common.Pgid{b.RootPage()})
700 return
701 }
702
703 // Otherwise traverse the page hierarchy.
704 b.tx.forEachPage(b.RootPage(), fn)
705}
706
707// forEachPageNode iterates over every page (or node) in a bucket.
708// This also includes inline pages.
709func (b *Bucket) forEachPageNode(fn func(*common.Page, *node, int)) {
710 // If we have an inline page or root node then just use that.
711 if b.page != nil {
712 fn(b.page, nil, 0)
713 return
714 }
715 b._forEachPageNode(b.RootPage(), 0, fn)
716}
717
718func (b *Bucket) _forEachPageNode(pgId common.Pgid, depth int, fn func(*common.Page, *node, int)) {
719 var p, n = b.pageNode(pgId)
720
721 // Execute function.
722 fn(p, n, depth)
723
724 // Recursively loop over children.
725 if p != nil {
726 if p.IsBranchPage() {
727 for i := 0; i < int(p.Count()); i++ {
728 elem := p.BranchPageElement(uint16(i))
729 b._forEachPageNode(elem.Pgid(), depth+1, fn)
730 }
731 }
732 } else {
733 if !n.isLeaf {
734 for _, inode := range n.inodes {
735 b._forEachPageNode(inode.Pgid(), depth+1, fn)
736 }
737 }
738 }
739}
740
741// spill writes all the nodes for this bucket to dirty pages.
742func (b *Bucket) spill() error {
743 // Spill all child buckets first.
744 for name, child := range b.buckets {
745 // If the child bucket is small enough and it has no child buckets then
746 // write it inline into the parent bucket's page. Otherwise spill it
747 // like a normal bucket and make the parent value a pointer to the page.
748 var value []byte
749 if child.inlineable() {
750 child.free()
751 value = child.write()
752 } else {
753 if err := child.spill(); err != nil {
754 return err
755 }
756
757 // Update the child bucket header in this bucket.
758 value = make([]byte, unsafe.Sizeof(common.InBucket{}))
759 var bucket = (*common.InBucket)(unsafe.Pointer(&value[0]))
760 *bucket = *child.InBucket
761 }
762
763 // Skip writing the bucket if there are no materialized nodes.
764 if child.rootNode == nil {
765 continue
766 }
767
768 // Update parent node.
769 var c = b.Cursor()
770 k, _, flags := c.seek([]byte(name))
771 if !bytes.Equal([]byte(name), k) {
772 panic(fmt.Sprintf("misplaced bucket header: %x -> %x", []byte(name), k))
773 }
774 if flags&common.BucketLeafFlag == 0 {
775 panic(fmt.Sprintf("unexpected bucket header flag: %x", flags))
776 }
777 c.node().put([]byte(name), []byte(name), value, 0, common.BucketLeafFlag)
778 }
779
780 // Ignore if there's not a materialized root node.
781 if b.rootNode == nil {
782 return nil
783 }
784
785 // Spill nodes.
786 if err := b.rootNode.spill(); err != nil {
787 return err
788 }
789 b.rootNode = b.rootNode.root()
790
791 // Update the root node for this bucket.
792 if b.rootNode.pgid >= b.tx.meta.Pgid() {
793 panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.Pgid()))
794 }
795 b.SetRootPage(b.rootNode.pgid)
796
797 return nil
798}
799
800// inlineable returns true if a bucket is small enough to be written inline
801// and if it contains no subbuckets. Otherwise, returns false.
802func (b *Bucket) inlineable() bool {
803 var n = b.rootNode
804
805 // Bucket must only contain a single leaf node.
806 if n == nil || !n.isLeaf {
807 return false
808 }
809
810 // Bucket is not inlineable if it contains subbuckets or if it goes beyond
811 // our threshold for inline bucket size.
812 var size = common.PageHeaderSize
813 for _, inode := range n.inodes {
814 size += common.LeafPageElementSize + uintptr(len(inode.Key())) + uintptr(len(inode.Value()))
815
816 if inode.Flags()&common.BucketLeafFlag != 0 {
817 return false
818 } else if size > b.maxInlineBucketSize() {
819 return false
820 }
821 }
822
823 return true
824}
825
826// Returns the maximum total size of a bucket to make it a candidate for inlining.
827func (b *Bucket) maxInlineBucketSize() uintptr {
828 return uintptr(b.tx.db.pageSize / 4)
829}
830
831// write allocates and writes a bucket to a byte slice.
832func (b *Bucket) write() []byte {
833 // Allocate the appropriate size.
834 var n = b.rootNode
835 var value = make([]byte, common.BucketHeaderSize+n.size())
836
837 // Write a bucket header.
838 var bucket = (*common.InBucket)(unsafe.Pointer(&value[0]))
839 *bucket = *b.InBucket
840
841 // Convert byte slice to a fake page and write the root node.
842 var p = (*common.Page)(unsafe.Pointer(&value[common.BucketHeaderSize]))
843 n.write(p)
844
845 return value
846}
847
848// rebalance attempts to balance all nodes.
849func (b *Bucket) rebalance() {
850 for _, n := range b.nodes {
851 n.rebalance()
852 }
853 for _, child := range b.buckets {
854 child.rebalance()
855 }
856}
857
858// node creates a node from a page and associates it with a given parent.
859func (b *Bucket) node(pgId common.Pgid, parent *node) *node {
860 common.Assert(b.nodes != nil, "nodes map expected")
861
862 // Retrieve node if it's already been created.
863 if n := b.nodes[pgId]; n != nil {
864 return n
865 }
866
867 // Otherwise create a node and cache it.
868 n := &node{bucket: b, parent: parent}
869 if parent == nil {
870 b.rootNode = n
871 } else {
872 parent.children = append(parent.children, n)
873 }
874
875 // Use the inline page if this is an inline bucket.
876 var p = b.page
877 if p == nil {
878 p = b.tx.page(pgId)
879 } else {
880 // if p isn't nil, then it's an inline bucket.
881 // The pgId must be 0 in this case.
882 common.Verify(func() {
883 common.Assert(pgId == 0, "The page ID (%d) isn't 0 for an inline bucket", pgId)
884 })
885 }
886
887 // Read the page into the node and cache it.
888 n.read(p)
889 b.nodes[pgId] = n
890
891 // Update statistics.
892 b.tx.stats.IncNodeCount(1)
893
894 return n
895}
896
897// free recursively frees all pages in the bucket.
898func (b *Bucket) free() {
899 if b.RootPage() == 0 {
900 return
901 }
902
903 var tx = b.tx
904 b.forEachPageNode(func(p *common.Page, n *node, _ int) {
905 if p != nil {
906 tx.db.freelist.Free(tx.meta.Txid(), p)
907 } else {
908 n.free()
909 }
910 })
911 b.SetRootPage(0)
912}
913
914// dereference removes all references to the old mmap.
915func (b *Bucket) dereference() {
916 if b.rootNode != nil {
917 b.rootNode.root().dereference()
918 }
919
920 for _, child := range b.buckets {
921 child.dereference()
922 }
923}
924
925// pageNode returns the in-memory node, if it exists.
926// Otherwise, returns the underlying page.
927func (b *Bucket) pageNode(id common.Pgid) (*common.Page, *node) {
928 // Inline buckets have a fake page embedded in their value so treat them
929 // differently. We'll return the rootNode (if available) or the fake page.
930 if b.RootPage() == 0 {
931 if id != 0 {
932 panic(fmt.Sprintf("inline bucket non-zero page access(2): %d != 0", id))
933 }
934 if b.rootNode != nil {
935 return nil, b.rootNode
936 }
937 return b.page, nil
938 }
939
940 // Check the node cache for non-inline buckets.
941 if b.nodes != nil {
942 if n := b.nodes[id]; n != nil {
943 return nil, n
944 }
945 }
946
947 // Finally lookup the page from the transaction if no node is materialized.
948 return b.tx.page(id), nil
949}
950
951// BucketStats records statistics about resources used by a bucket.
952type BucketStats struct {
953 // Page count statistics.
954 BranchPageN int // number of logical branch pages
955 BranchOverflowN int // number of physical branch overflow pages
956 LeafPageN int // number of logical leaf pages
957 LeafOverflowN int // number of physical leaf overflow pages
958
959 // Tree statistics.
960 KeyN int // number of keys/value pairs
961 Depth int // number of levels in B+tree
962
963 // Page size utilization.
964 BranchAlloc int // bytes allocated for physical branch pages
965 BranchInuse int // bytes actually used for branch data
966 LeafAlloc int // bytes allocated for physical leaf pages
967 LeafInuse int // bytes actually used for leaf data
968
969 // Bucket statistics
970 BucketN int // total number of buckets including the top bucket
971 InlineBucketN int // total number on inlined buckets
972 InlineBucketInuse int // bytes used for inlined buckets (also accounted for in LeafInuse)
973}
974
975func (s *BucketStats) Add(other BucketStats) {
976 s.BranchPageN += other.BranchPageN
977 s.BranchOverflowN += other.BranchOverflowN
978 s.LeafPageN += other.LeafPageN
979 s.LeafOverflowN += other.LeafOverflowN
980 s.KeyN += other.KeyN
981 if s.Depth < other.Depth {
982 s.Depth = other.Depth
983 }
984 s.BranchAlloc += other.BranchAlloc
985 s.BranchInuse += other.BranchInuse
986 s.LeafAlloc += other.LeafAlloc
987 s.LeafInuse += other.LeafInuse
988
989 s.BucketN += other.BucketN
990 s.InlineBucketN += other.InlineBucketN
991 s.InlineBucketInuse += other.InlineBucketInuse
992}
993
994// cloneBytes returns a copy of a given slice.
995func cloneBytes(v []byte) []byte {
996 var clone = make([]byte, len(v))
997 copy(clone, v)
998 return clone
999}
1000
1001type BucketStructure struct {
1002 Name string `json:"name"` // name of the bucket
1003 KeyN int `json:"keyN"` // number of key/value pairs
1004 Children []BucketStructure `json:"buckets,omitempty"` // child buckets
1005}