Service discovery logic rework

changed the ipMap to SetMatrix to allow transient states
Compacted the addSvc and deleteSvc into a one single method
Updated the datastructure for backends to allow storing all the information needed
to cleanup properly during the cleanupServiceBindings
Removed the enable/disable Service logic that was racing with sbLeave/sbJoin logic
Add some debug logs to track further race conditions

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
Flavio Crisciani 2017-06-06 16:04:50 -07:00
parent 8783788123
commit d64e71e4f7
No known key found for this signature in database
GPG Key ID: 28CAFCE754CF3A48
11 changed files with 397 additions and 165 deletions

View File

@ -121,7 +121,6 @@ run-tests:
check-local: check-format check-code run-tests
integration-tests: ./bin/dnet
@./test/integration/dnet/run-integration-tests.sh

135
agent.go
View File

@ -583,7 +583,7 @@ func (ep *endpoint) deleteDriverInfoFromCluster() error {
return nil
}
func (ep *endpoint) addServiceInfoToCluster() error {
func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil {
return nil
}
@ -593,26 +593,51 @@ func (ep *endpoint) addServiceInfoToCluster() error {
return nil
}
sb.Service.Lock()
defer sb.Service.Unlock()
logrus.Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())
// Check that the endpoint is still present on the sandbox before adding it to the service discovery.
// This is to handle a race between the EnableService and the sbLeave
// It is possible that the EnableService starts, fetches the list of the endpoints and
// by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
// The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
// This check under the Service lock of the sandbox ensure the correct behavior.
// If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
// but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
// In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
// removed from the list, in this situation the delete will bail out not finding any data to cleanup
// and the add will bail out not finding the endpoint on the sandbox.
if e := sb.getEndpoint(ep.ID()); e == nil {
logrus.Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
return nil
}
c := n.getController()
agent := c.getAgent()
var ingressPorts []*PortConfig
if ep.svcID != "" {
// Gossip ingress ports only in ingress network.
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
return err
}
}
name := ep.Name()
if ep.isAnonymous() {
name = ep.MyAliases()[0]
}
var ingressPorts []*PortConfig
if ep.svcID != "" {
// This is a task part of a service
// Gossip ingress ports only in ingress network.
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
return err
}
} else {
// This is a container simply attached to an attachable network
if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
return err
}
}
buf, err := proto.Marshal(&EndpointRecord{
Name: name,
ServiceName: ep.svcName,
@ -634,10 +659,12 @@ func (ep *endpoint) addServiceInfoToCluster() error {
}
}
logrus.Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())
return nil
}
func (ep *endpoint) deleteServiceInfoFromCluster() error {
func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error {
if ep.isAnonymous() && len(ep.myAliases) == 0 {
return nil
}
@ -647,17 +674,33 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
return nil
}
sb.Service.Lock()
defer sb.Service.Unlock()
logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
c := n.getController()
agent := c.getAgent()
if ep.svcID != "" && ep.Iface().Address() != nil {
var ingressPorts []*PortConfig
if n.ingress {
ingressPorts = ep.ingressPorts
}
name := ep.Name()
if ep.isAnonymous() {
name = ep.MyAliases()[0]
}
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
return err
if ep.Iface().Address() != nil {
if ep.svcID != "" {
// This is a task part of a service
var ingressPorts []*PortConfig
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
return err
}
} else {
// This is a container simply attached to an attachable network
if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
return err
}
}
}
@ -667,6 +710,8 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
}
}
logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
return nil
}
@ -814,58 +859,56 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
value = event.Value
case networkdb.UpdateEvent:
logrus.Errorf("Unexpected update service table event = %#v", event)
}
nw, err := c.NetworkByID(nid)
if err != nil {
logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
return
}
n := nw.(*network)
err = proto.Unmarshal(value, &epRec)
err := proto.Unmarshal(value, &epRec)
if err != nil {
logrus.Errorf("Failed to unmarshal service table value: %v", err)
return
}
name := epRec.Name
containerName := epRec.Name
svcName := epRec.ServiceName
svcID := epRec.ServiceID
vip := net.ParseIP(epRec.VirtualIP)
ip := net.ParseIP(epRec.EndpointIP)
ingressPorts := epRec.IngressPorts
aliases := epRec.Aliases
taskaliases := epRec.TaskAliases
serviceAliases := epRec.Aliases
taskAliases := epRec.TaskAliases
if name == "" || ip == nil {
if containerName == "" || ip == nil {
logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
return
}
if isAdd {
logrus.Debugf("handleEpTableEvent ADD %s R:%v", isAdd, eid, epRec)
if svcID != "" {
if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
// This is a remote task part of a service
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
return
}
}
n.addSvcRecords(name, ip, nil, true)
for _, alias := range taskaliases {
n.addSvcRecords(alias, ip, nil, true)
} else {
// This is a remote container simply attached to an attachable network
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
}
}
} else {
logrus.Debugf("handleEpTableEvent DEL %s R:%v", isAdd, eid, epRec)
if svcID != "" {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
return
}
}
n.deleteSvcRecords(name, ip, nil, true)
for _, alias := range taskaliases {
n.deleteSvcRecords(alias, ip, nil, true)
} else {
// This is a remote container simply attached to an attachable network
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
}
}
}
}

