diff options
author | Luke Shumaker <lukeshu@lukeshu.com> | 2022-12-31 12:45:16 -0700 |
---|---|---|
committer | Luke Shumaker <lukeshu@lukeshu.com> | 2023-01-05 19:48:17 -0700 |
commit | e1302ac1f2685db057b7ecafe70f57ad17d533dc (patch) | |
tree | 2304b73df715a227928d501aa3f427e12c74c806 /lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go | |
parent | 0e73803c2f951fe9f9f0c09e8c2bf14ac0cd8ef2 (diff) |
rebuildnodes: Parallelize I/O and CPU
Diffstat (limited to 'lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go')
-rw-r--r-- | lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go | 63 |
1 files changed, 42 insertions, 21 deletions
diff --git a/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go b/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go index dcf77b8..cefa668 100644 --- a/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go +++ b/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go @@ -11,6 +11,7 @@ import ( "sort" "time" + "github.com/datawire/dlib/dgroup" "github.com/datawire/dlib/dlog" "git.lukeshu.com/btrfs-progs-ng/lib/btrfs" @@ -130,30 +131,50 @@ func (o *rebuilder) Rebuild(_ctx context.Context) error { progress.D = len(itemQueue) progressWriter := textui.NewProgress[textui.Portion[int]](stepCtx, dlog.LogLevelInfo, textui.Tunable(1*time.Second)) stepCtx = dlog.WithField(stepCtx, "btrfsinspect.rebuild-nodes.rebuild.substep.progress", &progress) - for i, key := range itemQueue { - itemCtx := dlog.WithField(stepCtx, "btrfsinspect.rebuild-nodes.rebuild.process.item", key) - progress.N = i - progressWriter.Set(progress) - if err := _ctx.Err(); err != nil { - progressWriter.Done() - return err - } - o.curKey = key - itemBody, ok := o.rebuilt.Tree(itemCtx, key.TreeID).ReadItem(itemCtx, key.Key) - if !ok { - o.ioErr(itemCtx, fmt.Errorf("could not read previously read item: %v", key)) + type keyAndBody struct { + keyAndTree + Body btrfsitem.Item + } + itemChan := make(chan keyAndBody, textui.Tunable(300)) // average items-per-node≈100; let's have a buffer of ~3 nodes + grp := dgroup.NewGroup(stepCtx, dgroup.GroupConfig{}) + grp.Go("io", func(stepCtx context.Context) error { + defer close(itemChan) + for _, key := range itemQueue { + if err := stepCtx.Err(); err != nil { + return err + } + itemCtx := dlog.WithField(stepCtx, "btrfsinspect.rebuild-nodes.rebuild.process.item", key) + itemBody, ok := o.rebuilt.Tree(itemCtx, key.TreeID).ReadItem(itemCtx, key.Key) + if !ok { + o.ioErr(itemCtx, fmt.Errorf("could not read previously read item: %v", key)) + } + itemChan <- keyAndBody{ + keyAndTree: key, + Body: itemBody, + } } - handleItem(o, itemCtx, key.TreeID, btrfstree.Item{ - Key: key.Key, - Body: itemBody, - }) - if key.ItemType == btrfsitem.ROOT_ITEM_KEY { - o.treeQueue.Insert(key.ObjectID) + return nil + }) + grp.Go("cpu", func(stepCtx context.Context) error { + defer progressWriter.Done() + for item := range itemChan { + itemCtx := dlog.WithField(stepCtx, "btrfsinspect.rebuild-nodes.rebuild.process.item", item.keyAndTree) + o.curKey = item.keyAndTree + handleItem(o, itemCtx, item.TreeID, btrfstree.Item{ + Key: item.Key, + Body: item.Body, + }) + if item.ItemType == btrfsitem.ROOT_ITEM_KEY { + o.treeQueue.Insert(item.ObjectID) + } + progress.N++ + progressWriter.Set(progress) } + return nil + }) + if err := grp.Wait(); err != nil { + return err } - progress.N = len(itemQueue) - progressWriter.Set(progress) - progressWriter.Done() // Apply augments (drain o.augmentQueue, fill o.itemQueue) stepCtx = dlog.WithField(passCtx, "btrfsinspect.rebuild-nodes.rebuild.substep", "apply-augments") |