Funnel peerAdd and peerDelete in a channel

Remove the need for the wait group and avoid new
locks
Added utility to print the method name and the caller name

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
Flavio Crisciani 2017-07-27 11:43:13 -07:00
parent 8d6355b5c2
commit 0a9aaf80ce
No known key found for this signature in database
GPG Key ID: 28CAFCE754CF3A48
7 changed files with 175 additions and 40 deletions

29
common/caller.go Normal file
View File

@ -0,0 +1,29 @@
package common
import (
"runtime"
"strings"
)
func callerInfo(i int) string {
ptr, _, _, ok := runtime.Caller(i)
fName := "unknown"
if ok {
f := runtime.FuncForPC(ptr)
if f != nil {
// f.Name() is like: github.com/docker/libnetwork/common.MethodName
tmp := strings.Split(f.Name(), ".")
if len(tmp) > 0 {
fName = tmp[len(tmp)-1]
}
}
}
return fName
}
// CallerName returns the name of the function at the specified level
// level == 0 means current method name
func CallerName(level int) string {
return callerInfo(2 + level)
}

49
common/caller_test.go Normal file
View File

@ -0,0 +1,49 @@
package common
import "testing"
func fun1() string {
return CallerName(0)
}
func fun2() string {
return CallerName(1)
}
func fun3() string {
return fun4()
}
func fun4() string {
return CallerName(0)
}
func fun5() string {
return fun6()
}
func fun6() string {
return CallerName(1)
}
func TestCaller(t *testing.T) {
funName := fun1()
if funName != "fun1" {
t.Fatalf("error on fun1 caller %s", funName)
}
funName = fun2()
if funName != "TestCaller" {
t.Fatalf("error on fun2 caller %s", funName)
}
funName = fun3()
if funName != "fun4" {
t.Fatalf("error on fun2 caller %s", funName)
}
funName = fun5()
if funName != "fun5" {
t.Fatalf("error on fun5 caller %s", funName)
}
}

View File

@ -120,8 +120,7 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
}
}
d.peerDbAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac,
net.ParseIP(d.advertiseAddress), true)
d.peerAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true)
if err := d.checkEncryption(nid, nil, n.vxlanID(s), true, true); err != nil {
logrus.Warn(err)
@ -205,7 +204,7 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
return
}
d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false)
d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false, false)
}
// Leave method is invoked when a Sandbox detaches from an endpoint.

View File

@ -765,10 +765,7 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
logrus.Errorf("could not resolve peer %q: %v", ip, err)
continue
}
if err := n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss); err != nil {
logrus.Errorf("could not add neighbor entry for missed peer %q: %v", ip, err)
}
n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss, false)
} else {
// If the gc_thresh values are lower kernel might knock off the neighor entries.
// When we get a L3 miss check if its a valid peer and reprogram the neighbor

View File

@ -120,15 +120,10 @@ func (d *driver) processEvent(u serf.UserEvent) {
switch action {
case "join":
if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
net.ParseIP(vtepStr), true, false, false); err != nil {
logrus.Errorf("Peer add failed in the driver: %v\n", err)
}
d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr),
true, false, false, false)
case "leave":
if err := d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
net.ParseIP(vtepStr), true); err != nil {
logrus.Errorf("Peer delete failed in the driver: %v\n", err)
}
d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), true)
}
}

View File

