Skip to content

Commit

Permalink
Use immutable binary tree for index
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Jun 16, 2024
1 parent 4a3e2bd commit caccf0e
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 269 deletions.
1 change: 1 addition & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
)

require (
github.com/VictorLowther/ibtree v0.2.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions server/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/VictorLowther/ibtree v0.2.2 h1:OXmWILeZ8h1d+cBGT+bAjK9LV3Gwo6SxSSm/3Lc4L9I=
github.com/VictorLowther/ibtree v0.2.2/go.mod h1:tYw+Bf7fn2ILNstN0NFw+G+kO3trrkE5Mt66DK1eWvY=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
Expand Down
249 changes: 108 additions & 141 deletions server/storage/mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
package mvcc

import (
"sync"

"github.com/google/btree"
"bytes"
"fmt"
"github.com/VictorLowther/ibtree"

Check failure on line 20 in server/storage/mvcc/index.go

View workflow job for this annotation

GitHub Actions / test

could not import github.com/VictorLowther/ibtree (invalid package name: "")

Check failure on line 20 in server/storage/mvcc/index.go

View workflow job for this annotation

GitHub Actions / test

missing go.sum entry for module providing package github.com/VictorLowther/ibtree (imported by go.etcd.io/etcd/server/v3/storage/mvcc); to add:

Check failure on line 20 in server/storage/mvcc/index.go

View workflow job for this annotation

GitHub Actions / test

could not import github.com/VictorLowther/ibtree (invalid package name: "")

Check failure on line 20 in server/storage/mvcc/index.go

View workflow job for this annotation

GitHub Actions / test

could not import github.com/VictorLowther/ibtree (invalid package name: "")
"go.uber.org/zap"
)

Expand All @@ -30,102 +30,115 @@ type index interface {
Tombstone(key []byte, rev Revision) error
Compact(rev int64) map[Revision]struct{}
Keep(rev int64) map[Revision]struct{}

Insert(ki *keyIndex)
KeyIndex(ki *keyIndex) *keyIndex
}

type treeIndex struct {
sync.RWMutex
tree *btree.BTreeG[*keyIndex]
lg *zap.Logger
baseRev int64
revisionTree []*ibtree.Tree[keyRev]
lg *zap.Logger
}

func newTreeIndex(lg *zap.Logger) *treeIndex {
return &treeIndex{
tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool {
return aki.Less(bki)
}),
lg: lg,
}
type keyRev struct {
key []byte
mod, created Revision
version int64
}

func (ti *treeIndex) Put(key []byte, rev Revision) {
keyi := &keyIndex{key: key}

ti.Lock()
defer ti.Unlock()
okeyi, ok := ti.tree.Get(keyi)
if !ok {
keyi.put(ti.lg, rev.Main, rev.Sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi.put(ti.lg, rev.Main, rev.Sub)
var lessThen ibtree.LessThan[keyRev] = func(k keyRev, k2 keyRev) bool {
return compare(k, k2) == -1
}

func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
ti.RLock()
defer ti.RUnlock()
return ti.unsafeGet(key, atRev)
func compare(k keyRev, k2 keyRev) int {
return bytes.Compare(k.key, k2.key)
}

func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
keyi := &keyIndex{key: key}
if keyi = ti.keyIndex(keyi); keyi == nil {
return Revision{}, Revision{}, 0, ErrRevisionNotFound
func compareKey(k []byte) ibtree.CompareAgainst[keyRev] {
return func(k2 keyRev) int {
return bytes.Compare(k2.key, k)
}
return keyi.get(ti.lg, atRev)
}

func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
ti.RLock()
defer ti.RUnlock()
return ti.keyIndex(keyi)
func lessThenKey(k []byte) ibtree.Test[keyRev] {
return func(k2 keyRev) bool {
return bytes.Compare(k2.key, k) < 0
}
}

func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
if ki, ok := ti.tree.Get(keyi); ok {
return ki
func lessThenEqualKey(k []byte) ibtree.Test[keyRev] {
return func(k2 keyRev) bool {
return bytes.Compare(k2.key, k) >= 0
}
return nil
}

func (ti *treeIndex) unsafeVisit(key, end []byte, f func(ki *keyIndex) bool) {
keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
func newTreeIndex(lg *zap.Logger) *treeIndex {
return &treeIndex{
baseRev: -1,
lg: lg,
}
}

ti.tree.AscendGreaterOrEqual(keyi, func(item *keyIndex) bool {
if len(endi.key) > 0 && !item.Less(endi) {
return false
}
if !f(item) {
return false
func (ti *treeIndex) Put(key []byte, rev Revision) {
if ti.baseRev == -1 {
ti.baseRev = rev.Main - 1
ti.revisionTree = []*ibtree.Tree[keyRev]{
ibtree.New[keyRev](lessThen),
}
return true
})
}
if rev.Main != ti.rev()+1 {
panic(fmt.Sprintf("append only, lastRev: %d, putRev: %d", ti.rev(), rev.Main))
}
prevTree := ti.revisionTree[len(ti.revisionTree)-1]
item, found := prevTree.Get(compareKey(key))
created := rev
var version int64 = 1
if found {
created = item.created
version = item.version + 1
}
ti.revisionTree = append(ti.revisionTree, prevTree.Insert(keyRev{
key: key,
mod: rev,
created: created,
version: version,
}))
}

func (ti *treeIndex) rev() int64 {
return ti.baseRev + int64(len(ti.revisionTree)) - 1
}

func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
idx := atRev - ti.baseRev
if idx < 0 || idx >= int64(len(ti.revisionTree)) {
return Revision{}, Revision{}, 0, ErrRevisionNotFound
}
tree := ti.revisionTree[idx]

keyRev, found := tree.Get(compareKey(key))
if !found {
return Revision{}, Revision{}, 0, ErrRevisionNotFound
}
return keyRev.mod, keyRev.created, keyRev.version, nil
}

// Revisions returns limited number of revisions from key(included) to end(excluded)
// at the given rev. The returned slice is sorted in the order of key. There is no limit if limit <= 0.
// The second return parameter isn't capped by the limit and reflects the total number of revisions.
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []Revision, total int) {
ti.RLock()
defer ti.RUnlock()

if end == nil {
rev, _, _, err := ti.unsafeGet(key, atRev)
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
return nil, 0
}
return []Revision{rev}, 1
}
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
if limit <= 0 || len(revs) < limit {
revs = append(revs, rev)
}
total++
idx := atRev - ti.baseRev
tree := ti.revisionTree[idx]
tree.Range(lessThenKey(key), lessThenEqualKey(end), func(kr keyRev) bool {
if limit <= 0 || len(revs) < limit {
revs = append(revs, kr.mod)
}
total++
return true
})
return revs, total
Expand All @@ -134,119 +147,73 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []
// CountRevisions returns the number of revisions
// from key(included) to end(excluded) at the given rev.
func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
ti.RLock()
defer ti.RUnlock()

if end == nil {
_, _, _, err := ti.unsafeGet(key, atRev)
_, _, _, err := ti.Get(key, atRev)
if err != nil {
return 0
}
return 1
}
idx := atRev - ti.baseRev
tree := ti.revisionTree[idx]
total := 0
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if _, _, _, err := ki.get(ti.lg, atRev); err == nil {
total++
}
tree.Range(lessThenKey(key), lessThenEqualKey(end), func(kr keyRev) bool {
total++
return true
})
return total
}

func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []Revision) {
ti.RLock()
defer ti.RUnlock()

if end == nil {
rev, _, _, err := ti.unsafeGet(key, atRev)
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
return nil, nil
}
return [][]byte{key}, []Revision{rev}
}
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
keys = append(keys, ki.key)
}
idx := atRev - ti.baseRev
tree := ti.revisionTree[idx]
tree.Range(lessThenKey(key), lessThenEqualKey(end), func(kr keyRev) bool {
revs = append(revs, kr.mod)
keys = append(keys, kr.key)
return true
})
return keys, revs
}

func (ti *treeIndex) Tombstone(key []byte, rev Revision) error {
keyi := &keyIndex{key: key}

ti.Lock()
defer ti.Unlock()
ki, ok := ti.tree.Get(keyi)
if !ok {
if rev.Main != ti.rev()+1 {
panic(fmt.Sprintf("append only, lastRev: %d, putRev: %d", ti.rev(), rev.Main))
}
prevTree := ti.revisionTree[len(ti.revisionTree)-1]
newTree, _, found := prevTree.Delete(keyRev{
key: key,
})
if !found {
return ErrRevisionNotFound
}

return ki.tombstone(ti.lg, rev.Main, rev.Sub)
ti.revisionTree = append(ti.revisionTree, newTree)
return nil
}

func (ti *treeIndex) Compact(rev int64) map[Revision]struct{} {
available := make(map[Revision]struct{})
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
ti.Lock()
clone := ti.tree.Clone()
ti.Unlock()

clone.Ascend(func(keyi *keyIndex) bool {
// Lock is needed here to prevent modification to the keyIndex while
// compaction is going on or revision added to empty before deletion
ti.Lock()
keyi.compact(ti.lg, rev, available)
if keyi.isEmpty() {
_, ok := ti.tree.Delete(keyi)
if !ok {
ti.lg.Panic("failed to delete during compaction")
}
}
ti.Unlock()
return true
})
idx := rev - ti.baseRev
ti.revisionTree = ti.revisionTree[idx:]
ti.baseRev = rev
return available
}

// Keep finds all revisions to be kept for a Compaction at the given rev.
func (ti *treeIndex) Keep(rev int64) map[Revision]struct{} {
available := make(map[Revision]struct{})
ti.RLock()
defer ti.RUnlock()
ti.tree.Ascend(func(keyi *keyIndex) bool {
keyi.keep(rev, available)
return true
})
return available
}

func (ti *treeIndex) Equal(bi index) bool {
b := bi.(*treeIndex)

if ti.tree.Len() != b.tree.Len() {
return false
idx := rev - ti.baseRev
tree := ti.revisionTree[idx]
for it := tree.All(); it.Next(); {
keyRev := it.Item()
available[keyRev.mod] = struct{}{}
}

equal := true

ti.tree.Ascend(func(aki *keyIndex) bool {
bki, _ := b.tree.Get(aki)
if !aki.equal(bki) {
equal = false
return false
}
return true
})

return equal
}

func (ti *treeIndex) Insert(ki *keyIndex) {
ti.Lock()
defer ti.Unlock()
ti.tree.ReplaceOrInsert(ki)
return available
}
18 changes: 9 additions & 9 deletions server/storage/mvcc/index_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ import (
"go.uber.org/zap"
)

func BenchmarkIndexCompactBase(b *testing.B) { benchmarkIndexCompact(b, 3, 100) }
func BenchmarkIndexCompactLongKey(b *testing.B) { benchmarkIndexCompact(b, 512, 100) }
func BenchmarkIndexCompactLargeKeySpace(b *testing.B) { benchmarkIndexCompact(b, 3, 100000) }
//func BenchmarkIndexCompactBase(b *testing.B) { benchmarkIndexCompact(b, 3, 100) }
//func BenchmarkIndexCompactLongKey(b *testing.B) { benchmarkIndexCompact(b, 512, 100) }
//func BenchmarkIndexCompactLargeKeySpace(b *testing.B) { benchmarkIndexCompact(b, 3, 100000) }

func BenchmarkIndexKeepBase(b *testing.B) { benchmarkIndexKeep(b, 3, 100) }
func BenchmarkIndexKeepLongKey(b *testing.B) { benchmarkIndexKeep(b, 512, 100) }
func BenchmarkIndexKeepLargeKeySpace(b *testing.B) { benchmarkIndexKeep(b, 3, 100000) }
//func BenchmarkIndexKeepBase(b *testing.B) { benchmarkIndexKeep(b, 3, 100) }
//func BenchmarkIndexKeepLongKey(b *testing.B) { benchmarkIndexKeep(b, 512, 100) }
//func BenchmarkIndexKeepLargeKeySpace(b *testing.B) { benchmarkIndexKeep(b, 3, 100000) }

func BenchmarkIndexPutBase(b *testing.B) { benchmarkIndexPut(b, 3, 100) }
func BenchmarkIndexPutLongKey(b *testing.B) { benchmarkIndexPut(b, 512, 100) }
func BenchmarkIndexPutLargeKeySpace(b *testing.B) { benchmarkIndexPut(b, 3, 100000) }

func BenchmarkIndexTombstoneBase(b *testing.B) { benchmarkIndexTombstone(b, 3, 100, 25) }
func BenchmarkIndexTombstoneLongKey(b *testing.B) { benchmarkIndexTombstone(b, 512, 100, 25) }
func BenchmarkIndexTombstoneLargeKeySpace(b *testing.B) { benchmarkIndexTombstone(b, 3, 100000, 25) }
//func BenchmarkIndexTombstoneBase(b *testing.B) { benchmarkIndexTombstone(b, 3, 100, 25) }
//func BenchmarkIndexTombstoneLongKey(b *testing.B) { benchmarkIndexTombstone(b, 512, 100, 25) }
//func BenchmarkIndexTombstoneLargeKeySpace(b *testing.B) { benchmarkIndexTombstone(b, 3, 100000, 25) }

func BenchmarkIndexGetBase(b *testing.B) { benchmarkIndexGet(b, 3, 100, 1, 25) }
func BenchmarkIndexGetRepeatedKeys(b *testing.B) { benchmarkIndexGet(b, 3, 100, 1000, 25) }
Expand Down
Loading

0 comments on commit caccf0e

Please sign in to comment.