optimize the rebroadcast for failure case

Before when a node was failing, all the nodes would bump the lamport time of all their
entries. This means that if a node flap, there will be a storm of update of all the entries.
This commit on the base of the previous logic guarantees that only the node that joins back
will readvertise its own entries, the other nodes won't need to advertise again.

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
Flavio Crisciani 2017-07-17 08:36:43 -07:00
parent 4cf74e8735
commit 26db0b9fc9
No known key found for this signature in database
GPG Key ID: 28CAFCE754CF3A48
3 changed files with 8 additions and 49 deletions

View File

@ -290,13 +290,6 @@ func (nDB *NetworkDB) reconnectNode() {
return
}
// Update all the local table state to a new time to
// force update on the node we are trying to rejoin, just in
// case that node has these in deleting state still. This is
// facilitate fast convergence after recovering from a gossip
// failure.
nDB.updateLocalTableTime()
logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
nDB.bulkSync([]string{node.Name}, true)
}

View File

@ -45,9 +45,12 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
var failed bool
logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
e.broadcastNodeEvent(mn.Addr, opDelete)
e.nDB.deleteNodeTableEntries(mn.Name)
e.nDB.deleteNetworkEntriesForNode(mn.Name)
// The node left or failed, delete all the entries created by it.
// If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted
// If the node instead left because was going down, then it makes sense to just delete all its state
e.nDB.Lock()
e.nDB.deleteNetworkEntriesForNode(mn.Name)
e.nDB.deleteNodeTableEntries(mn.Name)
if n, ok := e.nDB.nodes[mn.Name]; ok {
delete(e.nDB.nodes, mn.Name)
@ -61,7 +64,6 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
if failed {
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
}
}
func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {

View File

@ -418,7 +418,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
}
func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
nDB.Lock()
for nid, nodes := range nDB.networkNodes {
updatedNodes := make([]string, 0, len(nodes))
for _, node := range nodes {
@ -433,7 +432,6 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
}
delete(nDB.networks, deletedNode)
nDB.Unlock()
}
// deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes:
@ -504,7 +502,6 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
}
func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nDB.Lock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
oldEntry := v.(*entry)
if oldEntry.node != node {
@ -516,21 +513,12 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nid := params[1]
key := params[2]
entry := &entry{
ltime: oldEntry.ltime,
node: node,
value: oldEntry.value,
deleting: true,
reapTime: reapInterval,
}
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
return false
})
nDB.Unlock()
}
// WalkTable walks a single table in NetworkDB and invokes the passed
@ -691,27 +679,3 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
n.ltime = ltime
}
}
func (nDB *NetworkDB) updateLocalTableTime() {
nDB.Lock()
defer nDB.Unlock()
ltime := nDB.tableClock.Increment()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
entry := v.(*entry)
if entry.node != nDB.config.NodeName {
return false
}
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]
entry.ltime = ltime
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
return false
})
}