Merge pull request #1195 from mrjana/lb

Add loadbalancer support
This commit is contained in:
Madhu Venugopal 2016-05-26 14:46:10 -07:00
commit 71e5247013
10 changed files with 506 additions and 108 deletions

View File

@ -167,14 +167,17 @@ func (ep *endpoint) addToCluster() error {
c := n.getController()
if !ep.isAnonymous() && ep.Iface().Address() != nil {
if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil {
return err
if ep.svcID != "" {
if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ep.Iface().Address().IP); err != nil {
return err
}
}
buf, err := proto.Marshal(&EndpointRecord{
Name: ep.Name(),
ServiceName: ep.svcName,
ServiceID: ep.svcID,
VirtualIP: ep.virtualIP.String(),
EndpointIP: ep.Iface().Address().IP.String(),
})
@ -204,8 +207,8 @@ func (ep *endpoint) deleteFromCluster() error {
c := n.getController()
if !ep.isAnonymous() {
if ep.Iface().Address() != nil {
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil {
if ep.svcID != "" && ep.Iface().Address() != nil {
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ep.Iface().Address().IP); err != nil {
return err
}
}
@ -357,6 +360,7 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
name := epRec.Name
svcName := epRec.ServiceName
svcID := epRec.ServiceID
vip := net.ParseIP(epRec.VirtualIP)
ip := net.ParseIP(epRec.EndpointIP)
if name == "" || ip == nil {
@ -365,16 +369,20 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
}
if isAdd {
if err := c.addServiceBinding(svcName, svcID, nid, eid, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
return
if svcID != "" {
if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
return
}
}
n.addSvcRecords(name, ip, nil, true)
} else {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
return
if svcID != "" {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
return
}
}
n.deleteSvcRecords(name, ip, nil, true)

View File

@ -39,7 +39,8 @@ type EndpointRecord struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
ServiceName string `protobuf:"bytes,2,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"`
ServiceID string `protobuf:"bytes,3,opt,name=service_id,json=serviceId,proto3" json:"service_id,omitempty"`
EndpointIP string `protobuf:"bytes,4,opt,name=endpoint_ip,json=endpointIp,proto3" json:"endpoint_ip,omitempty"`
VirtualIP string `protobuf:"bytes,4,opt,name=virtual_ip,json=virtualIp,proto3" json:"virtual_ip,omitempty"`
EndpointIP string `protobuf:"bytes,5,opt,name=endpoint_ip,json=endpointIp,proto3" json:"endpoint_ip,omitempty"`
}
func (m *EndpointRecord) Reset() { *m = EndpointRecord{} }
@ -53,11 +54,12 @@ func (this *EndpointRecord) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 8)
s := make([]string, 0, 9)
s = append(s, "&libnetwork.EndpointRecord{")
s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n")
s = append(s, "ServiceName: "+fmt.Sprintf("%#v", this.ServiceName)+",\n")
s = append(s, "ServiceID: "+fmt.Sprintf("%#v", this.ServiceID)+",\n")
s = append(s, "VirtualIP: "+fmt.Sprintf("%#v", this.VirtualIP)+",\n")
s = append(s, "EndpointIP: "+fmt.Sprintf("%#v", this.EndpointIP)+",\n")
s = append(s, "}")
return strings.Join(s, "")
@ -120,9 +122,15 @@ func (m *EndpointRecord) MarshalTo(data []byte) (int, error) {
i = encodeVarintAgent(data, i, uint64(len(m.ServiceID)))
i += copy(data[i:], m.ServiceID)
}
if len(m.EndpointIP) > 0 {
if len(m.VirtualIP) > 0 {
data[i] = 0x22
i++
i = encodeVarintAgent(data, i, uint64(len(m.VirtualIP)))
i += copy(data[i:], m.VirtualIP)
}
if len(m.EndpointIP) > 0 {
data[i] = 0x2a
i++
i = encodeVarintAgent(data, i, uint64(len(m.EndpointIP)))
i += copy(data[i:], m.EndpointIP)
}
@ -171,6 +179,10 @@ func (m *EndpointRecord) Size() (n int) {
if l > 0 {
n += 1 + l + sovAgent(uint64(l))
}
l = len(m.VirtualIP)
if l > 0 {
n += 1 + l + sovAgent(uint64(l))
}
l = len(m.EndpointIP)
if l > 0 {
n += 1 + l + sovAgent(uint64(l))
@ -199,6 +211,7 @@ func (this *EndpointRecord) String() string {
`Name:` + fmt.Sprintf("%v", this.Name) + `,`,
`ServiceName:` + fmt.Sprintf("%v", this.ServiceName) + `,`,
`ServiceID:` + fmt.Sprintf("%v", this.ServiceID) + `,`,
`VirtualIP:` + fmt.Sprintf("%v", this.VirtualIP) + `,`,
`EndpointIP:` + fmt.Sprintf("%v", this.EndpointIP) + `,`,
`}`,
}, "")
@ -329,6 +342,35 @@ func (m *EndpointRecord) Unmarshal(data []byte) error {
m.ServiceID = string(data[iNdEx:postIndex])
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field VirtualIP", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowAgent
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthAgent
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.VirtualIP = string(data[iNdEx:postIndex])
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field EndpointIP", wireType)
}
@ -484,18 +526,20 @@ var (
)
var fileDescriptorAgent = []byte{
// 204 bytes of a gzipped FileDescriptorProto
// 228 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0x4c, 0x4f, 0xcd,
0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0xc9, 0x4c, 0xca, 0x4b, 0x2d, 0x29,
0xcf, 0x2f, 0xca, 0x96, 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x0b, 0xeb, 0x83, 0x58, 0x10, 0x15,
0x4a, 0xcb, 0x18, 0xb9, 0xf8, 0x5c, 0xf3, 0x52, 0x0a, 0xf2, 0x33, 0xf3, 0x4a, 0x82, 0x52, 0x93,
0x4a, 0x57, 0x18, 0xb9, 0xf8, 0x5c, 0xf3, 0x52, 0x0a, 0xf2, 0x33, 0xf3, 0x4a, 0x82, 0x52, 0x93,
0xf3, 0x8b, 0x52, 0x84, 0x84, 0xb8, 0x58, 0xf2, 0x12, 0x73, 0x53, 0x25, 0x18, 0x15, 0x18, 0x35,
0x38, 0x83, 0xc0, 0x6c, 0x21, 0x45, 0x2e, 0x9e, 0xe2, 0xd4, 0xa2, 0xb2, 0xcc, 0xe4, 0xd4, 0x78,
0xb0, 0x1c, 0x13, 0x58, 0x8e, 0x1b, 0x2a, 0xe6, 0x07, 0x52, 0xa2, 0xc3, 0xc5, 0x05, 0x53, 0x92,
0x99, 0x22, 0xc1, 0x0c, 0x52, 0xe0, 0xc4, 0xfb, 0xe8, 0x9e, 0x3c, 0x67, 0x30, 0x44, 0xd4, 0xd3,
0x25, 0x88, 0x13, 0xaa, 0xc0, 0x33, 0x45, 0x48, 0x9f, 0x8b, 0x3b, 0x15, 0x6a, 0x6d, 0x7c, 0x66,
0x81, 0x04, 0x0b, 0x58, 0x39, 0x1f, 0x50, 0x39, 0x17, 0xcc, 0x35, 0x9e, 0x01, 0x41, 0x5c, 0x30,
0x25, 0x9e, 0x05, 0x4e, 0x12, 0x37, 0x1e, 0xca, 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48,
0x8e, 0xf1, 0x04, 0x10, 0x5f, 0x00, 0xe2, 0x07, 0x40, 0x9c, 0xc4, 0x06, 0xf6, 0x89, 0x31, 0x20,
0x00, 0x00, 0xff, 0xff, 0x94, 0x78, 0x3e, 0xce, 0xfa, 0x00, 0x00, 0x00,
0x25, 0x88, 0x13, 0xaa, 0xc0, 0x33, 0x05, 0xa4, 0xba, 0x2c, 0xb3, 0xa8, 0xa4, 0x34, 0x31, 0x27,
0x3e, 0xb3, 0x40, 0x82, 0x05, 0xa1, 0x3a, 0x0c, 0x22, 0xea, 0x19, 0x10, 0xc4, 0x09, 0x55, 0xe0,
0x59, 0x20, 0xa4, 0xcf, 0xc5, 0x9d, 0x0a, 0x75, 0x24, 0x48, 0x39, 0x2b, 0x58, 0x39, 0x1f, 0x50,
0x39, 0x17, 0xcc, 0xed, 0x40, 0xf5, 0x5c, 0x30, 0x25, 0x9e, 0x05, 0x4e, 0x12, 0x37, 0x1e, 0xca,
0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48, 0x8e, 0xf1, 0x04, 0x10, 0x5f, 0x00, 0xe2, 0x07,
0x40, 0x9c, 0xc4, 0x06, 0xf6, 0xb7, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xae, 0x11, 0xc5, 0x8d,
0x28, 0x01, 0x00, 0x00,
}

View File

@ -15,5 +15,6 @@ message EndpointRecord {
string name = 1;
string service_name = 2;
string service_id = 3 [(gogoproto.customname) = "ServiceID"];
string endpoint_ip = 4 [(gogoproto.customname) = "EndpointIP"];
string virtual_ip = 4 [(gogoproto.customname) = "VirtualIP"];
string endpoint_ip = 5 [(gogoproto.customname) = "EndpointIP"];
}

View File

@ -69,6 +69,7 @@ type endpoint struct {
myAliases []string
svcID string
svcName string
virtualIP net.IP
dbIndex uint64
dbExists bool
sync.Mutex
@ -93,6 +94,7 @@ func (ep *endpoint) MarshalJSON() ([]byte, error) {
epMap["myAliases"] = ep.myAliases
epMap["svcName"] = ep.svcName
epMap["svcID"] = ep.svcID
epMap["virtualIP"] = ep.virtualIP.String()
return json.Marshal(epMap)
}
@ -186,6 +188,10 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) {
ep.svcID = si.(string)
}
if vip, ok := epMap["virtualIP"]; ok {
ep.virtualIP = net.ParseIP(vip.(string))
}
ma, _ := json.Marshal(epMap["myAliases"])
var myAliases []string
json.Unmarshal(ma, &myAliases)
@ -212,6 +218,7 @@ func (ep *endpoint) CopyTo(o datastore.KVObject) error {
dstEp.disableResolution = ep.disableResolution
dstEp.svcName = ep.svcName
dstEp.svcID = ep.svcID
dstEp.virtualIP = ep.virtualIP
if ep.iface != nil {
dstEp.iface = &endpointInterface{}
@ -892,10 +899,11 @@ func CreateOptionAlias(name string, alias string) EndpointOption {
}
// CreateOptionService function returns an option setter for setting service binding configuration
func CreateOptionService(name, id string) EndpointOption {
func CreateOptionService(name, id string, vip net.IP) EndpointOption {
return func(ep *endpoint) {
ep.svcName = name
ep.svcID = id
ep.virtualIP = vip
}
}

View File

@ -218,7 +218,7 @@ func TestDestination(t *testing.T) {
i, err := New("")
require.NoError(t, err)
for _, protocol := range []string{"TCP"} {
for _, protocol := range protocols {
var serviceAddress string
s := Service{

View File

@ -77,6 +77,19 @@ type svcInfo struct {
service map[string][]servicePorts
}
// backing container or host's info
type serviceTarget struct {
name string
ip net.IP
port uint16
}
type servicePorts struct {
portName string
proto string
target []serviceTarget
}
// IpamConf contains all the ipam related configurations for a network
type IpamConf struct {
// The master address pool for containers and network interfaces

View File

@ -745,6 +745,12 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
}
}
// Populate load balancer only after updating all the other
// information including gateway and other routes so that
// loadbalancers are populated all the network state is in
// place in the sandbox.
sb.populateLoadbalancers(ep)
// Only update the store if we did not come here as part of
// sandbox delete. If we came here as part of delete then do
// not bother updating the store. The sandbox object will be

View File

@ -1,93 +1,32 @@
package libnetwork
import "net"
import (
"net"
"sync"
)
// backing container or host's info
type serviceTarget struct {
name string
ip net.IP
port uint16
}
type servicePorts struct {
portName string
proto string
target []serviceTarget
}
var (
// A global monotonic counter to assign firewall marks to
// services.
fwMarkCtr uint32 = 256
fwMarkCtrMu sync.Mutex
)
type service struct {
name string
id string
backEnds map[string]map[string]net.IP
name string // Service Name
id string // Service ID
// Map of loadbalancers for the service one-per attached
// network. It is keyed with network ID.
loadBalancers map[string]*loadBalancer
sync.Mutex
}
func newService(name string, id string) *service {
return &service{
name: name,
id: id,
backEnds: make(map[string]map[string]net.IP),
}
}
func (c *controller) addServiceBinding(name, sid, nid, eid string, ip net.IP) error {
var s *service
n, err := c.NetworkByID(nid)
if err != nil {
return err
}
c.Lock()
s, ok := c.serviceBindings[sid]
if !ok {
s = newService(name, sid)
}
netBackEnds, ok := s.backEnds[nid]
if !ok {
netBackEnds = make(map[string]net.IP)
s.backEnds[nid] = netBackEnds
}
netBackEnds[eid] = ip
c.serviceBindings[sid] = s
c.Unlock()
n.(*network).addSvcRecords(name, ip, nil, false)
return nil
}
func (c *controller) rmServiceBinding(name, sid, nid, eid string, ip net.IP) error {
n, err := c.NetworkByID(nid)
if err != nil {
return err
}
c.Lock()
s, ok := c.serviceBindings[sid]
if !ok {
c.Unlock()
return nil
}
netBackEnds, ok := s.backEnds[nid]
if !ok {
c.Unlock()
return nil
}
delete(netBackEnds, eid)
if len(netBackEnds) == 0 {
delete(s.backEnds, nid)
}
if len(s.backEnds) == 0 {
delete(c.serviceBindings, sid)
}
c.Unlock()
n.(*network).deleteSvcRecords(name, ip, nil, false)
return err
type loadBalancer struct {
vip net.IP
fwMark uint32
// Map of backend IPs backing this loadbalancer on this
// network. It is keyed with endpoint ID.
backEnds map[string]net.IP
}

360
service_linux.go Normal file
View File

@ -0,0 +1,360 @@
package libnetwork
import (
"fmt"
"net"
"os"
"os/exec"
"runtime"
"strconv"
"strings"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/reexec"
"github.com/docker/libnetwork/iptables"
"github.com/docker/libnetwork/ipvs"
"github.com/vishvananda/netlink/nl"
"github.com/vishvananda/netns"
)
func init() {
reexec.Register("fwmarker", fwMarker)
}
func newService(name string, id string) *service {
return &service{
name: name,
id: id,
loadBalancers: make(map[string]*loadBalancer),
}
}
func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error {
var (
s *service
addService bool
)
n, err := c.NetworkByID(nid)
if err != nil {
return err
}
c.Lock()
s, ok := c.serviceBindings[sid]
if !ok {
// Create a new service if we are seeing this service
// for the first time.
s = newService(name, sid)
c.serviceBindings[sid] = s
}
c.Unlock()
s.Lock()
lb, ok := s.loadBalancers[nid]
if !ok {
// Create a new load balancer if we are seeing this
// network attachment on the service for the first
// time.
lb = &loadBalancer{
vip: vip,
fwMark: fwMarkCtr,
backEnds: make(map[string]net.IP),
}
fwMarkCtrMu.Lock()
fwMarkCtr++
fwMarkCtrMu.Unlock()
s.loadBalancers[nid] = lb
// Since we just created this load balancer make sure
// we add a new service service in IPVS rules.
addService = true
// Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
svcIP := vip
if len(svcIP) == 0 {
svcIP = ip
}
n.(*network).addSvcRecords(name, svcIP, nil, false)
}
lb.backEnds[eid] = ip
s.Unlock()
// Add endpoint IP to special "tasks.svc_name" so that the
// applications have access to DNS RR.
n.(*network).addSvcRecords("tasks."+name, ip, nil, false)
// Add loadbalancer service and backend in all sandboxes in
// the network only if vip is valid.
if len(vip) != 0 {
n.(*network).addLBBackend(ip, vip, lb.fwMark, addService)
}
return nil
}
func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error {
var rmService bool
n, err := c.NetworkByID(nid)
if err != nil {
return err
}
c.Lock()
s, ok := c.serviceBindings[sid]
if !ok {
c.Unlock()
return nil
}
c.Unlock()
s.Lock()
lb, ok := s.loadBalancers[nid]
if !ok {
s.Unlock()
return nil
}
// Delete the special "tasks.svc_name" backend record.
n.(*network).deleteSvcRecords("tasks."+name, ip, nil, false)
delete(lb.backEnds, eid)
if len(lb.backEnds) == 0 {
// All the backends for this service have been
// removed. Time to remove the load balancer and also
// remove the service entry in IPVS.
rmService = true
// Make sure to remove the right IP since if vip is
// not valid we would have added a DNS RR record.
svcIP := vip
if len(svcIP) == 0 {
svcIP = ip
}
n.(*network).deleteSvcRecords(name, svcIP, nil, false)
delete(s.loadBalancers, nid)
}
if len(s.loadBalancers) == 0 {
// All loadbalancers for the service removed. Time to
// remove the service itself.
delete(c.serviceBindings, sid)
}
s.Unlock()
// Remove loadbalancer service(if needed) and backend in all
// sandboxes in the network only if the vip is valid.
if len(vip) != 0 {
n.(*network).rmLBBackend(ip, vip, lb.fwMark, rmService)
}
return nil
}
// Get all loadbalancers on this network that is currently discovered
// on this node..
func (n *network) connectedLoadbalancers() []*loadBalancer {
c := n.getController()
c.Lock()
defer c.Unlock()
var lbs []*loadBalancer
for _, s := range c.serviceBindings {
if lb, ok := s.loadBalancers[n.ID()]; ok {
lbs = append(lbs, lb)
}
}
return lbs
}
// Populate all loadbalancers on the network that the passed endpoint
// belongs to, into this sandbox.
func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
n := ep.getNetwork()
for _, lb := range n.connectedLoadbalancers() {
// Skip if vip is not valid.
if len(lb.vip) == 0 {
continue
}
addService := true
for _, ip := range lb.backEnds {
sb.addLBBackend(ip, lb.vip, lb.fwMark, addService)
addService = false
}
}
}
// Add loadbalancer backend to all sandboxes which has a connection to
// this network. If needed add the service as well, as specified by
// the addService bool.
func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, addService bool) {
n.WalkEndpoints(func(e Endpoint) bool {
ep := e.(*endpoint)
if sb, ok := ep.getSandbox(); ok {
sb.addLBBackend(ip, vip, fwMark, addService)
}
return false
})
}
// Remove loadbalancer backend from all sandboxes which has a
// connection to this network. If needed remove the service entry as
// well, as specified by the rmService bool.
func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, rmService bool) {
n.WalkEndpoints(func(e Endpoint) bool {
ep := e.(*endpoint)
if sb, ok := ep.getSandbox(); ok {
sb.rmLBBackend(ip, vip, fwMark, rmService)
}
return false
})
}
// Add loadbalancer backend into one connected sandbox.
func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, addService bool) {
i, err := ipvs.New(sb.Key())
if err != nil {
logrus.Errorf("Failed to create a ipvs handle for sbox %s: %v", sb.Key(), err)
return
}
defer i.Close()
s := &ipvs.Service{
AddressFamily: nl.FAMILY_V4,
FWMark: fwMark,
SchedName: ipvs.RoundRobin,
}
if addService {
logrus.Debugf("Creating service for vip %s fwMark %d", vip, fwMark)
if err := invokeFWMarker(sb.Key(), vip, fwMark, false); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
return
}
if err := i.NewService(s); err != nil {
logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err)
return
}
}
d := &ipvs.Destination{
AddressFamily: nl.FAMILY_V4,
Address: ip,
Weight: 1,
}
// Remove the sched name before using the service to add
// destination.
s.SchedName = ""
if err := i.NewDestination(s, d); err != nil {
logrus.Errorf("Failed to create real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
}
}
// Remove loadbalancer backend from one connected sandbox.
func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, rmService bool) {
i, err := ipvs.New(sb.Key())
if err != nil {
logrus.Errorf("Failed to create a ipvs handle for sbox %s: %v", sb.Key(), err)
return
}
defer i.Close()
s := &ipvs.Service{
AddressFamily: nl.FAMILY_V4,
FWMark: fwMark,
}
d := &ipvs.Destination{
AddressFamily: nl.FAMILY_V4,
Address: ip,
Weight: 1,
}
if err := i.DelDestination(s, d); err != nil {
logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
return
}
if rmService {
s.SchedName = ipvs.RoundRobin
if err := i.DelService(s); err != nil {
logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err)
return
}
if err := invokeFWMarker(sb.Key(), vip, fwMark, true); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
return
}
}
}
// Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, isDelete bool) error {
addDelOpt := "-A"
if isDelete {
addDelOpt = "-D"
}
cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt),
Stdout: os.Stdout,
Stderr: os.Stderr,
}
if err := cmd.Run(); err != nil {
return fmt.Errorf("reexec failed: %v", err)
}
return nil
}
// Firewall marker reexec function.
func fwMarker() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
if len(os.Args) < 5 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}
vip := os.Args[2]
fwMark, err := strconv.ParseUint(os.Args[3], 10, 32)
if err != nil {
logrus.Errorf("bad fwmark value(%s) passed: %v", os.Args[3], err)
os.Exit(2)
}
addDelOpt := os.Args[4]
ns, err := netns.GetFromPath(os.Args[1])
if err != nil {
logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
os.Exit(3)
}
defer ns.Close()
if err := netns.Set(ns); err != nil {
logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
os.Exit(4)
}
rule := strings.Fields(fmt.Sprintf("-t mangle %s OUTPUT -d %s/32 -j MARK --set-mark %d", addDelOpt, vip, fwMark))
if err := iptables.RawCombinedOutputNative(rule...); err != nil {
logrus.Errorf("setting up rule failed, %v: %v", rule, err)
os.Exit(5)
}
}

19
service_unsupported.go Normal file
View File

@ -0,0 +1,19 @@
// +build !linux
package libnetwork
import (
"fmt"
"net"
)
func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error {
return fmt.Errorf("not supported")
}
func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error {
return fmt.Errorf("not supported")
}
func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
}