View File

@ -14,7 +14,7 @@ option (gogoproto.goproto_stringer_all) = false;
// EndpointRecord specifies all the endpoint specific information that
// needs to gossiped to nodes participating in the network.
message EndpointRecord {
// Name of the endpoint
// Name of the container
string name = 1;
// Service name of the service to which this endpoint belongs.

View File

@ -597,8 +597,14 @@ func (ep *endpoint) rename(name string) error {
c := n.getController()
sb, ok := ep.getSandbox()
if !ok {
logrus.Warnf("rename for %s aborted, sandbox %s is not anymore present", ep.ID(), ep.sandboxID)
return nil
}
if c.isAgent() {
if err = ep.deleteServiceInfoFromCluster(); err != nil {
if err = ep.deleteServiceInfoFromCluster(sb, "rename"); err != nil {
return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err)
}
} else {
@ -617,15 +623,15 @@ func (ep *endpoint) rename(name string) error {
ep.anonymous = false
if c.isAgent() {
if err = ep.addServiceInfoToCluster(); err != nil {
if err = ep.addServiceInfoToCluster(sb); err != nil {
return types.InternalErrorf("Could not add service state for endpoint %s to cluster on rename: %v", ep.Name(), err)
}
defer func() {
if err != nil {
ep.deleteServiceInfoFromCluster()
ep.deleteServiceInfoFromCluster(sb, "rename")
ep.name = oldName
ep.anonymous = oldAnonymous
ep.addServiceInfoToCluster()
ep.addServiceInfoToCluster(sb)
}
}()
} else {
@ -746,7 +752,7 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption)
return err
}
if e := ep.deleteServiceInfoFromCluster(); e != nil {
if e := ep.deleteServiceInfoFromCluster(sb, "sbLeave"); e != nil {
logrus.Errorf("Could not delete service state for endpoint %s from cluster: %v", ep.Name(), e)
}

View File

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/docker/libnetwork/common"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
@ -383,7 +384,7 @@ func TestSRVServiceQuery(t *testing.T) {
sr := svcInfo{
svcMap: make(map[string][]net.IP),
svcIPv6Map: make(map[string][]net.IP),
ipMap: make(map[string]*ipInfo),
ipMap: common.NewSetMatrix(),
service: make(map[string][]servicePorts),
}
// backing container for the service

View File

@ -10,6 +10,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/libnetwork/common"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
@ -97,7 +98,7 @@ type ipInfo struct {
type svcInfo struct {
svcMap map[string][]net.IP
svcIPv6Map map[string][]net.IP
ipMap map[string]*ipInfo
ipMap common.SetMatrix
service map[string][]servicePorts
}
@ -990,6 +991,12 @@ func (n *network) delete(force bool) error {
c.cleanupServiceBindings(n.ID())
// The network had been left, the service discovery can be cleaned up
c.Lock()
logrus.Debugf("network %s delete, clean svcRecords", n.id)
delete(c.svcRecords, n.id)
c.Unlock()
removeFromStore:
// deleteFromStore performs an atomic delete operation and the
// network.epCnt will help prevent any possible
@ -1227,36 +1234,34 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool
// breaks some apps
if ep.isAnonymous() {
if len(myAliases) > 0 {
n.addSvcRecords(myAliases[0], iface.Address().IP, ipv6, true)
n.addSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord")
}
} else {
n.addSvcRecords(epName, iface.Address().IP, ipv6, true)
n.addSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord")
}
for _, alias := range myAliases {
n.addSvcRecords(alias, iface.Address().IP, ipv6, false)
n.addSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord")
}
} else {
if ep.isAnonymous() {
if len(myAliases) > 0 {
n.deleteSvcRecords(myAliases[0], iface.Address().IP, ipv6, true)
n.deleteSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord")
}
} else {
n.deleteSvcRecords(epName, iface.Address().IP, ipv6, true)
n.deleteSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord")
}
for _, alias := range myAliases {
n.deleteSvcRecords(alias, iface.Address().IP, ipv6, false)
n.deleteSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord")
}
}
}
}
func addIPToName(ipMap map[string]*ipInfo, name string, ip net.IP) {
func addIPToName(ipMap common.SetMatrix, name string, ip net.IP) {
reverseIP := netutils.ReverseIP(ip.String())
if _, ok := ipMap[reverseIP]; !ok {
ipMap[reverseIP] = &ipInfo{
name: name,
}
}
ipMap.Insert(reverseIP, ipInfo{
name: name,
})
}
func addNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) {
@ -1284,24 +1289,25 @@ func delNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) {
}
}
func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool) {
func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
// Do not add service names for ingress network as this is a
// routing only network
if n.ingress {
return
}
logrus.Debugf("(%s).addSvcRecords(%s, %s, %s, %t)", n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate)
logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method)
c := n.getController()
c.Lock()
defer c.Unlock()
sr, ok := c.svcRecords[n.ID()]
if !ok {
sr = svcInfo{
svcMap: make(map[string][]net.IP),
svcIPv6Map: make(map[string][]net.IP),
ipMap: make(map[string]*ipInfo),
ipMap: common.NewSetMatrix(),
}
c.svcRecords[n.ID()] = sr
}
@ -1319,28 +1325,33 @@ func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUp
}
}
func (n *network) deleteSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool) {
func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
// Do not delete service names from ingress network as this is a
// routing only network
if n.ingress {
return
}
logrus.Debugf("(%s).deleteSvcRecords(%s, %s, %s, %t)", n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate)
logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method)
c := n.getController()
c.Lock()
defer c.Unlock()
sr, ok := c.svcRecords[n.ID()]
if !ok {
return
}
if ipMapUpdate {
delete(sr.ipMap, netutils.ReverseIP(epIP.String()))
sr.ipMap.Remove(netutils.ReverseIP(epIP.String()), ipInfo{
name: name,
})
if epIPv6 != nil {
delete(sr.ipMap, netutils.ReverseIP(epIPv6.String()))
sr.ipMap.Remove(netutils.ReverseIP(epIPv6.String()), ipInfo{
name: name,
})
}
}
@ -1868,9 +1879,11 @@ func (n *network) HandleQueryResp(name string, ip net.IP) {
}
ipStr := netutils.ReverseIP(ip.String())
if ipInfo, ok := sr.ipMap[ipStr]; ok {
ipInfo.extResolver = true
// If an object with extResolver == true is already in the set this call will fail
// but anyway it means that has already been inserted before
if ok, _ := sr.ipMap.Contains(ipStr, ipInfo{name: name}); ok {
sr.ipMap.Remove(ipStr, ipInfo{name: name})
sr.ipMap.Insert(ipStr, ipInfo{name: name, extResolver: true})
}
}
@ -1886,13 +1899,27 @@ func (n *network) ResolveIP(ip string) string {
nwName := n.Name()
ipInfo, ok := sr.ipMap[ip]
if !ok || ipInfo.extResolver {
elemSet, ok := sr.ipMap.Get(ip)
if !ok || len(elemSet) == 0 {
return ""
}
// NOTE it is possible to have more than one element in the Set, this will happen
// because of interleave of diffent events from differnt sources (local container create vs
// network db notifications)
// In such cases the resolution will be based on the first element of the set, and can vary
// during the system stabilitation
elem, ok := elemSet[0].(ipInfo)
if !ok {
setStr, b := sr.ipMap.String(ip)
logrus.Errorf("expected set of ipInfo type for key %s set:%t %s", ip, b, setStr)
return ""
}
return ipInfo.name + "." + nwName
if elem.extResolver {
return ""
}
return elem.name + "." + nwName
}
func (n *network) ResolveService(name string) ([]*net.SRV, []net.IP) {

View File

@ -285,7 +285,6 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.Unlock()
nDB.broadcaster.Write(makeEvent(opCreate, tname, nid, key, value))
return nil
}
@ -313,7 +312,6 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.Unlock()
nDB.broadcaster.Write(makeEvent(opUpdate, tname, nid, key, value))
return nil
}
@ -359,7 +357,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.Unlock()
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, value))
return nil
}

