From 86201baa1bf72f9717fc4643cd0a00554a23bd72 Mon Sep 17 00:00:00 2001 From: Luke Shumaker Date: Sat, 19 May 2018 01:38:13 -0400 Subject: eh --- go/src/cow-dedupe/dedupe.go | 119 ++++++++++++----------------------- go/src/lib/pipeline/cmd.go | 16 +++++ go/src/lib/pipeline/pipeline.go | 80 +++++++++++++++++++++++ go/src/lib/statusline/statuslinue.go | 24 +++++++ 4 files changed, 159 insertions(+), 80 deletions(-) create mode 100644 go/src/lib/pipeline/cmd.go create mode 100644 go/src/lib/pipeline/pipeline.go create mode 100644 go/src/lib/statusline/statuslinue.go diff --git a/go/src/cow-dedupe/dedupe.go b/go/src/cow-dedupe/dedupe.go index f28d2f4..9fee970 100644 --- a/go/src/cow-dedupe/dedupe.go +++ b/go/src/cow-dedupe/dedupe.go @@ -10,6 +10,8 @@ import ( "runtime" "strconv" "strings" + + "lib/statusline" ) //#include @@ -23,100 +25,62 @@ func errhandle(err error) { } } -func findLikelyDups(paths []string) map[string][]string { - ret := map[string][]string{} +func getFiemaps(paths []string) map[string][]string { var err error for i := range paths { paths[i], err = filepath.Abs(paths[i]) errhandle(err) } - cmd := exec.Command("find", append(paths, "-type", "f", "-printf", "%s %p\\0")...) + + ret := map[string][]string{} + + sl := statusline.NewStatusLine(os.Stderr) + cnt := 0 + sl.Put("Mapping extents...") + + cmd := exec.Command("find", append(paths, "-type", "f", "-exec", "./cow-extent-map", "-m", "--", "{}", "+")...) stdout, err := cmd.StdoutPipe() errhandle(err) cmd.Stderr = os.Stderr + errhandle(cmd.Start()) rd := bufio.NewReader(stdout) for { - line, err := rd.ReadString('\x00') - if line == "" && err == io.EOF { + filename, err := rd.ReadString('\x00') + if filename == "" && err == io.EOF { break } - errhandle(err) - parts := strings.SplitN(strings.TrimSuffix(line, "\x00"), " ", 2) - if len(parts) != 2 { - panic("wut") - } - size := parts[0] - filename := parts[1] - basename := filepath.Base(filename) - key := size + " " + basename - ret[key] = append(ret[key], filename) - } - errhandle(cmd.Wait()) - for key := range ret { - if len(ret[key]) < 2 { - delete(ret, key) + filename = strings.TrimSuffix(filename, "\x00") + if !strings.HasPrefix(filename, "/") { + panic("ugly filename") } - } - return ret -} - -func getFiemaps(paths []string) map[string][]string { - ret := map[string][]string{} - fmt.Fprintf(os.Stderr, "Getting fiemaps for %d files...\n", len(paths)) - - cnt := 0 - for len(paths) > 0 { - _paths := paths - arg_len := 0 - for i := range _paths { - arg_len += len(_paths[i]) + 1 - if arg_len > arg_max/2 { - _paths = _paths[:i-1] - break - } + errhandle(err) + fiemap, err := rd.ReadString('\x00') + fiemap = strings.TrimSuffix(fiemap, "\x00") + if !(strings.HasPrefix(fiemap, "logical=") || fiemap == "") { + panic("ugly fiemap") } - paths = paths[len(_paths):] - - cmd := exec.Command("./cow-extent-map", append([]string{"-m", "--"}, _paths...)...) - stdout, err := cmd.StdoutPipe() errhandle(err) - cmd.Stderr = os.Stderr - errhandle(cmd.Start()) - rd := bufio.NewReader(stdout) - for { - filename, err := rd.ReadString('\x00') - if filename == "" && err == io.EOF { - break - } - filename = strings.TrimSuffix(filename, "\x00") - if !strings.HasPrefix(filename, "/") { - panic("ugly filename") - } - errhandle(err) - fiemap, err := rd.ReadString('\x00') - fiemap = strings.TrimSuffix(fiemap, "\x00") - if !(strings.HasPrefix(fiemap, "logical=") || fiemap == "") { - panic("ugly fiemap") - } - errhandle(err) - ret[fiemap] = append(ret[fiemap], filename) - cnt++ - fmt.Fprintf(os.Stderr, "\r%d ", cnt) - } - errhandle(cmd.Wait()) + ret[fiemap] = append(ret[fiemap], filename) + cnt++ + sl.Put(fmt.Sprintf("Mapping extents... %d", cnt)) } + errhandle(cmd.Wait()) - fmt.Fprintf(os.Stderr, "\r...done \n") + sl.Put(fmt.Sprintf("Mapping extents... done; mapped %d files", cnt)) + sl.End() + io.WriteString(os.Stderr, "\n") return ret } func getChecksums(paths []string) map[string][]string { ret := map[string][]string{} - fmt.Fprintf(os.Stderr, "Generating checksums for %d files...\n", len(paths)) + sl := statusline.NewStatusLine(os.Stderr) cnt := 0 + sl.Put(fmt.Sprintf("Generating checksums for files... %d/%d\n", cnt, len(paths))) + for len(paths) > 0 { _paths := paths arg_len := 0 @@ -150,28 +114,23 @@ func getChecksums(paths []string) map[string][]string { ret[checksum] = append(ret[checksum], filename) cnt++ - fmt.Fprintf(os.Stderr, "\r%d ", cnt) + sl.Put(fmt.Sprintf("Generating checksums for files... %d/%d\n", cnt, len(paths))) } errhandle(cmd.Wait()) } - fmt.Fprintf(os.Stderr, "\r...done \n") + sl.Put(fmt.Sprintf("Generating checksums for files... done; summed %d files\n", cnt)) + sl.End() + io.WriteString(os.Stderr, "\n") return ret } func main() { - // we have no parallelism, don't let syscalls fan-out weird on - // many-core systems + // we have low parallelism, don't let syscalls fan-out weird + // on many-core systems runtime.GOMAXPROCS(1) - likely := findLikelyDups(os.Args[1:]) - - var flatLikely []string - for _, filenames := range likely { - flatLikely = append(flatLikely, filenames...) - } - - fiemap2filenames := getFiemaps(flatLikely) + fiemap2filenames := getFiemaps(os.Args[1:]) filename2fiemap := map[string]string{} for fiemap, filenames := range fiemap2filenames { diff --git a/go/src/lib/pipeline/cmd.go b/go/src/lib/pipeline/cmd.go new file mode 100644 index 0000000..563becf --- /dev/null +++ b/go/src/lib/pipeline/cmd.go @@ -0,0 +1,16 @@ +package pipeline + +import ( + "io" +) + +type Cmd interface { + CombinedOutput() ([]byte, error) + Output() ([]byte, error) + Run() error + Start() error + StderrPipe() (io.ReadCloser, error) + StdinPipe() (io.WriteCloser, error) + StdoutPipe() (io.ReadCloser, error) + Wait() error +} diff --git a/go/src/lib/pipeline/pipeline.go b/go/src/lib/pipeline/pipeline.go new file mode 100644 index 0000000..a74e626 --- /dev/null +++ b/go/src/lib/pipeline/pipeline.go @@ -0,0 +1,80 @@ +package pipeline + +import ( + "os/exec" + "io" +) + +type Pipe struct { + Cmds []exec.Cmd +} + +func (pl *Pipe) preStart() error { + for _, cmd := range pl.Cmds[:len(pl.Cmds)-2] { + if err := cmd.Start(); err != nil { + return err + } + } + return nil +} + +func (pl *Pipe) Start() error { + for _, cmd := range pl.Cmds { + if err := cmd.Start(); err != nil { + return err + } + } + return nil +} + +func (pl *Pipe) Wait() error { + for _, cmd := range pl.Cmds { + if err := cmd.Wait(); err != nil { + return err + } + } + return nil +} + +func (pl *Pipe) Run() error { + if err := pl.Start(); err != nil { + return err + } + return pl.Wait() +} + +func (pl *Pipe) CombinedOutput() ([]byte, error) { + if err := pl.preStart(); err != nil { + return nil, err + } + return pl.Cmds[len(pl.Cmds)-1].CombinedOutput() +} + +func (pl *Pipe) Output() ([]byte, error) { + if err := pl.preStart(); err != nil { + return nil, err + } + return pl.Cmds[len(pl.Cmds)-1].Output() +} + +func (pl *Pipe) StdinPipe() (io.WriteCloser, error) { + return pl.Cmds[0].StdinPipe() +} + +func (pl *Pipe) StdoutPipe() (io.ReadCloser, error) { + return pl.Cmds[len(pl.Cmds)-1].StdoutPipe() +} + +var _ = &Pipe{} + +func Pipeline(cmds ...exec.Cmd) (*Pipe, error) { + pl := &Pipe{cmds} + for i := range pl.Cmds[:len(pl.Cmds)-2] { + out, err := pl.Cmds[i].StdoutPipe() + if err != nil { + return nil, err + } + pl.Cmds[i+1].Stdin = out + } + return pl, nil +} diff --git a/go/src/lib/statusline/statuslinue.go b/go/src/lib/statusline/statuslinue.go new file mode 100644 index 0000000..90ec1af --- /dev/null +++ b/go/src/lib/statusline/statuslinue.go @@ -0,0 +1,24 @@ +package statusline + +import ( + "fmt" + "io" +) + +type StatusLine struct { + out io.Writer + prevLen int +} + +func NewStatusLine(out io.Writer) *StatusLine { + return &StatusLine{out: out} +} + +func (sl *StatusLine) Put(line string) { + fmt.Fprintf(sl.out, "\r%-[1]*[2]s", sl.prevLen, line) + sl.prevLen = len(line) +} + +func (sl *StatusLine) End() { + fmt.Fprintf(sl.out, "\r%-[1]*[2]s\r", sl.prevLen, "") +} -- cgit v1.2.3-2-g168b