package main import ( "fmt" "io/ioutil" "net" "os" "path/filepath" "sort" "strings" "sync" "time" ) func fmtAddress(node, port string) string { if isIPv6(node) { return fmt.Sprintf("[%s]:%s", node, port) } else { return fmt.Sprintf("%s:%s", node, port) } } func isIPv6(node string) bool { ip := net.ParseIP(node) return ip != nil && ip.To4() == nil } var jwg JobWaitGroup func Emit(pt Point) { if pt == nil { return } fmt.Println(pt) } func DoHostfile(fname string) { hostname := filepath.Base(fname) cfg, err := readConfigFile(fname) if err != nil { Emit(NewPoint("public", map[string]string{"host": hostname}, map[string]interface{}{"error": err.Error()})) return } addresses := make(map[string]struct{}) for _, address := range getAddresses(cfg) { addresses[fmtAddress(address.Node, address.Port)] = struct{}{} } for address := range addresses { jwg.Do(hostname+"/"+address+"/tcp4", func() { Emit(DoAddress(hostname, "tcp4", address)) }) jwg.Do(hostname+"/"+address+"/tcp6", func() { Emit(DoAddress(hostname, "tcp6", address)) }) } } func DoAddress(host, network, address string) Point { tags := map[string]string{ "host": host, "network": network, "address": address, } addr, err := net.ResolveTCPAddr(network, address) if err != nil { if ae, ok := err.(*net.AddrError); ok && ae.Err == "no suitable address found" { return nil } return NewPoint("public", tags, map[string]interface{}{"error": err.Error()}) } var _wg sync.WaitGroup _wg.Add(2) var result_name string var result_version string var result_error error go func() { defer _wg.Done() result_name, result_version, result_error = Hello(addr) }() var result_ping float64 go func() { defer _wg.Done() result_ping = Ping(addr.IP) }() _wg.Wait() result := map[string]interface{}{} if result_error == nil { result["name"] = result_name result["version"] = result_version } else { result["error"] = result_error } if result_ping >= 0 { result["ping"] = result_ping } return NewPoint("public", tags, result) } var dialer = net.Dialer{ Timeout: 10 * time.Second, } func Hello(addr *net.TCPAddr) (name, version string, err error) { conn, err := dialer.Dial(addr.Network(), addr.String()) if err != nil { return "", "", err } defer conn.Close() conn.SetDeadline(time.Now().Add(10 * time.Second)) conn.(*net.TCPConn).CloseWrite() all, _ := ioutil.ReadAll(conn) line := strings.TrimRight(string(all), "\n") parts := strings.Split(line, " ") if len(parts) != 3 { return "", "", fmt.Errorf("malformed ID line: %q", line) } return parts[1], parts[2], nil } func main() { for _, fname := range os.Args[1:] { DoHostfile(fname) } watch(time.Second) } func watch(d time.Duration) { ticker := time.NewTicker(d) done := make(chan map[string]time.Duration) go func() { jobs := jwg.Wait() ticker.Stop() done <- jobs }() for { select { case <-ticker.C: n, jobs := jwg.Status() if len(jobs) == 0 { panic("no active jobs, but wg still waiting") } line := fmt.Sprintf("%d/%d : %v", len(jobs), n, strings.Join(jobs, ", ")) if len(line) > 70 { line = line[:67] + "..." } fmt.Fprintf(os.Stderr, "\r%-70s", line) case jobs := <-done: fmt.Fprintf(os.Stderr, "\r%-70s\n", "done") s := newSortHelper(jobs) sort.Sort(s) for _, job := range s.StringSlice { fmt.Fprintln(os.Stderr, s.times[job], job) } return } } } type sortHelper struct { times map[string]time.Duration sort.StringSlice } func (s sortHelper) Less(i, j int) bool { return s.times[s.StringSlice[i]] < s.times[s.StringSlice[j]] } func newSortHelper(jobs map[string]time.Duration) sortHelper { slice := make([]string, 0, len(jobs)) for job := range jobs { slice = append(slice, job) } return sortHelper{times: jobs, StringSlice: slice} }