From 7153fc9379dd5910c688925ccd2e0a03b9551a6d Mon Sep 17 00:00:00 2001 From: Luke Shumaker Date: Wed, 13 Jul 2022 22:38:43 -0600 Subject: Buffer FS IO --- lib/btrfsprogs/btrfsutil/open.go | 7 +- lib/containers/lru.go | 11 +++ lib/diskio/file_blockbuf.go | 84 ++++++++++++++++++++++ lib/diskio/file_state.go | 36 ++++++++++ lib/diskio/file_state_test.go | 59 +++++++++++++++ ...775199a43e0f9fd5c94bba343ce7bb6724d4ebafe311ed4 | 2 + 6 files changed, 198 insertions(+), 1 deletion(-) create mode 100644 lib/diskio/file_blockbuf.go create mode 100644 lib/diskio/file_state.go create mode 100644 lib/diskio/file_state_test.go create mode 100644 lib/diskio/testdata/fuzz/FuzzStatefulBufferedReader/582528ddfad69eb57775199a43e0f9fd5c94bba343ce7bb6724d4ebafe311ed4 (limited to 'lib') diff --git a/lib/btrfsprogs/btrfsutil/open.go b/lib/btrfsprogs/btrfsutil/open.go index 0f3b32a..9491a20 100644 --- a/lib/btrfsprogs/btrfsutil/open.go +++ b/lib/btrfsprogs/btrfsutil/open.go @@ -28,8 +28,13 @@ func Open(ctx context.Context, flag int, filenames ...string) (*btrfs.FS, error) typedFile := &diskio.OSFile[btrfsvol.PhysicalAddr]{ File: osFile, } + bufFile := diskio.NewBufferedFile[btrfsvol.PhysicalAddr]( + typedFile, + 16384, // block size: 16KiB + 1024, // number of blocks to buffer; total of 16MiB + ) devFile := &btrfs.Device{ - File: typedFile, + File: bufFile, } if err := fs.AddDevice(ctx, devFile); err != nil { return nil, fmt.Errorf("device file %q: %w", filename, err) diff --git a/lib/containers/lru.go b/lib/containers/lru.go index 8b8eb0e..a235a12 100644 --- a/lib/containers/lru.go +++ b/lib/containers/lru.go @@ -10,11 +10,22 @@ import ( lru "github.com/hashicorp/golang-lru" ) +// LRUCache is a least-recently-used(ish) cache. A zero LRUCache is +// usable and has a cache size of 128 items; use NewLRUCache to set a +// different size. type LRUCache[K comparable, V any] struct { initOnce sync.Once inner *lru.ARCCache } +func NewLRUCache[K comparable, V any](size int) *LRUCache[K, V] { + c := new(LRUCache[K, V]) + c.initOnce.Do(func() { + c.inner, _ = lru.NewARC(size) + }) + return c +} + func (c *LRUCache[K, V]) init() { c.initOnce.Do(func() { c.inner, _ = lru.NewARC(128) diff --git a/lib/diskio/file_blockbuf.go b/lib/diskio/file_blockbuf.go new file mode 100644 index 0000000..77b823c --- /dev/null +++ b/lib/diskio/file_blockbuf.go @@ -0,0 +1,84 @@ +// Copyright (C) 2022 Luke Shumaker +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package diskio + +import ( + "sync" + + "git.lukeshu.com/btrfs-progs-ng/lib/containers" +) + +type bufferedBlock struct { + Dat []byte + Err error +} + +type bufferedFile[A ~int64] struct { + inner File[A] + mu sync.RWMutex + blockSize A + blockCache *containers.LRUCache[A, bufferedBlock] +} + +var _ File[assertAddr] = (*bufferedFile[assertAddr])(nil) + +func NewBufferedFile[A ~int64](file File[A], blockSize A, cacheSize int) *bufferedFile[A] { + return &bufferedFile[A]{ + inner: file, + blockSize: blockSize, + blockCache: containers.NewLRUCache[A, bufferedBlock](cacheSize), + } +} + +func (bf *bufferedFile[A]) Name() string { return bf.inner.Name() } +func (bf *bufferedFile[A]) Size() A { return bf.inner.Size() } +func (bf *bufferedFile[A]) Close() error { return bf.inner.Close() } + +func (bf *bufferedFile[A]) ReadAt(dat []byte, off A) (n int, err error) { + done := 0 + for done < len(dat) { + n, err := bf.maybeShortReadAt(dat[done:], off+A(done)) + done += n + if err != nil { + return done, err + } + } + return done, nil +} + +func (bf *bufferedFile[A]) maybeShortReadAt(dat []byte, off A) (n int, err error) { + bf.mu.RLock() + defer bf.mu.RUnlock() + offsetWithinBlock := off % bf.blockSize + blockOffset := off - offsetWithinBlock + cachedBlock, ok := bf.blockCache.Get(blockOffset) + if !ok { + cachedBlock.Dat = make([]byte, bf.blockSize) + n, err := bf.inner.ReadAt(cachedBlock.Dat, blockOffset) + cachedBlock.Dat = cachedBlock.Dat[:n] + cachedBlock.Err = err + bf.blockCache.Add(blockOffset, cachedBlock) + } + n = copy(dat, cachedBlock.Dat[offsetWithinBlock:]) + if n < len(dat) { + return n, cachedBlock.Err + } + return n, nil +} + +func (bf *bufferedFile[A]) WriteAt(dat []byte, off A) (n int, err error) { + bf.mu.Lock() + defer bf.mu.Unlock() + + // Do the work + n, err = bf.inner.WriteAt(dat, off) + + // Cache invalidation + for blockOffset := off - (off % bf.blockSize); blockOffset < off+A(n); blockOffset += bf.blockSize { + bf.blockCache.Remove(blockOffset) + } + + return +} diff --git a/lib/diskio/file_state.go b/lib/diskio/file_state.go new file mode 100644 index 0000000..63ec53b --- /dev/null +++ b/lib/diskio/file_state.go @@ -0,0 +1,36 @@ +// Copyright (C) 2022 Luke Shumaker +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package diskio + +type statefulFile[A ~int64] struct { + inner File[A] + pos A +} + +var _ File[assertAddr] = (*statefulFile[assertAddr])(nil) + +func NewStatefulFile[A ~int64](file File[A]) *statefulFile[A] { + return &statefulFile[A]{ + inner: file, + } +} + +func (sf *statefulFile[A]) Name() string { return sf.inner.Name() } +func (sf *statefulFile[A]) Size() A { return sf.inner.Size() } +func (sf *statefulFile[A]) Close() error { return sf.inner.Close() } +func (sf *statefulFile[A]) ReadAt(dat []byte, off A) (int, error) { return sf.inner.ReadAt(dat, off) } +func (sf *statefulFile[A]) WriteAt(dat []byte, off A) (int, error) { return sf.inner.WriteAt(dat, off) } + +func (sf *statefulFile[A]) Read(dat []byte) (n int, err error) { + n, err = sf.ReadAt(dat, sf.pos) + sf.pos += A(n) + return n, err +} + +func (sf *statefulFile[A]) ReadByte() (byte, error) { + var dat [1]byte + _, err := sf.Read(dat[:]) + return dat[0], err +} diff --git a/lib/diskio/file_state_test.go b/lib/diskio/file_state_test.go new file mode 100644 index 0000000..3f0c119 --- /dev/null +++ b/lib/diskio/file_state_test.go @@ -0,0 +1,59 @@ +// Copyright (C) 2022 Luke Shumaker +// +// SPDX-License-Identifier: GPL-2.0-or-later + +package diskio_test + +import ( + "bytes" + "testing" + "testing/iotest" + + "git.lukeshu.com/btrfs-progs-ng/lib/diskio" +) + +type byteReaderWithName struct { + *bytes.Reader + name string +} + +func (r byteReaderWithName) Name() string { + return r.name +} + +func (r byteReaderWithName) Close() error { + return nil +} + +func (r byteReaderWithName) WriteAt([]byte, int64) (int, error) { + panic("not implemented") +} + +func FuzzStatefulReader(f *testing.F) { + f.Fuzz(func(t *testing.T, content []byte) { + t.Logf("content=%q", content) + var file diskio.File[int64] = byteReaderWithName{ + Reader: bytes.NewReader(content), + name: t.Name(), + } + reader := diskio.NewStatefulFile[int64](file) + if err := iotest.TestReader(reader, content); err != nil { + t.Error(err) + } + }) +} + +func FuzzStatefulBufferedReader(f *testing.F) { + f.Fuzz(func(t *testing.T, content []byte) { + t.Logf("content=%q", content) + var file diskio.File[int64] = byteReaderWithName{ + Reader: bytes.NewReader(content), + name: t.Name(), + } + file = diskio.NewBufferedFile[int64](file, 4, 2) + reader := diskio.NewStatefulFile[int64](file) + if err := iotest.TestReader(reader, content); err != nil { + t.Error(err) + } + }) +} diff --git a/lib/diskio/testdata/fuzz/FuzzStatefulBufferedReader/582528ddfad69eb57775199a43e0f9fd5c94bba343ce7bb6724d4ebafe311ed4 b/lib/diskio/testdata/fuzz/FuzzStatefulBufferedReader/582528ddfad69eb57775199a43e0f9fd5c94bba343ce7bb6724d4ebafe311ed4 new file mode 100644 index 0000000..a96f559 --- /dev/null +++ b/lib/diskio/testdata/fuzz/FuzzStatefulBufferedReader/582528ddfad69eb57775199a43e0f9fd5c94bba343ce7bb6724d4ebafe311ed4 @@ -0,0 +1,2 @@ +go test fuzz v1 +[]byte("0") -- cgit v1.2.3-2-g168b