View File

@ -86,6 +86,9 @@ type sandbox struct {
ingress bool
ndotsSet bool
sync.Mutex
// This mutex is used to serialize service related operation for an endpoint
// The lock is here because the endpoint is saved into the store so is not unique
Service sync.Mutex
}
// These are the container configs used to customize container /etc/hosts file.
@ -668,26 +671,25 @@ func (sb *sandbox) SetKey(basePath string) error {
}
func (sb *sandbox) EnableService() error {
logrus.Debugf("EnableService %s START", sb.containerID)
for _, ep := range sb.getConnectedEndpoints() {
if ep.enableService(true) {
if err := ep.addServiceInfoToCluster(); err != nil {
if err := ep.addServiceInfoToCluster(sb); err != nil {
ep.enableService(false)
return fmt.Errorf("could not update state for endpoint %s into cluster: %v", ep.Name(), err)
}
}
}
logrus.Debugf("EnableService %s DONE", sb.containerID)
return nil
}
func (sb *sandbox) DisableService() error {
logrus.Debugf("DisableService %s START", sb.containerID)
for _, ep := range sb.getConnectedEndpoints() {
if ep.enableService(false) {
if err := ep.deleteServiceInfoFromCluster(); err != nil {
ep.enableService(true)
return fmt.Errorf("could not delete state for endpoint %s from cluster: %v", ep.Name(), err)
}
}
ep.enableService(false)
}
logrus.Debugf("DisableService %s DONE", sb.containerID)
return nil
}

View File

@ -4,6 +4,8 @@ import (
"fmt"
"net"
"sync"
"github.com/docker/libnetwork/common"
)
var (
@ -48,17 +50,49 @@ type service struct {
// Service aliases
aliases []string
// This maps tracks for each IP address the list of endpoints ID
// associated with it. At stable state the endpoint ID expected is 1
// but during transition and service change it is possible to have
// temporary more than 1
ipToEndpoint common.SetMatrix
deleted bool
sync.Mutex
}
// assignIPToEndpoint inserts the mapping between the IP and the endpoint identifier
// returns true if the mapping was not present, false otherwise
// returns also the number of endpoints associated to the IP
func (s *service) assignIPToEndpoint(ip, eID string) (bool, int) {
return s.ipToEndpoint.Insert(ip, eID)
}
// removeIPToEndpoint removes the mapping between the IP and the endpoint identifier
// returns true if the mapping was deleted, false otherwise
// returns also the number of endpoints associated to the IP
func (s *service) removeIPToEndpoint(ip, eID string) (bool, int) {
return s.ipToEndpoint.Remove(ip, eID)
}
func (s *service) printIPToEndpoint(ip string) (string, bool) {
return s.ipToEndpoint.String(ip)
}
type loadBalancer struct {
vip net.IP
fwMark uint32
// Map of backend IPs backing this loadbalancer on this
// network. It is keyed with endpoint ID.
backEnds map[string]net.IP
backEnds map[string]loadBalancerBackend
// Back pointer to service to which the loadbalancer belongs.
service *service
}
type loadBalancerBackend struct {
ip net.IP
containerName string
taskAliases []string
}

View File

@ -6,15 +6,126 @@ import (
"net"
"github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/common"
)
func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
func (c *controller) addEndpointNameResolution(svcName, svcID, nID, eID, containerName string, vip net.IP, serviceAliases, taskAliases []string, ip net.IP, addService bool, method string) error {
n, err := c.NetworkByID(nID)
if err != nil {
return err
}
logrus.Debugf("addEndpointNameResolution %s %s add_service:%t", eID, svcName, addService)
// Add container resolution mappings
c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method)
// Add endpoint IP to special "tasks.svc_name" so that the applications have access to DNS RR.
n.(*network).addSvcRecords(eID, "tasks."+svcName, ip, nil, false, method)
for _, alias := range serviceAliases {
n.(*network).addSvcRecords(eID, "tasks."+alias, ip, nil, false, method)
}
// Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
if len(vip) == 0 {
n.(*network).addSvcRecords(eID, svcName, ip, nil, false, method)
for _, alias := range serviceAliases {
n.(*network).addSvcRecords(eID, alias, ip, nil, false, method)
}
}
if addService && len(vip) != 0 {
n.(*network).addSvcRecords(eID, svcName, vip, nil, false, method)
for _, alias := range serviceAliases {
n.(*network).addSvcRecords(eID, alias, vip, nil, false, method)
}
}
return nil
}
func (c *controller) addContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error {
n, err := c.NetworkByID(nID)
if err != nil {
return err
}
logrus.Debugf("addContainerNameResolution %s %s", eID, containerName)
// Add resolution for container name
n.(*network).addSvcRecords(eID, containerName, ip, nil, true, method)
// Add resolution for taskaliases
for _, alias := range taskAliases {
n.(*network).addSvcRecords(eID, alias, ip, nil, true, method)
}
return nil
}
func (c *controller) deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName string, vip net.IP, serviceAliases, taskAliases []string, ip net.IP, rmService, multipleEntries bool, method string) error {
n, err := c.NetworkByID(nID)
if err != nil {
return err
}
logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t", eID, svcName, rmService, multipleEntries)
// Delete container resolution mappings
c.delContainerNameResolution(nID, eID, containerName, taskAliases, ip, method)
// Delete the special "tasks.svc_name" backend record.
if !multipleEntries {
n.(*network).deleteSvcRecords(eID, "tasks."+svcName, ip, nil, false, method)
for _, alias := range serviceAliases {
n.(*network).deleteSvcRecords(eID, "tasks."+alias, ip, nil, false, method)
}
}
// If we are doing DNS RR delete the endpoint IP from DNS record right away.
if !multipleEntries && len(vip) == 0 {
n.(*network).deleteSvcRecords(eID, svcName, ip, nil, false, method)
for _, alias := range serviceAliases {
n.(*network).deleteSvcRecords(eID, alias, ip, nil, false, method)
}
}
// Remove the DNS record for VIP only if we are removing the service
if rmService && len(vip) != 0 && !multipleEntries {
n.(*network).deleteSvcRecords(eID, svcName, vip, nil, false, method)
for _, alias := range serviceAliases {
n.(*network).deleteSvcRecords(eID, alias, vip, nil, false, method)
}
}
return nil
}
func (c *controller) delContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error {
n, err := c.NetworkByID(nID)
if err != nil {
return err
}
logrus.Debugf("delContainerNameResolution %s %s", eID, containerName)
// Delete resolution for container name
n.(*network).deleteSvcRecords(eID, containerName, ip, nil, true, method)
// Delete resolution for taskaliases
for _, alias := range taskAliases {
n.(*network).deleteSvcRecords(eID, alias, ip, nil, true, method)
}
return nil
}
func newService(name string, id string, ingressPorts []*PortConfig, serviceAliases []string) *service {
return &service{
name: name,
id: id,
ingressPorts: ingressPorts,
loadBalancers: make(map[string]*loadBalancer),
aliases: aliases,
aliases: serviceAliases,
ipToEndpoint: common.NewSetMatrix(),
}
}
@ -50,21 +161,26 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
for _, s := range services {
s.Lock()
// Skip the serviceBindings that got deleted
if s.deleted {
s.Unlock()
continue
}
for nid, lb := range s.loadBalancers {
if cleanupNID != "" && nid != cleanupNID {
continue
}
for eid, ip := range lb.backEnds {
for eid, be := range lb.backEnds {
service := s
loadBalancer := lb
networkID := nid
epID := eid
epIP := ip
epIP := be.ip
cleanupFuncs = append(cleanupFuncs, func() {
if err := c.rmServiceBinding(service.name, service.id, networkID, epID, loadBalancer.vip,
service.ingressPorts, service.aliases, epIP); err != nil {
if err := c.rmServiceBinding(service.name, service.id, networkID, epID, be.containerName, loadBalancer.vip,
service.ingressPorts, service.aliases, be.taskAliases, epIP, "cleanupServiceBindings"); err != nil {
logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v",
service.id, networkID, epID, err)
}
@ -80,48 +196,43 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
}
func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
n, err := c.NetworkByID(nid)
func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases, taskAliases []string, ip net.IP, method string) error {
var addService bool
n, err := c.NetworkByID(nID)
if err != nil {
return err
}
skey := serviceKey{
id: sid,
id: svcID,
ports: portConfigs(ingressPorts).String(),
}
c.Lock()
s, ok := c.serviceBindings[skey]
if !ok {
// Create a new service if we are seeing this service
// for the first time.
s = newService(name, sid, ingressPorts, aliases)
c.serviceBindings[skey] = s
var s *service
for {
c.Lock()
var ok bool
s, ok = c.serviceBindings[skey]
if !ok {
// Create a new service if we are seeing this service
// for the first time.
s = newService(svcName, svcID, ingressPorts, serviceAliases)
c.serviceBindings[skey] = s
}
c.Unlock()
s.Lock()
if !s.deleted {
// ok the object is good to be used
break
}
s.Unlock()
}
c.Unlock()
logrus.Debugf("addServiceBinding from %s START for %s %s", method, svcName, eID)
// Add endpoint IP to special "tasks.svc_name" so that the
// applications have access to DNS RR.
n.(*network).addSvcRecords("tasks."+name, ip, nil, false)
for _, alias := range aliases {
n.(*network).addSvcRecords("tasks."+alias, ip, nil, false)
}
// Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
svcIP := vip
if len(svcIP) == 0 {
svcIP = ip
}
n.(*network).addSvcRecords(name, svcIP, nil, false)
for _, alias := range aliases {
n.(*network).addSvcRecords(alias, svcIP, nil, false)
}
s.Lock()
defer s.Unlock()
lb, ok := s.loadBalancers[nid]
lb, ok := s.loadBalancers[nID]
if !ok {
// Create a new load balancer if we are seeing this
// network attachment on the service for the first
@ -129,7 +240,7 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
lb = &loadBalancer{
vip: vip,
fwMark: fwMarkCtr,
backEnds: make(map[string]net.IP),
backEnds: make(map[string]loadBalancerBackend),
service: s,
}
@ -137,10 +248,19 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
fwMarkCtr++
fwMarkCtrMu.Unlock()
s.loadBalancers[nid] = lb
s.loadBalancers[nID] = lb
addService = true
}
lb.backEnds[eid] = ip
lb.backEnds[eID] = loadBalancerBackend{ip: ip,
containerName: containerName,
taskAliases: taskAliases}
ok, entries := s.assignIPToEndpoint(ip.String(), eID)
if !ok || entries > 1 {
setStr, b := s.printIPToEndpoint(ip.String())
logrus.Warnf("addServiceBinding %s possible trainsient state ok:%t entries:%d set:%t %s", eID, ok, entries, b, setStr)
}
// Add loadbalancer service and backend in all sandboxes in
// the network only if vip is valid.
@ -148,89 +268,87 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts)
}
// Add the appropriate name resolutions
c.addEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, addService, "addServiceBinding")
logrus.Debugf("addServiceBinding from %s END for %s %s", method, svcName, eID)
return nil
}
func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string) error {
var rmService bool
n, err := c.NetworkByID(nid)
n, err := c.NetworkByID(nID)
if err != nil {
return err
}
skey := serviceKey{
id: sid,
id: svcID,
ports: portConfigs(ingressPorts).String(),
}
c.Lock()
s, ok := c.serviceBindings[skey]
c.Unlock()
logrus.Debugf("rmServiceBinding from %s START for %s %s", method, svcName, eID)
if !ok {
logrus.Warnf("rmServiceBinding %s %s %s aborted c.serviceBindings[skey] !ok", method, svcName, eID)
return nil
}
s.Lock()
lb, ok := s.loadBalancers[nid]
defer s.Unlock()
lb, ok := s.loadBalancers[nID]
if !ok {
s.Unlock()
logrus.Warnf("rmServiceBinding %s %s %s aborted s.loadBalancers[nid] !ok", method, svcName, eID)
return nil
}
_, ok = lb.backEnds[eid]
_, ok = lb.backEnds[eID]
if !ok {
s.Unlock()
logrus.Warnf("rmServiceBinding %s %s %s aborted lb.backEnds[eid] !ok", method, svcName, eID)
return nil
}
delete(lb.backEnds, eid)
delete(lb.backEnds, eID)
if len(lb.backEnds) == 0 {
// All the backends for this service have been
// removed. Time to remove the load balancer and also
// remove the service entry in IPVS.
rmService = true
delete(s.loadBalancers, nid)
delete(s.loadBalancers, nID)
}
if len(s.loadBalancers) == 0 {
// All loadbalancers for the service removed. Time to
// remove the service itself.
c.Lock()
// Mark the object as deleted so that the add won't use it wrongly
s.deleted = true
delete(c.serviceBindings, skey)
c.Unlock()
}
ok, entries := s.removeIPToEndpoint(ip.String(), eID)
if !ok || entries > 0 {
setStr, b := s.printIPToEndpoint(ip.String())
logrus.Warnf("rmServiceBinding %s possible trainsient state ok:%t entries:%d set:%t %s", eID, ok, entries, b, setStr)
}
// Remove loadbalancer service(if needed) and backend in all
// sandboxes in the network only if the vip is valid.
if len(vip) != 0 {
n.(*network).rmLBBackend(ip, vip, lb.fwMark, ingressPorts, rmService)
}
s.Unlock()
// Delete the special "tasks.svc_name" backend record.
n.(*network).deleteSvcRecords("tasks."+name, ip, nil, false)
for _, alias := range aliases {
n.(*network).deleteSvcRecords("tasks."+alias, ip, nil, false)
}
// If we are doing DNS RR add the endpoint IP to DNS record
// right away.
if len(vip) == 0 {
n.(*network).deleteSvcRecords(name, ip, nil, false)
for _, alias := range aliases {
n.(*network).deleteSvcRecords(alias, ip, nil, false)
}
}
// Remove the DNS record for VIP only if we are removing the service
if rmService && len(vip) != 0 {
n.(*network).deleteSvcRecords(name, vip, nil, false)
for _, alias := range aliases {
n.(*network).deleteSvcRecords(alias, vip, nil, false)
}
}
// Delete the name resolutions
c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding")
logrus.Debugf("rmServiceBinding from %s END for %s %s", method, svcName, eID)
return nil
}

View File

@ -44,6 +44,11 @@ func (n *network) connectedLoadbalancers() []*loadBalancer {
var lbs []*loadBalancer
for _, s := range serviceBindings {
s.Lock()
// Skip the serviceBindings that got deleted
if s.deleted {
s.Unlock()
continue
}
if lb, ok := s.loadBalancers[n.ID()]; ok {
lbs = append(lbs, lb)
}
@ -97,8 +102,8 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
}
lb.service.Lock()
for _, ip := range lb.backEnds {
sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress)
for _, l := range lb.backEnds {
sb.addLBBackend(l.ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress)
}
lb.service.Unlock()
}