Service discovery race on serviceBindings delete. Bug on IP reuse (#1808)

* Correct SetMatrix documentation

The SetMatrix is a generic data structure, so the description
should not be tight to any specific use

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

* Service Discovery reuse name and serviceBindings deletion

- Added logic to handle name reuse from different services
- Moved the deletion from the serviceBindings map at the end
  of the rmServiceBindings body to avoid race with new services

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

* Avoid race on network cleanup

Use the locker to avoid the race between the network
deletion and new endpoints being created

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

* CleanupServiceBindings to clean the SD records

Allow the cleanupServicebindings to take care of the service discovery
cleanup. Also avoid to trigger the cleanup for each endpoint from an SD
point of view
LB and SD will be separated in the future

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

* Addressed comments

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

* NetworkDB deleteEntry has to happen

If there is an error locally guarantee that the delete entry
on network DB is still honored

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
Flavio Crisciani 2017-06-18 05:25:58 -07:00 committed by Madhu Venugopal
parent bfc5fe339c
commit 6426d1e66f
7 changed files with 314 additions and 143 deletions

View File

@ -648,13 +648,13 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
TaskAliases: ep.myAliases,
EndpointIP: ep.Iface().Address().IP.String(),
})
if err != nil {
return err
}
if agent != nil {
if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
logrus.Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
return err
}
}
@ -686,6 +686,13 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
name = ep.MyAliases()[0]
}
if agent != nil {
// First delete from networkDB then locally
if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
}
}
if ep.Iface().Address() != nil {
if ep.svcID != "" {
// This is a task part of a service
@ -693,7 +700,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil {
return err
}
} else {
@ -704,12 +711,6 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
}
}
if agent != nil {
if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
return err
}
}
logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
return nil
@ -900,7 +901,7 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil {
logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
return
}

View File

