diff options
-rw-r--r-- | cmd/btrfs-rec/util.go | 61 | ||||
-rw-r--r-- | lib/streamio/runescanner.go | 90 |
2 files changed, 92 insertions, 59 deletions
diff --git a/cmd/btrfs-rec/util.go b/cmd/btrfs-rec/util.go index 9a0d60c..b2497b2 100644 --- a/cmd/btrfs-rec/util.go +++ b/cmd/btrfs-rec/util.go @@ -9,77 +9,20 @@ import ( "context" "io" "os" - "time" "git.lukeshu.com/go/lowmemjson" "github.com/datawire/dlib/dlog" - "git.lukeshu.com/btrfs-progs-ng/lib/textui" + "git.lukeshu.com/btrfs-progs-ng/lib/streamio" ) -type runeScanner struct { - ctx context.Context //nolint:containedctx // For detecting shutdown from methods - progress textui.Portion[int64] - progressWriter *textui.Progress[textui.Portion[int64]] - unreadCnt uint64 - reader *bufio.Reader - closer io.Closer -} - -func newRuneScanner(ctx context.Context, fh *os.File) (*runeScanner, error) { - fi, err := fh.Stat() - if err != nil { - return nil, err - } - ret := &runeScanner{ - ctx: ctx, - progress: textui.Portion[int64]{ - D: fi.Size(), - }, - progressWriter: textui.NewProgress[textui.Portion[int64]](ctx, dlog.LogLevelInfo, textui.Tunable(1*time.Second)), - reader: bufio.NewReader(fh), - closer: fh, - } - return ret, nil -} - -func (rs *runeScanner) ReadRune() (r rune, size int, err error) { - if err := rs.ctx.Err(); err != nil { - return 0, 0, err - } - r, size, err = rs.reader.ReadRune() - if rs.unreadCnt > 0 { - rs.unreadCnt-- - } else { - rs.progress.N += int64(size) - rs.progressWriter.Set(rs.progress) - } - return -} - -func (rs *runeScanner) UnreadRune() error { - if err := rs.ctx.Err(); err != nil { - return err - } - if err := rs.reader.UnreadRune(); err != nil { - return err - } - rs.unreadCnt++ - return nil -} - -func (rs *runeScanner) Close() error { - rs.progressWriter.Done() - return rs.closer.Close() -} - func readJSONFile[T any](ctx context.Context, filename string) (T, error) { fh, err := os.Open(filename) if err != nil { var zero T return zero, err } - buf, err := newRuneScanner(dlog.WithField(ctx, "btrfs.read-json-file", filename), fh) + buf, err := streamio.NewRuneScanner(dlog.WithField(ctx, "btrfs.read-json-file", filename), fh) defer func() { _ = buf.Close() }() diff --git a/lib/streamio/runescanner.go b/lib/streamio/runescanner.go new file mode 100644 index 0000000..421859d --- /dev/null +++ b/lib/streamio/runescanner.go @@ -0,0 +1,90 @@ +// Copyright (C) 2022-2023 Luke Shumaker <lukeshu@lukeshu.com> +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package streamio + +import ( + "bufio" + "context" + "io" + "os" + "time" + + "github.com/datawire/dlib/dlog" + + "git.lukeshu.com/btrfs-progs-ng/lib/textui" +) + +type RuneScanner interface { + io.RuneScanner + io.Closer +} + +type runeScanner struct { + ctx context.Context //nolint:containedctx // For detecting shutdown from methods + progress textui.Portion[int64] + progressWriter *textui.Progress[textui.Portion[int64]] + unreadCnt uint64 + reader *bufio.Reader + closer io.Closer +} + +// NewRuneScanner returns an io.RuneScanner (and io.Closer) that +// bufferes a file, similar to bufio.NewReader. There are two +// advantages over bufio.NewReader: +// +// - It takes a Context, and causes reads to fail once the Context is +// canceled; allowing large parse operations to be gracefully cut +// short. +// +// - It logs the progress of reading the file via textui.Progress. +func NewRuneScanner(ctx context.Context, fh *os.File) (RuneScanner, error) { + fi, err := fh.Stat() + if err != nil { + return nil, err + } + ret := &runeScanner{ + ctx: ctx, + progress: textui.Portion[int64]{ + D: fi.Size(), + }, + progressWriter: textui.NewProgress[textui.Portion[int64]](ctx, dlog.LogLevelInfo, textui.Tunable(1*time.Second)), + reader: bufio.NewReader(fh), + closer: fh, + } + return ret, nil +} + +// ReadRune implements io.RuneReader. +func (rs *runeScanner) ReadRune() (r rune, size int, err error) { + if err := rs.ctx.Err(); err != nil { + return 0, 0, err + } + r, size, err = rs.reader.ReadRune() + if rs.unreadCnt > 0 { + rs.unreadCnt-- + } else { + rs.progress.N += int64(size) + rs.progressWriter.Set(rs.progress) + } + return +} + +// ReadRune implements io.RuneScanner. +func (rs *runeScanner) UnreadRune() error { + if err := rs.ctx.Err(); err != nil { + return err + } + if err := rs.reader.UnreadRune(); err != nil { + return err + } + rs.unreadCnt++ + return nil +} + +// ReadRune implements io.Closer. +func (rs *runeScanner) Close() error { + rs.progressWriter.Done() + return rs.closer.Close() +} |