From 0bf056d59f2b54cd2ffb2da8f5af1acffd6354fc Mon Sep 17 00:00:00 2001 From: Madhu Venugopal Date: Wed, 12 Oct 2016 16:55:20 -0700 Subject: [PATCH] Windows overlay driver support 1. Base work was done by msabansal and nwoodmsft from : https://github.com/msabansal/docker/tree/overlay 2. reorganized under drivers/windows/overlay and rebased to libnetwork master 3. Porting overlay common fixes to windows driver * 4658085 * bb736c4 * f6d8968 4. Windows Service Discovery changes for swarm-mode 5. renaming default windows ipam drivers as "windows" Signed-off-by: Madhu Venugopal Signed-off-by: msabansal Signed-off-by: nwoodmsft --- controller.go | 3 +- drivers/windows/overlay/joinleave_windows.go | 114 ++++ .../windows/overlay/ov_endpoint_windows.go | 346 ++++++++++++ .../overlay/ov_network_local_windows.go | 209 +++++++ drivers/windows/overlay/ov_network_windows.go | 512 ++++++++++++++++++ drivers/windows/overlay/ov_serf_windows.go | 179 ++++++ drivers/windows/overlay/overlay.pb.go | 468 ++++++++++++++++ drivers/windows/overlay/overlay.proto | 27 + drivers/windows/overlay/overlay_windows.go | 297 ++++++++++ drivers/windows/overlay/peerdb_windows.go | 154 ++++++ drivers/windows/windows.go | 5 +- drivers_windows.go | 2 + ipams/builtin/builtin_windows.go | 43 +- ipams/windowsipam/windowsipam.go | 3 + netutils/utils_windows.go | 4 +- network.go | 3 + network_unix.go | 6 + network_windows.go | 30 +- service_common.go | 225 ++++++++ service_linux.go | 216 -------- service_unsupported.go | 2 +- service_windows.go | 15 + 22 files changed, 2634 insertions(+), 229 deletions(-) create mode 100644 drivers/windows/overlay/joinleave_windows.go create mode 100644 drivers/windows/overlay/ov_endpoint_windows.go create mode 100644 drivers/windows/overlay/ov_network_local_windows.go create mode 100644 drivers/windows/overlay/ov_network_windows.go create mode 100644 drivers/windows/overlay/ov_serf_windows.go create mode 100644 drivers/windows/overlay/overlay.pb.go create mode 100644 drivers/windows/overlay/overlay.proto create mode 100644 drivers/windows/overlay/overlay_windows.go create mode 100644 drivers/windows/overlay/peerdb_windows.go create mode 100644 service_common.go create mode 100644 service_windows.go diff --git a/controller.go b/controller.go index aee0827f..e2289c05 100644 --- a/controller.go +++ b/controller.go @@ -634,12 +634,13 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ... id = stringid.GenerateRandomID() } + defaultIpam := defaultIpamForNetworkType(networkType) // Construct the network object network := &network{ name: name, networkType: networkType, generic: map[string]interface{}{netlabel.GenericData: make(map[string]string)}, - ipamType: ipamapi.DefaultIPAM, + ipamType: defaultIpam, id: id, created: time.Now(), ctrlr: c, diff --git a/drivers/windows/overlay/joinleave_windows.go b/drivers/windows/overlay/joinleave_windows.go new file mode 100644 index 00000000..0bef47c7 --- /dev/null +++ b/drivers/windows/overlay/joinleave_windows.go @@ -0,0 +1,114 @@ +package overlay + +import ( + "fmt" + "net" + + "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/types" + "github.com/gogo/protobuf/proto" +) + +// Join method is invoked when a Sandbox is attached to an endpoint. +func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo, options map[string]interface{}) error { + if err := validateID(nid, eid); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("could not find network with id %s", nid) + } + + ep := n.endpoint(eid) + if ep == nil { + return fmt.Errorf("could not find endpoint with id %s", eid) + } + + if err := d.writeEndpointToStore(ep); err != nil { + return fmt.Errorf("failed to update overlay endpoint %s to local data store: %v", ep.id[0:7], err) + } + + buf, err := proto.Marshal(&PeerRecord{ + EndpointIP: ep.addr.String(), + EndpointMAC: ep.mac.String(), + TunnelEndpointIP: n.providerAddress, + }) + + if err != nil { + return err + } + + if err := jinfo.AddTableEntry(ovPeerTable, eid, buf); err != nil { + logrus.Errorf("overlay: Failed adding table entry to joininfo: %v", err) + } + + jinfo.DisableGatewayService() + + d.pushLocalEndpointEvent("join", nid, eid) + + return nil +} + +func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) { + if tableName != ovPeerTable { + logrus.Errorf("Unexpected table notification for table %s received", tableName) + return + } + + eid := key + + var peer PeerRecord + if err := proto.Unmarshal(value, &peer); err != nil { + logrus.Errorf("Failed to unmarshal peer record: %v", err) + return + } + + n := d.network(nid) + if n == nil { + return + } + + // Ignore local peers. We already know about them and they + // should not be added to vxlan fdb. + if peer.TunnelEndpointIP == n.providerAddress { + return + } + + addr, err := types.ParseCIDR(peer.EndpointIP) + if err != nil { + logrus.Errorf("Invalid peer IP %s received in event notify", peer.EndpointIP) + return + } + + mac, err := net.ParseMAC(peer.EndpointMAC) + if err != nil { + logrus.Errorf("Invalid mac %s received in event notify", peer.EndpointMAC) + return + } + + vtep := net.ParseIP(peer.TunnelEndpointIP) + if vtep == nil { + logrus.Errorf("Invalid VTEP %s received in event notify", peer.TunnelEndpointIP) + return + } + + if etype == driverapi.Delete { + d.peerDelete(nid, eid, addr.IP, addr.Mask, mac, vtep, true) + return + } + + d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true) +} + +// Leave method is invoked when a Sandbox detaches from an endpoint. +func (d *driver) Leave(nid, eid string) error { + if err := validateID(nid, eid); err != nil { + return err + } + + d.pushLocalEndpointEvent("leave", nid, eid) + + return nil +} diff --git a/drivers/windows/overlay/ov_endpoint_windows.go b/drivers/windows/overlay/ov_endpoint_windows.go new file mode 100644 index 00000000..2e4e9333 --- /dev/null +++ b/drivers/windows/overlay/ov_endpoint_windows.go @@ -0,0 +1,346 @@ +package overlay + +import ( + "encoding/json" + "fmt" + "net" + + "github.com/Microsoft/hcsshim" + "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/datastore" + "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/types" +) + +type endpointTable map[string]*endpoint + +const overlayEndpointPrefix = "overlay/endpoint" + +type endpoint struct { + id string + nid string + profileId string + remote bool + mac net.HardwareAddr + addr *net.IPNet + dbExists bool + dbIndex uint64 +} + +func validateID(nid, eid string) error { + if nid == "" { + return fmt.Errorf("invalid network id") + } + + if eid == "" { + return fmt.Errorf("invalid endpoint id") + } + + return nil +} + +func (n *network) endpoint(eid string) *endpoint { + n.Lock() + defer n.Unlock() + + return n.endpoints[eid] +} + +func (n *network) addEndpoint(ep *endpoint) { + n.Lock() + n.endpoints[ep.id] = ep + n.Unlock() +} + +func (n *network) deleteEndpoint(eid string) { + n.Lock() + delete(n.endpoints, eid) + n.Unlock() +} + +func (n *network) removeEndpointWithAddress(addr *net.IPNet) { + var networkEndpoint *endpoint + n.Lock() + for _, ep := range n.endpoints { + if ep.addr.IP.Equal(addr.IP) { + networkEndpoint = ep + break + } + } + if networkEndpoint != nil { + delete(n.endpoints, networkEndpoint.id) + } + n.Unlock() + + if networkEndpoint != nil { + logrus.Debugf("Removing stale endpoint from HNS") + _, err := hcsshim.HNSEndpointRequest("DELETE", networkEndpoint.profileId, "") + + if err != nil { + logrus.Debugf("Failed to delete stale overlay endpoint (%s) from hns", networkEndpoint.id[0:7]) + } + + if err := n.driver.deleteEndpointFromStore(networkEndpoint); err != nil { + logrus.Debugf("Failed to delete stale overlay endpoint (%s) from store", networkEndpoint.id[0:7]) + } + } +} + +func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo, + epOptions map[string]interface{}) error { + var err error + if err = validateID(nid, eid); err != nil { + return err + } + + // Since we perform lazy configuration make sure we try + // configuring the driver when we enter CreateEndpoint since + // CreateNetwork may not be called in every node. + if err := d.configure(); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("network id %q not found", nid) + } + + ep := &endpoint{ + id: eid, + nid: n.id, + addr: ifInfo.Address(), + mac: ifInfo.MacAddress(), + } + + if ep.addr == nil { + return fmt.Errorf("create endpoint was not passed interface IP address") + } + + if s := n.getSubnetforIP(ep.addr); s == nil { + return fmt.Errorf("no matching subnet for IP %q in network %q\n", ep.addr, nid) + } + + // Todo: Add port bindings and qos policies here + + hnsEndpoint := &hcsshim.HNSEndpoint{ + VirtualNetwork: n.hnsId, + IPAddress: ep.addr.IP, + EnableInternalDNS: true, + } + + if ep.mac != nil { + hnsEndpoint.MacAddress = ep.mac.String() + } + + paPolicy, err := json.Marshal(hcsshim.PaPolicy{ + Type: "PA", + PA: n.providerAddress, + }) + + if err != nil { + return err + } + + hnsEndpoint.Policies = append(hnsEndpoint.Policies, paPolicy) + + configurationb, err := json.Marshal(hnsEndpoint) + if err != nil { + return err + } + + hnsresponse, err := hcsshim.HNSEndpointRequest("POST", "", string(configurationb)) + if err != nil { + return err + } + + ep.profileId = hnsresponse.Id + + if ep.mac == nil { + ep.mac, err = net.ParseMAC(hnsresponse.MacAddress) + if err != nil { + return err + } + + if err := ifInfo.SetMacAddress(ep.mac); err != nil { + return err + } + } + + n.addEndpoint(ep) + if err := d.writeEndpointToStore(ep); err != nil { + return fmt.Errorf("failed to update overlay endpoint %s to local store: %v", ep.id[0:7], err) + } + + return nil +} + +func (d *driver) DeleteEndpoint(nid, eid string) error { + if err := validateID(nid, eid); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("network id %q not found", nid) + } + + ep := n.endpoint(eid) + if ep == nil { + return fmt.Errorf("endpoint id %q not found", eid) + } + + n.deleteEndpoint(eid) + + if err := d.deleteEndpointFromStore(ep); err != nil { + logrus.Warnf("Failed to delete overlay endpoint %s from local store: %v", ep.id[0:7], err) + } + + _, err := hcsshim.HNSEndpointRequest("DELETE", ep.profileId, "") + if err != nil { + return err + } + + return nil +} + +func (d *driver) EndpointOperInfo(nid, eid string) (map[string]interface{}, error) { + if err := validateID(nid, eid); err != nil { + return nil, err + } + + n := d.network(nid) + if n == nil { + return nil, fmt.Errorf("network id %q not found", nid) + } + + ep := n.endpoint(eid) + if ep == nil { + return nil, fmt.Errorf("endpoint id %q not found", eid) + } + + data := make(map[string]interface{}, 1) + data["hnsid"] = ep.profileId + data["AllowUnqualifiedDNSQuery"] = true + return data, nil +} + +func (d *driver) deleteEndpointFromStore(e *endpoint) error { + if d.localStore == nil { + return fmt.Errorf("overlay local store not initialized, ep not deleted") + } + + if err := d.localStore.DeleteObjectAtomic(e); err != nil { + return err + } + + return nil +} + +func (d *driver) writeEndpointToStore(e *endpoint) error { + if d.localStore == nil { + return fmt.Errorf("overlay local store not initialized, ep not added") + } + + if err := d.localStore.PutObjectAtomic(e); err != nil { + return err + } + return nil +} + +func (ep *endpoint) DataScope() string { + return datastore.LocalScope +} + +func (ep *endpoint) New() datastore.KVObject { + return &endpoint{} +} + +func (ep *endpoint) CopyTo(o datastore.KVObject) error { + dstep := o.(*endpoint) + *dstep = *ep + return nil +} + +func (ep *endpoint) Key() []string { + return []string{overlayEndpointPrefix, ep.id} +} + +func (ep *endpoint) KeyPrefix() []string { + return []string{overlayEndpointPrefix} +} + +func (ep *endpoint) Index() uint64 { + return ep.dbIndex +} + +func (ep *endpoint) SetIndex(index uint64) { + ep.dbIndex = index + ep.dbExists = true +} + +func (ep *endpoint) Exists() bool { + return ep.dbExists +} + +func (ep *endpoint) Skip() bool { + return false +} + +func (ep *endpoint) Value() []byte { + b, err := json.Marshal(ep) + if err != nil { + return nil + } + return b +} + +func (ep *endpoint) SetValue(value []byte) error { + return json.Unmarshal(value, ep) +} + +func (ep *endpoint) MarshalJSON() ([]byte, error) { + epMap := make(map[string]interface{}) + + epMap["id"] = ep.id + epMap["nid"] = ep.nid + epMap["remote"] = ep.remote + if ep.profileId != "" { + epMap["profileId"] = ep.profileId + } + + if ep.addr != nil { + epMap["addr"] = ep.addr.String() + } + if len(ep.mac) != 0 { + epMap["mac"] = ep.mac.String() + } + + return json.Marshal(epMap) +} + +func (ep *endpoint) UnmarshalJSON(value []byte) error { + var ( + err error + epMap map[string]interface{} + ) + + json.Unmarshal(value, &epMap) + + ep.id = epMap["id"].(string) + ep.nid = epMap["nid"].(string) + ep.remote = epMap["remote"].(bool) + if v, ok := epMap["profileId"]; ok { + ep.profileId = v.(string) + } + if v, ok := epMap["mac"]; ok { + if ep.mac, err = net.ParseMAC(v.(string)); err != nil { + return types.InternalErrorf("failed to decode endpoint interface mac address after json unmarshal: %s", v.(string)) + } + } + if v, ok := epMap["addr"]; ok { + if ep.addr, err = types.ParseCIDR(v.(string)); err != nil { + return types.InternalErrorf("failed to decode endpoint interface ipv4 address after json unmarshal: %v", err) + } + } + return nil +} diff --git a/drivers/windows/overlay/ov_network_local_windows.go b/drivers/windows/overlay/ov_network_local_windows.go new file mode 100644 index 00000000..1f27c549 --- /dev/null +++ b/drivers/windows/overlay/ov_network_local_windows.go @@ -0,0 +1,209 @@ +package overlay + +import ( + "encoding/json" + "fmt" + "sync" + + "github.com/Microsoft/hcsshim" + "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/datastore" +) + +const overlayNetworkPrefix = "overlay/network" + +type localNetwork struct { + id string + hnsID string + providerAddress string + dbIndex uint64 + dbExists bool + sync.Mutex +} + +func (d *driver) findHnsNetwork(n *network) error { + ln, err := d.getLocalNetworkFromStore(n.id) + + if err != nil { + return err + } + + if ln == nil { + subnets := []hcsshim.Subnet{} + + for _, s := range n.subnets { + subnet := hcsshim.Subnet{ + AddressPrefix: s.subnetIP.String(), + } + + if s.gwIP != nil { + subnet.GatewayAddress = s.gwIP.IP.String() + } + + vsidPolicy, err := json.Marshal(hcsshim.VsidPolicy{ + Type: "VSID", + VSID: uint(s.vni), + }) + + if err != nil { + return err + } + + subnet.Policies = append(subnet.Policies, vsidPolicy) + subnets = append(subnets, subnet) + } + + network := &hcsshim.HNSNetwork{ + Name: n.name, + Type: d.Type(), + Subnets: subnets, + NetworkAdapterName: n.interfaceName, + } + + configurationb, err := json.Marshal(network) + if err != nil { + return err + } + + configuration := string(configurationb) + logrus.Infof("HNSNetwork Request =%v", configuration) + + hnsresponse, err := hcsshim.HNSNetworkRequest("POST", "", configuration) + if err != nil { + return err + } + + n.hnsId = hnsresponse.Id + n.providerAddress = hnsresponse.ManagementIP + + // Save local host specific info + if err := d.writeLocalNetworkToStore(n); err != nil { + return fmt.Errorf("failed to update data store for network %v: %v", n.id, err) + } + } else { + n.hnsId = ln.hnsID + n.providerAddress = ln.providerAddress + } + + return nil +} + +func (d *driver) getLocalNetworkFromStore(nid string) (*localNetwork, error) { + + if d.localStore == nil { + return nil, fmt.Errorf("overlay local store not initialized, network not found") + } + + n := &localNetwork{id: nid} + if err := d.localStore.GetObject(datastore.Key(n.Key()...), n); err != nil { + return nil, nil + } + + return n, nil +} + +func (d *driver) deleteLocalNetworkFromStore(n *network) error { + if d.localStore == nil { + return fmt.Errorf("overlay local store not initialized, network not deleted") + } + + ln, err := d.getLocalNetworkFromStore(n.id) + + if err != nil { + return err + } + + if err = d.localStore.DeleteObjectAtomic(ln); err != nil { + return err + } + + return nil +} + +func (d *driver) writeLocalNetworkToStore(n *network) error { + if d.localStore == nil { + return fmt.Errorf("overlay local store not initialized, network not added") + } + + ln := &localNetwork{ + id: n.id, + hnsID: n.hnsId, + providerAddress: n.providerAddress, + } + + if err := d.localStore.PutObjectAtomic(ln); err != nil { + return err + } + return nil +} + +func (n *localNetwork) DataScope() string { + return datastore.LocalScope +} + +func (n *localNetwork) New() datastore.KVObject { + return &localNetwork{} +} + +func (n *localNetwork) CopyTo(o datastore.KVObject) error { + dstep := o.(*localNetwork) + *dstep = *n + return nil +} + +func (n *localNetwork) Key() []string { + return []string{overlayNetworkPrefix, n.id} +} + +func (n *localNetwork) KeyPrefix() []string { + return []string{overlayNetworkPrefix} +} + +func (n *localNetwork) Index() uint64 { + return n.dbIndex +} + +func (n *localNetwork) SetIndex(index uint64) { + n.dbIndex = index + n.dbExists = true +} + +func (n *localNetwork) Exists() bool { + return n.dbExists +} + +func (n *localNetwork) Skip() bool { + return false +} + +func (n *localNetwork) Value() []byte { + b, err := json.Marshal(n) + if err != nil { + return nil + } + return b +} + +func (n *localNetwork) SetValue(value []byte) error { + return json.Unmarshal(value, n) +} + +func (n *localNetwork) MarshalJSON() ([]byte, error) { + networkMap := make(map[string]interface{}) + + networkMap["id"] = n.id + networkMap["hnsID"] = n.hnsID + networkMap["providerAddress"] = n.providerAddress + return json.Marshal(networkMap) +} + +func (n *localNetwork) UnmarshalJSON(value []byte) error { + var networkMap map[string]interface{} + + json.Unmarshal(value, &networkMap) + + n.id = networkMap["id"].(string) + n.hnsID = networkMap["hnsID"].(string) + n.providerAddress = networkMap["providerAddress"].(string) + return nil +} diff --git a/drivers/windows/overlay/ov_network_windows.go b/drivers/windows/overlay/ov_network_windows.go new file mode 100644 index 00000000..f89469f7 --- /dev/null +++ b/drivers/windows/overlay/ov_network_windows.go @@ -0,0 +1,512 @@ +package overlay + +import ( + "encoding/json" + "fmt" + "net" + "strconv" + "strings" + "sync" + + "github.com/Microsoft/hcsshim" + "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/datastore" + "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/netlabel" + "github.com/docker/libnetwork/types" +) + +var ( + hostMode bool + networkMu sync.Mutex +) + +type networkTable map[string]*network + +type subnet struct { + vni uint32 + initErr error + subnetIP *net.IPNet + gwIP *net.IPNet +} + +type subnetJSON struct { + SubnetIP string + GwIP string + Vni uint32 +} + +type network struct { + id string + name string + hnsId string + dbIndex uint64 + dbExists bool + providerAddress string + interfaceName string + endpoints endpointTable + driver *driver + initEpoch int + initErr error + subnets []*subnet + secure bool + sync.Mutex +} + +func (d *driver) NetworkAllocate(id string, option map[string]string, ipV4Data, ipV6Data []driverapi.IPAMData) (map[string]string, error) { + return nil, types.NotImplementedErrorf("not implemented") +} + +func (d *driver) NetworkFree(id string) error { + return types.NotImplementedErrorf("not implemented") +} + +func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error { + var ( + networkName string + interfaceName string + ) + + if id == "" { + return fmt.Errorf("invalid network id") + } + + if len(ipV4Data) == 0 || ipV4Data[0].Pool.String() == "0.0.0.0/0" { + return types.BadRequestErrorf("ipv4 pool is empty") + } + + vnis := make([]uint32, 0, len(ipV4Data)) + + // Since we perform lazy configuration make sure we try + // configuring the driver when we enter CreateNetwork + if err := d.configure(); err != nil { + return err + } + + n := &network{ + id: id, + driver: d, + endpoints: endpointTable{}, + subnets: []*subnet{}, + } + + genData, ok := option[netlabel.GenericData].(map[string]string) + + if !ok { + return fmt.Errorf("Unknown generic data option") + } + + for label, value := range genData { + switch label { + case "com.docker.network.windowsshim.networkname": + networkName = value + case "com.docker.network.windowsshim.interface": + interfaceName = value + case "com.docker.network.windowsshim.hnsid": + n.hnsId = value + case netlabel.OverlayVxlanIDList: + vniStrings := strings.Split(value, ",") + for _, vniStr := range vniStrings { + vni, err := strconv.Atoi(vniStr) + if err != nil { + return fmt.Errorf("invalid vxlan id value %q passed", vniStr) + } + + vnis = append(vnis, uint32(vni)) + } + } + } + + // If we are getting vnis from libnetwork, either we get for + // all subnets or none. + if len(vnis) != 0 && len(vnis) < len(ipV4Data) { + return fmt.Errorf("insufficient vnis(%d) passed to overlay", len(vnis)) + } + + for i, ipd := range ipV4Data { + s := &subnet{ + subnetIP: ipd.Pool, + gwIP: ipd.Gateway, + } + + if len(vnis) != 0 { + s.vni = vnis[i] + } + + n.subnets = append(n.subnets, s) + } + + n.name = networkName + if n.name == "" { + n.name = id + } + + n.interfaceName = interfaceName + + if err := n.writeToStore(); err != nil { + return fmt.Errorf("failed to update data store for network %v: %v", n.id, err) + } + + if nInfo != nil { + if err := nInfo.TableEventRegister(ovPeerTable); err != nil { + return err + } + } + + d.addNetwork(n) + + err := d.findHnsNetwork(n) + genData["com.docker.network.windowsshim.hnsid"] = n.hnsId + + return err +} + +func (d *driver) DeleteNetwork(nid string) error { + if nid == "" { + return fmt.Errorf("invalid network id") + } + + // Make sure driver resources are initialized before proceeding + if err := d.configure(); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("could not find network with id %s", nid) + } + + _, err := hcsshim.HNSNetworkRequest("DELETE", n.hnsId, "") + if err != nil { + return err + } + + d.deleteNetwork(nid) + d.deleteLocalNetworkFromStore(n) + + return nil +} + +func (d *driver) ProgramExternalConnectivity(nid, eid string, options map[string]interface{}) error { + return nil +} + +func (d *driver) RevokeExternalConnectivity(nid, eid string) error { + return nil +} + +func (d *driver) addNetwork(n *network) { + d.Lock() + d.networks[n.id] = n + d.Unlock() +} + +func (d *driver) deleteNetwork(nid string) { + d.Lock() + delete(d.networks, nid) + d.Unlock() +} + +func (d *driver) network(nid string) *network { + d.Lock() + networks := d.networks + d.Unlock() + + n, ok := networks[nid] + if !ok { + n = d.getNetworkFromStore(nid) + if n != nil { + n.driver = d + n.endpoints = endpointTable{} + networks[nid] = n + } + } + + return n +} + +func (d *driver) getNetworkFromStore(nid string) *network { + if d.store == nil { + return nil + } + + n := &network{id: nid} + if err := d.store.GetObject(datastore.Key(n.Key()...), n); err != nil { + return nil + } + + // As the network is being discovered from the global store, HNS may not be aware of it yet + err := d.findHnsNetwork(n) + if err != nil { + logrus.Errorf("Failed to find hns network: %v", err) + return nil + } + + return n +} + +func (n *network) vxlanID(s *subnet) uint32 { + n.Lock() + defer n.Unlock() + + return s.vni +} + +func (n *network) setVxlanID(s *subnet, vni uint32) { + n.Lock() + s.vni = vni + n.Unlock() +} + +func (n *network) Key() []string { + return []string{"overlay", "network", n.id} +} + +func (n *network) KeyPrefix() []string { + return []string{"overlay", "network"} +} + +func (n *network) Value() []byte { + m := map[string]interface{}{} + + netJSON := []*subnetJSON{} + + for _, s := range n.subnets { + sj := &subnetJSON{ + SubnetIP: s.subnetIP.String(), + GwIP: s.gwIP.String(), + Vni: s.vni, + } + netJSON = append(netJSON, sj) + } + + b, err := json.Marshal(netJSON) + if err != nil { + return []byte{} + } + + m["secure"] = n.secure + m["subnets"] = netJSON + m["interfaceName"] = n.interfaceName + m["providerAddress"] = n.providerAddress + m["hnsId"] = n.hnsId + m["name"] = n.name + b, err = json.Marshal(m) + if err != nil { + return []byte{} + } + + return b +} + +func (n *network) Index() uint64 { + return n.dbIndex +} + +func (n *network) SetIndex(index uint64) { + n.dbIndex = index + n.dbExists = true +} + +func (n *network) Exists() bool { + return n.dbExists +} + +func (n *network) Skip() bool { + return false +} + +func (n *network) SetValue(value []byte) error { + var ( + m map[string]interface{} + newNet bool + isMap = true + netJSON = []*subnetJSON{} + ) + + if err := json.Unmarshal(value, &m); err != nil { + err := json.Unmarshal(value, &netJSON) + if err != nil { + return err + } + isMap = false + } + + if len(n.subnets) == 0 { + newNet = true + } + + if isMap { + if val, ok := m["secure"]; ok { + n.secure = val.(bool) + } + if val, ok := m["providerAddress"]; ok { + n.providerAddress = val.(string) + } + if val, ok := m["interfaceName"]; ok { + n.interfaceName = val.(string) + } + if val, ok := m["hnsId"]; ok { + n.hnsId = val.(string) + } + if val, ok := m["name"]; ok { + n.name = val.(string) + } + bytes, err := json.Marshal(m["subnets"]) + if err != nil { + return err + } + if err := json.Unmarshal(bytes, &netJSON); err != nil { + return err + } + } + + for _, sj := range netJSON { + subnetIPstr := sj.SubnetIP + gwIPstr := sj.GwIP + vni := sj.Vni + + subnetIP, _ := types.ParseCIDR(subnetIPstr) + gwIP, _ := types.ParseCIDR(gwIPstr) + + if newNet { + s := &subnet{ + subnetIP: subnetIP, + gwIP: gwIP, + vni: vni, + } + n.subnets = append(n.subnets, s) + } else { + sNet := n.getMatchingSubnet(subnetIP) + if sNet != nil { + sNet.vni = vni + } + } + } + return nil +} + +func (n *network) DataScope() string { + return datastore.GlobalScope +} + +func (n *network) writeToStore() error { + if n.driver.store == nil { + return nil + } + + return n.driver.store.PutObjectAtomic(n) +} + +func (n *network) releaseVxlanID() ([]uint32, error) { + if len(n.subnets) == 0 { + return nil, nil + } + + if n.driver.store != nil { + if err := n.driver.store.DeleteObjectAtomic(n); err != nil { + if err == datastore.ErrKeyModified || err == datastore.ErrKeyNotFound { + // In both the above cases we can safely assume that the key has been removed by some other + // instance and so simply get out of here + return nil, nil + } + + return nil, fmt.Errorf("failed to delete network to vxlan id map: %v", err) + } + } + var vnis []uint32 + for _, s := range n.subnets { + if n.driver.vxlanIdm != nil { + vni := n.vxlanID(s) + vnis = append(vnis, vni) + n.driver.vxlanIdm.Release(uint64(vni)) + } + + n.setVxlanID(s, 0) + } + + return vnis, nil +} + +func (n *network) obtainVxlanID(s *subnet) error { + //return if the subnet already has a vxlan id assigned + if s.vni != 0 { + return nil + } + + if n.driver.store == nil { + return fmt.Errorf("no valid vxlan id and no datastore configured, cannot obtain vxlan id") + } + + for { + if err := n.driver.store.GetObject(datastore.Key(n.Key()...), n); err != nil { + return fmt.Errorf("getting network %q from datastore failed %v", n.id, err) + } + + if s.vni == 0 { + vxlanID, err := n.driver.vxlanIdm.GetID() + if err != nil { + return fmt.Errorf("failed to allocate vxlan id: %v", err) + } + + n.setVxlanID(s, uint32(vxlanID)) + if err := n.writeToStore(); err != nil { + n.driver.vxlanIdm.Release(uint64(n.vxlanID(s))) + n.setVxlanID(s, 0) + if err == datastore.ErrKeyModified { + continue + } + return fmt.Errorf("network %q failed to update data store: %v", n.id, err) + } + return nil + } + return nil + } +} + +// contains return true if the passed ip belongs to one the network's +// subnets +func (n *network) contains(ip net.IP) bool { + for _, s := range n.subnets { + if s.subnetIP.Contains(ip) { + return true + } + } + + return false +} + +// getSubnetforIP returns the subnet to which the given IP belongs +func (n *network) getSubnetforIP(ip *net.IPNet) *subnet { + for _, s := range n.subnets { + // first check if the mask lengths are the same + i, _ := s.subnetIP.Mask.Size() + j, _ := ip.Mask.Size() + if i != j { + continue + } + if s.subnetIP.Contains(ip.IP) { + return s + } + } + return nil +} + +// getMatchingSubnet return the network's subnet that matches the input +func (n *network) getMatchingSubnet(ip *net.IPNet) *subnet { + if ip == nil { + return nil + } + for _, s := range n.subnets { + // first check if the mask lengths are the same + i, _ := s.subnetIP.Mask.Size() + j, _ := ip.Mask.Size() + if i != j { + continue + } + if s.subnetIP.IP.Equal(ip.IP) { + return s + } + } + return nil +} diff --git a/drivers/windows/overlay/ov_serf_windows.go b/drivers/windows/overlay/ov_serf_windows.go new file mode 100644 index 00000000..78a703c3 --- /dev/null +++ b/drivers/windows/overlay/ov_serf_windows.go @@ -0,0 +1,179 @@ +package overlay + +import ( + "fmt" + "net" + "strings" + "time" + + "github.com/Sirupsen/logrus" + "github.com/hashicorp/serf/serf" +) + +type ovNotify struct { + action string + ep *endpoint + nw *network +} + +type logWriter struct{} + +func (l *logWriter) Write(p []byte) (int, error) { + str := string(p) + + switch { + case strings.Contains(str, "[WARN]"): + logrus.Warn(str) + case strings.Contains(str, "[DEBUG]"): + logrus.Debug(str) + case strings.Contains(str, "[INFO]"): + logrus.Info(str) + case strings.Contains(str, "[ERR]"): + logrus.Error(str) + } + + return len(p), nil +} + +func (d *driver) serfInit() error { + var err error + + config := serf.DefaultConfig() + config.Init() + config.MemberlistConfig.BindAddr = d.bindAddress + + d.eventCh = make(chan serf.Event, 4) + config.EventCh = d.eventCh + config.UserCoalescePeriod = 1 * time.Second + config.UserQuiescentPeriod = 50 * time.Millisecond + + config.LogOutput = &logWriter{} + config.MemberlistConfig.LogOutput = config.LogOutput + + s, err := serf.Create(config) + if err != nil { + return fmt.Errorf("failed to create cluster node: %v", err) + } + defer func() { + if err != nil { + s.Shutdown() + } + }() + + d.serfInstance = s + + d.notifyCh = make(chan ovNotify) + d.exitCh = make(chan chan struct{}) + + go d.startSerfLoop(d.eventCh, d.notifyCh, d.exitCh) + return nil +} + +func (d *driver) serfJoin(neighIP string) error { + if neighIP == "" { + return fmt.Errorf("no neighbor to join") + } + if _, err := d.serfInstance.Join([]string{neighIP}, false); err != nil { + return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v", + neighIP, err) + } + return nil +} + +func (d *driver) notifyEvent(event ovNotify) { + ep := event.ep + + ePayload := fmt.Sprintf("%s %s %s %s", event.action, ep.addr.IP.String(), + net.IP(ep.addr.Mask).String(), ep.mac.String()) + eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(), + event.nw.id, ep.id) + + if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil { + logrus.Errorf("Sending user event failed: %v\n", err) + } +} + +func (d *driver) processEvent(u serf.UserEvent) { + logrus.Debugf("Received user event name:%s, payload:%s\n", u.Name, + string(u.Payload)) + + var dummy, action, vtepStr, nid, eid, ipStr, maskStr, macStr string + if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil { + fmt.Printf("Failed to scan name string: %v\n", err) + } + + if _, err := fmt.Sscan(string(u.Payload), &action, + &ipStr, &maskStr, &macStr); err != nil { + fmt.Printf("Failed to scan value string: %v\n", err) + } + + logrus.Debugf("Parsed data = %s/%s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, maskStr, macStr) + + mac, err := net.ParseMAC(macStr) + if err != nil { + logrus.Errorf("Failed to parse mac: %v\n", err) + } + + if d.serfInstance.LocalMember().Addr.String() == vtepStr { + return + } + + switch action { + case "join": + if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, + net.ParseIP(vtepStr), true); err != nil { + logrus.Errorf("Peer add failed in the driver: %v\n", err) + } + case "leave": + if err := d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, + net.ParseIP(vtepStr), true); err != nil { + logrus.Errorf("Peer delete failed in the driver: %v\n", err) + } + } +} + +func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify, + exitCh chan chan struct{}) { + + for { + select { + case notify, ok := <-notifyCh: + if !ok { + break + } + + d.notifyEvent(notify) + case ch, ok := <-exitCh: + if !ok { + break + } + + if err := d.serfInstance.Leave(); err != nil { + logrus.Errorf("failed leaving the cluster: %v\n", err) + } + + d.serfInstance.Shutdown() + close(ch) + return + case e, ok := <-eventCh: + if !ok { + break + } + u, ok := e.(serf.UserEvent) + if !ok { + break + } + d.processEvent(u) + } + } +} + +func (d *driver) isSerfAlive() bool { + d.Lock() + serfInstance := d.serfInstance + d.Unlock() + if serfInstance == nil || serfInstance.State() != serf.SerfAlive { + return false + } + return true +} diff --git a/drivers/windows/overlay/overlay.pb.go b/drivers/windows/overlay/overlay.pb.go new file mode 100644 index 00000000..cfa0eeea --- /dev/null +++ b/drivers/windows/overlay/overlay.pb.go @@ -0,0 +1,468 @@ +// Code generated by protoc-gen-gogo. +// source: overlay.proto +// DO NOT EDIT! + +/* + Package overlay is a generated protocol buffer package. + + It is generated from these files: + overlay.proto + + It has these top-level messages: + PeerRecord +*/ +package overlay + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" +import _ "github.com/gogo/protobuf/gogoproto" + +import strings "strings" +import github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto" +import sort "sort" +import strconv "strconv" +import reflect "reflect" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +const _ = proto.GoGoProtoPackageIsVersion1 + +// PeerRecord defines the information corresponding to a peer +// container in the overlay network. +type PeerRecord struct { + // Endpoint IP is the IP of the container attachment on the + // given overlay network. + EndpointIP string `protobuf:"bytes,1,opt,name=endpoint_ip,json=endpointIp,proto3" json:"endpoint_ip,omitempty"` + // Endpoint MAC is the mac address of the container attachment + // on the given overlay network. + EndpointMAC string `protobuf:"bytes,2,opt,name=endpoint_mac,json=endpointMac,proto3" json:"endpoint_mac,omitempty"` + // Tunnel Endpoint IP defines the host IP for the host in + // which this container is running and can be reached by + // building a tunnel to that host IP. + TunnelEndpointIP string `protobuf:"bytes,3,opt,name=tunnel_endpoint_ip,json=tunnelEndpointIp,proto3" json:"tunnel_endpoint_ip,omitempty"` +} + +func (m *PeerRecord) Reset() { *m = PeerRecord{} } +func (*PeerRecord) ProtoMessage() {} +func (*PeerRecord) Descriptor() ([]byte, []int) { return fileDescriptorOverlay, []int{0} } + +func init() { + proto.RegisterType((*PeerRecord)(nil), "overlay.PeerRecord") +} +func (this *PeerRecord) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&overlay.PeerRecord{") + s = append(s, "EndpointIP: "+fmt.Sprintf("%#v", this.EndpointIP)+",\n") + s = append(s, "EndpointMAC: "+fmt.Sprintf("%#v", this.EndpointMAC)+",\n") + s = append(s, "TunnelEndpointIP: "+fmt.Sprintf("%#v", this.TunnelEndpointIP)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringOverlay(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func extensionToGoStringOverlay(e map[int32]github_com_gogo_protobuf_proto.Extension) string { + if e == nil { + return "nil" + } + s := "map[int32]proto.Extension{" + keys := make([]int, 0, len(e)) + for k := range e { + keys = append(keys, int(k)) + } + sort.Ints(keys) + ss := []string{} + for _, k := range keys { + ss = append(ss, strconv.Itoa(k)+": "+e[int32(k)].GoString()) + } + s += strings.Join(ss, ",") + "}" + return s +} +func (m *PeerRecord) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *PeerRecord) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.EndpointIP) > 0 { + data[i] = 0xa + i++ + i = encodeVarintOverlay(data, i, uint64(len(m.EndpointIP))) + i += copy(data[i:], m.EndpointIP) + } + if len(m.EndpointMAC) > 0 { + data[i] = 0x12 + i++ + i = encodeVarintOverlay(data, i, uint64(len(m.EndpointMAC))) + i += copy(data[i:], m.EndpointMAC) + } + if len(m.TunnelEndpointIP) > 0 { + data[i] = 0x1a + i++ + i = encodeVarintOverlay(data, i, uint64(len(m.TunnelEndpointIP))) + i += copy(data[i:], m.TunnelEndpointIP) + } + return i, nil +} + +func encodeFixed64Overlay(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Overlay(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintOverlay(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (m *PeerRecord) Size() (n int) { + var l int + _ = l + l = len(m.EndpointIP) + if l > 0 { + n += 1 + l + sovOverlay(uint64(l)) + } + l = len(m.EndpointMAC) + if l > 0 { + n += 1 + l + sovOverlay(uint64(l)) + } + l = len(m.TunnelEndpointIP) + if l > 0 { + n += 1 + l + sovOverlay(uint64(l)) + } + return n +} + +func sovOverlay(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozOverlay(x uint64) (n int) { + return sovOverlay(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *PeerRecord) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PeerRecord{`, + `EndpointIP:` + fmt.Sprintf("%v", this.EndpointIP) + `,`, + `EndpointMAC:` + fmt.Sprintf("%v", this.EndpointMAC) + `,`, + `TunnelEndpointIP:` + fmt.Sprintf("%v", this.TunnelEndpointIP) + `,`, + `}`, + }, "") + return s +} +func valueToStringOverlay(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *PeerRecord) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowOverlay + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PeerRecord: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PeerRecord: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field EndpointIP", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowOverlay + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthOverlay + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.EndpointIP = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field EndpointMAC", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowOverlay + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthOverlay + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.EndpointMAC = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TunnelEndpointIP", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowOverlay + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthOverlay + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TunnelEndpointIP = string(data[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipOverlay(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthOverlay + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipOverlay(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowOverlay + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowOverlay + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowOverlay + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthOverlay + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowOverlay + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipOverlay(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthOverlay = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowOverlay = fmt.Errorf("proto: integer overflow") +) + +var fileDescriptorOverlay = []byte{ + // 195 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xcd, 0x2f, 0x4b, 0x2d, + 0xca, 0x49, 0xac, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x87, 0x72, 0xa5, 0x44, 0xd2, + 0xf3, 0xd3, 0xf3, 0xc1, 0x62, 0xfa, 0x20, 0x16, 0x44, 0x5a, 0x69, 0x2b, 0x23, 0x17, 0x57, 0x40, + 0x6a, 0x6a, 0x51, 0x50, 0x6a, 0x72, 0x7e, 0x51, 0x8a, 0x90, 0x3e, 0x17, 0x77, 0x6a, 0x5e, 0x4a, + 0x41, 0x7e, 0x66, 0x5e, 0x49, 0x7c, 0x66, 0x81, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0xa7, 0x13, 0xdf, + 0xa3, 0x7b, 0xf2, 0x5c, 0xae, 0x50, 0x61, 0xcf, 0x80, 0x20, 0x2e, 0x98, 0x12, 0xcf, 0x02, 0x21, + 0x23, 0x2e, 0x1e, 0xb8, 0x86, 0xdc, 0xc4, 0x64, 0x09, 0x26, 0xb0, 0x0e, 0x7e, 0xa0, 0x0e, 0x6e, + 0x98, 0x0e, 0x5f, 0x47, 0xe7, 0x20, 0xb8, 0xa9, 0xbe, 0x89, 0xc9, 0x42, 0x4e, 0x5c, 0x42, 0x25, + 0xa5, 0x79, 0x79, 0xa9, 0x39, 0xf1, 0xc8, 0x76, 0x31, 0x83, 0x75, 0x8a, 0x00, 0x75, 0x0a, 0x84, + 0x80, 0x65, 0x91, 0x6c, 0x14, 0x28, 0x41, 0x15, 0x29, 0x70, 0x92, 0xb8, 0xf1, 0x50, 0x8e, 0xe1, + 0xc3, 0x43, 0x39, 0xc6, 0x86, 0x47, 0x72, 0x8c, 0x27, 0x80, 0xf8, 0x02, 0x10, 0x3f, 0x00, 0xe2, + 0x24, 0x36, 0xb0, 0xc7, 0x8c, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0xbf, 0xd7, 0x7d, 0x7d, 0x08, + 0x01, 0x00, 0x00, +} diff --git a/drivers/windows/overlay/overlay.proto b/drivers/windows/overlay/overlay.proto new file mode 100644 index 00000000..45b8c9de --- /dev/null +++ b/drivers/windows/overlay/overlay.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +import "gogoproto/gogo.proto"; + +package overlay; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.stringer_all) = true; +option (gogoproto.gostring_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.goproto_stringer_all) = false; + +// PeerRecord defines the information corresponding to a peer +// container in the overlay network. +message PeerRecord { + // Endpoint IP is the IP of the container attachment on the + // given overlay network. + string endpoint_ip = 1 [(gogoproto.customname) = "EndpointIP"]; + // Endpoint MAC is the mac address of the container attachment + // on the given overlay network. + string endpoint_mac = 2 [(gogoproto.customname) = "EndpointMAC"]; + // Tunnel Endpoint IP defines the host IP for the host in + // which this container is running and can be reached by + // building a tunnel to that host IP. + string tunnel_endpoint_ip = 3 [(gogoproto.customname) = "TunnelEndpointIP"]; +} \ No newline at end of file diff --git a/drivers/windows/overlay/overlay_windows.go b/drivers/windows/overlay/overlay_windows.go new file mode 100644 index 00000000..7ebc16ee --- /dev/null +++ b/drivers/windows/overlay/overlay_windows.go @@ -0,0 +1,297 @@ +package overlay + +//go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto + +import ( + "fmt" + "net" + "sync" + + "github.com/Microsoft/hcsshim" + "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/datastore" + "github.com/docker/libnetwork/discoverapi" + "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/idm" + "github.com/docker/libnetwork/netlabel" + "github.com/docker/libnetwork/types" + "github.com/hashicorp/serf/serf" +) + +const ( + networkType = "overlay" + vethPrefix = "veth" + vethLen = 7 + vxlanIDStart = 4096 + vxlanIDEnd = (1 << 24) - 1 + vxlanPort = 4789 + vxlanEncap = 50 + secureOption = "encrypted" +) + +var initVxlanIdm = make(chan (bool), 1) + +type driver struct { + eventCh chan serf.Event + notifyCh chan ovNotify + exitCh chan chan struct{} + bindAddress string + advertiseAddress string + neighIP string + config map[string]interface{} + serfInstance *serf.Serf + networks networkTable + store datastore.DataStore + localStore datastore.DataStore + vxlanIdm *idm.Idm + once sync.Once + joinOnce sync.Once + sync.Mutex +} + +// Init registers a new instance of overlay driver +func Init(dc driverapi.DriverCallback, config map[string]interface{}) error { + c := driverapi.Capability{ + DataScope: datastore.GlobalScope, + } + + d := &driver{ + networks: networkTable{}, + config: config, + } + + if data, ok := config[netlabel.GlobalKVClient]; ok { + var err error + dsc, ok := data.(discoverapi.DatastoreConfigData) + if !ok { + return types.InternalErrorf("incorrect data in datastore configuration: %v", data) + } + d.store, err = datastore.NewDataStoreFromConfig(dsc) + if err != nil { + return types.InternalErrorf("failed to initialize data store: %v", err) + } + } + + if data, ok := config[netlabel.LocalKVClient]; ok { + var err error + dsc, ok := data.(discoverapi.DatastoreConfigData) + if !ok { + return types.InternalErrorf("incorrect data in datastore configuration: %v", data) + } + d.localStore, err = datastore.NewDataStoreFromConfig(dsc) + if err != nil { + return types.InternalErrorf("failed to initialize local data store: %v", err) + } + } + + d.restoreEndpoints() + + return dc.RegisterDriver(networkType, d, c) +} + +// Endpoints are stored in the local store. Restore them and reconstruct the overlay sandbox +func (d *driver) restoreEndpoints() error { + if d.localStore == nil { + logrus.Warnf("Cannot restore overlay endpoints because local datastore is missing") + return nil + } + kvol, err := d.localStore.List(datastore.Key(overlayEndpointPrefix), &endpoint{}) + if err != nil && err != datastore.ErrKeyNotFound { + return fmt.Errorf("failed to read overlay endpoint from store: %v", err) + } + + if err == datastore.ErrKeyNotFound { + return nil + } + + for _, kvo := range kvol { + ep := kvo.(*endpoint) + + n := d.network(ep.nid) + if n == nil || ep.remote { + if !ep.remote { + logrus.Debugf("Network (%s) not found for restored endpoint (%s)", ep.nid[0:7], ep.id[0:7]) + logrus.Debugf("Deleting stale overlay endpoint (%s) from store", ep.id[0:7]) + } + + hcsshim.HNSEndpointRequest("DELETE", ep.profileId, "") + + if err := d.deleteEndpointFromStore(ep); err != nil { + logrus.Debugf("Failed to delete stale overlay endpoint (%s) from store", ep.id[0:7]) + } + + continue + } + + n.addEndpoint(ep) + } + + return nil +} + +// Fini cleans up the driver resources +func Fini(drv driverapi.Driver) { + d := drv.(*driver) + + if d.exitCh != nil { + waitCh := make(chan struct{}) + + d.exitCh <- waitCh + + <-waitCh + } +} + +func (d *driver) configure() error { + if d.store == nil { + return nil + } + + if d.vxlanIdm == nil { + return d.initializeVxlanIdm() + } + + return nil +} + +func (d *driver) initializeVxlanIdm() error { + var err error + + initVxlanIdm <- true + defer func() { <-initVxlanIdm }() + + if d.vxlanIdm != nil { + return nil + } + + d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd) + if err != nil { + return fmt.Errorf("failed to initialize vxlan id manager: %v", err) + } + + return nil +} + +func (d *driver) Type() string { + return networkType +} + +func validateSelf(node string) error { + advIP := net.ParseIP(node) + if advIP == nil { + return fmt.Errorf("invalid self address (%s)", node) + } + + addrs, err := net.InterfaceAddrs() + if err != nil { + return fmt.Errorf("Unable to get interface addresses %v", err) + } + for _, addr := range addrs { + ip, _, err := net.ParseCIDR(addr.String()) + if err == nil && ip.Equal(advIP) { + return nil + } + } + return fmt.Errorf("Multi-Host overlay networking requires cluster-advertise(%s) to be configured with a local ip-address that is reachable within the cluster", advIP.String()) +} + +func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) { + if self && !d.isSerfAlive() { + if err := validateSelf(advertiseAddress); err != nil { + logrus.Errorf("%s", err.Error()) + } + + d.Lock() + d.advertiseAddress = advertiseAddress + d.bindAddress = bindAddress + d.Unlock() + + // If there is no cluster store there is no need to start serf. + if d.store != nil { + err := d.serfInit() + if err != nil { + logrus.Errorf("initializing serf instance failed: %v", err) + return + } + } + } + + d.Lock() + if !self { + d.neighIP = advertiseAddress + } + neighIP := d.neighIP + d.Unlock() + + if d.serfInstance != nil && neighIP != "" { + var err error + d.joinOnce.Do(func() { + err = d.serfJoin(neighIP) + if err == nil { + d.pushLocalDb() + } + }) + if err != nil { + logrus.Errorf("joining serf neighbor %s failed: %v", advertiseAddress, err) + d.Lock() + d.joinOnce = sync.Once{} + d.Unlock() + return + } + } +} + +func (d *driver) pushLocalEndpointEvent(action, nid, eid string) { + n := d.network(nid) + if n == nil { + logrus.Debugf("Error pushing local endpoint event for network %s", nid) + return + } + ep := n.endpoint(eid) + if ep == nil { + logrus.Debugf("Error pushing local endpoint event for ep %s / %s", nid, eid) + return + } + + if !d.isSerfAlive() { + return + } + d.notifyCh <- ovNotify{ + action: action, + nw: n, + ep: ep, + } +} + +// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster +func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error { + + var err error + switch dType { + case discoverapi.NodeDiscovery: + nodeData, ok := data.(discoverapi.NodeDiscoveryData) + if !ok || nodeData.Address == "" { + return fmt.Errorf("invalid discovery data") + } + d.nodeJoin(nodeData.Address, nodeData.BindAddress, nodeData.Self) + case discoverapi.DatastoreConfig: + if d.store != nil { + return types.ForbiddenErrorf("cannot accept datastore configuration: Overlay driver has a datastore configured already") + } + dsc, ok := data.(discoverapi.DatastoreConfigData) + if !ok { + return types.InternalErrorf("incorrect data in datastore configuration: %v", data) + } + d.store, err = datastore.NewDataStoreFromConfig(dsc) + if err != nil { + return types.InternalErrorf("failed to initialize data store: %v", err) + } + default: + } + return nil +} + +// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster +func (d *driver) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error { + return nil +} diff --git a/drivers/windows/overlay/peerdb_windows.go b/drivers/windows/overlay/peerdb_windows.go new file mode 100644 index 00000000..4937539d --- /dev/null +++ b/drivers/windows/overlay/peerdb_windows.go @@ -0,0 +1,154 @@ +package overlay + +import ( + "fmt" + "net" + + "encoding/json" + + log "github.com/Sirupsen/logrus" + + "github.com/Microsoft/hcsshim" + "github.com/docker/libnetwork/types" +) + +const ovPeerTable = "overlay_peer_table" + +func (d *driver) pushLocalDb() { + if !d.isSerfAlive() { + return + } + + d.Lock() + networks := d.networks + d.Unlock() + + for _, n := range networks { + n.Lock() + endpoints := n.endpoints + n.Unlock() + + for _, ep := range endpoints { + if !ep.remote { + d.notifyCh <- ovNotify{ + action: "join", + nw: n, + ep: ep, + } + + } + } + } +} + +func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, + peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error { + + log.Debugf("WINOVERLAY: Enter peerAdd for ca ip %s with ca mac %s", peerIP.String(), peerMac.String()) + + if err := validateID(nid, eid); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return nil + } + + if updateDb { + log.Info("WINOVERLAY: peerAdd: notifying HNS of the REMOTE endpoint") + + hnsEndpoint := &hcsshim.HNSEndpoint{ + VirtualNetwork: n.hnsId, + MacAddress: peerMac.String(), + IPAddress: peerIP, + IsRemoteEndpoint: true, + } + + paPolicy, err := json.Marshal(hcsshim.PaPolicy{ + Type: "PA", + PA: vtep.String(), + }) + + if err != nil { + return err + } + + hnsEndpoint.Policies = append(hnsEndpoint.Policies, paPolicy) + + configurationb, err := json.Marshal(hnsEndpoint) + if err != nil { + return err + } + + // Temp: We have to create a endpoint object to keep track of the HNS ID for + // this endpoint so that we can retrieve it later when the endpoint is deleted. + // This seems unnecessary when we already have dockers EID. See if we can pass + // the global EID to HNS to use as it's ID, rather than having each HNS assign + // it's own local ID for the endpoint + + addr, err := types.ParseCIDR(peerIP.String() + "/32") + if err != nil { + return err + } + + n.removeEndpointWithAddress(addr) + + hnsresponse, err := hcsshim.HNSEndpointRequest("POST", "", string(configurationb)) + if err != nil { + return err + } + + ep := &endpoint{ + id: eid, + nid: nid, + addr: addr, + mac: peerMac, + profileId: hnsresponse.Id, + remote: true, + } + + n.addEndpoint(ep) + + if err := d.writeEndpointToStore(ep); err != nil { + return fmt.Errorf("failed to update overlay endpoint %s to local store: %v", ep.id[0:7], err) + } + } + + return nil +} + +func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, + peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error { + + log.Infof("WINOVERLAY: Enter peerDelete for endpoint %s and peer ip %s", eid, peerIP.String()) + + if err := validateID(nid, eid); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return nil + } + + ep := n.endpoint(eid) + if ep == nil { + return fmt.Errorf("could not find endpoint with id %s", eid) + } + + if updateDb { + _, err := hcsshim.HNSEndpointRequest("DELETE", ep.profileId, "") + if err != nil { + return err + } + + n.deleteEndpoint(eid) + + if err := d.deleteEndpointFromStore(ep); err != nil { + log.Debugf("Failed to delete stale overlay endpoint (%s) from store", ep.id[0:7]) + } + } + + return nil +} diff --git a/drivers/windows/windows.go b/drivers/windows/windows.go index ed6f7c2f..01658141 100644 --- a/drivers/windows/windows.go +++ b/drivers/windows/windows.go @@ -75,7 +75,8 @@ type driver struct { sync.Mutex } -func isValidNetworkType(networkType string) bool { +// IsBuiltinWindowsDriver vaidates if network-type is a builtin local-scoped driver +func IsBuiltinLocalDriver(networkType string) bool { if "l2bridge" == networkType || "l2tunnel" == networkType || "nat" == networkType || "ics" == networkType || "transparent" == networkType { return true } @@ -91,7 +92,7 @@ func newDriver(networkType string) *driver { // GetInit returns an initializer for the given network type func GetInit(networkType string) func(dc driverapi.DriverCallback, config map[string]interface{}) error { return func(dc driverapi.DriverCallback, config map[string]interface{}) error { - if !isValidNetworkType(networkType) { + if !IsBuiltinLocalDriver(networkType) { return types.BadRequestErrorf("Network type not supported: %s", networkType) } diff --git a/drivers_windows.go b/drivers_windows.go index 1e44b626..b5058d6c 100644 --- a/drivers_windows.go +++ b/drivers_windows.go @@ -3,11 +3,13 @@ package libnetwork import ( "github.com/docker/libnetwork/drivers/null" "github.com/docker/libnetwork/drivers/windows" + "github.com/docker/libnetwork/drivers/windows/overlay" ) func getInitializers() []initializer { return []initializer{ {null.Init, "null"}, + {overlay.Init, "overlay"}, {windows.GetInit("transparent"), "transparent"}, {windows.GetInit("l2bridge"), "l2bridge"}, {windows.GetInit("l2tunnel"), "l2tunnel"}, diff --git a/ipams/builtin/builtin_windows.go b/ipams/builtin/builtin_windows.go index d24f5e63..155c0468 100644 --- a/ipams/builtin/builtin_windows.go +++ b/ipams/builtin/builtin_windows.go @@ -3,14 +3,55 @@ package builtin import ( + "fmt" + + "github.com/docker/libnetwork/datastore" + "github.com/docker/libnetwork/ipam" "github.com/docker/libnetwork/ipamapi" + "github.com/docker/libnetwork/ipamutils" windowsipam "github.com/docker/libnetwork/ipams/windowsipam" ) +// InitDockerDefault registers the built-in ipam service with libnetwork +func InitDockerDefault(ic ipamapi.Callback, l, g interface{}) error { + var ( + ok bool + localDs, globalDs datastore.DataStore + ) + + if l != nil { + if localDs, ok = l.(datastore.DataStore); !ok { + return fmt.Errorf("incorrect local datastore passed to built-in ipam init") + } + } + + if g != nil { + if globalDs, ok = g.(datastore.DataStore); !ok { + return fmt.Errorf("incorrect global datastore passed to built-in ipam init") + } + } + + ipamutils.InitNetworks() + + a, err := ipam.NewAllocator(localDs, globalDs) + if err != nil { + return err + } + + cps := &ipamapi.Capability{RequiresRequestReplay: true} + + return ic.RegisterIpamDriverWithCapabilities(ipamapi.DefaultIPAM, a, cps) +} + // Init registers the built-in ipam service with libnetwork func Init(ic ipamapi.Callback, l, g interface{}) error { - initFunc := windowsipam.GetInit(ipamapi.DefaultIPAM) + initFunc := windowsipam.GetInit(windowsipam.DefaultIPAM) + + err := InitDockerDefault(ic, l, g) + if err != nil { + return err + } return initFunc(ic, l, g) } diff --git a/ipams/windowsipam/windowsipam.go b/ipams/windowsipam/windowsipam.go index 661fd6a4..5fcd7255 100644 --- a/ipams/windowsipam/windowsipam.go +++ b/ipams/windowsipam/windowsipam.go @@ -15,6 +15,9 @@ const ( globalAddressSpace = "GlobalDefault" ) +// DefaultIPAM defines the default ipam-driver for local-scoped windows networks +const DefaultIPAM = "windows" + var ( defaultPool, _ = types.ParseCIDR("0.0.0.0/0") ) diff --git a/netutils/utils_windows.go b/netutils/utils_windows.go index b6e79c75..73af44ec 100644 --- a/netutils/utils_windows.go +++ b/netutils/utils_windows.go @@ -18,6 +18,8 @@ func ElectInterfaceAddresses(name string) ([]*net.IPNet, []*net.IPNet, error) { // FindAvailableNetwork returns a network from the passed list which does not // overlap with existing interfaces in the system + +// TODO : Use appropriate windows APIs to identify non-overlapping subnets func FindAvailableNetwork(list []*net.IPNet) (*net.IPNet, error) { - return nil, types.NotImplementedErrorf("not supported on windows") + return nil, nil } diff --git a/network.go b/network.go index 90d77998..a3511a5f 100644 --- a/network.go +++ b/network.go @@ -633,6 +633,9 @@ func NetworkOptionIpam(ipamDriver string, addrSpace string, ipV4 []*IpamConf, ip return func(n *network) { if ipamDriver != "" { n.ipamType = ipamDriver + if ipamDriver == ipamapi.DefaultIPAM { + n.ipamType = defaultIpamForNetworkType(n.Type()) + } } n.ipamOptions = opts n.addrSpace = addrSpace diff --git a/network_unix.go b/network_unix.go index 77b6e1ce..585261ec 100644 --- a/network_unix.go +++ b/network_unix.go @@ -2,7 +2,13 @@ package libnetwork +import "github.com/docker/libnetwork/ipamapi" + // Stub implementations for DNS related functions func (n *network) startResolver() { } + +func defaultIpamForNetworkType(networkType string) string { + return ipamapi.DefaultIPAM +} diff --git a/network_windows.go b/network_windows.go index 4bf95c75..9d6fa7e4 100644 --- a/network_windows.go +++ b/network_windows.go @@ -4,10 +4,13 @@ package libnetwork import ( "runtime" + "time" "github.com/Microsoft/hcsshim" log "github.com/Sirupsen/logrus" "github.com/docker/libnetwork/drivers/windows" + "github.com/docker/libnetwork/ipamapi" + "github.com/docker/libnetwork/ipams/windowsipam" ) func executeInCompartment(compartmentID uint32, x func()) { @@ -42,15 +45,28 @@ func (n *network) startResolver() { for _, subnet := range hnsresponse.Subnets { if subnet.GatewayAddress != "" { - resolver := NewResolver(subnet.GatewayAddress, false, "", n) - log.Debugf("Binding a resolver on network %s gateway %s", n.Name(), subnet.GatewayAddress) - executeInCompartment(hnsresponse.DNSServerCompartment, resolver.SetupFunc(53)) - if err = resolver.Start(); err != nil { - log.Errorf("Resolver Setup/Start failed for container %s, %q", n.Name(), err) - } else { - n.resolver = append(n.resolver, resolver) + for i := 0; i < 3; i++ { + resolver := NewResolver(subnet.GatewayAddress, false, "", n) + log.Debugf("Binding a resolver on network %s gateway %s", n.Name(), subnet.GatewayAddress) + executeInCompartment(hnsresponse.DNSServerCompartment, resolver.SetupFunc(53)) + + if err = resolver.Start(); err != nil { + log.Errorf("Resolver Setup/Start failed for container %s, %q", n.Name(), err) + time.Sleep(1 * time.Second) + } else { + log.Debugf("Resolver bound successfuly for network %s", n.Name()) + n.resolver = append(n.resolver, resolver) + break + } } } } }) } + +func defaultIpamForNetworkType(networkType string) string { + if windows.IsBuiltinLocalDriver(networkType) { + return windowsipam.DefaultIPAM + } + return ipamapi.DefaultIPAM +} diff --git a/service_common.go b/service_common.go new file mode 100644 index 00000000..a0172f59 --- /dev/null +++ b/service_common.go @@ -0,0 +1,225 @@ +// +build linux windows + +package libnetwork + +import ( + "net" + + "github.com/Sirupsen/logrus" +) + +func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service { + return &service{ + name: name, + id: id, + ingressPorts: ingressPorts, + loadBalancers: make(map[string]*loadBalancer), + aliases: aliases, + } +} + +func (c *controller) cleanupServiceBindings(cleanupNID string) { + var cleanupFuncs []func() + + c.Lock() + services := make([]*service, 0, len(c.serviceBindings)) + for _, s := range c.serviceBindings { + services = append(services, s) + } + c.Unlock() + + for _, s := range services { + s.Lock() + for nid, lb := range s.loadBalancers { + if cleanupNID != "" && nid != cleanupNID { + continue + } + + for eid, ip := range lb.backEnds { + service := s + loadBalancer := lb + networkID := nid + epID := eid + epIP := ip + + cleanupFuncs = append(cleanupFuncs, func() { + if err := c.rmServiceBinding(service.name, service.id, networkID, epID, loadBalancer.vip, + service.ingressPorts, service.aliases, epIP); err != nil { + logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v", + service.id, networkID, epID, err) + } + }) + } + } + s.Unlock() + } + + for _, f := range cleanupFuncs { + f() + } + +} + +func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error { + var ( + s *service + addService bool + ) + + n, err := c.NetworkByID(nid) + if err != nil { + return err + } + + skey := serviceKey{ + id: sid, + ports: portConfigs(ingressPorts).String(), + } + + c.Lock() + s, ok := c.serviceBindings[skey] + if !ok { + // Create a new service if we are seeing this service + // for the first time. + s = newService(name, sid, ingressPorts, aliases) + c.serviceBindings[skey] = s + } + c.Unlock() + + // Add endpoint IP to special "tasks.svc_name" so that the + // applications have access to DNS RR. + n.(*network).addSvcRecords("tasks."+name, ip, nil, false) + for _, alias := range aliases { + n.(*network).addSvcRecords("tasks."+alias, ip, nil, false) + } + + // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR + svcIP := vip + if len(svcIP) == 0 { + svcIP = ip + } + n.(*network).addSvcRecords(name, svcIP, nil, false) + for _, alias := range aliases { + n.(*network).addSvcRecords(alias, svcIP, nil, false) + } + + s.Lock() + defer s.Unlock() + + lb, ok := s.loadBalancers[nid] + if !ok { + // Create a new load balancer if we are seeing this + // network attachment on the service for the first + // time. + lb = &loadBalancer{ + vip: vip, + fwMark: fwMarkCtr, + backEnds: make(map[string]net.IP), + service: s, + } + + fwMarkCtrMu.Lock() + fwMarkCtr++ + fwMarkCtrMu.Unlock() + + s.loadBalancers[nid] = lb + + // Since we just created this load balancer make sure + // we add a new service service in IPVS rules. + addService = true + + } + + lb.backEnds[eid] = ip + + // Add loadbalancer service and backend in all sandboxes in + // the network only if vip is valid. + if len(vip) != 0 { + n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts, addService) + } + + return nil +} + +func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error { + var rmService bool + + n, err := c.NetworkByID(nid) + if err != nil { + return err + } + + skey := serviceKey{ + id: sid, + ports: portConfigs(ingressPorts).String(), + } + + c.Lock() + s, ok := c.serviceBindings[skey] + if !ok { + c.Unlock() + return nil + } + c.Unlock() + + s.Lock() + lb, ok := s.loadBalancers[nid] + if !ok { + s.Unlock() + return nil + } + + _, ok = lb.backEnds[eid] + if !ok { + s.Unlock() + return nil + } + + delete(lb.backEnds, eid) + if len(lb.backEnds) == 0 { + // All the backends for this service have been + // removed. Time to remove the load balancer and also + // remove the service entry in IPVS. + rmService = true + + delete(s.loadBalancers, nid) + } + + if len(s.loadBalancers) == 0 { + // All loadbalancers for the service removed. Time to + // remove the service itself. + delete(c.serviceBindings, skey) + } + + // Remove loadbalancer service(if needed) and backend in all + // sandboxes in the network only if the vip is valid. + if len(vip) != 0 { + n.(*network).rmLBBackend(ip, vip, lb.fwMark, ingressPorts, rmService) + } + s.Unlock() + + // Delete the special "tasks.svc_name" backend record. + n.(*network).deleteSvcRecords("tasks."+name, ip, nil, false) + for _, alias := range aliases { + n.(*network).deleteSvcRecords("tasks."+alias, ip, nil, false) + } + + // If we are doing DNS RR add the endpoint IP to DNS record + // right away. + if len(vip) == 0 { + n.(*network).deleteSvcRecords(name, ip, nil, false) + for _, alias := range aliases { + n.(*network).deleteSvcRecords(alias, ip, nil, false) + } + } + + // Remove the DNS record for VIP only if we are removing the service + if rmService && len(vip) != 0 { + n.(*network).deleteSvcRecords(name, vip, nil, false) + for _, alias := range aliases { + n.(*network).deleteSvcRecords(alias, vip, nil, false) + } + } + + return nil +} diff --git a/service_linux.go b/service_linux.go index 669eda9f..8d382d55 100644 --- a/service_linux.go +++ b/service_linux.go @@ -29,222 +29,6 @@ func init() { reexec.Register("redirecter", redirecter) } -func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service { - return &service{ - name: name, - id: id, - ingressPorts: ingressPorts, - loadBalancers: make(map[string]*loadBalancer), - aliases: aliases, - } -} - -func (c *controller) cleanupServiceBindings(cleanupNID string) { - var cleanupFuncs []func() - - c.Lock() - services := make([]*service, 0, len(c.serviceBindings)) - for _, s := range c.serviceBindings { - services = append(services, s) - } - c.Unlock() - - for _, s := range services { - s.Lock() - for nid, lb := range s.loadBalancers { - if cleanupNID != "" && nid != cleanupNID { - continue - } - - for eid, ip := range lb.backEnds { - service := s - loadBalancer := lb - networkID := nid - epID := eid - epIP := ip - - cleanupFuncs = append(cleanupFuncs, func() { - if err := c.rmServiceBinding(service.name, service.id, networkID, epID, loadBalancer.vip, - service.ingressPorts, service.aliases, epIP); err != nil { - logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v", - service.id, networkID, epID, err) - } - }) - } - } - s.Unlock() - } - - for _, f := range cleanupFuncs { - f() - } - -} - -func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error { - var ( - s *service - addService bool - ) - - n, err := c.NetworkByID(nid) - if err != nil { - return err - } - - skey := serviceKey{ - id: sid, - ports: portConfigs(ingressPorts).String(), - } - - c.Lock() - s, ok := c.serviceBindings[skey] - if !ok { - // Create a new service if we are seeing this service - // for the first time. - s = newService(name, sid, ingressPorts, aliases) - c.serviceBindings[skey] = s - } - c.Unlock() - - // Add endpoint IP to special "tasks.svc_name" so that the - // applications have access to DNS RR. - n.(*network).addSvcRecords("tasks."+name, ip, nil, false) - for _, alias := range aliases { - n.(*network).addSvcRecords("tasks."+alias, ip, nil, false) - } - - // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR - svcIP := vip - if len(svcIP) == 0 { - svcIP = ip - } - n.(*network).addSvcRecords(name, svcIP, nil, false) - for _, alias := range aliases { - n.(*network).addSvcRecords(alias, svcIP, nil, false) - } - - s.Lock() - defer s.Unlock() - - lb, ok := s.loadBalancers[nid] - if !ok { - // Create a new load balancer if we are seeing this - // network attachment on the service for the first - // time. - lb = &loadBalancer{ - vip: vip, - fwMark: fwMarkCtr, - backEnds: make(map[string]net.IP), - service: s, - } - - fwMarkCtrMu.Lock() - fwMarkCtr++ - fwMarkCtrMu.Unlock() - - s.loadBalancers[nid] = lb - - // Since we just created this load balancer make sure - // we add a new service service in IPVS rules. - addService = true - - } - - lb.backEnds[eid] = ip - - // Add loadbalancer service and backend in all sandboxes in - // the network only if vip is valid. - if len(vip) != 0 { - n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts, addService) - } - - return nil -} - -func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error { - var rmService bool - - n, err := c.NetworkByID(nid) - if err != nil { - return err - } - - skey := serviceKey{ - id: sid, - ports: portConfigs(ingressPorts).String(), - } - - c.Lock() - s, ok := c.serviceBindings[skey] - if !ok { - c.Unlock() - return nil - } - c.Unlock() - - s.Lock() - lb, ok := s.loadBalancers[nid] - if !ok { - s.Unlock() - return nil - } - - _, ok = lb.backEnds[eid] - if !ok { - s.Unlock() - return nil - } - - delete(lb.backEnds, eid) - if len(lb.backEnds) == 0 { - // All the backends for this service have been - // removed. Time to remove the load balancer and also - // remove the service entry in IPVS. - rmService = true - - delete(s.loadBalancers, nid) - } - - if len(s.loadBalancers) == 0 { - // All loadbalancers for the service removed. Time to - // remove the service itself. - delete(c.serviceBindings, skey) - } - - // Remove loadbalancer service(if needed) and backend in all - // sandboxes in the network only if the vip is valid. - if len(vip) != 0 { - n.(*network).rmLBBackend(ip, vip, lb.fwMark, ingressPorts, rmService) - } - s.Unlock() - - // Delete the special "tasks.svc_name" backend record. - n.(*network).deleteSvcRecords("tasks."+name, ip, nil, false) - for _, alias := range aliases { - n.(*network).deleteSvcRecords("tasks."+alias, ip, nil, false) - } - - // If we are doing DNS RR add the endpoint IP to DNS record - // right away. - if len(vip) == 0 { - n.(*network).deleteSvcRecords(name, ip, nil, false) - for _, alias := range aliases { - n.(*network).deleteSvcRecords(alias, ip, nil, false) - } - } - - // Remove the DNS record for VIP only if we are removing the service - if rmService && len(vip) != 0 { - n.(*network).deleteSvcRecords(name, vip, nil, false) - for _, alias := range aliases { - n.(*network).deleteSvcRecords(alias, vip, nil, false) - } - } - - return nil -} - // Get all loadbalancers on this network that is currently discovered // on this node. func (n *network) connectedLoadbalancers() []*loadBalancer { diff --git a/service_unsupported.go b/service_unsupported.go index 0ae384a9..37b98281 100644 --- a/service_unsupported.go +++ b/service_unsupported.go @@ -1,4 +1,4 @@ -// +build !linux +// +build !linux,!windows package libnetwork diff --git a/service_windows.go b/service_windows.go new file mode 100644 index 00000000..8d79b2d8 --- /dev/null +++ b/service_windows.go @@ -0,0 +1,15 @@ +package libnetwork + +import "net" + +func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, addService bool) { +} + +func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, rmService bool) { +} + +func (sb *sandbox) populateLoadbalancers(ep *endpoint) { +} + +func arrangeIngressFilterRule() { +}