From 0a9aaf80ce2d3c5a65e710f4120b806791afc33b Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Thu, 27 Jul 2017 11:43:13 -0700 Subject: [PATCH] Funnel peerAdd and peerDelete in a channel Remove the need for the wait group and avoid new locks Added utility to print the method name and the caller name Signed-off-by: Flavio Crisciani --- common/caller.go | 29 +++++++++++ common/caller_test.go | 49 ++++++++++++++++++ drivers/overlay/joinleave.go | 5 +- drivers/overlay/ov_network.go | 5 +- drivers/overlay/ov_serf.go | 11 ++-- drivers/overlay/overlay.go | 20 ++++++-- drivers/overlay/peerdb.go | 96 +++++++++++++++++++++++++++-------- 7 files changed, 175 insertions(+), 40 deletions(-) create mode 100644 common/caller.go create mode 100644 common/caller_test.go diff --git a/common/caller.go b/common/caller.go new file mode 100644 index 00000000..0dec3bc0 --- /dev/null +++ b/common/caller.go @@ -0,0 +1,29 @@ +package common + +import ( + "runtime" + "strings" +) + +func callerInfo(i int) string { + ptr, _, _, ok := runtime.Caller(i) + fName := "unknown" + if ok { + f := runtime.FuncForPC(ptr) + if f != nil { + // f.Name() is like: github.com/docker/libnetwork/common.MethodName + tmp := strings.Split(f.Name(), ".") + if len(tmp) > 0 { + fName = tmp[len(tmp)-1] + } + } + } + + return fName +} + +// CallerName returns the name of the function at the specified level +// level == 0 means current method name +func CallerName(level int) string { + return callerInfo(2 + level) +} diff --git a/common/caller_test.go b/common/caller_test.go new file mode 100644 index 00000000..babfbb7b --- /dev/null +++ b/common/caller_test.go @@ -0,0 +1,49 @@ +package common + +import "testing" + +func fun1() string { + return CallerName(0) +} + +func fun2() string { + return CallerName(1) +} + +func fun3() string { + return fun4() +} + +func fun4() string { + return CallerName(0) +} + +func fun5() string { + return fun6() +} + +func fun6() string { + return CallerName(1) +} + +func TestCaller(t *testing.T) { + funName := fun1() + if funName != "fun1" { + t.Fatalf("error on fun1 caller %s", funName) + } + + funName = fun2() + if funName != "TestCaller" { + t.Fatalf("error on fun2 caller %s", funName) + } + + funName = fun3() + if funName != "fun4" { + t.Fatalf("error on fun2 caller %s", funName) + } + + funName = fun5() + if funName != "fun5" { + t.Fatalf("error on fun5 caller %s", funName) + } +} diff --git a/drivers/overlay/joinleave.go b/drivers/overlay/joinleave.go index cdbb4282..31c311f4 100644 --- a/drivers/overlay/joinleave.go +++ b/drivers/overlay/joinleave.go @@ -120,8 +120,7 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo, } } - d.peerDbAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, - net.ParseIP(d.advertiseAddress), true) + d.peerAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true) if err := d.checkEncryption(nid, nil, n.vxlanID(s), true, true); err != nil { logrus.Warn(err) @@ -205,7 +204,7 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri return } - d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false) + d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false, false) } // Leave method is invoked when a Sandbox detaches from an endpoint. diff --git a/drivers/overlay/ov_network.go b/drivers/overlay/ov_network.go index 01f6287b..01b53ac7 100644 --- a/drivers/overlay/ov_network.go +++ b/drivers/overlay/ov_network.go @@ -765,10 +765,7 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) { logrus.Errorf("could not resolve peer %q: %v", ip, err) continue } - - if err := n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss); err != nil { - logrus.Errorf("could not add neighbor entry for missed peer %q: %v", ip, err) - } + n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss, false) } else { // If the gc_thresh values are lower kernel might knock off the neighor entries. // When we get a L3 miss check if its a valid peer and reprogram the neighbor diff --git a/drivers/overlay/ov_serf.go b/drivers/overlay/ov_serf.go index 9002bce6..20954ef2 100644 --- a/drivers/overlay/ov_serf.go +++ b/drivers/overlay/ov_serf.go @@ -120,15 +120,10 @@ func (d *driver) processEvent(u serf.UserEvent) { switch action { case "join": - if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, - net.ParseIP(vtepStr), true, false, false); err != nil { - logrus.Errorf("Peer add failed in the driver: %v\n", err) - } + d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), + true, false, false, false) case "leave": - if err := d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, - net.ParseIP(vtepStr), true); err != nil { - logrus.Errorf("Peer delete failed in the driver: %v\n", err) - } + d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), true) } } diff --git a/drivers/overlay/overlay.go b/drivers/overlay/overlay.go index 8d19b2e1..11eda678 100644 --- a/drivers/overlay/overlay.go +++ b/drivers/overlay/overlay.go @@ -3,6 +3,7 @@ package overlay //go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto import ( + "context" "fmt" "net" "sync" @@ -50,6 +51,8 @@ type driver struct { joinOnce sync.Once localJoinOnce sync.Once keys []*key + peerOpCh chan *peerOperation + peerOpCancel context.CancelFunc sync.Mutex } @@ -64,10 +67,16 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error { peerDb: peerNetworkMap{ mp: map[string]*peerMap{}, }, - secMap: &encrMap{nodes: map[string][]*spi{}}, - config: config, + secMap: &encrMap{nodes: map[string][]*spi{}}, + config: config, + peerOpCh: make(chan *peerOperation), } + // Launch the go routine for processing peer operations + ctx, cancel := context.WithCancel(context.Background()) + d.peerOpCancel = cancel + go d.peerOpRoutine(ctx, d.peerOpCh) + if data, ok := config[netlabel.GlobalKVClient]; ok { var err error dsc, ok := data.(discoverapi.DatastoreConfigData) @@ -161,7 +170,7 @@ func (d *driver) restoreEndpoints() error { } n.incEndpointCount() - d.peerDbAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true) + d.peerAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true) } return nil } @@ -170,6 +179,11 @@ func (d *driver) restoreEndpoints() error { func Fini(drv driverapi.Driver) { d := drv.(*driver) + // Notify the peer go routine to return + if d.peerOpCancel != nil { + d.peerOpCancel() + } + if d.exitCh != nil { waitCh := make(chan struct{}) diff --git a/drivers/overlay/peerdb.go b/drivers/overlay/peerdb.go index 21cd1fbe..e9f249cf 100644 --- a/drivers/overlay/peerdb.go +++ b/drivers/overlay/peerdb.go @@ -1,12 +1,14 @@ package overlay import ( + "context" "fmt" "net" "sync" "syscall" "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/common" ) const ovPeerTable = "overlay_peer_table" @@ -59,8 +61,6 @@ func (pKey *peerKey) Scan(state fmt.ScanState, verb rune) error { return nil } -var peerDbWg sync.WaitGroup - func (d *driver) peerDbWalk(f func(string, *peerKey, *peerEntry) bool) error { d.peerDb.Lock() nids := []string{} @@ -141,8 +141,6 @@ func (d *driver) peerDbSearch(nid string, peerIP net.IP) (net.HardwareAddr, net. func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, isLocal bool) { - peerDbWg.Wait() - d.peerDb.Lock() pMap, ok := d.peerDb.mp[nid] if !ok { @@ -173,7 +171,6 @@ func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP) peerEntry { - peerDbWg.Wait() d.peerDb.Lock() pMap, ok := d.peerDb.mp[nid] @@ -215,9 +212,6 @@ func (d *driver) peerDbUpdateSandbox(nid string) { } d.peerDb.Unlock() - peerDbWg.Add(1) - - var peerOps []func() pMap.Lock() for pKeyStr, pEntry := range pMap.mp { var pKey peerKey @@ -233,28 +227,67 @@ func (d *driver) peerDbUpdateSandbox(nid string) { // pointing to the same memory location for every iteration. Make // a copy of pEntry before capturing it in the following closure. entry := pEntry - op := func() { - if err := d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask, - pKey.peerMac, entry.vtep, - false, false, false); err != nil { - logrus.Errorf("peerdbupdate in sandbox failed for ip %s and mac %s: %v", - pKey.peerIP, pKey.peerMac, err) - } - } - peerOps = append(peerOps, op) + d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask, pKey.peerMac, entry.vtep, false, false, false, false) } pMap.Unlock() +} - for _, op := range peerOps { - op() +type peerOperation struct { + isAdd bool + networkID string + endpointID string + peerIP net.IP + peerIPMask net.IPMask + peerMac net.HardwareAddr + vtepIP net.IP + updateDB bool + l2Miss bool + l3Miss bool + localPeer bool + callerName string +} + +func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) { + var err error + for { + select { + case <-ctx.Done(): + return + case op := <-ch: + if op.isAdd { + err = d.peerAddOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.updateDB, op.l2Miss, op.l3Miss, op.localPeer) + } else { + err = d.peerDeleteOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.localPeer) + } + if err != nil { + logrus.Warnf("Peer operation failed:%s op:%v", err, op) + } + } } - - peerDbWg.Done() } func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, - peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss bool) error { + peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, localPeer bool) { + callerName := common.CallerName(1) + d.peerOpCh <- &peerOperation{ + isAdd: true, + networkID: nid, + endpointID: eid, + peerIP: peerIP, + peerIPMask: peerIPMask, + peerMac: peerMac, + vtepIP: vtep, + updateDB: updateDb, + l2Miss: l2Miss, + l3Miss: l3Miss, + localPeer: localPeer, + callerName: callerName, + } +} + +func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, + peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, updateOnlyDB bool) error { if err := validateID(nid, eid); err != nil { return err @@ -262,6 +295,9 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, if updateDb { d.peerDbAdd(nid, eid, peerIP, peerIPMask, peerMac, vtep, false) + if updateOnlyDB { + return nil + } } n := d.network(nid) @@ -311,6 +347,22 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, } func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, + peerMac net.HardwareAddr, vtep net.IP, updateDb bool) { + callerName := common.CallerName(1) + d.peerOpCh <- &peerOperation{ + isAdd: false, + networkID: nid, + endpointID: eid, + peerIP: peerIP, + peerIPMask: peerIPMask, + peerMac: peerMac, + vtepIP: vtep, + updateDB: updateDb, + callerName: callerName, + } +} + +func (d *driver) peerDeleteOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error { if err := validateID(nid, eid); err != nil {