From d6d19b3a8f66f27f78fd2c6fa02b32fb2a88e9ad Mon Sep 17 00:00:00 2001 From: Luke Shumaker Date: Wed, 25 Jan 2023 17:41:28 -0700 Subject: Move btrfs-rec's logging RuneScanner to a new `streamio` package --- lib/streamio/runescanner.go | 90 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 lib/streamio/runescanner.go (limited to 'lib/streamio') diff --git a/lib/streamio/runescanner.go b/lib/streamio/runescanner.go new file mode 100644 index 0000000..421859d --- /dev/null +++ b/lib/streamio/runescanner.go @@ -0,0 +1,90 @@ +// Copyright (C) 2022-2023 Luke Shumaker +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package streamio + +import ( + "bufio" + "context" + "io" + "os" + "time" + + "github.com/datawire/dlib/dlog" + + "git.lukeshu.com/btrfs-progs-ng/lib/textui" +) + +type RuneScanner interface { + io.RuneScanner + io.Closer +} + +type runeScanner struct { + ctx context.Context //nolint:containedctx // For detecting shutdown from methods + progress textui.Portion[int64] + progressWriter *textui.Progress[textui.Portion[int64]] + unreadCnt uint64 + reader *bufio.Reader + closer io.Closer +} + +// NewRuneScanner returns an io.RuneScanner (and io.Closer) that +// bufferes a file, similar to bufio.NewReader. There are two +// advantages over bufio.NewReader: +// +// - It takes a Context, and causes reads to fail once the Context is +// canceled; allowing large parse operations to be gracefully cut +// short. +// +// - It logs the progress of reading the file via textui.Progress. +func NewRuneScanner(ctx context.Context, fh *os.File) (RuneScanner, error) { + fi, err := fh.Stat() + if err != nil { + return nil, err + } + ret := &runeScanner{ + ctx: ctx, + progress: textui.Portion[int64]{ + D: fi.Size(), + }, + progressWriter: textui.NewProgress[textui.Portion[int64]](ctx, dlog.LogLevelInfo, textui.Tunable(1*time.Second)), + reader: bufio.NewReader(fh), + closer: fh, + } + return ret, nil +} + +// ReadRune implements io.RuneReader. +func (rs *runeScanner) ReadRune() (r rune, size int, err error) { + if err := rs.ctx.Err(); err != nil { + return 0, 0, err + } + r, size, err = rs.reader.ReadRune() + if rs.unreadCnt > 0 { + rs.unreadCnt-- + } else { + rs.progress.N += int64(size) + rs.progressWriter.Set(rs.progress) + } + return +} + +// ReadRune implements io.RuneScanner. +func (rs *runeScanner) UnreadRune() error { + if err := rs.ctx.Err(); err != nil { + return err + } + if err := rs.reader.UnreadRune(); err != nil { + return err + } + rs.unreadCnt++ + return nil +} + +// ReadRune implements io.Closer. +func (rs *runeScanner) Close() error { + rs.progressWriter.Done() + return rs.closer.Close() +} -- cgit v1.2.3-2-g168b From 3af4c3163249c6c378c4edd91ee4ebd749a00f8b Mon Sep 17 00:00:00 2001 From: Luke Shumaker Date: Wed, 25 Jan 2023 17:44:19 -0700 Subject: streamio: Optimize based on the profiler --- lib/streamio/runescanner.go | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) (limited to 'lib/streamio') diff --git a/lib/streamio/runescanner.go b/lib/streamio/runescanner.go index 421859d..451f32f 100644 --- a/lib/streamio/runescanner.go +++ b/lib/streamio/runescanner.go @@ -23,6 +23,7 @@ type RuneScanner interface { type runeScanner struct { ctx context.Context //nolint:containedctx // For detecting shutdown from methods + done <-chan struct{} progress textui.Portion[int64] progressWriter *textui.Progress[textui.Portion[int64]] unreadCnt uint64 @@ -45,7 +46,8 @@ func NewRuneScanner(ctx context.Context, fh *os.File) (RuneScanner, error) { return nil, err } ret := &runeScanner{ - ctx: ctx, + ctx: ctx, + done: ctx.Done(), progress: textui.Portion[int64]{ D: fi.Size(), }, @@ -56,26 +58,40 @@ func NewRuneScanner(ctx context.Context, fh *os.File) (RuneScanner, error) { return ret, nil } +func isClosed(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} + +//nolint:gomnd // False positive: gomnd.ignored-functions=[textui.Tunable] doesn't support type params. +var runeThrottle = textui.Tunable[int64](64) + // ReadRune implements io.RuneReader. func (rs *runeScanner) ReadRune() (r rune, size int, err error) { - if err := rs.ctx.Err(); err != nil { - return 0, 0, err + // According to the profiler, checking if the rs.ctx.Done() + // channel is closed is faster than checking if rs.ctx.Err() + // is non-nil. + if rs.unreadCnt == 0 && isClosed(rs.done) { + return 0, 0, rs.ctx.Err() } r, size, err = rs.reader.ReadRune() if rs.unreadCnt > 0 { rs.unreadCnt-- } else { rs.progress.N += int64(size) - rs.progressWriter.Set(rs.progress) + if rs.progress.D < runeThrottle || rs.progress.N%runeThrottle == 0 || rs.progress.N > rs.progress.D-runeThrottle { + rs.progressWriter.Set(rs.progress) + } } return } // ReadRune implements io.RuneScanner. func (rs *runeScanner) UnreadRune() error { - if err := rs.ctx.Err(); err != nil { - return err - } if err := rs.reader.UnreadRune(); err != nil { return err } -- cgit v1.2.3-2-g168b