diff options
author | Luke Shumaker <lukeshu@lukeshu.com> | 2022-12-31 12:45:16 -0700 |
---|---|---|
committer | Luke Shumaker <lukeshu@lukeshu.com> | 2023-01-05 19:48:17 -0700 |
commit | e1302ac1f2685db057b7ecafe70f57ad17d533dc (patch) | |
tree | 2304b73df715a227928d501aa3f427e12c74c806 | |
parent | 0e73803c2f951fe9f9f0c09e8c2bf14ac0cd8ef2 (diff) |
rebuildnodes: Parallelize I/O and CPU
3 files changed, 60 insertions, 32 deletions
diff --git a/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go b/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go index ef6da6f..60f922d 100644 --- a/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go +++ b/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/forrest.go @@ -1,4 +1,4 @@ -// Copyright (C) 2022 Luke Shumaker <lukeshu@lukeshu.com> +// Copyright (C) 2022-2023 Luke Shumaker <lukeshu@lukeshu.com> // // SPDX-License-Identifier: GPL-2.0-or-later @@ -62,7 +62,7 @@ type RebuiltForrest struct { cbLookupUUID func(ctx context.Context, uuid btrfsprim.UUID) (id btrfsprim.ObjID, ok bool) // mutable - trees map[btrfsprim.ObjID]*RebuiltTree + trees containers.SyncMap[btrfsprim.ObjID, *RebuiltTree] allItems *containers.LRUCache[btrfsprim.ObjID, *itemIndex] incItems *containers.LRUCache[btrfsprim.ObjID, *itemIndex] } @@ -84,7 +84,6 @@ func NewRebuiltForrest( cbLookupRoot: cbLookupRoot, cbLookupUUID: cbLookupUUID, - trees: make(map[btrfsprim.ObjID]*RebuiltTree), allItems: containers.NewLRUCache[btrfsprim.ObjID, *itemIndex](textui.Tunable(8)), incItems: containers.NewLRUCache[btrfsprim.ObjID, *itemIndex](textui.Tunable(8)), } @@ -99,11 +98,12 @@ func (ts *RebuiltForrest) Tree(ctx context.Context, treeID btrfsprim.ObjID) *Reb if !ts.addTree(ctx, treeID, nil) { return nil } - return ts.trees[treeID] + tree, _ := ts.trees.Load(treeID) + return tree } func (ts *RebuiltForrest) addTree(ctx context.Context, treeID btrfsprim.ObjID, stack []btrfsprim.ObjID) (ok bool) { - if _, ok := ts.trees[treeID]; ok { + if _, ok := ts.trees.Load(treeID); ok { return true } if slices.Contains(treeID, stack) { @@ -151,12 +151,12 @@ func (ts *RebuiltForrest) addTree(ctx context.Context, treeID btrfsprim.ObjID, s if !ts.addTree(ctx, parentID, append(stack, treeID)) { return false } - tree.Parent = ts.trees[parentID] + tree.Parent, _ = ts.trees.Load(parentID) } } tree.indexLeafs(ctx) - ts.trees[treeID] = tree + ts.trees.Store(treeID, tree) if root != 0 { tree.AddRoot(ctx, root) } @@ -170,9 +170,10 @@ func (ts *RebuiltForrest) addTree(ctx context.Context, treeID btrfsprim.ObjID, s // Do not mutate the set of roots for a tree; it is a pointer to the // RebuiltForrest's internal set! func (ts *RebuiltForrest) ListRoots() map[btrfsprim.ObjID]containers.Set[btrfsvol.LogicalAddr] { - ret := make(map[btrfsprim.ObjID]containers.Set[btrfsvol.LogicalAddr], len(ts.trees)) - for treeID := range ts.trees { - ret[treeID] = ts.trees[treeID].Roots - } + ret := make(map[btrfsprim.ObjID]containers.Set[btrfsvol.LogicalAddr]) + ts.trees.Range(func(treeID btrfsprim.ObjID, tree *RebuiltTree) bool { + ret[treeID] = tree.Roots + return true + }) return ret } diff --git a/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/tree.go b/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/tree.go index ffbaa0f..1daeefc 100644 --- a/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/tree.go +++ b/lib/btrfsprogs/btrfsinspect/rebuildnodes/btrees/tree.go @@ -7,6 +7,7 @@ package btrees import ( "context" "fmt" + "sync" "time" "github.com/datawire/dlib/dlog" @@ -34,6 +35,7 @@ type RebuiltTree struct { leafToRoots map[btrfsvol.LogicalAddr]containers.Set[btrfsvol.LogicalAddr] // mutable + mu sync.Mutex Roots containers.Set[btrfsvol.LogicalAddr] Leafs containers.Set[btrfsvol.LogicalAddr] } @@ -137,6 +139,8 @@ func (s rootStats) String() string { // call .AddRoot() to re-attach part of the tree that has been broken // off. func (tree *RebuiltTree) AddRoot(ctx context.Context, rootNode btrfsvol.LogicalAddr) { + tree.mu.Lock() + defer tree.mu.Unlock() ctx = dlog.WithField(ctx, "btrfsinspect.rebuild-nodes.rebuild.add-root", fmt.Sprintf("tree=%v rootNode=%v", tree.ID, rootNode)) tree.Roots.Insert(rootNode) @@ -205,6 +209,8 @@ func (s itemStats) String() string { } func (tree *RebuiltTree) items(ctx context.Context, cache *containers.LRUCache[btrfsprim.ObjID, *itemIndex], leafs []btrfsvol.LogicalAddr) *containers.SortedMap[btrfsprim.Key, keyio.ItemPtr] { + tree.mu.Lock() + defer tree.mu.Unlock() index := cache.GetOrElse(tree.ID, func() *itemIndex { return &itemIndex{ Leafs: make(containers.Set[btrfsvol.LogicalAddr]), diff --git a/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go b/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go index dcf77b8..cefa668 100644 --- a/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go +++ b/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go @@ -11,6 +11,7 @@ import ( "sort" "time" + "github.com/datawire/dlib/dgroup" "github.com/datawire/dlib/dlog" "git.lukeshu.com/btrfs-progs-ng/lib/btrfs" @@ -130,30 +131,50 @@ func (o *rebuilder) Rebuild(_ctx context.Context) error { progress.D = len(itemQueue) progressWriter := textui.NewProgress[textui.Portion[int]](stepCtx, dlog.LogLevelInfo, textui.Tunable(1*time.Second)) stepCtx = dlog.WithField(stepCtx, "btrfsinspect.rebuild-nodes.rebuild.substep.progress", &progress) - for i, key := range itemQueue { - itemCtx := dlog.WithField(stepCtx, "btrfsinspect.rebuild-nodes.rebuild.process.item", key) - progress.N = i - progressWriter.Set(progress) - if err := _ctx.Err(); err != nil { - progressWriter.Done() - return err - } - o.curKey = key - itemBody, ok := o.rebuilt.Tree(itemCtx, key.TreeID).ReadItem(itemCtx, key.Key) - if !ok { - o.ioErr(itemCtx, fmt.Errorf("could not read previously read item: %v", key)) + type keyAndBody struct { + keyAndTree + Body btrfsitem.Item + } + itemChan := make(chan keyAndBody, textui.Tunable(300)) // average items-per-node≈100; let's have a buffer of ~3 nodes + grp := dgroup.NewGroup(stepCtx, dgroup.GroupConfig{}) + grp.Go("io", func(stepCtx context.Context) error { + defer close(itemChan) + for _, key := range itemQueue { + if err := stepCtx.Err(); err != nil { + return err + } + itemCtx := dlog.WithField(stepCtx, "btrfsinspect.rebuild-nodes.rebuild.process.item", key) + itemBody, ok := o.rebuilt.Tree(itemCtx, key.TreeID).ReadItem(itemCtx, key.Key) + if !ok { + o.ioErr(itemCtx, fmt.Errorf("could not read previously read item: %v", key)) + } + itemChan <- keyAndBody{ + keyAndTree: key, + Body: itemBody, + } } - handleItem(o, itemCtx, key.TreeID, btrfstree.Item{ - Key: key.Key, - Body: itemBody, - }) - if key.ItemType == btrfsitem.ROOT_ITEM_KEY { - o.treeQueue.Insert(key.ObjectID) + return nil + }) + grp.Go("cpu", func(stepCtx context.Context) error { + defer progressWriter.Done() + for item := range itemChan { + itemCtx := dlog.WithField(stepCtx, "btrfsinspect.rebuild-nodes.rebuild.process.item", item.keyAndTree) + o.curKey = item.keyAndTree + handleItem(o, itemCtx, item.TreeID, btrfstree.Item{ + Key: item.Key, + Body: item.Body, + }) + if item.ItemType == btrfsitem.ROOT_ITEM_KEY { + o.treeQueue.Insert(item.ObjectID) + } + progress.N++ + progressWriter.Set(progress) } + return nil + }) + if err := grp.Wait(); err != nil { + return err } - progress.N = len(itemQueue) - progressWriter.Set(progress) - progressWriter.Done() // Apply augments (drain o.augmentQueue, fill o.itemQueue) stepCtx = dlog.WithField(passCtx, "btrfsinspect.rebuild-nodes.rebuild.substep", "apply-augments") |