summaryrefslogtreecommitdiff
path: root/lib/diskio/file_blockbuf.go
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diskio/file_blockbuf.go')
-rw-r--r--lib/diskio/file_blockbuf.go98
1 files changed, 63 insertions, 35 deletions
diff --git a/lib/diskio/file_blockbuf.go b/lib/diskio/file_blockbuf.go
index 0bb3156..580e55a 100644
--- a/lib/diskio/file_blockbuf.go
+++ b/lib/diskio/file_blockbuf.go
@@ -5,63 +5,74 @@
package diskio
import (
+ "context"
"sync"
- "git.lukeshu.com/go/typedsync"
+ "github.com/datawire/dlib/dlog"
- "git.lukeshu.com/btrfs-progs-ng/lib/containers"
+ "git.lukeshu.com/btrfs-progs-ng/lib/caching"
)
-type bufferedBlock struct {
+type bufferedBlock[A ~int64] struct {
+ Mu sync.RWMutex
+ Addr A
+ Dirty bool
+
Dat []byte
Err error
}
type bufferedFile[A ~int64] struct {
+ ctx context.Context
inner File[A]
- mu sync.RWMutex
blockSize A
- blockCache containers.ARCache[A, bufferedBlock]
- blockPool typedsync.Pool[[]byte]
+ blockCache caching.Cache[A, bufferedBlock[A]]
}
var _ File[assertAddr] = (*bufferedFile[assertAddr])(nil)
-func NewBufferedFile[A ~int64](file File[A], blockSize A, cacheSize int) *bufferedFile[A] {
+func NewBufferedFile[A ~int64](ctx context.Context, file File[A], blockSize A, cacheSize int) *bufferedFile[A] {
ret := &bufferedFile[A]{
inner: file,
blockSize: blockSize,
- blockCache: containers.ARCache[A, bufferedBlock]{
- MaxLen: cacheSize,
- },
}
- ret.blockPool.New = ret.malloc
- ret.blockCache.OnRemove = ret.free
- ret.blockCache.New = ret.readBlock
+ ret.blockCache = caching.NewLRUCache[A, bufferedBlock[A]](cacheSize, bufferedBlockSource[A]{ret})
return ret
}
-func (bf *bufferedFile[A]) malloc() []byte {
- return make([]byte, bf.blockSize)
+type bufferedBlockSource[A ~int64] struct {
+ bf *bufferedFile[A]
}
-func (bf *bufferedFile[A]) free(_ A, buf bufferedBlock) {
- bf.blockPool.Put(buf.Dat)
+func (src bufferedBlockSource[A]) Flush(ctx context.Context, block *bufferedBlock[A]) {
+ if !block.Dirty {
+ return
+ }
+ if _, err := src.bf.inner.WriteAt(block.Dat, block.Addr); err != nil {
+ dlog.Errorf(src.bf.ctx, "i/o error: write: %v", err)
+ }
+ block.Dirty = false
}
-func (bf *bufferedFile[A]) readBlock(blockOffset A) bufferedBlock {
- dat, _ := bf.blockPool.Get()
- n, err := bf.inner.ReadAt(dat, blockOffset)
- return bufferedBlock{
- Dat: dat[:n],
- Err: err,
+func (src bufferedBlockSource[A]) Load(ctx context.Context, blockAddr A, block *bufferedBlock[A]) {
+ src.Flush(ctx, block)
+ if block.Dat == nil {
+ block.Dat = make([]byte, src.bf.blockSize)
}
+ n, err := src.bf.inner.ReadAt(block.Dat[:src.bf.blockSize], blockAddr)
+ block.Addr = blockAddr
+ block.Dat = block.Dat[:n]
+ block.Err = err
}
func (bf *bufferedFile[A]) Name() string { return bf.inner.Name() }
func (bf *bufferedFile[A]) Size() A { return bf.inner.Size() }
func (bf *bufferedFile[A]) Close() error { return bf.inner.Close() }
+func (bf *bufferedFile[A]) Flush() {
+ bf.blockCache.Flush(bf.ctx)
+}
+
func (bf *bufferedFile[A]) ReadAt(dat []byte, off A) (n int, err error) {
done := 0
for done < len(dat) {
@@ -75,11 +86,14 @@ func (bf *bufferedFile[A]) ReadAt(dat []byte, off A) (n int, err error) {
}
func (bf *bufferedFile[A]) maybeShortReadAt(dat []byte, off A) (n int, err error) {
- bf.mu.RLock()
- defer bf.mu.RUnlock()
offsetWithinBlock := off % bf.blockSize
blockOffset := off - offsetWithinBlock
- cachedBlock, _ := bf.blockCache.Load(blockOffset)
+
+ cachedBlock := bf.blockCache.Acquire(bf.ctx, blockOffset)
+ defer bf.blockCache.Release(blockOffset)
+ cachedBlock.Mu.RLock()
+ defer cachedBlock.Mu.RUnlock()
+
n = copy(dat, cachedBlock.Dat[offsetWithinBlock:])
if n < len(dat) {
return n, cachedBlock.Err
@@ -88,16 +102,30 @@ func (bf *bufferedFile[A]) maybeShortReadAt(dat []byte, off A) (n int, err error
}
func (bf *bufferedFile[A]) WriteAt(dat []byte, off A) (n int, err error) {
- bf.mu.Lock()
- defer bf.mu.Unlock()
+ done := 0
+ for done < len(dat) {
+ n, err := bf.maybeShortWriteAt(dat[done:], off+A(done))
+ done += n
+ if err != nil {
+ return done, err
+ }
+ }
+ return done, nil
+}
- // Do the work
- n, err = bf.inner.WriteAt(dat, off)
+func (bf *bufferedFile[A]) maybeShortWriteAt(dat []byte, off A) (n int, err error) {
+ offsetWithinBlock := off % bf.blockSize
+ blockOffset := off - offsetWithinBlock
- // Cache invalidation
- for blockOffset := off - (off % bf.blockSize); blockOffset < off+A(n); blockOffset += bf.blockSize {
- bf.blockCache.Delete(blockOffset)
- }
+ cachedBlock := bf.blockCache.Acquire(bf.ctx, blockOffset)
+ defer bf.blockCache.Release(blockOffset)
+ cachedBlock.Mu.Lock()
+ defer cachedBlock.Mu.Unlock()
- return n, err
+ cachedBlock.Dirty = true
+ n = copy(cachedBlock.Dat[offsetWithinBlock:], dat)
+ if n < len(dat) {
+ return n, cachedBlock.Err
+ }
+ return n, nil
}