@ -10,24 +10,26 @@ import (
type SetMatrix interface {
// Get returns the members of the set for a specific key as a slice.
Get(key string) ([]interface{}, bool)
// Contains is used to verify is an element is in a set for a specific key
// Contains is used to verify if an element is in a set for a specific key
// returns true if the element is in the set
// returns true if there is a set for the key
Contains(key string, value interface{}) (bool, bool)
// Insert inserts the mapping between the IP and the endpoint identifier
// returns true if the mapping was not present, false otherwise
// returns also the number of endpoints associated to the IP
// Insert inserts the value in the set of a key
// returns true if the value is inserted (was not already in the set), false otherwise
// returns also the length of the set for the key
Insert(key string, value interface{}) (bool, int)
// Remove removes the mapping between the IP and the endpoint identifier
// returns true if the mapping was deleted, false otherwise
// returns also the number of endpoints associated to the IP
// Remove removes the value in the set for a specific key
// returns true if the value is deleted, false otherwise
// returns also the length of the set for the key
Remove(key string, value interface{}) (bool, int)
// Cardinality returns the number of elements in the set of a specific key
// returns false if the key is not in the map
// Cardinality returns the number of elements in the set for a key
// returns false if the set is not present
Cardinality(key string) (int, bool)
// String returns the string version of the set, empty otherwise
// returns false if the key is not in the map
// returns false if the set is not present
String(key string) (string, bool)
// Returns all the keys in the map
Keys() []string
}
type setMatrix struct {
@ -121,3 +123,13 @@ func (s *setMatrix) String(key string) (string, bool) {
}
return set.String(), ok
}
func (s *setMatrix) Keys() []string {
s.Lock()
defer s.Unlock()
keys := make([]string, 0, len(s.matrix))
for k := range s.matrix {
keys = append(keys, k)
}
return keys
}

View File

@ -13,6 +13,7 @@ import (
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/ipamapi"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/netutils"
"github.com/docker/libnetwork/testutils"
"github.com/docker/libnetwork/types"
)
@ -382,8 +383,8 @@ func TestSRVServiceQuery(t *testing.T) {
}
sr := svcInfo{
svcMap: make(map[string][]net.IP),
svcIPv6Map: make(map[string][]net.IP),
svcMap: common.NewSetMatrix(),
svcIPv6Map: common.NewSetMatrix(),
ipMap: common.NewSetMatrix(),
service: make(map[string][]servicePorts),
}
@ -440,6 +441,119 @@ func TestSRVServiceQuery(t *testing.T) {
}
}
func TestServiceVIPReuse(t *testing.T) {
c, err := New()
if err != nil {
t.Fatal(err)
}
defer c.Stop()
n, err := c.NewNetwork("bridge", "net1", "", nil)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := n.Delete(); err != nil {
t.Fatal(err)
}
}()
ep, err := n.CreateEndpoint("testep")
if err != nil {
t.Fatal(err)
}
sb, err := c.NewSandbox("c1")
if err != nil {
t.Fatal(err)
}
defer func() {
if err := sb.Delete(); err != nil {
t.Fatal(err)
}
}()
err = ep.Join(sb)
if err != nil {
t.Fatal(err)
}
// Add 2 services with same name but different service ID to share the same VIP
n.(*network).addSvcRecords("ep1", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
n.(*network).addSvcRecords("ep2", "service_test", "serviceID2", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
ipToResolve := netutils.ReverseIP("192.168.0.1")
ipList, _ := n.(*network).ResolveName("service_test", types.IPv4)
if len(ipList) == 0 {
t.Fatal("There must be the VIP")
}
if len(ipList) != 1 {
t.Fatal("It must return only 1 VIP")
}
if ipList[0].String() != "192.168.0.1" {
t.Fatal("The service VIP is 192.168.0.1")
}
name := n.(*network).ResolveIP(ipToResolve)
if name == "" {
t.Fatal("It must return a name")
}
if name != "service_test.net1" {
t.Fatalf("It must return the service_test.net1 != %s", name)
}
// Delete service record for one of the services, the IP should remain because one service is still associated with it
n.(*network).deleteSvcRecords("ep1", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
ipList, _ = n.(*network).ResolveName("service_test", types.IPv4)
if len(ipList) == 0 {
t.Fatal("There must be the VIP")
}
if len(ipList) != 1 {
t.Fatal("It must return only 1 VIP")
}
if ipList[0].String() != "192.168.0.1" {
t.Fatal("The service VIP is 192.168.0.1")
}
name = n.(*network).ResolveIP(ipToResolve)
if name == "" {
t.Fatal("It must return a name")
}
if name != "service_test.net1" {
t.Fatalf("It must return the service_test.net1 != %s", name)
}
// Delete again the service using the previous service ID, nothing should happen
n.(*network).deleteSvcRecords("ep2", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
ipList, _ = n.(*network).ResolveName("service_test", types.IPv4)
if len(ipList) == 0 {
t.Fatal("There must be the VIP")
}
if len(ipList) != 1 {
t.Fatal("It must return only 1 VIP")
}
if ipList[0].String() != "192.168.0.1" {
t.Fatal("The service VIP is 192.168.0.1")
}
name = n.(*network).ResolveIP(ipToResolve)
if name == "" {
t.Fatal("It must return a name")
}
if name != "service_test.net1" {
t.Fatalf("It must return the service_test.net1 != %s", name)
}
// Delete now using the second service ID, now all the entries should be gone
n.(*network).deleteSvcRecords("ep2", "service_test", "serviceID2", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
ipList, _ = n.(*network).ResolveName("service_test", types.IPv4)
if len(ipList) != 0 {
t.Fatal("All the VIPs should be gone now")
}
name = n.(*network).ResolveIP(ipToResolve)
if name != "" {
t.Fatalf("It must return empty no more services associated, instead:%s", name)
}
}
func TestIpamReleaseOnNetDriverFailures(t *testing.T) {
if !testutils.IsRunningInContainer() {
defer testutils.SetupTestOSContext(t)()

View File

@ -92,12 +92,20 @@ type EndpointWalker func(ep Endpoint) bool
// Its an indication to defer PTR queries also to that external server.
type ipInfo struct {
name string
serviceID string
extResolver bool
}
// svcMapEntry is the body of the element into the svcMap
// The ip is a string because the SetMatrix does not accept non hashable values
type svcMapEntry struct {
ip string
serviceID string
}
type svcInfo struct {
svcMap map[string][]net.IP
svcIPv6Map map[string][]net.IP
svcMap common.SetMatrix
svcIPv6Map common.SetMatrix
ipMap common.SetMatrix
service map[string][]servicePorts
}
@ -933,6 +941,9 @@ func (n *network) delete(force bool) error {
id := n.id
n.Unlock()
c.networkLocker.Lock(id)
defer c.networkLocker.Unlock(id)
n, err := c.getNetworkFromStore(id)
if err != nil {
return &UnknownNetworkError{name: name, id: id}
@ -991,12 +1002,6 @@ func (n *network) delete(force bool) error {
c.cleanupServiceBindings(n.ID())
// The network had been left, the service discovery can be cleaned up
c.Lock()
logrus.Debugf("network %s delete, clean svcRecords", n.id)
delete(c.svcRecords, n.id)
c.Unlock()
removeFromStore:
// deleteFromStore performs an atomic delete operation and the
// network.epCnt will help prevent any possible
@ -1070,6 +1075,9 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
ep := &endpoint{name: name, generic: make(map[string]interface{}), iface: &endpointInterface{}}
ep.id = stringid.GenerateRandomID()
n.ctrlr.networkLocker.Lock(n.id)
defer n.ctrlr.networkLocker.Unlock(n.id)
// Initialize ep.network with a possibly stale copy of n. We need this to get network from
// store. But once we get it from store we will have the most uptodate copy possibly.
ep.network = n
@ -1228,75 +1236,77 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool
ipv6 = iface.AddressIPv6().IP
}
serviceID := ep.svcID
if serviceID == "" {
serviceID = ep.ID()
}
if isAdd {
// If anonymous endpoint has an alias use the first alias
// for ip->name mapping. Not having the reverse mapping
// breaks some apps
if ep.isAnonymous() {
if len(myAliases) > 0 {
n.addSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord")
n.addSvcRecords(ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord")
}
} else {
n.addSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord")
n.addSvcRecords(ep.ID(), epName, serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord")
}
for _, alias := range myAliases {
n.addSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord")
n.addSvcRecords(ep.ID(), alias, serviceID, iface.Address().IP, ipv6, false, "updateSvcRecord")
}
} else {
if ep.isAnonymous() {
if len(myAliases) > 0 {
n.deleteSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord")
n.deleteSvcRecords(ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord")
}
} else {
n.deleteSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord")
n.deleteSvcRecords(ep.ID(), epName, serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord")
}
for _, alias := range myAliases {
n.deleteSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord")
n.deleteSvcRecords(ep.ID(), alias, serviceID, iface.Address().IP, ipv6, false, "updateSvcRecord")
}
}
}
}
func addIPToName(ipMap common.SetMatrix, name string, ip net.IP) {
func addIPToName(ipMap common.SetMatrix, name, serviceID string, ip net.IP) {
reverseIP := netutils.ReverseIP(ip.String())
ipMap.Insert(reverseIP, ipInfo{
name: name,
name: name,
serviceID: serviceID,
})
}
func addNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) {
ipList := svcMap[name]
for _, ip := range ipList {
if ip.Equal(epIP) {
return
}
}
svcMap[name] = append(svcMap[name], epIP)
func delIPToName(ipMap common.SetMatrix, name, serviceID string, ip net.IP) {
reverseIP := netutils.ReverseIP(ip.String())
ipMap.Remove(reverseIP, ipInfo{
name: name,
serviceID: serviceID,
})
}
func delNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) {
ipList := svcMap[name]
for i, ip := range ipList {
if ip.Equal(epIP) {
ipList = append(ipList[:i], ipList[i+1:]...)
break
}
}
svcMap[name] = ipList
if len(ipList) == 0 {
delete(svcMap, name)
}
func addNameToIP(svcMap common.SetMatrix, name, serviceID string, epIP net.IP) {
svcMap.Insert(name, svcMapEntry{
ip: epIP.String(),
serviceID: serviceID,
})
}
func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
func delNameToIP(svcMap common.SetMatrix, name, serviceID string, epIP net.IP) {
svcMap.Remove(name, svcMapEntry{
ip: epIP.String(),
serviceID: serviceID,
})
}
func (n *network) addSvcRecords(eID, name, serviceID string, epIP, epIPv6 net.IP, ipMapUpdate bool, method string) {
// Do not add service names for ingress network as this is a
// routing only network
if n.ingress {
return
}
logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method)
logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s sid:%s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method, serviceID)
c := n.getController()
c.Lock()
@ -1305,34 +1315,34 @@ func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ip
sr, ok := c.svcRecords[n.ID()]
if !ok {
sr = svcInfo{
svcMap: make(map[string][]net.IP),
svcIPv6Map: make(map[string][]net.IP),
svcMap: common.NewSetMatrix(),
svcIPv6Map: common.NewSetMatrix(),
ipMap: common.NewSetMatrix(),
}
c.svcRecords[n.ID()] = sr
}
if ipMapUpdate {
addIPToName(sr.ipMap, name, epIP)
addIPToName(sr.ipMap, name, serviceID, epIP)
if epIPv6 != nil {
addIPToName(sr.ipMap, name, epIPv6)
addIPToName(sr.ipMap, name, serviceID, epIPv6)
}
}
addNameToIP(sr.svcMap, name, epIP)
addNameToIP(sr.svcMap, name, serviceID, epIP)
if epIPv6 != nil {
addNameToIP(sr.svcIPv6Map, name, epIPv6)
addNameToIP(sr.svcIPv6Map, name, serviceID, epIPv6)
}
}
func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
func (n *network) deleteSvcRecords(eID, name, serviceID string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
// Do not delete service names from ingress network as this is a
// routing only network
if n.ingress {
return
}
logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method)
logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s sid:%s ", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method, serviceID)
c := n.getController()
c.Lock()
@ -1344,21 +1354,17 @@ func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP,
}
if ipMapUpdate {
sr.ipMap.Remove(netutils.ReverseIP(epIP.String()), ipInfo{
name: name,
})
delIPToName(sr.ipMap, name, serviceID, epIP)
if epIPv6 != nil {
sr.ipMap.Remove(netutils.ReverseIP(epIPv6.String()), ipInfo{
name: name,
})
delIPToName(sr.ipMap, name, serviceID, epIPv6)
}
}
delNameToIP(sr.svcMap, name, epIP)
delNameToIP(sr.svcMap, name, serviceID, epIP)
if epIPv6 != nil {
delNameToIP(sr.svcIPv6Map, name, epIPv6)
delNameToIP(sr.svcIPv6Map, name, serviceID, epIPv6)
}
}
@ -1376,19 +1382,31 @@ func (n *network) getSvcRecords(ep *endpoint) []etchosts.Record {
n.ctrlr.Lock()
defer n.ctrlr.Unlock()
sr, _ := n.ctrlr.svcRecords[n.id]
sr, ok := n.ctrlr.svcRecords[n.id]
if !ok || sr.svcMap == nil {
return nil
}
for h, ip := range sr.svcMap {
if strings.Split(h, ".")[0] == epName {
svcMapKeys := sr.svcMap.Keys()
// Loop on service names on this network
for _, k := range svcMapKeys {
if strings.Split(k, ".")[0] == epName {
continue
}
if len(ip) == 0 {
logrus.Warnf("Found empty list of IP addresses for service %s on network %s (%s)", h, n.name, n.id)
// Get all the IPs associated to this service
mapEntryList, ok := sr.svcMap.Get(k)
if !ok {
// The key got deleted
continue
}
if len(mapEntryList) == 0 {
logrus.Warnf("Found empty list of IP addresses for service %s on network %s (%s)", k, n.name, n.id)
continue
}
recs = append(recs, etchosts.Record{
Hosts: h,
IP: ip[0].String(),
Hosts: k,
IP: mapEntryList[0].(svcMapEntry).ip,
})
}
@ -1845,8 +1863,7 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) {
}
req = strings.TrimSuffix(req, ".")
var ip []net.IP
ip, ok = sr.svcMap[req]
ipSet, ok := sr.svcMap.Get(req)
if ipType == types.IPv6 {
// If the name resolved to v4 address then its a valid name in
@ -1856,13 +1873,20 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) {
if ok && n.enableIPv6 == false {
ipv6Miss = true
}
ip = sr.svcIPv6Map[req]
ipSet, ok = sr.svcIPv6Map.Get(req)
}
if ip != nil {
ipLocal := make([]net.IP, len(ip))
copy(ipLocal, ip)
return ipLocal, false
if ok && len(ipSet) > 0 {
// this map is to avoid IP duplicates, this can happen during a transition period where 2 services are using the same IP
noDup := make(map[string]bool)
var ipLocal []net.IP
for _, ip := range ipSet {
if _, dup := noDup[ip.(svcMapEntry).ip]; !dup {
noDup[ip.(svcMapEntry).ip] = true
ipLocal = append(ipLocal, net.ParseIP(ip.(svcMapEntry).ip))
}
}
return ipLocal, ok
}
return nil, ipv6Miss

View File

@ -85,14 +85,8 @@ type loadBalancer struct {
// Map of backend IPs backing this loadbalancer on this
// network. It is keyed with endpoint ID.
backEnds map[string]loadBalancerBackend
backEnds map[string]net.IP
// Back pointer to service to which the loadbalancer belongs.
service *service
}
type loadBalancerBackend struct {
ip net.IP
containerName string
taskAliases []string
}

View File

@ -15,29 +15,35 @@ func (c *controller) addEndpointNameResolution(svcName, svcID, nID, eID, contain
return err
}
logrus.Debugf("addEndpointNameResolution %s %s add_service:%t", eID, svcName, addService)
logrus.Debugf("addEndpointNameResolution %s %s add_service:%t sAliases:%v tAliases:%v", eID, svcName, addService, serviceAliases, taskAliases)
// Add container resolution mappings
c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method)
serviceID := svcID
if serviceID == "" {
// This is the case of a normal container not part of a service
serviceID = eID
}
// Add endpoint IP to special "tasks.svc_name" so that the applications have access to DNS RR.
n.(*network).addSvcRecords(eID, "tasks."+svcName, ip, nil, false, method)
n.(*network).addSvcRecords(eID, "tasks."+svcName, serviceID, ip, nil, false, method)
for _, alias := range serviceAliases {
n.(*network).addSvcRecords(eID, "tasks."+alias, ip, nil, false, method)
n.(*network).addSvcRecords(eID, "tasks."+alias, serviceID, ip, nil, false, method)
}
// Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
if len(vip) == 0 {
n.(*network).addSvcRecords(eID, svcName, ip, nil, false, method)
n.(*network).addSvcRecords(eID, svcName, serviceID, ip, nil, false, method)
for _, alias := range serviceAliases {
n.(*network).addSvcRecords(eID, alias, ip, nil, false, method)
n.(*network).addSvcRecords(eID, alias, serviceID, ip, nil, false, method)
}
}
if addService && len(vip) != 0 {
n.(*network).addSvcRecords(eID, svcName, vip, nil, false, method)
n.(*network).addSvcRecords(eID, svcName, serviceID, vip, nil, false, method)
for _, alias := range serviceAliases {
n.(*network).addSvcRecords(eID, alias, vip, nil, false, method)
n.(*network).addSvcRecords(eID, alias, serviceID, vip, nil, false, method)
}
}
@ -52,11 +58,11 @@ func (c *controller) addContainerNameResolution(nID, eID, containerName string,
logrus.Debugf("addContainerNameResolution %s %s", eID, containerName)
// Add resolution for container name
n.(*network).addSvcRecords(eID, containerName, ip, nil, true, method)
n.(*network).addSvcRecords(eID, containerName, eID, ip, nil, true, method)
// Add resolution for taskaliases
for _, alias := range taskAliases {
n.(*network).addSvcRecords(eID, alias, ip, nil, true, method)
n.(*network).addSvcRecords(eID, alias, eID, ip, nil, true, method)
}
return nil
@ -68,32 +74,38 @@ func (c *controller) deleteEndpointNameResolution(svcName, svcID, nID, eID, cont
return err
}
logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t", eID, svcName, rmService, multipleEntries)
logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t sAliases:%v tAliases:%v", eID, svcName, rmService, multipleEntries, serviceAliases, taskAliases)
// Delete container resolution mappings
c.delContainerNameResolution(nID, eID, containerName, taskAliases, ip, method)
serviceID := svcID
if serviceID == "" {
// This is the case of a normal container not part of a service
serviceID = eID
}
// Delete the special "tasks.svc_name" backend record.
if !multipleEntries {
n.(*network).deleteSvcRecords(eID, "tasks."+svcName, ip, nil, false, method)
n.(*network).deleteSvcRecords(eID, "tasks."+svcName, serviceID, ip, nil, false, method)
for _, alias := range serviceAliases {
n.(*network).deleteSvcRecords(eID, "tasks."+alias, ip, nil, false, method)
n.(*network).deleteSvcRecords(eID, "tasks."+alias, serviceID, ip, nil, false, method)
}
}
// If we are doing DNS RR delete the endpoint IP from DNS record right away.
if !multipleEntries && len(vip) == 0 {
n.(*network).deleteSvcRecords(eID, svcName, ip, nil, false, method)
n.(*network).deleteSvcRecords(eID, svcName, serviceID, ip, nil, false, method)
for _, alias := range serviceAliases {
n.(*network).deleteSvcRecords(eID, alias, ip, nil, false, method)
n.(*network).deleteSvcRecords(eID, alias, serviceID, ip, nil, false, method)
}
}
// Remove the DNS record for VIP only if we are removing the service
if rmService && len(vip) != 0 && !multipleEntries {
n.(*network).deleteSvcRecords(eID, svcName, vip, nil, false, method)
n.(*network).deleteSvcRecords(eID, svcName, serviceID, vip, nil, false, method)
for _, alias := range serviceAliases {
n.(*network).deleteSvcRecords(eID, alias, vip, nil, false, method)
n.(*network).deleteSvcRecords(eID, alias, serviceID, vip, nil, false, method)
}
}
@ -108,11 +120,11 @@ func (c *controller) delContainerNameResolution(nID, eID, containerName string,
logrus.Debugf("delContainerNameResolution %s %s", eID, containerName)
// Delete resolution for container name
n.(*network).deleteSvcRecords(eID, containerName, ip, nil, true, method)
n.(*network).deleteSvcRecords(eID, containerName, eID, ip, nil, true, method)
// Delete resolution for taskaliases
for _, alias := range taskAliases {
n.(*network).deleteSvcRecords(eID, alias, ip, nil, true, method)
n.(*network).deleteSvcRecords(eID, alias, eID, ip, nil, true, method)
}
return nil
@ -152,6 +164,7 @@ func (c *controller) getLBIndex(sid, nid string, ingressPorts []*PortConfig) int
func (c *controller) cleanupServiceBindings(cleanupNID string) {
var cleanupFuncs []func()
logrus.Debugf("cleanupServiceBindings for %s", cleanupNID)
c.Lock()
services := make([]*service, 0, len(c.serviceBindings))
for _, s := range c.serviceBindings {
@ -171,16 +184,27 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
continue
}
for eid, be := range lb.backEnds {
// The network is being deleted, erase all the associated service discovery records
// TODO(fcrisciani) separate the Load Balancer from the Service discovery, this operation
// can be done safely here, but the rmServiceBinding is still keeping consistency in the
// data structures that are tracking the endpoint to IP mapping.
c.Lock()
logrus.Debugf("cleanupServiceBindings erasing the svcRecords for %s", nid)
delete(c.svcRecords, nid)
c.Unlock()
for eid, ip := range lb.backEnds {
epID := eid
epIP := ip
service := s
loadBalancer := lb
networkID := nid
epID := eid
epIP := be.ip
cleanupFuncs = append(cleanupFuncs, func() {
if err := c.rmServiceBinding(service.name, service.id, networkID, epID, be.containerName, loadBalancer.vip,
service.ingressPorts, service.aliases, be.taskAliases, epIP, "cleanupServiceBindings"); err != nil {
// ContainerName and taskAliases are not available here, this is still fine because the Service discovery
// cleanup already happened before. The only thing that rmServiceBinding is still doing here a part from the Load
// Balancer bookeeping, is to keep consistent the mapping of endpoint to IP.
if err := c.rmServiceBinding(service.name, service.id, networkID, epID, "", loadBalancer.vip,
service.ingressPorts, service.aliases, []string{}, epIP, "cleanupServiceBindings", false); err != nil {
logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v",
service.id, networkID, epID, err)
}
@ -228,8 +252,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
}
s.Unlock()
}
logrus.Debugf("addServiceBinding from %s START for %s %s", method, svcName, eID)
logrus.Debugf("addServiceBinding from %s START for %s %s p:%p nid:%s skey:%v", method, svcName, eID, s, nID, skey)
defer s.Unlock()
lb, ok := s.loadBalancers[nID]
@ -242,7 +265,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
lb = &loadBalancer{
vip: vip,
fwMark: fwMarkCtr,
backEnds: make(map[string]loadBalancerBackend),
backEnds: make(map[string]net.IP),
service: s,
}
@ -253,9 +276,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
addService = true
}
lb.backEnds[eID] = loadBalancerBackend{ip: ip,
containerName: containerName,
taskAliases: taskAliases}
lb.backEnds[eID] = ip
ok, entries := s.assignIPToEndpoint(ip.String(), eID)
if !ok || entries > 1 {
@ -277,7 +298,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
return nil
}
func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string) error {
func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string, deleteSvcRecords bool) error {
var rmService bool
@ -294,7 +315,6 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
c.Lock()
s, ok := c.serviceBindings[skey]
c.Unlock()
logrus.Debugf("rmServiceBinding from %s START for %s %s", method, svcName, eID)
if !ok {
logrus.Warnf("rmServiceBinding %s %s %s aborted c.serviceBindings[skey] !ok", method, svcName, eID)
return nil
@ -302,6 +322,7 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
s.Lock()
defer s.Unlock()
logrus.Debugf("rmServiceBinding from %s START for %s %s p:%p nid:%s sKey:%v deleteSvc:%t", method, svcName, eID, s, nID, skey, deleteSvcRecords)
lb, ok := s.loadBalancers[nID]
if !ok {
logrus.Warnf("rmServiceBinding %s %s %s aborted s.loadBalancers[nid] !ok", method, svcName, eID)
@ -322,17 +343,7 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
rmService = true
delete(s.loadBalancers, nID)
}
if len(s.loadBalancers) == 0 {
// All loadbalancers for the service removed. Time to
// remove the service itself.
c.Lock()
// Mark the object as deleted so that the add won't use it wrongly
s.deleted = true
delete(c.serviceBindings, skey)
c.Unlock()
logrus.Debugf("rmServiceBinding %s delete %s, p:%p in loadbalancers len:%d", eID, nID, lb, len(s.loadBalancers))
}
ok, entries := s.removeIPToEndpoint(ip.String(), eID)
@ -348,7 +359,22 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
}
// Delete the name resolutions
c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding")
if deleteSvcRecords {
c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding")
}
if len(s.loadBalancers) == 0 {
// All loadbalancers for the service removed. Time to
// remove the service itself.
c.Lock()
// Mark the object as deleted so that the add won't use it wrongly
s.deleted = true
// NOTE The delete from the serviceBindings map has to be the last operation else we are allowing a race between this service
// that is getting deleted and a new service that will be created if the entry is not anymore there
delete(c.serviceBindings, skey)
c.Unlock()
}
logrus.Debugf("rmServiceBinding from %s END for %s %s", method, svcName, eID)
return nil

View File

@ -102,8 +102,8 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
}
lb.service.Lock()
for _, l := range lb.backEnds {
sb.addLBBackend(l.ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress)
for _, ip := range lb.backEnds {
sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress)
}
lb.service.Unlock()
}