@ -3,6 +3,7 @@ package overlay
//go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto
import (
"context"
"fmt"
"net"
"sync"
@ -50,6 +51,8 @@ type driver struct {
joinOnce sync.Once
localJoinOnce sync.Once
keys []*key
peerOpCh chan *peerOperation
peerOpCancel context.CancelFunc
sync.Mutex
}
@ -64,10 +67,16 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
peerDb: peerNetworkMap{
mp: map[string]*peerMap{},
},
secMap: &encrMap{nodes: map[string][]*spi{}},
config: config,
secMap: &encrMap{nodes: map[string][]*spi{}},
config: config,
peerOpCh: make(chan *peerOperation),
}
// Launch the go routine for processing peer operations
ctx, cancel := context.WithCancel(context.Background())
d.peerOpCancel = cancel
go d.peerOpRoutine(ctx, d.peerOpCh)
if data, ok := config[netlabel.GlobalKVClient]; ok {
var err error
dsc, ok := data.(discoverapi.DatastoreConfigData)
@ -161,7 +170,7 @@ func (d *driver) restoreEndpoints() error {
}
n.incEndpointCount()
d.peerDbAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true)
d.peerAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true)
}
return nil
}
@ -170,6 +179,11 @@ func (d *driver) restoreEndpoints() error {
func Fini(drv driverapi.Driver) {
d := drv.(*driver)
// Notify the peer go routine to return
if d.peerOpCancel != nil {
d.peerOpCancel()
}
if d.exitCh != nil {
waitCh := make(chan struct{})

View File

@ -1,12 +1,14 @@
package overlay
import (
"context"
"fmt"
"net"
"sync"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/common"
)
const ovPeerTable = "overlay_peer_table"
@ -59,8 +61,6 @@ func (pKey *peerKey) Scan(state fmt.ScanState, verb rune) error {
return nil
}
var peerDbWg sync.WaitGroup
func (d *driver) peerDbWalk(f func(string, *peerKey, *peerEntry) bool) error {
d.peerDb.Lock()
nids := []string{}
@ -141,8 +141,6 @@ func (d *driver) peerDbSearch(nid string, peerIP net.IP) (net.HardwareAddr, net.
func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP, isLocal bool) {
peerDbWg.Wait()
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
if !ok {
@ -173,7 +171,6 @@ func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask
func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP) peerEntry {
peerDbWg.Wait()
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
@ -215,9 +212,6 @@ func (d *driver) peerDbUpdateSandbox(nid string) {
}
d.peerDb.Unlock()
peerDbWg.Add(1)
var peerOps []func()
pMap.Lock()
for pKeyStr, pEntry := range pMap.mp {
var pKey peerKey
@ -233,28 +227,67 @@ func (d *driver) peerDbUpdateSandbox(nid string) {
// pointing to the same memory location for every iteration. Make
// a copy of pEntry before capturing it in the following closure.
entry := pEntry
op := func() {
if err := d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask,
pKey.peerMac, entry.vtep,
false, false, false); err != nil {
logrus.Errorf("peerdbupdate in sandbox failed for ip %s and mac %s: %v",
pKey.peerIP, pKey.peerMac, err)
}
}
peerOps = append(peerOps, op)
d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask, pKey.peerMac, entry.vtep, false, false, false, false)
}
pMap.Unlock()
}
for _, op := range peerOps {
op()
type peerOperation struct {
isAdd bool
networkID string
endpointID string
peerIP net.IP
peerIPMask net.IPMask
peerMac net.HardwareAddr
vtepIP net.IP
updateDB bool
l2Miss bool
l3Miss bool
localPeer bool
callerName string
}
func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) {
var err error
for {
select {
case <-ctx.Done():
return
case op := <-ch:
if op.isAdd {
err = d.peerAddOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.updateDB, op.l2Miss, op.l3Miss, op.localPeer)
} else {
err = d.peerDeleteOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.localPeer)
}
if err != nil {
logrus.Warnf("Peer operation failed:%s op:%v", err, op)
}
}
}
peerDbWg.Done()
}
func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss bool) error {
peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, localPeer bool) {
callerName := common.CallerName(1)
d.peerOpCh <- &peerOperation{
isAdd: true,
networkID: nid,
endpointID: eid,
peerIP: peerIP,
peerIPMask: peerIPMask,
peerMac: peerMac,
vtepIP: vtep,
updateDB: updateDb,
l2Miss: l2Miss,
l3Miss: l3Miss,
localPeer: localPeer,
callerName: callerName,
}
}
func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, updateOnlyDB bool) error {
if err := validateID(nid, eid); err != nil {
return err
@ -262,6 +295,9 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
if updateDb {
d.peerDbAdd(nid, eid, peerIP, peerIPMask, peerMac, vtep, false)
if updateOnlyDB {
return nil
}
}
n := d.network(nid)
@ -311,6 +347,22 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
}
func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP, updateDb bool) {
callerName := common.CallerName(1)
d.peerOpCh <- &peerOperation{
isAdd: false,
networkID: nid,
endpointID: eid,
peerIP: peerIP,
peerIPMask: peerIPMask,
peerMac: peerMac,
vtepIP: vtep,
updateDB: updateDb,
callerName: callerName,
}
}
func (d *driver) peerDeleteOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
if err := validateID(nid, eid); err != nil {