summaryrefslogtreecommitdiff
path: root/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go
diff options
context:
space:
mode:
Diffstat (limited to 'lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go')
-rw-r--r--lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go63
1 files changed, 42 insertions, 21 deletions
diff --git a/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go b/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go
index dcf77b8..cefa668 100644
--- a/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go
+++ b/lib/btrfsprogs/btrfsinspect/rebuildnodes/rebuild.go
@@ -11,6 +11,7 @@ import (
"sort"
"time"
+ "github.com/datawire/dlib/dgroup"
"github.com/datawire/dlib/dlog"
"git.lukeshu.com/btrfs-progs-ng/lib/btrfs"
@@ -130,30 +131,50 @@ func (o *rebuilder) Rebuild(_ctx context.Context) error {
progress.D = len(itemQueue)
progressWriter := textui.NewProgress[textui.Portion[int]](stepCtx, dlog.LogLevelInfo, textui.Tunable(1*time.Second))
stepCtx = dlog.WithField(stepCtx, "btrfsinspect.rebuild-nodes.rebuild.substep.progress", &progress)
- for i, key := range itemQueue {
- itemCtx := dlog.WithField(stepCtx, "btrfsinspect.rebuild-nodes.rebuild.process.item", key)
- progress.N = i
- progressWriter.Set(progress)
- if err := _ctx.Err(); err != nil {
- progressWriter.Done()
- return err
- }
- o.curKey = key
- itemBody, ok := o.rebuilt.Tree(itemCtx, key.TreeID).ReadItem(itemCtx, key.Key)
- if !ok {
- o.ioErr(itemCtx, fmt.Errorf("could not read previously read item: %v", key))
+ type keyAndBody struct {
+ keyAndTree
+ Body btrfsitem.Item
+ }
+ itemChan := make(chan keyAndBody, textui.Tunable(300)) // average items-per-node≈100; let's have a buffer of ~3 nodes
+ grp := dgroup.NewGroup(stepCtx, dgroup.GroupConfig{})
+ grp.Go("io", func(stepCtx context.Context) error {
+ defer close(itemChan)
+ for _, key := range itemQueue {
+ if err := stepCtx.Err(); err != nil {
+ return err
+ }
+ itemCtx := dlog.WithField(stepCtx, "btrfsinspect.rebuild-nodes.rebuild.process.item", key)
+ itemBody, ok := o.rebuilt.Tree(itemCtx, key.TreeID).ReadItem(itemCtx, key.Key)
+ if !ok {
+ o.ioErr(itemCtx, fmt.Errorf("could not read previously read item: %v", key))
+ }
+ itemChan <- keyAndBody{
+ keyAndTree: key,
+ Body: itemBody,
+ }
}
- handleItem(o, itemCtx, key.TreeID, btrfstree.Item{
- Key: key.Key,
- Body: itemBody,
- })
- if key.ItemType == btrfsitem.ROOT_ITEM_KEY {
- o.treeQueue.Insert(key.ObjectID)
+ return nil
+ })
+ grp.Go("cpu", func(stepCtx context.Context) error {
+ defer progressWriter.Done()
+ for item := range itemChan {
+ itemCtx := dlog.WithField(stepCtx, "btrfsinspect.rebuild-nodes.rebuild.process.item", item.keyAndTree)
+ o.curKey = item.keyAndTree
+ handleItem(o, itemCtx, item.TreeID, btrfstree.Item{
+ Key: item.Key,
+ Body: item.Body,
+ })
+ if item.ItemType == btrfsitem.ROOT_ITEM_KEY {
+ o.treeQueue.Insert(item.ObjectID)
+ }
+ progress.N++
+ progressWriter.Set(progress)
}
+ return nil
+ })
+ if err := grp.Wait(); err != nil {
+ return err
}
- progress.N = len(itemQueue)
- progressWriter.Set(progress)
- progressWriter.Done()
// Apply augments (drain o.augmentQueue, fill o.itemQueue)
stepCtx = dlog.WithField(passCtx, "btrfsinspect.rebuild-nodes.rebuild.substep", "apply-augments")