diff --git a/.gitignore b/.gitignore index f9cd104f..d1b1d4cb 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,4 @@ cmd/dnet/dnet .settings/ libnetworkbuild.created +test/networkDb/testMain diff --git a/diagnose/diagnose.go b/diagnose/diagnose.go new file mode 100644 index 00000000..0ce7a491 --- /dev/null +++ b/diagnose/diagnose.go @@ -0,0 +1,133 @@ +package diagnose + +import ( + "fmt" + "net" + "net/http" + "sync" + + "github.com/Sirupsen/logrus" +) + +// HTTPHandlerFunc TODO +type HTTPHandlerFunc func(interface{}, http.ResponseWriter, *http.Request) + +type httpHandlerCustom struct { + ctx interface{} + F func(interface{}, http.ResponseWriter, *http.Request) +} + +// ServeHTTP TODO +func (h httpHandlerCustom) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.F(h.ctx, w, r) +} + +var diagPaths2Func = map[string]HTTPHandlerFunc{ + "/": notImplemented, + "/help": help, + "/ready": ready, +} + +// Server when the debug is enabled exposes a +// This data structure is protected by the Agent mutex so does not require and additional mutex here +type Server struct { + sk net.Listener + port int + mux *http.ServeMux + registeredHanders []string + sync.Mutex +} + +// Init TODO +func (n *Server) Init() { + n.mux = http.NewServeMux() + + // Register local handlers + n.RegisterHandler(n, diagPaths2Func) +} + +// RegisterHandler TODO +func (n *Server) RegisterHandler(ctx interface{}, hdlrs map[string]HTTPHandlerFunc) { + n.Lock() + defer n.Unlock() + for path, fun := range hdlrs { + n.mux.Handle(path, httpHandlerCustom{ctx, fun}) + n.registeredHanders = append(n.registeredHanders, path) + } +} + +// EnableDebug opens a TCP socket to debug the passed network DB +func (n *Server) EnableDebug(ip string, port int) { + n.Lock() + defer n.Unlock() + + n.port = port + logrus.SetLevel(logrus.DebugLevel) + + if n.sk != nil { + logrus.Infof("The server is already up and running") + return + } + + logrus.Infof("Starting the server listening on %d for commands", port) + + // // Create the socket + // var err error + // n.sk, err = net.Listen("tcp", listeningAddr) + // if err != nil { + // log.Fatal(err) + // } + // + // go func() { + // http.Serve(n.sk, n.mux) + // }() + http.ListenAndServe(":8000", n.mux) +} + +// DisableDebug stop the dubug and closes the tcp socket +func (n *Server) DisableDebug() { + n.Lock() + defer n.Unlock() + n.sk.Close() + n.sk = nil +} + +// IsDebugEnable returns true when the debug is enabled +func (n *Server) IsDebugEnable() bool { + n.Lock() + defer n.Unlock() + return n.sk != nil +} + +func notImplemented(ctx interface{}, w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "URL path: %s no method implemented check /help\n", r.URL.Path) +} + +func help(ctx interface{}, w http.ResponseWriter, r *http.Request) { + n, ok := ctx.(*Server) + if ok { + for _, path := range n.registeredHanders { + fmt.Fprintf(w, "%s\n", path) + } + } +} + +func ready(ctx interface{}, w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "OK\n") +} + +// DebugHTTPForm TODO +func DebugHTTPForm(r *http.Request) { + r.ParseForm() + for k, v := range r.Form { + logrus.Debugf("Form[%q] = %q\n", k, v) + } +} + +// HTTPReplyError TODO +func HTTPReplyError(w http.ResponseWriter, message, usage string) { + fmt.Fprintf(w, "%s\n", message) + if usage != "" { + fmt.Fprintf(w, "Usage: %s\n", usage) + } +} diff --git a/networkdb/delegate.go b/networkdb/delegate.go index b3ef000d..beef82a4 100644 --- a/networkdb/delegate.go +++ b/networkdb/delegate.go @@ -104,6 +104,9 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { } n = nDB.checkAndGetNode(nEvent) + if n == nil { + return false + } nDB.purgeSameNode(n) n.ltime = nEvent.LTime diff --git a/networkdb/networkdb.go b/networkdb/networkdb.go index e4cc9df1..ec9a8db5 100644 --- a/networkdb/networkdb.go +++ b/networkdb/networkdb.go @@ -108,6 +108,11 @@ type PeerInfo struct { IP string } +// PeerClusterInfo represents the peer (gossip cluster) nodes +type PeerClusterInfo struct { + PeerInfo +} + type node struct { memberlist.Node ltime serf.LamportTime @@ -253,6 +258,20 @@ func (nDB *NetworkDB) Close() { } } +// ClusterPeers returns all the gossip cluster peers. +func (nDB *NetworkDB) ClusterPeers() []PeerInfo { + nDB.RLock() + defer nDB.RUnlock() + peers := make([]PeerInfo, 0, len(nDB.nodes)) + for _, node := range nDB.nodes { + peers = append(peers, PeerInfo{ + Name: node.Name, + IP: node.Node.Addr.String(), + }) + } + return peers +} + // Peers returns the gossip peers for a given network. func (nDB *NetworkDB) Peers(nid string) []PeerInfo { nDB.RLock() diff --git a/networkdb/networkdbdiagnose.go b/networkdb/networkdbdiagnose.go new file mode 100644 index 00000000..d70cec7b --- /dev/null +++ b/networkdb/networkdbdiagnose.go @@ -0,0 +1,242 @@ +package networkdb + +import ( + "fmt" + "net/http" + "strings" + + "github.com/docker/libnetwork/diagnose" +) + +const ( + missingParameter = "missing parameter" +) + +// NetDbPaths2Func TODO +var NetDbPaths2Func = map[string]diagnose.HTTPHandlerFunc{ + "/join": dbJoin, + "/networkpeers": dbPeers, + "/clusterpeers": dbClusterPeers, + "/joinnetwork": dbJoinNetwork, + "/leavenetwork": dbLeaveNetwork, + "/createentry": dbCreateEntry, + "/updateentry": dbUpdateEntry, + "/deleteentry": dbDeleteEntry, + "/getentry": dbGetEntry, + "/gettable": dbGetTable, +} + +func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["members"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?members=ip1,ip2,...", r.URL.Path)) + return + } + + nDB, ok := ctx.(*NetworkDB) + if ok { + err := nDB.Join(strings.Split(r.Form["members"][0], ",")) + if err != nil { + fmt.Fprintf(w, "%s error in the DB join %s\n", r.URL.Path, err) + return + } + + fmt.Fprintf(w, "OK\n") + } +} + +func dbPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["nid"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path)) + return + } + + nDB, ok := ctx.(*NetworkDB) + if ok { + peers := nDB.Peers(r.Form["nid"][0]) + fmt.Fprintf(w, "Network:%s Total peers: %d\n", r.Form["nid"], len(peers)) + for i, peerInfo := range peers { + fmt.Fprintf(w, "%d) %s -> %s\n", i, peerInfo.Name, peerInfo.IP) + } + } +} + +func dbClusterPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) { + nDB, ok := ctx.(*NetworkDB) + if ok { + peers := nDB.ClusterPeers() + fmt.Fprintf(w, "Total peers: %d\n", len(peers)) + for i, peerInfo := range peers { + fmt.Fprintf(w, "%d) %s -> %s\n", i, peerInfo.Name, peerInfo.IP) + } + } +} + +func dbCreateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["tname"]) < 1 || + len(r.Form["nid"]) < 1 || + len(r.Form["key"]) < 1 || + len(r.Form["value"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path)) + return + } + + tname := r.Form["tname"][0] + nid := r.Form["nid"][0] + key := r.Form["key"][0] + value := r.Form["value"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + if err := nDB.CreateEntry(tname, nid, key, []byte(value)); err != nil { + diagnose.HTTPReplyError(w, err.Error(), "") + return + } + fmt.Fprintf(w, "OK\n") + } +} + +func dbUpdateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["tname"]) < 1 || + len(r.Form["nid"]) < 1 || + len(r.Form["key"]) < 1 || + len(r.Form["value"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path)) + return + } + + tname := r.Form["tname"][0] + nid := r.Form["nid"][0] + key := r.Form["key"][0] + value := r.Form["value"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + if err := nDB.UpdateEntry(tname, nid, key, []byte(value)); err != nil { + diagnose.HTTPReplyError(w, err.Error(), "") + return + } + fmt.Fprintf(w, "OK\n") + } +} + +func dbDeleteEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["tname"]) < 1 || + len(r.Form["nid"]) < 1 || + len(r.Form["key"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path)) + return + } + + tname := r.Form["tname"][0] + nid := r.Form["nid"][0] + key := r.Form["key"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + err := nDB.DeleteEntry(tname, nid, key) + if err != nil { + diagnose.HTTPReplyError(w, err.Error(), "") + return + } + fmt.Fprintf(w, "OK\n") + } +} + +func dbGetEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["tname"]) < 1 || + len(r.Form["nid"]) < 1 || + len(r.Form["key"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path)) + return + } + + tname := r.Form["tname"][0] + nid := r.Form["nid"][0] + key := r.Form["key"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + value, err := nDB.GetEntry(tname, nid, key) + if err != nil { + diagnose.HTTPReplyError(w, err.Error(), "") + return + } + fmt.Fprintf(w, "key:`%s` value:`%s`\n", key, string(value)) + } +} + +func dbJoinNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["nid"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path)) + return + } + + nid := r.Form["nid"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + if err := nDB.JoinNetwork(nid); err != nil { + diagnose.HTTPReplyError(w, err.Error(), "") + return + } + fmt.Fprintf(w, "OK\n") + } +} + +func dbLeaveNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["nid"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path)) + return + } + + nid := r.Form["nid"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + if err := nDB.LeaveNetwork(nid); err != nil { + diagnose.HTTPReplyError(w, err.Error(), "") + return + } + fmt.Fprintf(w, "OK\n") + } +} + +func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["tname"]) < 1 || + len(r.Form["nid"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id", r.URL.Path)) + return + } + + tname := r.Form["tname"][0] + nid := r.Form["nid"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + table := nDB.GetTableByNetwork(tname, nid) + fmt.Fprintf(w, "total elements: %d\n", len(table)) + i := 0 + for k, v := range table { + fmt.Fprintf(w, "%d) k:`%s` -> v:`%s`\n", i, k, string(v.([]byte))) + i++ + } + } +} diff --git a/test/networkDb/Dockerfile b/test/networkDb/Dockerfile new file mode 100644 index 00000000..5dd7b20e --- /dev/null +++ b/test/networkDb/Dockerfile @@ -0,0 +1,7 @@ +FROM alpine + +COPY testMain /app/ + +WORKDIR app + +ENTRYPOINT ["/app/testMain"] diff --git a/test/networkDb/README b/test/networkDb/README new file mode 100644 index 00000000..17da65aa --- /dev/null +++ b/test/networkDb/README @@ -0,0 +1,15 @@ +SERVER + +cd test/networkdb +env GOOS=linux go build -v server/ndbTester.go && docker build -t fcrisciani/networkdb-test -f server/Dockerfile . +(only for testkit case) docker push fcrisciani/networkdb-test + +Run server: docker service create --name testdb --network net1 --replicas 3 --env TASK_ID="{{.Task.ID}}" -p mode=host,target=8000 fcrisciani/networkdb-test server 8000 + +CLIENT + +cd test/networkdb +Join cluster: docker run -it --network net1 fcrisciani/networkdb-test client join testdb 8000 +Join network: docker run -it --network net1 fcrisciani/networkdb-test client join-network testdb 8000 test +Run test: docker run -it --network net1 fcrisciani/networkdb-test client write-delete-unique-keys testdb 8000 test tableBla 3 10 +check table: curl "localhost:32768/gettable?nid=test&tname=table_name" diff --git a/test/networkDb/dbclient/ndbClient.go b/test/networkDb/dbclient/ndbClient.go new file mode 100644 index 00000000..4550abb8 --- /dev/null +++ b/test/networkDb/dbclient/ndbClient.go @@ -0,0 +1,693 @@ +package dbclient + +import ( + "context" + "io/ioutil" + "log" + "net" + "net/http" + "os" + "regexp" + "strconv" + "strings" + "time" + + "github.com/Sirupsen/logrus" +) + +var servicePort string + +const totalWrittenKeys string = "totalKeys" + +type resultTuple struct { + id string + result int +} + +func httpGetFatalError(ip, port, path string) { + // for { + body, err := httpGet(ip, port, path) + if err != nil || !strings.Contains(string(body), "OK") { + // if strings.Contains(err.Error(), "EOF") { + // logrus.Warnf("Got EOF path:%s err:%s", path, err) + // continue + // } + log.Fatalf("[%s] error %s %s", path, err, body) + } + // break + // } +} + +func httpGet(ip, port, path string) ([]byte, error) { + resp, err := http.Get("http://" + ip + ":" + port + path) + if err != nil { + logrus.Errorf("httpGet error:%s", err) + return nil, err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + return body, err +} + +func joinCluster(ip, port string, members []string, doneCh chan resultTuple) { + httpGetFatalError(ip, port, "/join?members="+strings.Join(members, ",")) + + if doneCh != nil { + doneCh <- resultTuple{id: ip, result: 0} + } +} + +func joinNetwork(ip, port, network string, doneCh chan resultTuple) { + httpGetFatalError(ip, port, "/joinnetwork?nid="+network) + + if doneCh != nil { + doneCh <- resultTuple{id: ip, result: 0} + } +} + +func leaveNetwork(ip, port, network string, doneCh chan resultTuple) { + httpGetFatalError(ip, port, "/leavenetwork?nid="+network) + + if doneCh != nil { + doneCh <- resultTuple{id: ip, result: 0} + } +} + +func writeTableKey(ip, port, networkName, tableName, key string) { + createPath := "/createentry?nid=" + networkName + "&tname=" + tableName + "&value=v&key=" + httpGetFatalError(ip, port, createPath+key) +} + +func deleteTableKey(ip, port, networkName, tableName, key string) { + deletePath := "/deleteentry?nid=" + networkName + "&tname=" + tableName + "&key=" + httpGetFatalError(ip, port, deletePath+key) +} + +func clusterPeersNumber(ip, port string, doneCh chan resultTuple) { + body, err := httpGet(ip, port, "/clusterpeers") + + if err != nil { + logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err) + doneCh <- resultTuple{id: ip, result: -1} + return + } + peersRegexp := regexp.MustCompile(`Total peers: ([0-9]+)`) + peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1]) + + doneCh <- resultTuple{id: ip, result: peersNum} +} + +func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) { + body, err := httpGet(ip, port, "/networkpeers?nid="+networkName) + + if err != nil { + logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err) + doneCh <- resultTuple{id: ip, result: -1} + return + } + peersRegexp := regexp.MustCompile(`Total peers: ([0-9]+)`) + peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1]) + + doneCh <- resultTuple{id: ip, result: peersNum} +} + +func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) { + body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName) + + if err != nil { + logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err) + doneCh <- resultTuple{id: ip, result: -1} + return + } + elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`) + entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1]) + doneCh <- resultTuple{id: ip, result: entriesNum} +} + +func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) { + httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName) + if doneCh != nil { + doneCh <- resultTuple{id: ip, result: 0} + } +} + +func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) { + body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName) + + if err != nil { + logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err) + doneCh <- resultTuple{id: ip, result: -1} + return + } + elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`) + entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1]) + doneCh <- resultTuple{id: ip, result: entriesNum} +} + +func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) { + for x := 0; ; x++ { + select { + case <-ctx.Done(): + doneCh <- resultTuple{id: ip, result: x} + return + default: + k := key + strconv.Itoa(x) + // write key + writeTableKey(ip, port, networkName, tableName, k) + // give time to send out key writes + time.Sleep(100 * time.Millisecond) + } + } +} + +func writeDeleteUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) { + for x := 0; ; x++ { + select { + case <-ctx.Done(): + doneCh <- resultTuple{id: ip, result: x} + return + default: + k := key + strconv.Itoa(x) + // write key + writeTableKey(ip, port, networkName, tableName, k) + // give time to send out key writes + time.Sleep(100 * time.Millisecond) + // delete key + deleteTableKey(ip, port, networkName, tableName, k) + } + } +} + +func writeDeleteLeaveJoin(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) { + for x := 0; ; x++ { + select { + case <-ctx.Done(): + doneCh <- resultTuple{id: ip, result: x} + return + default: + k := key + strconv.Itoa(x) + // write key + writeTableKey(ip, port, networkName, tableName, k) + time.Sleep(100 * time.Millisecond) + // delete key + deleteTableKey(ip, port, networkName, tableName, k) + // give some time + time.Sleep(100 * time.Millisecond) + // leave network + leaveNetwork(ip, port, networkName, nil) + // join network + joinNetwork(ip, port, networkName, nil) + } + } +} + +func ready(ip, port string, doneCh chan resultTuple) { + for { + body, err := httpGet(ip, port, "/ready") + if err != nil || !strings.Contains(string(body), "OK") { + time.Sleep(500 * time.Millisecond) + continue + } + // success + break + } + // notify the completion + doneCh <- resultTuple{id: ip, result: 0} +} + +func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) { + startTime := time.Now().UnixNano() + var successTime int64 + + // Loop for 2 minutes to guartee that the result is stable + for { + select { + case <-ctx.Done(): + // Validate test success, if the time is set means that all the tables are empty + if successTime != 0 { + logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond) + return + } + log.Fatal("Test failed, there is still entries in the tables of the nodes") + default: + logrus.Infof("Checking table %s expected %d", tableName, expectedEntries) + doneCh := make(chan resultTuple, len(ips)) + for _, ip := range ips { + go fn(ip, servicePort, networkName, tableName, doneCh) + } + + nodesWithCorrectEntriesNum := 0 + for i := len(ips); i > 0; i-- { + tableEntries := <-doneCh + logrus.Infof("Node %s has %d entries", tableEntries.id, tableEntries.result) + if tableEntries.result == expectedEntries { + nodesWithCorrectEntriesNum++ + } + } + close(doneCh) + if nodesWithCorrectEntriesNum == len(ips) { + if successTime == 0 { + successTime = time.Now().UnixNano() + logrus.Infof("Success after %d msec", time.Duration(successTime-startTime)/time.Millisecond) + } + } else { + successTime = 0 + } + time.Sleep(10 * time.Second) + } + } +} + +func waitWriters(parallelWriters int, mustWrite bool, doneCh chan resultTuple) map[string]int { + var totalKeys int + resultTable := make(map[string]int) + for i := 0; i < parallelWriters; i++ { + logrus.Infof("Waiting for %d workers", parallelWriters-i) + workerReturn := <-doneCh + totalKeys += workerReturn.result + if mustWrite && workerReturn.result == 0 { + log.Fatalf("The worker %s did not write any key %d == 0", workerReturn.id, workerReturn.result) + } + if !mustWrite && workerReturn.result != 0 { + log.Fatalf("The worker %s was supposed to return 0 instead %d != 0", workerReturn.id, workerReturn.result) + } + if mustWrite { + resultTable[workerReturn.id] = workerReturn.result + logrus.Infof("The worker %s wrote %d keys", workerReturn.id, workerReturn.result) + } + } + resultTable[totalWrittenKeys] = totalKeys + return resultTable +} + +// ready +func doReady(ips []string) { + doneCh := make(chan resultTuple, len(ips)) + // check all the nodes + for _, ip := range ips { + go ready(ip, servicePort, doneCh) + } + // wait for the readiness of all nodes + for i := len(ips); i > 0; i-- { + <-doneCh + } + close(doneCh) +} + +// join +func doJoin(ips []string) { + doneCh := make(chan resultTuple, len(ips)) + // check all the nodes + for i, ip := range ips { + members := append([]string(nil), ips[:i]...) + members = append(members, ips[i+1:]...) + go joinCluster(ip, servicePort, members, doneCh) + } + // wait for the readiness of all nodes + for i := len(ips); i > 0; i-- { + <-doneCh + } + close(doneCh) +} + +// cluster-peers expectedNumberPeers +func doClusterPeers(ips []string, args []string) { + doneCh := make(chan resultTuple, len(ips)) + expectedPeers, _ := strconv.Atoi(args[0]) + // check all the nodes + for _, ip := range ips { + go clusterPeersNumber(ip, servicePort, doneCh) + } + // wait for the readiness of all nodes + for i := len(ips); i > 0; i-- { + node := <-doneCh + if node.result != expectedPeers { + log.Fatalf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result) + } + } + close(doneCh) +} + +// join-network networkName +func doJoinNetwork(ips []string, args []string) { + doneCh := make(chan resultTuple, len(ips)) + // check all the nodes + for _, ip := range ips { + go joinNetwork(ip, servicePort, args[0], doneCh) + } + // wait for the readiness of all nodes + for i := len(ips); i > 0; i-- { + <-doneCh + } + close(doneCh) +} + +// leave-network networkName +func doLeaveNetwork(ips []string, args []string) { + doneCh := make(chan resultTuple, len(ips)) + // check all the nodes + for _, ip := range ips { + go leaveNetwork(ip, servicePort, args[0], doneCh) + } + // wait for the readiness of all nodes + for i := len(ips); i > 0; i-- { + <-doneCh + } + close(doneCh) +} + +// cluster-peers networkName expectedNumberPeers maxRetry +func doNetworkPeers(ips []string, args []string) { + doneCh := make(chan resultTuple, len(ips)) + networkName := args[0] + expectedPeers, _ := strconv.Atoi(args[1]) + maxRetry, _ := strconv.Atoi(args[2]) + for retry := 0; retry < maxRetry; retry++ { + // check all the nodes + for _, ip := range ips { + go networkPeersNumber(ip, servicePort, networkName, doneCh) + } + // wait for the readiness of all nodes + for i := len(ips); i > 0; i-- { + node := <-doneCh + if node.result != expectedPeers { + if retry == maxRetry-1 { + log.Fatalf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result) + } else { + logrus.Warnf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result) + } + time.Sleep(1 * time.Second) + } + } + } + close(doneCh) +} + +// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec +func doWriteDeleteUniqueKeys(ips []string, args []string) { + networkName := args[0] + tableName := args[1] + parallelWriters, _ := strconv.Atoi(args[2]) + writeTimeSec, _ := strconv.Atoi(args[3]) + + doneCh := make(chan resultTuple, parallelWriters) + // Enable watch of tables from clients + for i := 0; i < parallelWriters; i++ { + go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh) + } + waitWriters(parallelWriters, false, doneCh) + + // Start parallel writers that will create and delete unique keys + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second) + for i := 0; i < parallelWriters; i++ { + key := "key-" + strconv.Itoa(i) + "-" + logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i]) + go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh) + } + + // Sync with all the writers + keyMap := waitWriters(parallelWriters, true, doneCh) + cancel() + logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys]) + + // check table entries for 2 minutes + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) + checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + cancel() + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber) + cancel() +} + +// write-unique-keys networkName tableName numParallelWriters writeTimeSec +func doWriteUniqueKeys(ips []string, args []string) { + networkName := args[0] + tableName := args[1] + parallelWriters, _ := strconv.Atoi(args[2]) + writeTimeSec, _ := strconv.Atoi(args[3]) + + doneCh := make(chan resultTuple, parallelWriters) + // Enable watch of tables from clients + for i := 0; i < parallelWriters; i++ { + go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh) + } + waitWriters(parallelWriters, false, doneCh) + + // Start parallel writers that will create and delete unique keys + defer close(doneCh) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second) + for i := 0; i < parallelWriters; i++ { + key := "key-" + strconv.Itoa(i) + "-" + logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i]) + go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh) + } + + // Sync with all the writers + keyMap := waitWriters(parallelWriters, true, doneCh) + cancel() + logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys]) + + // check table entries for 2 minutes + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) + checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber) + cancel() +} + +// write-delete-leave-join networkName tableName numParallelWriters writeTimeSec +func doWriteDeleteLeaveJoin(ips []string, args []string) { + networkName := args[0] + tableName := args[1] + parallelWriters, _ := strconv.Atoi(args[2]) + writeTimeSec, _ := strconv.Atoi(args[3]) + + // Start parallel writers that will create and delete unique keys + doneCh := make(chan resultTuple, parallelWriters) + defer close(doneCh) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second) + for i := 0; i < parallelWriters; i++ { + key := "key-" + strconv.Itoa(i) + "-" + logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i]) + go writeDeleteLeaveJoin(ctx, ips[i], servicePort, networkName, tableName, key, doneCh) + } + + // Sync with all the writers + keyMap := waitWriters(parallelWriters, true, doneCh) + cancel() + logrus.Infof("Written a total of %d keys on the cluster", keyMap["totalKeys"]) + + // check table entries for 2 minutes + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) + checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + cancel() +} + +// write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec +func doWriteDeleteWaitLeaveJoin(ips []string, args []string) { + networkName := args[0] + tableName := args[1] + parallelWriters, _ := strconv.Atoi(args[2]) + writeTimeSec, _ := strconv.Atoi(args[3]) + + // Start parallel writers that will create and delete unique keys + doneCh := make(chan resultTuple, parallelWriters) + defer close(doneCh) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second) + for i := 0; i < parallelWriters; i++ { + key := "key-" + strconv.Itoa(i) + "-" + logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i]) + go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh) + } + + // Sync with all the writers + keyMap := waitWriters(parallelWriters, true, doneCh) + cancel() + logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys]) + + // The writers will leave the network + for i := 0; i < parallelWriters; i++ { + logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i]) + go leaveNetwork(ips[i], servicePort, networkName, doneCh) + } + waitWriters(parallelWriters, false, doneCh) + + // Give some time + time.Sleep(100 * time.Millisecond) + + // The writers will join the network + for i := 0; i < parallelWriters; i++ { + logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i]) + go joinNetwork(ips[i], servicePort, networkName, doneCh) + } + waitWriters(parallelWriters, false, doneCh) + + // check table entries for 2 minutes + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) + checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + cancel() +} + +// write-wait-leave networkName tableName numParallelWriters writeTimeSec +func doWriteWaitLeave(ips []string, args []string) { + networkName := args[0] + tableName := args[1] + parallelWriters, _ := strconv.Atoi(args[2]) + writeTimeSec, _ := strconv.Atoi(args[3]) + + // Start parallel writers that will create and delete unique keys + doneCh := make(chan resultTuple, parallelWriters) + defer close(doneCh) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second) + for i := 0; i < parallelWriters; i++ { + key := "key-" + strconv.Itoa(i) + "-" + logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i]) + go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh) + } + + // Sync with all the writers + keyMap := waitWriters(parallelWriters, true, doneCh) + cancel() + logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys]) + + // The writers will leave the network + for i := 0; i < parallelWriters; i++ { + logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i]) + go leaveNetwork(ips[i], servicePort, networkName, doneCh) + } + waitWriters(parallelWriters, false, doneCh) + + // check table entries for 2 minutes + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) + checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + cancel() +} + +// write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver +func doWriteWaitLeaveJoin(ips []string, args []string) { + networkName := args[0] + tableName := args[1] + parallelWriters, _ := strconv.Atoi(args[2]) + writeTimeSec, _ := strconv.Atoi(args[3]) + parallerlLeaver, _ := strconv.Atoi(args[4]) + + // Start parallel writers that will create and delete unique keys + doneCh := make(chan resultTuple, parallelWriters) + defer close(doneCh) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second) + for i := 0; i < parallelWriters; i++ { + key := "key-" + strconv.Itoa(i) + "-" + logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i]) + go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh) + } + + // Sync with all the writers + keyMap := waitWriters(parallelWriters, true, doneCh) + cancel() + logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys]) + + keysExpected := keyMap[totalWrittenKeys] + // The Leavers will leave the network + for i := 0; i < parallerlLeaver; i++ { + logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i]) + go leaveNetwork(ips[i], servicePort, networkName, doneCh) + // Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed + keysExpected -= keyMap[ips[i]] + } + waitWriters(parallerlLeaver, false, doneCh) + + // Give some time + time.Sleep(100 * time.Millisecond) + + // The writers will join the network + for i := 0; i < parallerlLeaver; i++ { + logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i]) + go joinNetwork(ips[i], servicePort, networkName, doneCh) + } + waitWriters(parallerlLeaver, false, doneCh) + + // check table entries for 2 minutes + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) + checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber) + cancel() +} + +var cmdArgChec = map[string]int{ + "debug": 0, + "fail": 0, + "ready": 2, + "join": 2, + "leave": 2, + "join-network": 3, + "leave-network": 3, + "cluster-peers": 3, + "write-delete-unique-keys": 4, +} + +// Client is a client +func Client(args []string) { + logrus.Infof("[CLIENT] Starting with arguments %v", args) + command := args[0] + + if len(args) < cmdArgChec[command] { + log.Fatalf("Command %s requires %d arguments, aborting...", command, cmdArgChec[command]) + } + + switch command { + case "debug": + time.Sleep(1 * time.Hour) + os.Exit(0) + case "fail": + log.Fatalf("Test error condition with message: error error error") + } + + serviceName := args[1] + ips, _ := net.LookupHost("tasks." + serviceName) + logrus.Infof("got the ips %v", ips) + if len(ips) == 0 { + log.Fatalf("Cannot resolve any IP for the service tasks.%s", serviceName) + } + servicePort = args[2] + commandArgs := args[3:] + logrus.Infof("Executing %s with args:%v", command, commandArgs) + switch command { + case "ready": + doReady(ips) + case "join": + doJoin(ips) + case "leave": + + case "cluster-peers": + // cluster-peers + doClusterPeers(ips, commandArgs) + + case "join-network": + // join-network networkName + doJoinNetwork(ips, commandArgs) + case "leave-network": + // leave-network networkName + doLeaveNetwork(ips, commandArgs) + case "network-peers": + // network-peers networkName maxRetry + doNetworkPeers(ips, commandArgs) + + case "write-unique-keys": + // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec + doWriteUniqueKeys(ips, commandArgs) + case "write-delete-unique-keys": + // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec + doWriteDeleteUniqueKeys(ips, commandArgs) + case "write-delete-leave-join": + // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec + doWriteDeleteLeaveJoin(ips, commandArgs) + case "write-delete-wait-leave-join": + // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec + doWriteDeleteWaitLeaveJoin(ips, commandArgs) + case "write-wait-leave": + // write-wait-leave networkName tableName numParallelWriters writeTimeSec + doWriteWaitLeave(ips, commandArgs) + case "write-wait-leave-join": + // write-wait-leave networkName tableName numParallelWriters writeTimeSec + doWriteWaitLeaveJoin(ips, commandArgs) + default: + log.Fatalf("Command %s not recognized", command) + } +} diff --git a/test/networkDb/dbserver/ndbServer.go b/test/networkDb/dbserver/ndbServer.go new file mode 100644 index 00000000..dab8b3db --- /dev/null +++ b/test/networkDb/dbserver/ndbServer.go @@ -0,0 +1,109 @@ +package dbserver + +import ( + "errors" + "fmt" + "log" + "net" + "net/http" + "os" + "strconv" + + "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/diagnose" + "github.com/docker/libnetwork/networkdb" + "github.com/docker/libnetwork/test/networkDb/dummyclient" +) + +var nDB *networkdb.NetworkDB +var server diagnose.Server +var ipAddr string + +var testerPaths2Func = map[string]diagnose.HTTPHandlerFunc{ + "/myip": ipaddress, +} + +func ipaddress(ctx interface{}, w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "%s\n", ipAddr) +} + +// Server starts the server +func Server(args []string) { + logrus.Infof("[SERVER] Starting with arguments %v", args) + if len(args) < 1 { + log.Fatal("Port number is a mandatory argument, aborting...") + } + port, _ := strconv.Atoi(args[0]) + var localNodeName string + var ok bool + if localNodeName, ok = os.LookupEnv("TASK_ID"); !ok { + log.Fatal("TASK_ID environment variable not set, aborting...") + } + logrus.Infof("[SERVER] Starting node %s on port %d", localNodeName, port) + + ip, err := getIPInterface("eth0") + if err != nil { + logrus.Errorf("%s There was a problem with the IP %s\n", localNodeName, err) + return + } + ipAddr = ip + logrus.Infof("%s uses IP %s\n", localNodeName, ipAddr) + + server = diagnose.Server{} + server.Init() + conf := networkdb.DefaultConfig() + conf.NodeName = localNodeName + conf.AdvertiseAddr = ipAddr + conf.BindAddr = ipAddr + nDB, err = networkdb.New(conf) + if err != nil { + logrus.Infof("%s error in the DB init %s\n", localNodeName, err) + return + } + + // Register network db handlers + server.RegisterHandler(nDB, networkdb.NetDbPaths2Func) + server.RegisterHandler(nil, testerPaths2Func) + server.RegisterHandler(nDB, dummyclient.DummyClientPaths2Func) + server.EnableDebug("", port) +} + +func getIPInterface(name string) (string, error) { + ifaces, err := net.Interfaces() + if err != nil { + return "", err + } + for _, iface := range ifaces { + if iface.Name != name { + continue // not the name specified + } + + if iface.Flags&net.FlagUp == 0 { + return "", errors.New("Interfaces is down") + } + + addrs, err := iface.Addrs() + if err != nil { + return "", err + } + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + if ip == nil || ip.IsLoopback() { + continue + } + ip = ip.To4() + if ip == nil { + continue + } + return ip.String(), nil + } + return "", errors.New("Interfaces does not have a valid IPv4") + } + return "", errors.New("Interface not found") +} diff --git a/test/networkDb/dummyclient/dummyClient.go b/test/networkDb/dummyclient/dummyClient.go new file mode 100644 index 00000000..510042a0 --- /dev/null +++ b/test/networkDb/dummyclient/dummyClient.go @@ -0,0 +1,118 @@ +package dummyclient + +import ( + "fmt" + "log" + "net/http" + + "github.com/Sirupsen/logrus" + events "github.com/docker/go-events" + "github.com/docker/libnetwork/diagnose" + "github.com/docker/libnetwork/networkdb" +) + +// DummyClientPaths2Func exported paths for the client +var DummyClientPaths2Func = map[string]diagnose.HTTPHandlerFunc{ + "/watchtable": watchTable, + "/watchedtableentries": watchTableEntries, +} + +const ( + missingParameter = "missing parameter" +) + +type tableHandler struct { + cancelWatch func() + entries map[string]string +} + +var clientWatchTable = map[string]tableHandler{} + +func watchTable(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["tname"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path)) + return + } + + tableName := r.Form["tname"][0] + if _, ok := clientWatchTable[tableName]; ok { + fmt.Fprintf(w, "OK\n") + return + } + + nDB, ok := ctx.(*networkdb.NetworkDB) + if ok { + ch, cancel := nDB.Watch(tableName, "", "") + clientWatchTable[tableName] = tableHandler{cancelWatch: cancel, entries: make(map[string]string)} + go handleTableEvents(tableName, ch) + + fmt.Fprintf(w, "OK\n") + } +} + +func watchTableEntries(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["tname"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path)) + return + } + + tableName := r.Form["tname"][0] + table, ok := clientWatchTable[tableName] + if !ok { + fmt.Fprintf(w, "Table %s not watched\n", tableName) + return + } + + fmt.Fprintf(w, "total elements: %d\n", len(table.entries)) + i := 0 + for k, v := range table.entries { + fmt.Fprintf(w, "%d) k:`%s` -> v:`%s`\n", i, k, v) + i++ + } +} + +func handleTableEvents(tableName string, ch *events.Channel) { + var ( + // nid string + eid string + value []byte + isAdd bool + ) + + logrus.Infof("Started watching table:%s", tableName) + for { + select { + case <-ch.Done(): + logrus.Infof("End watching %s", tableName) + return + + case evt := <-ch.C: + logrus.Infof("Recevied new event on:%s", tableName) + switch event := evt.(type) { + case networkdb.CreateEvent: + // nid = event.NetworkID + eid = event.Key + value = event.Value + isAdd = true + case networkdb.DeleteEvent: + // nid = event.NetworkID + eid = event.Key + value = event.Value + isAdd = false + default: + log.Fatalf("Unexpected table event = %#v", event) + } + if isAdd { + // logrus.Infof("Add %s %s", tableName, eid) + clientWatchTable[tableName].entries[eid] = string(value) + } else { + // logrus.Infof("Del %s %s", tableName, eid) + delete(clientWatchTable[tableName].entries, eid) + } + } + } +} diff --git a/test/networkDb/testMain.go b/test/networkDb/testMain.go new file mode 100644 index 00000000..4ec3064f --- /dev/null +++ b/test/networkDb/testMain.go @@ -0,0 +1,24 @@ +package main + +import ( + "log" + "os" + + "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/test/networkDb/dbclient" + "github.com/docker/libnetwork/test/networkDb/dbserver" +) + +func main() { + logrus.Infof("Starting the image with these args: %v", os.Args) + if len(os.Args) < 1 { + log.Fatal("You need at least 1 argument [client/server]") + } + + switch os.Args[1] { + case "server": + dbserver.Server(os.Args[2:]) + case "client": + dbclient.Client(os.Args[2:]) + } +}