Update protobuf and grpc to match docker/swarmkit

protobuf: 8ee79997227bf9b34611aee7946ae64735e6fd93
grpc: v1.0.4

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
Kenfe-Mickael Laventure 2017-01-23 15:27:12 -08:00
parent c29c95a687
commit aff808ece6
58 changed files with 2701 additions and 407 deletions

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,7 @@ clone git github.com/docker/docker 2f6e3b0ba027b558adabd41344fee59db4441011
clone git github.com/docker/go-units 5d2041e26a699eaca682e2ea41c8f891e1060444
clone git github.com/godbus/dbus e2cf28118e66a6a63db46cf6088a35d2054d3bb0
clone git github.com/golang/glog 23def4e6c14b4da8ac2ed8007337bc5eb5007998
clone git github.com/golang/protobuf 1f49d83d9aa00e6ce4fc8258c71cc7786aec968a
clone git github.com/golang/protobuf 8ee79997227bf9b34611aee7946ae64735e6fd93
clone git github.com/opencontainers/runc 51371867a01c467f08af739783b8beafc154c4d7 https://github.com/docker/runc.git
clone git github.com/opencontainers/runtime-spec 1c7c27d043c2a5e513a44084d2b10d77d1402b8c
clone git github.com/rcrowley/go-metrics eeba7bd0dd01ace6e690fa833b3f22aaec29af43
@ -23,7 +23,7 @@ clone git github.com/vishvananda/netlink adb0f53af689dd38f1443eba79489feaacf0b22
clone git github.com/Azure/go-ansiterm 70b2c90b260171e829f1ebd7c17f600c11858dbe
clone git golang.org/x/net 991d3e32f76f19ee6d9caadb3a22eae8d23315f7 https://github.com/golang/net.git
clone git golang.org/x/sys d4feaf1a7e61e1d9e79e6c4e76c6349e9cab0a03 https://github.com/golang/sys.git
clone git google.golang.org/grpc v1.0.1-GA https://github.com/grpc/grpc-go.git
clone git google.golang.org/grpc v1.0.4 https://github.com/grpc/grpc-go.git
clone git github.com/seccomp/libseccomp-golang 1b506fc7c24eec5a3693cdcbed40d9c226cfc6a1
clone git github.com/tonistiigi/fifo b45391ebcd3d282404092c04a2b015b37df12383
clone git github.com/pkg/errors 839d9e913e063e28dfd0e6c7b7512793e0a48be9

View File

@ -0,0 +1,52 @@
package system
import (
"os"
"syscall"
"time"
"unsafe"
)
var (
maxTime time.Time
)
func init() {
if unsafe.Sizeof(syscall.Timespec{}.Nsec) == 8 {
// This is a 64 bit timespec
// os.Chtimes limits time to the following
maxTime = time.Unix(0, 1<<63-1)
} else {
// This is a 32 bit timespec
maxTime = time.Unix(1<<31-1, 0)
}
}
// Chtimes changes the access time and modified time of a file at the given path
func Chtimes(name string, atime time.Time, mtime time.Time) error {
unixMinTime := time.Unix(0, 0)
unixMaxTime := maxTime
// If the modified time is prior to the Unix Epoch, or after the
// end of Unix Time, os.Chtimes has undefined behavior
// default to Unix Epoch in this case, just in case
if atime.Before(unixMinTime) || atime.After(unixMaxTime) {
atime = unixMinTime
}
if mtime.Before(unixMinTime) || mtime.After(unixMaxTime) {
mtime = unixMinTime
}
if err := os.Chtimes(name, atime, mtime); err != nil {
return err
}
// Take platform specific action for setting create time.
if err := setCTime(name, mtime); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,14 @@
// +build !windows
package system
import (
"time"
)
//setCTime will set the create time on a file. On Unix, the create
//time is updated as a side effect of setting the modified time, so
//no action is required.
func setCTime(path string, ctime time.Time) error {
return nil
}

View File

@ -0,0 +1,27 @@
// +build windows
package system
import (
"syscall"
"time"
)
//setCTime will set the create time on a file. On Windows, this requires
//calling SetFileTime and explicitly including the create time.
func setCTime(path string, ctime time.Time) error {
ctimespec := syscall.NsecToTimespec(ctime.UnixNano())
pathp, e := syscall.UTF16PtrFromString(path)
if e != nil {
return e
}
h, e := syscall.CreateFile(pathp,
syscall.FILE_WRITE_ATTRIBUTES, syscall.FILE_SHARE_WRITE, nil,
syscall.OPEN_EXISTING, syscall.FILE_FLAG_BACKUP_SEMANTICS, 0)
if e != nil {
return e
}
defer syscall.Close(h)
c := syscall.NsecToFiletime(syscall.TimespecToNsec(ctimespec))
return syscall.SetFileTime(h, &c, nil, nil)
}

View File

@ -0,0 +1,10 @@
package system
import (
"errors"
)
var (
// ErrNotSupportedPlatform means the platform is not supported.
ErrNotSupportedPlatform = errors.New("platform and architecture is not supported")
)

View File

@ -0,0 +1,83 @@
package system
// This file implements syscalls for Win32 events which are not implemented
// in golang.
import (
"syscall"
"unsafe"
)
var (
procCreateEvent = modkernel32.NewProc("CreateEventW")
procOpenEvent = modkernel32.NewProc("OpenEventW")
procSetEvent = modkernel32.NewProc("SetEvent")
procResetEvent = modkernel32.NewProc("ResetEvent")
procPulseEvent = modkernel32.NewProc("PulseEvent")
)
// CreateEvent implements win32 CreateEventW func in golang. It will create an event object.
func CreateEvent(eventAttributes *syscall.SecurityAttributes, manualReset bool, initialState bool, name string) (handle syscall.Handle, err error) {
namep, _ := syscall.UTF16PtrFromString(name)
var _p1 uint32
if manualReset {
_p1 = 1
}
var _p2 uint32
if initialState {
_p2 = 1
}
r0, _, e1 := procCreateEvent.Call(uintptr(unsafe.Pointer(eventAttributes)), uintptr(_p1), uintptr(_p2), uintptr(unsafe.Pointer(namep)))
use(unsafe.Pointer(namep))
handle = syscall.Handle(r0)
if handle == syscall.InvalidHandle {
err = e1
}
return
}
// OpenEvent implements win32 OpenEventW func in golang. It opens an event object.
func OpenEvent(desiredAccess uint32, inheritHandle bool, name string) (handle syscall.Handle, err error) {
namep, _ := syscall.UTF16PtrFromString(name)
var _p1 uint32
if inheritHandle {
_p1 = 1
}
r0, _, e1 := procOpenEvent.Call(uintptr(desiredAccess), uintptr(_p1), uintptr(unsafe.Pointer(namep)))
use(unsafe.Pointer(namep))
handle = syscall.Handle(r0)
if handle == syscall.InvalidHandle {
err = e1
}
return
}
// SetEvent implements win32 SetEvent func in golang.
func SetEvent(handle syscall.Handle) (err error) {
return setResetPulse(handle, procSetEvent)
}
// ResetEvent implements win32 ResetEvent func in golang.
func ResetEvent(handle syscall.Handle) (err error) {
return setResetPulse(handle, procResetEvent)
}
// PulseEvent implements win32 PulseEvent func in golang.
func PulseEvent(handle syscall.Handle) (err error) {
return setResetPulse(handle, procPulseEvent)
}
func setResetPulse(handle syscall.Handle, proc *syscall.LazyProc) (err error) {
r0, _, _ := proc.Call(uintptr(handle))
if r0 != 0 {
err = syscall.Errno(r0)
}
return
}
var temp unsafe.Pointer
// use ensures a variable is kept alive without the GC freeing while still needed
func use(p unsafe.Pointer) {
temp = p
}

View File

@ -0,0 +1,19 @@
// +build !windows
package system
import (
"os"
"path/filepath"
)
// MkdirAll creates a directory named path along with any necessary parents,
// with permission specified by attribute perm for all dir created.
func MkdirAll(path string, perm os.FileMode) error {
return os.MkdirAll(path, perm)
}
// IsAbs is a platform-specific wrapper for filepath.IsAbs.
func IsAbs(path string) bool {
return filepath.IsAbs(path)
}

View File

@ -0,0 +1,82 @@
// +build windows
package system
import (
"os"
"path/filepath"
"regexp"
"strings"
"syscall"
)
// MkdirAll implementation that is volume path aware for Windows.
func MkdirAll(path string, perm os.FileMode) error {
if re := regexp.MustCompile(`^\\\\\?\\Volume{[a-z0-9-]+}$`); re.MatchString(path) {
return nil
}
// The rest of this method is copied from os.MkdirAll and should be kept
// as-is to ensure compatibility.
// Fast path: if we can tell whether path is a directory or file, stop with success or error.
dir, err := os.Stat(path)
if err == nil {
if dir.IsDir() {
return nil
}
return &os.PathError{
Op: "mkdir",
Path: path,
Err: syscall.ENOTDIR,
}
}
// Slow path: make sure parent exists and then call Mkdir for path.
i := len(path)
for i > 0 && os.IsPathSeparator(path[i-1]) { // Skip trailing path separator.
i--
}
j := i
for j > 0 && !os.IsPathSeparator(path[j-1]) { // Scan backward over element.
j--
}
if j > 1 {
// Create parent
err = MkdirAll(path[0:j-1], perm)
if err != nil {
return err
}
}
// Parent now exists; invoke Mkdir and use its result.
err = os.Mkdir(path, perm)
if err != nil {
// Handle arguments like "foo/." by
// double-checking that directory doesn't exist.
dir, err1 := os.Lstat(path)
if err1 == nil && dir.IsDir() {
return nil
}
return err
}
return nil
}
// IsAbs is a platform-specific wrapper for filepath.IsAbs. On Windows,
// golang filepath.IsAbs does not consider a path \windows\system32 as absolute
// as it doesn't start with a drive-letter/colon combination. However, in
// docker we need to verify things such as WORKDIR /windows/system32 in
// a Dockerfile (which gets translated to \windows\system32 when being processed
// by the daemon. This SHOULD be treated as absolute from a docker processing
// perspective.
func IsAbs(path string) bool {
if !filepath.IsAbs(path) {
if !strings.HasPrefix(path, string(os.PathSeparator)) {
return false
}
}
return true
}

View File

@ -0,0 +1,19 @@
// +build !windows
package system
import (
"syscall"
)
// Lstat takes a path to a file and returns
// a system.StatT type pertaining to that file.
//
// Throws an error if the file does not exist
func Lstat(path string) (*StatT, error) {
s := &syscall.Stat_t{}
if err := syscall.Lstat(path, s); err != nil {
return nil, err
}
return fromStatT(s)
}

View File

@ -0,0 +1,25 @@
// +build windows
package system
import (
"os"
)
// Lstat calls os.Lstat to get a fileinfo interface back.
// This is then copied into our own locally defined structure.
// Note the Linux version uses fromStatT to do the copy back,
// but that not strictly necessary when already in an OS specific module.
func Lstat(path string) (*StatT, error) {
fi, err := os.Lstat(path)
if err != nil {
return nil, err
}
return &StatT{
name: fi.Name(),
size: fi.Size(),
mode: fi.Mode(),
modTime: fi.ModTime(),
isDir: fi.IsDir()}, nil
}

View File

@ -0,0 +1,17 @@
package system
// MemInfo contains memory statistics of the host system.
type MemInfo struct {
// Total usable RAM (i.e. physical RAM minus a few reserved bits and the
// kernel binary code).
MemTotal int64
// Amount of free memory.
MemFree int64
// Total amount of swap space available.
SwapTotal int64
// Amount of swap space that is currently unused.
SwapFree int64
}

View File

@ -0,0 +1,65 @@
package system
import (
"bufio"
"io"
"os"
"strconv"
"strings"
"github.com/docker/go-units"
)
// ReadMemInfo retrieves memory statistics of the host system and returns a
// MemInfo type.
func ReadMemInfo() (*MemInfo, error) {
file, err := os.Open("/proc/meminfo")
if err != nil {
return nil, err
}
defer file.Close()
return parseMemInfo(file)
}
// parseMemInfo parses the /proc/meminfo file into
// a MemInfo object given an io.Reader to the file.
// Throws error if there are problems reading from the file
func parseMemInfo(reader io.Reader) (*MemInfo, error) {
meminfo := &MemInfo{}
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
// Expected format: ["MemTotal:", "1234", "kB"]
parts := strings.Fields(scanner.Text())
// Sanity checks: Skip malformed entries.
if len(parts) < 3 || parts[2] != "kB" {
continue
}
// Convert to bytes.
size, err := strconv.Atoi(parts[1])
if err != nil {
continue
}
bytes := int64(size) * units.KiB
switch parts[0] {
case "MemTotal:":
meminfo.MemTotal = bytes
case "MemFree:":
meminfo.MemFree = bytes
case "SwapTotal:":
meminfo.SwapTotal = bytes
case "SwapFree:":
meminfo.SwapFree = bytes
}
}
// Handle errors that may have occurred during the reading of the file.
if err := scanner.Err(); err != nil {
return nil, err
}
return meminfo, nil
}

View File

@ -0,0 +1,8 @@
// +build !linux,!windows
package system
// ReadMemInfo is not supported on platforms other than linux and windows.
func ReadMemInfo() (*MemInfo, error) {
return nil, ErrNotSupportedPlatform
}

View File

@ -0,0 +1,44 @@
package system
import (
"syscall"
"unsafe"
)
var (
modkernel32 = syscall.NewLazyDLL("kernel32.dll")
procGlobalMemoryStatusEx = modkernel32.NewProc("GlobalMemoryStatusEx")
)
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa366589(v=vs.85).aspx
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa366770(v=vs.85).aspx
type memorystatusex struct {
dwLength uint32
dwMemoryLoad uint32
ullTotalPhys uint64
ullAvailPhys uint64
ullTotalPageFile uint64
ullAvailPageFile uint64
ullTotalVirtual uint64
ullAvailVirtual uint64
ullAvailExtendedVirtual uint64
}
// ReadMemInfo retrieves memory statistics of the host system and returns a
// MemInfo type.
func ReadMemInfo() (*MemInfo, error) {
msi := &memorystatusex{
dwLength: 64,
}
r1, _, _ := procGlobalMemoryStatusEx.Call(uintptr(unsafe.Pointer(msi)))
if r1 == 0 {
return &MemInfo{}, nil
}
return &MemInfo{
MemTotal: int64(msi.ullTotalPhys),
MemFree: int64(msi.ullAvailPhys),
SwapTotal: int64(msi.ullTotalPageFile),
SwapFree: int64(msi.ullAvailPageFile),
}, nil
}

View File

@ -0,0 +1,22 @@
// +build !windows
package system
import (
"syscall"
)
// Mknod creates a filesystem node (file, device special file or named pipe) named path
// with attributes specified by mode and dev.
func Mknod(path string, mode uint32, dev int) error {
return syscall.Mknod(path, mode, dev)
}
// Mkdev is used to build the value of linux devices (in /dev/) which specifies major
// and minor number of the newly created device special file.
// Linux device nodes are a bit weird due to backwards compat with 16 bit device nodes.
// They are, from low to high: the lower 8 bits of the minor, then 12 bits of the major,
// then the top 12 bits of the minor.
func Mkdev(major int64, minor int64) uint32 {
return uint32(((minor & 0xfff00) << 12) | ((major & 0xfff) << 8) | (minor & 0xff))
}

View File

@ -0,0 +1,13 @@
// +build windows
package system
// Mknod is not implemented on Windows.
func Mknod(path string, mode uint32, dev int) error {
return ErrNotSupportedPlatform
}
// Mkdev is not implemented on Windows.
func Mkdev(major int64, minor int64) uint32 {
panic("Mkdev not implemented on Windows.")
}

View File

@ -0,0 +1,8 @@
// +build !windows
package system
// DefaultPathEnv is unix style list of directories to search for
// executables. Each directory is separated from the next by a colon
// ':' character .
const DefaultPathEnv = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"

View File

@ -0,0 +1,7 @@
// +build windows
package system
// DefaultPathEnv is deliberately empty on Windows as the default path will be set by
// the container. Docker has no context of what the default path should be.
const DefaultPathEnv = ""

View File

@ -0,0 +1,53 @@
// +build !windows
package system
import (
"syscall"
)
// StatT type contains status of a file. It contains metadata
// like permission, owner, group, size, etc about a file.
type StatT struct {
mode uint32
uid uint32
gid uint32
rdev uint64
size int64
mtim syscall.Timespec
}
// Mode returns file's permission mode.
func (s StatT) Mode() uint32 {
return s.mode
}
// UID returns file's user id of owner.
func (s StatT) UID() uint32 {
return s.uid
}
// GID returns file's group id of owner.
func (s StatT) GID() uint32 {
return s.gid
}
// Rdev returns file's device ID (if it's special file).
func (s StatT) Rdev() uint64 {
return s.rdev
}
// Size returns file's size.
func (s StatT) Size() int64 {
return s.size
}
// Mtim returns file's last modification time.
func (s StatT) Mtim() syscall.Timespec {
return s.mtim
}
// GetLastModification returns file's last modification time.
func (s StatT) GetLastModification() syscall.Timespec {
return s.Mtim()
}

View File

@ -0,0 +1,27 @@
package system
import (
"syscall"
)
// fromStatT converts a syscall.Stat_t type to a system.Stat_t type
func fromStatT(s *syscall.Stat_t) (*StatT, error) {
return &StatT{size: s.Size,
mode: uint32(s.Mode),
uid: s.Uid,
gid: s.Gid,
rdev: uint64(s.Rdev),
mtim: s.Mtimespec}, nil
}
// Stat takes a path to a file and returns
// a system.Stat_t type pertaining to that file.
//
// Throws an error if the file does not exist
func Stat(path string) (*StatT, error) {
s := &syscall.Stat_t{}
if err := syscall.Stat(path, s); err != nil {
return nil, err
}
return fromStatT(s)
}

View File

@ -0,0 +1,33 @@
package system
import (
"syscall"
)
// fromStatT converts a syscall.Stat_t type to a system.Stat_t type
func fromStatT(s *syscall.Stat_t) (*StatT, error) {
return &StatT{size: s.Size,
mode: s.Mode,
uid: s.Uid,
gid: s.Gid,
rdev: s.Rdev,
mtim: s.Mtim}, nil
}
// FromStatT exists only on linux, and loads a system.StatT from a
// syscal.Stat_t.
func FromStatT(s *syscall.Stat_t) (*StatT, error) {
return fromStatT(s)
}
// Stat takes a path to a file and returns
// a system.StatT type pertaining to that file.
//
// Throws an error if the file does not exist
func Stat(path string) (*StatT, error) {
s := &syscall.Stat_t{}
if err := syscall.Stat(path, s); err != nil {
return nil, err
}
return fromStatT(s)
}

View File

@ -0,0 +1,15 @@
package system
import (
"syscall"
)
// fromStatT creates a system.StatT type from a syscall.Stat_t type
func fromStatT(s *syscall.Stat_t) (*StatT, error) {
return &StatT{size: s.Size,
mode: uint32(s.Mode),
uid: s.Uid,
gid: s.Gid,
rdev: uint64(s.Rdev),
mtim: s.Mtim}, nil
}

View File

@ -0,0 +1,17 @@
// +build solaris
package system
import (
"syscall"
)
// fromStatT creates a system.StatT type from a syscall.Stat_t type
func fromStatT(s *syscall.Stat_t) (*StatT, error) {
return &StatT{size: s.Size,
mode: uint32(s.Mode),
uid: s.Uid,
gid: s.Gid,
rdev: uint64(s.Rdev),
mtim: s.Mtim}, nil
}

View File

@ -0,0 +1,17 @@
// +build !linux,!windows,!freebsd,!solaris,!openbsd
package system
import (
"syscall"
)
// fromStatT creates a system.StatT type from a syscall.Stat_t type
func fromStatT(s *syscall.Stat_t) (*StatT, error) {
return &StatT{size: s.Size,
mode: uint32(s.Mode),
uid: s.Uid,
gid: s.Gid,
rdev: uint64(s.Rdev),
mtim: s.Mtimespec}, nil
}

View File

@ -0,0 +1,43 @@
// +build windows
package system
import (
"os"
"time"
)
// StatT type contains status of a file. It contains metadata
// like name, permission, size, etc about a file.
type StatT struct {
name string
size int64
mode os.FileMode
modTime time.Time
isDir bool
}
// Name returns file's name.
func (s StatT) Name() string {
return s.name
}
// Size returns file's size.
func (s StatT) Size() int64 {
return s.size
}
// Mode returns file's permission mode.
func (s StatT) Mode() os.FileMode {
return s.mode
}
// ModTime returns file's last modification time.
func (s StatT) ModTime() time.Time {
return s.modTime
}
// IsDir returns whether file is actually a directory.
func (s StatT) IsDir() bool {
return s.isDir
}

View File

@ -0,0 +1,17 @@
// +build linux freebsd
package system
import "syscall"
// Unmount is a platform-specific helper function to call
// the unmount syscall.
func Unmount(dest string) error {
return syscall.Unmount(dest, 0)
}
// CommandLineToArgv should not be used on Unix.
// It simply returns commandLine in the only element in the returned array.
func CommandLineToArgv(commandLine string) ([]string, error) {
return []string{commandLine}, nil
}

View File

@ -0,0 +1,73 @@
package system
import (
"syscall"
"unsafe"
)
var (
ntuserApiset = syscall.NewLazyDLL("ext-ms-win-ntuser-window-l1-1-0")
)
// OSVersion is a wrapper for Windows version information
// https://msdn.microsoft.com/en-us/library/windows/desktop/ms724439(v=vs.85).aspx
type OSVersion struct {
Version uint32
MajorVersion uint8
MinorVersion uint8
Build uint16
}
// GetOSVersion gets the operating system version on Windows. Note that
// docker.exe must be manifested to get the correct version information.
func GetOSVersion() OSVersion {
var err error
osv := OSVersion{}
osv.Version, err = syscall.GetVersion()
if err != nil {
// GetVersion never fails.
panic(err)
}
osv.MajorVersion = uint8(osv.Version & 0xFF)
osv.MinorVersion = uint8(osv.Version >> 8 & 0xFF)
osv.Build = uint16(osv.Version >> 16)
return osv
}
// Unmount is a platform-specific helper function to call
// the unmount syscall. Not supported on Windows
func Unmount(dest string) error {
return nil
}
// CommandLineToArgv wraps the Windows syscall to turn a commandline into an argument array.
func CommandLineToArgv(commandLine string) ([]string, error) {
var argc int32
argsPtr, err := syscall.UTF16PtrFromString(commandLine)
if err != nil {
return nil, err
}
argv, err := syscall.CommandLineToArgv(argsPtr, &argc)
if err != nil {
return nil, err
}
defer syscall.LocalFree(syscall.Handle(uintptr(unsafe.Pointer(argv))))
newArgs := make([]string, argc)
for i, v := range (*argv)[:argc] {
newArgs[i] = string(syscall.UTF16ToString((*v)[:]))
}
return newArgs, nil
}
// HasWin32KSupport determines whether containers that depend on win32k can
// run on this machine. Win32k is the driver used to implement windowing.
func HasWin32KSupport() bool {
// For now, check for ntuser API support on the host. In the future, a host
// may support win32k in containers even if the host does not support ntuser
// APIs.
return ntuserApiset.Load() == nil
}

View File

@ -0,0 +1,13 @@
// +build !windows
package system
import (
"syscall"
)
// Umask sets current process's file mode creation mask to newmask
// and returns oldmask.
func Umask(newmask int) (oldmask int, err error) {
return syscall.Umask(newmask), nil
}

View File

@ -0,0 +1,9 @@
// +build windows
package system
// Umask is not supported on the windows platform.
func Umask(newmask int) (oldmask int, err error) {
// should not be called on cli code path
return 0, ErrNotSupportedPlatform
}

View File

@ -0,0 +1,8 @@
package system
import "syscall"
// LUtimesNano is not supported by darwin platform.
func LUtimesNano(path string, ts []syscall.Timespec) error {
return ErrNotSupportedPlatform
}

View File

@ -0,0 +1,22 @@
package system
import (
"syscall"
"unsafe"
)
// LUtimesNano is used to change access and modification time of the specified path.
// It's used for symbol link file because syscall.UtimesNano doesn't support a NOFOLLOW flag atm.
func LUtimesNano(path string, ts []syscall.Timespec) error {
var _path *byte
_path, err := syscall.BytePtrFromString(path)
if err != nil {
return err
}
if _, _, err := syscall.Syscall(syscall.SYS_LUTIMES, uintptr(unsafe.Pointer(_path)), uintptr(unsafe.Pointer(&ts[0])), 0); err != 0 && err != syscall.ENOSYS {
return err
}
return nil
}

View File

@ -0,0 +1,26 @@
package system
import (
"syscall"
"unsafe"
)
// LUtimesNano is used to change access and modification time of the specified path.
// It's used for symbol link file because syscall.UtimesNano doesn't support a NOFOLLOW flag atm.
func LUtimesNano(path string, ts []syscall.Timespec) error {
// These are not currently available in syscall
atFdCwd := -100
atSymLinkNoFollow := 0x100
var _path *byte
_path, err := syscall.BytePtrFromString(path)
if err != nil {
return err
}
if _, _, err := syscall.Syscall6(syscall.SYS_UTIMENSAT, uintptr(atFdCwd), uintptr(unsafe.Pointer(_path)), uintptr(unsafe.Pointer(&ts[0])), uintptr(atSymLinkNoFollow), 0, 0); err != 0 && err != syscall.ENOSYS {
return err
}
return nil
}

View File

@ -0,0 +1,10 @@
// +build !linux,!freebsd,!darwin
package system
import "syscall"
// LUtimesNano is not supported on platforms other than linux, freebsd and darwin.
func LUtimesNano(path string, ts []syscall.Timespec) error {
return ErrNotSupportedPlatform
}

View File

@ -0,0 +1,63 @@
package system
import (
"syscall"
"unsafe"
)
// Lgetxattr retrieves the value of the extended attribute identified by attr
// and associated with the given path in the file system.
// It will returns a nil slice and nil error if the xattr is not set.
func Lgetxattr(path string, attr string) ([]byte, error) {
pathBytes, err := syscall.BytePtrFromString(path)
if err != nil {
return nil, err
}
attrBytes, err := syscall.BytePtrFromString(attr)
if err != nil {
return nil, err
}
dest := make([]byte, 128)
destBytes := unsafe.Pointer(&dest[0])
sz, _, errno := syscall.Syscall6(syscall.SYS_LGETXATTR, uintptr(unsafe.Pointer(pathBytes)), uintptr(unsafe.Pointer(attrBytes)), uintptr(destBytes), uintptr(len(dest)), 0, 0)
if errno == syscall.ENODATA {
return nil, nil
}
if errno == syscall.ERANGE {
dest = make([]byte, sz)
destBytes := unsafe.Pointer(&dest[0])
sz, _, errno = syscall.Syscall6(syscall.SYS_LGETXATTR, uintptr(unsafe.Pointer(pathBytes)), uintptr(unsafe.Pointer(attrBytes)), uintptr(destBytes), uintptr(len(dest)), 0, 0)
}
if errno != 0 {
return nil, errno
}
return dest[:sz], nil
}
var _zero uintptr
// Lsetxattr sets the value of the extended attribute identified by attr
// and associated with the given path in the file system.
func Lsetxattr(path string, attr string, data []byte, flags int) error {
pathBytes, err := syscall.BytePtrFromString(path)
if err != nil {
return err
}
attrBytes, err := syscall.BytePtrFromString(attr)
if err != nil {
return err
}
var dataBytes unsafe.Pointer
if len(data) > 0 {
dataBytes = unsafe.Pointer(&data[0])
} else {
dataBytes = unsafe.Pointer(&_zero)
}
_, _, errno := syscall.Syscall6(syscall.SYS_LSETXATTR, uintptr(unsafe.Pointer(pathBytes)), uintptr(unsafe.Pointer(attrBytes)), uintptr(dataBytes), uintptr(len(data)), uintptr(flags), 0)
if errno != 0 {
return errno
}
return nil
}

View File

@ -0,0 +1,13 @@
// +build !linux
package system
// Lgetxattr is not supported on platforms other than linux.
func Lgetxattr(path string, attr string) ([]byte, error) {
return nil, ErrNotSupportedPlatform
}
// Lsetxattr is not supported on platforms other than linux.
func Lsetxattr(path string, attr string, data []byte, flags int) error {
return ErrNotSupportedPlatform
}

View File

@ -61,7 +61,6 @@ var ErrInternalBadWireType = errors.New("proto: internal error: bad wiretype for
// int32, int64, uint32, uint64, bool, and enum
// protocol buffer types.
func DecodeVarint(buf []byte) (x uint64, n int) {
// x, n already 0
for shift := uint(0); shift < 64; shift += 7 {
if n >= len(buf) {
return 0, 0
@ -78,13 +77,7 @@ func DecodeVarint(buf []byte) (x uint64, n int) {
return 0, 0
}
// DecodeVarint reads a varint-encoded integer from the Buffer.
// This is the format for the
// int32, int64, uint32, uint64, bool, and enum
// protocol buffer types.
func (p *Buffer) DecodeVarint() (x uint64, err error) {
// x, err already 0
func (p *Buffer) decodeVarintSlow() (x uint64, err error) {
i := p.index
l := len(p.buf)
@ -107,6 +100,107 @@ func (p *Buffer) DecodeVarint() (x uint64, err error) {
return
}
// DecodeVarint reads a varint-encoded integer from the Buffer.
// This is the format for the
// int32, int64, uint32, uint64, bool, and enum
// protocol buffer types.
func (p *Buffer) DecodeVarint() (x uint64, err error) {
i := p.index
buf := p.buf
if i >= len(buf) {
return 0, io.ErrUnexpectedEOF
} else if buf[i] < 0x80 {
p.index++
return uint64(buf[i]), nil
} else if len(buf)-i < 10 {
return p.decodeVarintSlow()
}
var b uint64
// we already checked the first byte
x = uint64(buf[i]) - 0x80
i++
b = uint64(buf[i])
i++
x += b << 7
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 7
b = uint64(buf[i])
i++
x += b << 14
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 14
b = uint64(buf[i])
i++
x += b << 21
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 21
b = uint64(buf[i])
i++
x += b << 28
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 28
b = uint64(buf[i])
i++
x += b << 35
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 35
b = uint64(buf[i])
i++
x += b << 42
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 42
b = uint64(buf[i])
i++
x += b << 49
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 49
b = uint64(buf[i])
i++
x += b << 56
if b&0x80 == 0 {
goto done
}
x -= 0x80 << 56
b = uint64(buf[i])
i++
x += b << 63
if b&0x80 == 0 {
goto done
}
// x -= 0x80 << 63 // Always zero.
return 0, errOverflow
done:
p.index = i
return x, nil
}
// DecodeFixed64 reads a 64-bit integer from the Buffer.
// This is the format for the
// fixed64, sfixed64, and double protocol buffer types.
@ -340,6 +434,8 @@ func (p *Buffer) DecodeGroup(pb Message) error {
// Buffer and places the decoded result in pb. If the struct
// underlying pb does not match the data in the buffer, the results can be
// unpredictable.
//
// Unlike proto.Unmarshal, this does not reset pb before starting to unmarshal.
func (p *Buffer) Unmarshal(pb Message) error {
// If the object can unmarshal itself, let it.
if u, ok := pb.(Unmarshaler); ok {

View File

@ -234,10 +234,6 @@ func Marshal(pb Message) ([]byte, error) {
}
p := NewBuffer(nil)
err := p.Marshal(pb)
var state errorState
if err != nil && !state.shouldContinue(err, nil) {
return nil, err
}
if p.buf == nil && err == nil {
// Return a non-nil slice on success.
return []byte{}, nil
@ -266,11 +262,8 @@ func (p *Buffer) Marshal(pb Message) error {
// Can the object marshal itself?
if m, ok := pb.(Marshaler); ok {
data, err := m.Marshal()
if err != nil {
return err
}
p.buf = append(p.buf, data...)
return nil
return err
}
t, base, err := getbase(pb)
@ -282,7 +275,7 @@ func (p *Buffer) Marshal(pb Message) error {
}
if collectStats {
stats.Encode++
(stats).Encode++ // Parens are to work around a goimports bug.
}
if len(p.buf) > maxMarshalSize {
@ -309,7 +302,7 @@ func Size(pb Message) (n int) {
}
if collectStats {
stats.Size++
(stats).Size++ // Parens are to work around a goimports bug.
}
return
@ -1014,7 +1007,6 @@ func size_slice_struct_message(p *Properties, base structPointer) (n int) {
if p.isMarshaler {
m := structPointer_Interface(structp, p.stype).(Marshaler)
data, _ := m.Marshal()
n += len(p.tagcode)
n += sizeRawBytes(data)
continue
}
@ -1083,10 +1075,17 @@ func (o *Buffer) enc_map(p *Properties, base structPointer) error {
func (o *Buffer) enc_exts(p *Properties, base structPointer) error {
exts := structPointer_Extensions(base, p.field)
if err := encodeExtensions(exts); err != nil {
v, mu := exts.extensionsRead()
if v == nil {
return nil
}
mu.Lock()
defer mu.Unlock()
if err := encodeExtensionsMap(v); err != nil {
return err
}
v, _ := exts.extensionsRead()
return o.enc_map_body(v)
}

View File

@ -154,6 +154,7 @@ type ExtensionDesc struct {
Field int32 // field number
Name string // fully-qualified name of extension, for text formatting
Tag string // protobuf tag style
Filename string // name of the file in which the extension is defined
}
func (ed *ExtensionDesc) repeated() bool {

View File

@ -592,7 +592,11 @@ func (p *textParser) readStruct(sv reflect.Value, terminator string) error {
props = oop.Prop
nv := reflect.New(oop.Type.Elem())
dst = nv.Elem().Field(0)
sv.Field(oop.Field).Set(nv)
field := sv.Field(oop.Field)
if !field.IsNil() {
return p.errorf("field '%s' would overwrite already parsed oneof '%s'", name, sv.Type().Field(oop.Field).Name)
}
field.Set(nv)
}
if !dst.IsValid() {
return p.errorf("unknown field name %q in %v", name, st)

View File

@ -3,19 +3,17 @@ language: go
go:
- 1.5.4
- 1.6.3
- 1.7
go_import_path: google.golang.org/grpc
before_install:
- go get golang.org/x/tools/cmd/goimports
- go get github.com/golang/lint/golint
- go get github.com/axw/gocov/gocov
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
- if [[ $TRAVIS_GO_VERSION != 1.5* ]]; then go get github.com/golang/lint/golint; fi
- go get -u golang.org/x/tools/cmd/goimports github.com/axw/gocov/gocov github.com/mattn/goveralls golang.org/x/tools/cmd/cover
script:
- '! gofmt -s -d -l . 2>&1 | read'
- '! goimports -l . | read'
- '! golint ./... | grep -vE "(_string|\.pb)\.go:"'
- '! go tool vet -all . 2>&1 | grep -vE "constant [0-9]+ not a string in call to Errorf"'
- 'if [[ $TRAVIS_GO_VERSION != 1.5* ]]; then ! golint ./... | grep -vE "(_string|\.pb)\.go:"; fi'
- '! go tool vet -all . 2>&1 | grep -vE "constant [0-9]+ not a string in call to Errorf" | grep -vF .pb.go:' # https://github.com/golang/protobuf/issues/214
- make test testrace

View File

@ -16,7 +16,7 @@ $ go get google.golang.org/grpc
Prerequisites
-------------
This requires Go 1.5 or later .
This requires Go 1.5 or later.
Constraints
-----------

View File

@ -58,7 +58,7 @@ func setDefaults(bc *BackoffConfig) {
}
}
func (bc BackoffConfig) backoff(retries int) (t time.Duration) {
func (bc BackoffConfig) backoff(retries int) time.Duration {
if retries == 0 {
return bc.baseDelay
}

View File

@ -38,6 +38,8 @@ import (
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/naming"
)
@ -52,6 +54,14 @@ type Address struct {
Metadata interface{}
}
// BalancerConfig specifies the configurations for Balancer.
type BalancerConfig struct {
// DialCreds is the transport credential the Balancer implementation can
// use to dial to a remote load balancer server. The Balancer implementations
// can ignore this if it does not need to talk to another party securely.
DialCreds credentials.TransportCredentials
}
// BalancerGetOptions configures a Get call.
// This is the EXPERIMENTAL API and may be changed or extended in the future.
type BalancerGetOptions struct {
@ -66,11 +76,11 @@ type Balancer interface {
// Start does the initialization work to bootstrap a Balancer. For example,
// this function may start the name resolution and watch the updates. It will
// be called when dialing.
Start(target string) error
Start(target string, config BalancerConfig) error
// Up informs the Balancer that gRPC has a connection to the server at
// addr. It returns down which is called once the connection to addr gets
// lost or closed.
// TODO: It is not clear how to construct and take advantage the meaningful error
// TODO: It is not clear how to construct and take advantage of the meaningful error
// parameter for down. Need realistic demands to guide.
Up(addr Address) (down func(error))
// Get gets the address of a server for the RPC corresponding to ctx.
@ -205,7 +215,12 @@ func (rr *roundRobin) watchAddrUpdates() error {
return nil
}
func (rr *roundRobin) Start(target string) error {
func (rr *roundRobin) Start(target string, config BalancerConfig) error {
rr.mu.Lock()
defer rr.mu.Unlock()
if rr.done {
return ErrClientConnClosing
}
if rr.r == nil {
// If there is no name resolver installed, it is not needed to
// do name resolution. In this case, target is added into rr.addrs
@ -301,7 +316,7 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
if !opts.BlockingWait {
if len(rr.addrs) == 0 {
rr.mu.Unlock()
err = fmt.Errorf("there is no address available")
err = Errorf(codes.Unavailable, "there is no address available")
return
}
// Returns the next addr on rr.addrs for failfast RPCs.

View File

@ -49,9 +49,8 @@ import (
// On error, it returns the error and indicates whether the call should be retried.
//
// TODO(zhaoq): Check whether the received message sequence is valid.
func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error {
func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
// Try to acquire header metadata from the server if there is any.
var err error
defer func() {
if err != nil {
if _, ok := err.(transport.ConnectionError); !ok {
@ -61,7 +60,7 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s
}()
c.headerMD, err = stream.Header()
if err != nil {
return err
return
}
p := &parser{r: stream}
for {
@ -69,7 +68,7 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s
if err == io.EOF {
break
}
return err
return
}
}
c.trailerMD = stream.Trailer()
@ -96,7 +95,7 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
}
outBuf, err := encode(codec, args, compressor, cbuf)
if err != nil {
return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err)
return nil, Errorf(codes.Internal, "grpc: %v", err)
}
err = t.Write(stream, outBuf, opts)
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
@ -112,7 +111,14 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
// Invoke sends the RPC request on the wire and returns after response is received.
// Invoke is called by generated code. Also users can call Invoke directly when it
// is really needed in their use cases.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
c := defaultCallInfo
for _, o := range opts {
if err := o.before(&c); err != nil {

View File

@ -83,15 +83,17 @@ var (
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
codec Codec
cp Compressor
dc Decompressor
bs backoffStrategy
balancer Balancer
block bool
insecure bool
timeout time.Duration
copts transport.ConnectOptions
unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor
codec Codec
cp Compressor
dc Decompressor
bs backoffStrategy
balancer Balancer
block bool
insecure bool
timeout time.Duration
copts transport.ConnectOptions
}
// DialOption configures how we set up the connection.
@ -215,19 +217,48 @@ func WithUserAgent(s string) DialOption {
}
}
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
o.unaryInt = f
}
}
// WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
return func(o *dialOptions) {
o.streamInt = f
}
}
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
// DialContext creates a client connection to the given target
// using the supplied context.
func DialContext(ctx context.Context, target string, opts ...DialOption) (*ClientConn, error) {
// DialContext creates a client connection to the given target. ctx can be used to
// cancel or expire the pending connecting. Once this function returns, the
// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
// to terminate all the pending operations after this function returns.
// This is the EXPERIMENTAL API.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[Address]*addrConn),
}
cc.ctx, cc.cancel = context.WithCancel(ctx)
cc.ctx, cc.cancel = context.WithCancel(context.Background())
defer func() {
select {
case <-ctx.Done():
conn, err = nil, ctx.Err()
default:
}
if err != nil {
cc.Close()
}
}()
for _, opt := range opts {
opt(&cc.dopts)
}
@ -239,31 +270,47 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (*Clien
if cc.dopts.bs == nil {
cc.dopts.bs = DefaultBackoffConfig
}
var (
ok bool
addrs []Address
)
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
} else {
if err := cc.dopts.balancer.Start(target); err != nil {
return nil, err
}
ch := cc.dopts.balancer.Notify()
if ch == nil {
// There is no name resolver installed.
addrs = append(addrs, Address{Addr: target})
} else {
addrs, ok = <-ch
if !ok || len(addrs) == 0 {
return nil, errNoAddr
}
colonPos := strings.LastIndex(target, ":")
if colonPos == -1 {
colonPos = len(target)
}
cc.authority = target[:colonPos]
}
var ok bool
waitC := make(chan error, 1)
go func() {
var addrs []Address
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
} else {
var credsClone credentials.TransportCredentials
if creds != nil {
credsClone = creds.Clone()
}
config := BalancerConfig{
DialCreds: credsClone,
}
if err := cc.dopts.balancer.Start(target, config); err != nil {
waitC <- err
return
}
ch := cc.dopts.balancer.Notify()
if ch == nil {
// There is no name resolver installed.
addrs = append(addrs, Address{Addr: target})
} else {
addrs, ok = <-ch
if !ok || len(addrs) == 0 {
waitC <- errNoAddr
return
}
}
}
for _, a := range addrs {
if err := cc.resetAddrConn(a, false, nil); err != nil {
waitC <- err
@ -277,16 +324,13 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (*Clien
timeoutCh = time.After(cc.dopts.timeout)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-waitC:
if err != nil {
cc.Close()
return nil, err
}
case <-cc.ctx.Done():
cc.Close()
return nil, cc.ctx.Err()
case <-timeoutCh:
cc.Close()
return nil, ErrClientConnTimeout
}
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
@ -294,11 +338,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (*Clien
if ok {
go cc.lbWatcher()
}
colonPos := strings.LastIndex(target, ":")
if colonPos == -1 {
colonPos = len(target)
}
cc.authority = target[:colonPos]
return cc, nil
}
@ -645,14 +684,18 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
}
ctx, cancel := context.WithTimeout(ac.ctx, timeout)
connectTime := time.Now()
newTransport, err := transport.NewClientTransport(ctx, ac.addr.Addr, ac.dopts.copts)
sinfo := transport.TargetInfo{
Addr: ac.addr.Addr,
Metadata: ac.addr.Metadata,
}
newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
if err != nil {
cancel()
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
return err
}
grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
ac.mu.Lock()
if ac.state == Shutdown {
// ac.tearDown(...) has been invoked.
@ -764,7 +807,7 @@ func (ac *addrConn) transportMonitor() {
}
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
// iv) transport is in TransientFailure and there's no balancer/failfast is true.
// iv) transport is in TransientFailure and there is a balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()

View File

@ -40,6 +40,7 @@ package credentials // import "google.golang.org/grpc/credentials"
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"net"
@ -71,7 +72,7 @@ type PerRPCCredentials interface {
}
// ProtocolInfo provides information regarding the gRPC wire protocol version,
// security protocol, security protocol version in use, etc.
// security protocol, security protocol version in use, server name, etc.
type ProtocolInfo struct {
// ProtocolVersion is the gRPC wire protocol version.
ProtocolVersion string
@ -79,6 +80,8 @@ type ProtocolInfo struct {
SecurityProtocol string
// SecurityVersion is the security protocol version.
SecurityVersion string
// ServerName is the user-configured server name.
ServerName string
}
// AuthInfo defines the common interface for the auth information the users are interested in.
@ -86,6 +89,12 @@ type AuthInfo interface {
AuthType() string
}
var (
// ErrConnDispatched indicates that rawConn has been dispatched out of gRPC
// and the caller should not close rawConn.
ErrConnDispatched = errors.New("credentials: rawConn is dispatched out of gRPC")
)
// TransportCredentials defines the common interface for all the live gRPC wire
// protocols and supported transport security protocols (e.g., TLS, SSL).
type TransportCredentials interface {
@ -100,6 +109,12 @@ type TransportCredentials interface {
ServerHandshake(net.Conn) (net.Conn, AuthInfo, error)
// Info provides the ProtocolInfo of this TransportCredentials.
Info() ProtocolInfo
// Clone makes a copy of this TransportCredentials.
Clone() TransportCredentials
// OverrideServerName overrides the server name used to verify the hostname on the returned certificates from the server.
// gRPC internals also use it to override the virtual hosting name if it is set.
// It must be called before dialing. Currently, this is only used by grpclb.
OverrideServerName(string) error
}
// TLSInfo contains the auth information for a TLS authenticated connection.
@ -123,19 +138,10 @@ func (c tlsCreds) Info() ProtocolInfo {
return ProtocolInfo{
SecurityProtocol: "tls",
SecurityVersion: "1.2",
ServerName: c.config.ServerName,
}
}
// GetRequestMetadata returns nil, nil since TLS credentials does not have
// metadata.
func (c *tlsCreds) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return nil, nil
}
func (c *tlsCreds) RequireTransportSecurity() bool {
return true
}
func (c *tlsCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (_ net.Conn, _ AuthInfo, err error) {
// use local cfg to avoid clobbering ServerName if using multiple endpoints
cfg := cloneTLSConfig(c.config)
@ -172,6 +178,15 @@ func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error)
return conn, TLSInfo{conn.ConnectionState()}, nil
}
func (c *tlsCreds) Clone() TransportCredentials {
return NewTLS(c.config)
}
func (c *tlsCreds) OverrideServerName(serverNameOverride string) error {
c.config.ServerName = serverNameOverride
return nil
}
// NewTLS uses c to construct a TransportCredentials based on TLS.
func NewTLS(c *tls.Config) TransportCredentials {
tc := &tlsCreds{cloneTLSConfig(c)}
@ -180,12 +195,16 @@ func NewTLS(c *tls.Config) TransportCredentials {
}
// NewClientTLSFromCert constructs a TLS from the input certificate for client.
func NewClientTLSFromCert(cp *x509.CertPool, serverName string) TransportCredentials {
return NewTLS(&tls.Config{ServerName: serverName, RootCAs: cp})
// serverNameOverride is for testing only. If set to a non empty string,
// it will override the virtual host name of authority (e.g. :authority header field) in requests.
func NewClientTLSFromCert(cp *x509.CertPool, serverNameOverride string) TransportCredentials {
return NewTLS(&tls.Config{ServerName: serverNameOverride, RootCAs: cp})
}
// NewClientTLSFromFile constructs a TLS from the input certificate file for client.
func NewClientTLSFromFile(certFile, serverName string) (TransportCredentials, error) {
// serverNameOverride is for testing only. If set to a non empty string,
// it will override the virtual host name of authority (e.g. :authority header field) in requests.
func NewClientTLSFromFile(certFile, serverNameOverride string) (TransportCredentials, error) {
b, err := ioutil.ReadFile(certFile)
if err != nil {
return nil, err
@ -194,7 +213,7 @@ func NewClientTLSFromFile(certFile, serverName string) (TransportCredentials, er
if !cp.AppendCertsFromPEM(b) {
return nil, fmt.Errorf("credentials: failed to append certificates")
}
return NewTLS(&tls.Config{ServerName: serverName, RootCAs: cp}), nil
return NewTLS(&tls.Config{ServerName: serverNameOverride, RootCAs: cp}), nil
}
// NewServerTLSFromCert constructs a TLS from the input certificate for server.

View File

@ -90,7 +90,7 @@ var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion3
const _ = grpc.SupportPackageIsVersion4
// Client API for Health service
@ -153,7 +153,7 @@ var _Health_serviceDesc = grpc.ServiceDesc{
},
},
Streams: []grpc.StreamDesc{},
Metadata: fileDescriptor0,
Metadata: "health.proto",
}
func init() { proto.RegisterFile("health.proto", fileDescriptor0) }

View File

@ -37,6 +37,22 @@ import (
"golang.org/x/net/context"
)
// UnaryInvoker is called by UnaryClientInterceptor to complete RPCs.
type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error
// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. inovker is the handler to complete the RPC
// and it is the responsibility of the interceptor to call it.
// This is the EXPERIMENTAL API.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
// Streamer is called by StreamClientInterceptor to create a ClientStream.
type Streamer func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error)
// StreamClientInterceptor intercepts the creation of ClientStream. It may return a custom ClientStream to intercept all I/O
// operations. streamer is the handlder to create a ClientStream and it is the responsibility of the interceptor to call it.
// This is the EXPERIMENTAL API.
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
// UnaryServerInfo consists of various information about a unary RPC on
// server side. All per-rpc information may be mutated by the interceptor.
type UnaryServerInfo struct {

View File

@ -117,10 +117,17 @@ func (md MD) Len() int {
// Copy returns a copy of md.
func (md MD) Copy() MD {
return Join(md)
}
// Join joins any number of MDs into a single MD.
// The order of values for each key is determined by the order in which
// the MDs containing those values are presented to Join.
func Join(mds ...MD) MD {
out := MD{}
for k, v := range md {
for _, i := range v {
out[k] = append(out[k], i)
for _, md := range mds {
for k, v := range md {
out[k] = append(out[k], v...)
}
}
return out

View File

@ -303,10 +303,10 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
case compressionNone:
case compressionMade:
if dc == nil || recvCompress != dc.Type() {
return transport.StreamErrorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
return Errorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
}
default:
return transport.StreamErrorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
return Errorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
}
return nil
}
@ -448,10 +448,10 @@ func convertCode(err error) codes.Code {
return codes.Unknown
}
// SupportPackageIsVersion3 is referenced from generated protocol buffer files
// SupportPackageIsVersion4 is referenced from generated protocol buffer files
// to assert that that code is compatible with this version of the grpc package.
//
// This constant may be renamed in the future if a change in the generated code
// requires a synchronised update of grpc-go and protoc-gen-go. This constant
// should not be referenced from any other code.
const SupportPackageIsVersion3 = true
const SupportPackageIsVersion4 = true

View File

@ -89,10 +89,12 @@ type service struct {
type Server struct {
opts options
mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[io.Closer]bool
drain bool
mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[io.Closer]bool
drain bool
ctx context.Context
cancel context.CancelFunc
// A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
// and all the transport goes away.
cv *sync.Cond
@ -203,6 +205,7 @@ func NewServer(opt ...ServerOption) *Server {
m: make(map[string]*service),
}
s.cv = sync.NewCond(&s.mu)
s.ctx, s.cancel = context.WithCancel(context.Background())
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
@ -324,7 +327,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Service returns when lis.Accept fails. lis will be closed when
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
@ -344,14 +347,38 @@ func (s *Server) Serve(lis net.Listener) error {
}
s.mu.Unlock()
}()
var tempDelay time.Duration // how long to sleep on accept failure
for {
rawConn, err := lis.Accept()
if err != nil {
if ne, ok := err.(interface {
Temporary() bool
}); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
s.mu.Lock()
s.printf("Accept error: %v; retrying in %v", err, tempDelay)
s.mu.Unlock()
select {
case <-time.After(tempDelay):
case <-s.ctx.Done():
}
continue
}
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
return err
}
tempDelay = 0
// Start a new goroutine to deal with rawConn
// so we don't stall this Accept loop goroutine.
go s.handleRawConn(rawConn)
@ -367,7 +394,10 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
rawConn.Close()
// If serverHandShake returns ErrConnDispatched, keep rawConn open.
if err != credentials.ErrConnDispatched {
rawConn.Close()
}
return
}
@ -497,7 +527,7 @@ func (s *Server) removeConn(c io.Closer) {
defer s.mu.Unlock()
if s.conns != nil {
delete(s.conns, c)
s.cv.Signal()
s.cv.Broadcast()
}
}
@ -544,7 +574,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
return err
}
if err == io.ErrUnexpectedEOF {
err = transport.StreamError{Code: codes.Internal, Desc: "io.ErrUnexpectedEOF"}
err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
if err != nil {
switch err := err.(type) {
@ -566,8 +596,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
switch err := err.(type) {
case transport.StreamError:
if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
case *rpcError:
if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
}
default:
@ -798,7 +828,7 @@ func (s *Server) Stop() {
st := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
s.cv.Signal()
s.cv.Broadcast()
s.mu.Unlock()
for lis := range listeners {
@ -809,6 +839,7 @@ func (s *Server) Stop() {
}
s.mu.Lock()
s.cancel()
if s.events != nil {
s.events.Finish()
s.events = nil
@ -821,16 +852,19 @@ func (s *Server) Stop() {
func (s *Server) GracefulStop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.drain == true || s.conns == nil {
if s.conns == nil {
return
}
s.drain = true
for lis := range s.lis {
lis.Close()
}
s.lis = nil
for c := range s.conns {
c.(transport.ServerTransport).Drain()
s.cancel()
if !s.drain {
for c := range s.conns {
c.(transport.ServerTransport).Drain()
}
s.drain = true
}
for len(s.conns) != 0 {
s.cv.Wait()
@ -862,33 +896,49 @@ func (s *Server) testingCloseConns() {
s.mu.Unlock()
}
// SendHeader sends header metadata. It may be called at most once from a unary
// RPC handler. The ctx is the RPC handler's Context or one derived from it.
func SendHeader(ctx context.Context, md metadata.MD) error {
// SetHeader sets the header metadata.
// When called multiple times, all the provided metadata will be merged.
// All the metadata will be sent out when one of the following happens:
// - grpc.SendHeader() is called;
// - The first response is sent out;
// - An RPC status is sent out (error or success).
func SetHeader(ctx context.Context, md metadata.MD) error {
if md.Len() == 0 {
return nil
}
stream, ok := transport.StreamFromContext(ctx)
if !ok {
return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx)
return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
return stream.SetHeader(md)
}
// SendHeader sends header metadata. It may be called at most once.
// The provided md and headers set by SetHeader() will be sent.
func SendHeader(ctx context.Context, md metadata.MD) error {
stream, ok := transport.StreamFromContext(ctx)
if !ok {
return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
t := stream.ServerTransport()
if t == nil {
grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
}
return t.WriteHeader(stream, md)
if err := t.WriteHeader(stream, md); err != nil {
return toRPCErr(err)
}
return nil
}
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
// It may be called at most once from a unary RPC handler. The ctx is the RPC
// handler's Context or one derived from it.
// When called more than once, all the provided metadata will be merged.
func SetTrailer(ctx context.Context, md metadata.MD) error {
if md.Len() == 0 {
return nil
}
stream, ok := transport.StreamFromContext(ctx)
if !ok {
return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx)
return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
return stream.SetTrailer(md)
}

View File

@ -97,7 +97,14 @@ type ClientStream interface {
// NewClientStream creates a new Stream for the client side. This is called
// by generated code.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
if cc.dopts.streamInt != nil {
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
}
return newClientStream(ctx, desc, cc, method, opts...)
}
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
var (
t transport.ClientTransport
s *transport.Stream
@ -296,7 +303,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
}()
if err != nil {
return transport.StreamErrorf(codes.Internal, "grpc: %v", err)
return Errorf(codes.Internal, "grpc: %v", err)
}
return cs.t.Write(cs.s, out, &transport.Options{Last: false})
}
@ -403,12 +410,19 @@ func (cs *clientStream) finish(err error) {
// ServerStream defines the interface a server stream has to satisfy.
type ServerStream interface {
// SendHeader sends the header metadata. It should not be called
// after SendProto. It fails if called multiple times or if
// called after SendProto.
// SetHeader sets the header metadata. It may be called multiple times.
// When call multiple times, all the provided metadata will be merged.
// All the metadata will be sent out when one of the following happens:
// - ServerStream.SendHeader() is called;
// - The first response is sent out;
// - An RPC status is sent out (error or success).
SetHeader(metadata.MD) error
// SendHeader sends the header metadata.
// The provided md and headers set by SetHeader() will be sent.
// It fails if called multiple times.
SendHeader(metadata.MD) error
// SetTrailer sets the trailer metadata which will be sent with the
// RPC status.
// SetTrailer sets the trailer metadata which will be sent with the RPC status.
// When called more than once, all the provided metadata will be merged.
SetTrailer(metadata.MD)
Stream
}
@ -434,6 +448,13 @@ func (ss *serverStream) Context() context.Context {
return ss.s.Context()
}
func (ss *serverStream) SetHeader(md metadata.MD) error {
if md.Len() == 0 {
return nil
}
return ss.s.SetHeader(md)
}
func (ss *serverStream) SendHeader(md metadata.MD) error {
return ss.t.WriteHeader(ss.s, md)
}
@ -468,10 +489,13 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
}
}()
if err != nil {
err = transport.StreamErrorf(codes.Internal, "grpc: %v", err)
err = Errorf(codes.Internal, "grpc: %v", err)
return err
}
return ss.t.Write(ss.s, out, &transport.Options{Last: false})
if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
return nil
}
func (ss *serverStream) RecvMsg(m interface{}) (err error) {
@ -489,5 +513,14 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
return recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize)
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize); err != nil {
if err == io.EOF {
return err
}
if err == io.ErrUnexpectedEOF {
err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
return toRPCErr(err)
}
return nil
}

View File

@ -85,7 +85,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr
if v := r.Header.Get("grpc-timeout"); v != "" {
to, err := decodeTimeout(v)
if err != nil {
return nil, StreamErrorf(codes.Internal, "malformed time-out: %v", err)
return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err)
}
st.timeoutSet = true
st.timeout = to
@ -393,5 +393,5 @@ func mapRecvMsgError(err error) error {
}
}
}
return ConnectionError{Desc: err.Error()}
return connectionErrorf(true, err, err.Error())
}

View File

@ -57,6 +57,7 @@ import (
type http2Client struct {
target string // server name/addr
userAgent string
md interface{}
conn net.Conn // underlying communication channel
authInfo credentials.AuthInfo // auth info about the connection
nextID uint32 // the next stream ID to be used
@ -107,21 +108,49 @@ type http2Client struct {
prevGoAwayID uint32
}
func dial(fn func(context.Context, string) (net.Conn, error), ctx context.Context, addr string) (net.Conn, error) {
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
if fn != nil {
return fn(ctx, addr)
}
return dialContext(ctx, "tcp", addr)
}
func isTemporary(err error) bool {
switch err {
case io.EOF:
// Connection closures may be resolved upon retry, and are thus
// treated as temporary.
return true
case context.DeadlineExceeded:
// In Go 1.7, context.DeadlineExceeded implements Timeout(), and this
// special case is not needed. Until then, we need to keep this
// clause.
return true
}
switch err := err.(type) {
case interface {
Temporary() bool
}:
return err.Temporary()
case interface {
Timeout() bool
}:
// Timeouts may be resolved upon retry, and are thus treated as
// temporary.
return err.Timeout()
}
return false
}
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ ClientTransport, err error) {
func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) {
scheme := "http"
conn, connErr := dial(opts.Dialer, ctx, addr)
if connErr != nil {
return nil, ConnectionErrorf(true, connErr, "transport: %v", connErr)
conn, err := dial(ctx, opts.Dialer, addr.Addr)
if err != nil {
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Any further errors will close the underlying connection
defer func(conn net.Conn) {
@ -132,12 +161,13 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
var authInfo credentials.AuthInfo
if creds := opts.TransportCredentials; creds != nil {
scheme = "https"
conn, authInfo, connErr = creds.ClientHandshake(ctx, addr, conn)
}
if connErr != nil {
// Credentials handshake error is not a temporary error (unless the error
// was the connection closing).
return nil, ConnectionErrorf(connErr == io.EOF, connErr, "transport: %v", connErr)
conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn)
if err != nil {
// Credentials handshake errors are typically considered permanent
// to avoid retrying on e.g. bad certificates.
temp := isTemporary(err)
return nil, connectionErrorf(temp, err, "transport: %v", err)
}
}
ua := primaryUA
if opts.UserAgent != "" {
@ -145,8 +175,9 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
}
var buf bytes.Buffer
t := &http2Client{
target: addr,
target: addr.Addr,
userAgent: ua,
md: addr.Metadata,
conn: conn,
authInfo: authInfo,
// The client initiated stream id is odd starting from 1.
@ -176,11 +207,11 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
n, err := t.conn.Write(clientPreface)
if err != nil {
t.Close()
return nil, ConnectionErrorf(true, err, "transport: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
if n != len(clientPreface) {
t.Close()
return nil, ConnectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
}
if initialWindowSize != defaultWindowSize {
err = t.framer.writeSettings(true, http2.Setting{
@ -192,13 +223,13 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
}
if err != nil {
t.Close()
return nil, ConnectionErrorf(true, err, "transport: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
t.Close()
return nil, ConnectionErrorf(true, err, "transport: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
go t.controller()
@ -223,8 +254,10 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
}
// Make a stream be able to cancel the pending operations by itself.
s.ctx, s.cancel = context.WithCancel(ctx)
// The client side stream context should have exactly the same life cycle with the user provided context.
// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
// So we use the original context here instead of creating a copy.
s.ctx = ctx
s.dec = &recvBufferReader{
ctx: s.ctx,
goAway: s.goAway,
@ -236,16 +269,6 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// NewStream creates a stream and register it into the transport as "active"
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
// Record the timeout value on the context.
var timeout time.Duration
if dl, ok := ctx.Deadline(); ok {
timeout = dl.Sub(time.Now())
}
select {
case <-ctx.Done():
return nil, ContextErr(ctx.Err())
default:
}
pr := &peer.Peer{
Addr: t.conn.RemoteAddr(),
}
@ -266,12 +289,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
pos := strings.LastIndex(callHdr.Method, "/")
if pos == -1 {
return nil, StreamErrorf(codes.InvalidArgument, "transport: malformed method name: %q", callHdr.Method)
return nil, streamErrorf(codes.InvalidArgument, "transport: malformed method name: %q", callHdr.Method)
}
audience := "https://" + callHdr.Host + port + callHdr.Method[:pos]
data, err := c.GetRequestMetadata(ctx, audience)
if err != nil {
return nil, StreamErrorf(codes.InvalidArgument, "transport: %v", err)
return nil, streamErrorf(codes.InvalidArgument, "transport: %v", err)
}
for k, v := range data {
authData[k] = v
@ -352,9 +375,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if callHdr.SendCompress != "" {
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
}
if timeout > 0 {
if dl, ok := ctx.Deadline(); ok {
// Send out timeout regardless its value. The server can detect timeout context by itself.
timeout := dl.Sub(time.Now())
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
}
for k, v := range authData {
// Capital header names are illegal in HTTP/2.
k = strings.ToLower(k)
@ -376,6 +402,16 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
}
}
if md, ok := t.md.(*metadata.MD); ok {
for k, v := range *md {
if isReservedHeader(k) {
continue
}
for _, entry := range v {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
}
first := true
// Sends the headers in a single batch even when they span multiple frames.
for !endHeaders {
@ -408,7 +444,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
if err != nil {
t.notifyError(err)
return nil, ConnectionErrorf(true, err, "transport: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
t.writableChan <- 0
@ -454,7 +490,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
}
s.state = streamDone
s.mu.Unlock()
if _, ok := err.(StreamError); ok {
if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
}
}
@ -622,7 +658,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
// invoked.
if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
t.notifyError(err)
return ConnectionErrorf(true, err, "transport: %v", err)
return connectionErrorf(true, err, "transport: %v", err)
}
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()
@ -670,7 +706,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
func (t *http2Client) handleData(f *http2.DataFrame) {
size := len(f.Data())
if err := t.fc.onData(uint32(size)); err != nil {
t.notifyError(ConnectionErrorf(true, err, "%v", err))
t.notifyError(connectionErrorf(true, err, "%v", err))
return
}
// Select the right stream to dispatch.
@ -766,6 +802,9 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
}
func (t *http2Client) handlePing(f *http2.PingFrame) {
if f.IsAck() { // Do nothing.
return
}
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)
@ -776,7 +815,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
if t.state == reachable || t.state == draining {
if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
t.mu.Unlock()
t.notifyError(ConnectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
return
}
select {
@ -785,7 +824,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// t.goAway has been closed (i.e.,multiple GoAways).
if id < f.LastStreamID {
t.mu.Unlock()
t.notifyError(ConnectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
return
}
t.prevGoAwayID = id
@ -823,6 +862,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
state.processHeaderField(hf)
}
if state.err != nil {
s.mu.Lock()
if !s.headerDone {
close(s.headerChan)
s.headerDone = true
}
s.mu.Unlock()
s.write(recvMsg{err: state.err})
// Something wrong. Stops reading even when there is remaining.
return
@ -900,7 +945,7 @@ func (t *http2Client) reader() {
t.mu.Unlock()
if s != nil {
// use error detail to provide better err message
handleMalformedHTTP2(s, StreamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
}
continue
} else {

View File

@ -111,12 +111,12 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
Val: uint32(initialWindowSize)})
}
if err := framer.writeSettings(true, settings...); err != nil {
return nil, ConnectionErrorf(true, err, "transport: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
return nil, ConnectionErrorf(true, err, "transport: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
var buf bytes.Buffer
@ -405,6 +405,9 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
}
func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() { // Do nothing.
return
}
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)
@ -448,7 +451,7 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
}
if err != nil {
t.Close()
return ConnectionErrorf(true, err, "transport: %v", err)
return connectionErrorf(true, err, "transport: %v", err)
}
}
return nil
@ -462,6 +465,14 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
return ErrIllegalHeaderWrite
}
s.headerOk = true
if md.Len() > 0 {
if s.header.Len() > 0 {
s.header = metadata.Join(s.header, md)
} else {
s.header = md
}
}
md = s.header
s.mu.Unlock()
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
@ -493,7 +504,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
var headersSent bool
var headersSent, hasHeader bool
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
@ -502,7 +513,16 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
if s.headerOk {
headersSent = true
}
if s.header.Len() > 0 {
hasHeader = true
}
s.mu.Unlock()
if !headersSent && hasHeader {
t.WriteHeader(s, nil)
headersSent = true
}
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
@ -544,33 +564,14 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
return StreamErrorf(codes.Unknown, "the stream has been done")
return streamErrorf(codes.Unknown, "the stream has been done")
}
if !s.headerOk {
writeHeaderFrame = true
s.headerOk = true
}
s.mu.Unlock()
if writeHeaderFrame {
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
t.hBuf.Reset()
t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
if s.sendCompress != "" {
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
}
p := http2.HeadersFrameParam{
StreamID: s.id,
BlockFragment: t.hBuf.Bytes(),
EndHeaders: true,
}
if err := t.framer.writeHeaders(false, p); err != nil {
t.Close()
return ConnectionErrorf(true, err, "transport: %v", err)
}
t.writableChan <- 0
t.WriteHeader(s, nil)
}
r := bytes.NewBuffer(data)
for {
@ -642,7 +643,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
t.Close()
return ConnectionErrorf(true, err, "transport: %v", err)
return connectionErrorf(true, err, "transport: %v", err)
}
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()

View File

@ -162,7 +162,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
switch f.Name {
case "content-type":
if !validContentType(f.Value) {
d.setErr(StreamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
d.setErr(streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
return
}
case "grpc-encoding":
@ -170,7 +170,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
case "grpc-status":
code, err := strconv.Atoi(f.Value)
if err != nil {
d.setErr(StreamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
d.setErr(streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
return
}
d.statusCode = codes.Code(code)
@ -181,7 +181,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
var err error
d.timeout, err = decodeTimeout(f.Value)
if err != nil {
d.setErr(StreamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
d.setErr(streamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
return
}
case ":path":
@ -253,6 +253,9 @@ func div(d, r time.Duration) int64 {
// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
func encodeTimeout(t time.Duration) string {
if t <= 0 {
return "0n"
}
if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "n"
}
@ -349,7 +352,7 @@ func decodeGrpcMessageUnchecked(msg string) string {
for i := 0; i < lenMsg; i++ {
c := msg[i]
if c == percentByte && i+2 < lenMsg {
parsed, err := strconv.ParseInt(msg[i+1:i+3], 16, 8)
parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
if err != nil {
buf.WriteByte(c)
} else {

View File

@ -39,7 +39,6 @@ package transport // import "google.golang.org/grpc/transport"
import (
"bytes"
"errors"
"fmt"
"io"
"net"
@ -169,7 +168,8 @@ type Stream struct {
// nil for client side Stream.
st ServerTransport
// ctx is the associated context of the stream.
ctx context.Context
ctx context.Context
// cancel is always nil for client side Stream.
cancel context.CancelFunc
// done is closed when the final status arrives.
done chan struct{}
@ -286,19 +286,30 @@ func (s *Stream) StatusDesc() string {
return s.statusDesc
}
// ErrIllegalTrailerSet indicates that the trailer has already been set or it
// is too late to do so.
var ErrIllegalTrailerSet = errors.New("transport: trailer has been set")
// SetTrailer sets the trailer metadata which will be sent with the RPC status
// by the server. This can only be called at most once. Server side only.
func (s *Stream) SetTrailer(md metadata.MD) error {
// SetHeader sets the header metadata. This can be called multiple times.
// Server side only.
func (s *Stream) SetHeader(md metadata.MD) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.trailer != nil {
return ErrIllegalTrailerSet
if s.headerOk || s.state == streamDone {
return ErrIllegalHeaderWrite
}
s.trailer = md.Copy()
if md.Len() == 0 {
return nil
}
s.header = metadata.Join(s.header, md)
return nil
}
// SetTrailer sets the trailer metadata which will be sent with the RPC status
// by the server. This can be called multiple times. Server side only.
func (s *Stream) SetTrailer(md metadata.MD) error {
if md.Len() == 0 {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
s.trailer = metadata.Join(s.trailer, md)
return nil
}
@ -350,7 +361,7 @@ func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authI
return newHTTP2Server(conn, maxStreams, authInfo)
}
// ConnectOptions covers all relevant options for dialing a server.
// ConnectOptions covers all relevant options for communicating with the server.
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
@ -362,9 +373,15 @@ type ConnectOptions struct {
TransportCredentials credentials.TransportCredentials
}
// TargetInfo contains the information of the target such as network address and metadata.
type TargetInfo struct {
Addr string
Metadata interface{}
}
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(ctx context.Context, target string, opts ConnectOptions) (ClientTransport, error) {
func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions) (ClientTransport, error) {
return newHTTP2Client(ctx, target, opts)
}
@ -476,16 +493,16 @@ type ServerTransport interface {
Drain()
}
// StreamErrorf creates an StreamError with the specified error code and description.
func StreamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
// streamErrorf creates an StreamError with the specified error code and description.
func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
return StreamError{
Code: c,
Desc: fmt.Sprintf(format, a...),
}
}
// ConnectionErrorf creates an ConnectionError with the specified error description.
func ConnectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
// connectionErrorf creates an ConnectionError with the specified error description.
func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
return ConnectionError{
Desc: fmt.Sprintf(format, a...),
temp: temp,
@ -522,10 +539,10 @@ func (e ConnectionError) Origin() error {
var (
// ErrConnClosing indicates that the transport is closing.
ErrConnClosing = ConnectionError{Desc: "transport is closing", temp: true}
ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
// ErrStreamDrain indicates that the stream is rejected by the server because
// the server stops accepting new RPCs.
ErrStreamDrain = StreamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
)
// StreamError is an error that only affects one stream within a connection.
@ -542,9 +559,9 @@ func (e StreamError) Error() string {
func ContextErr(err error) StreamError {
switch err {
case context.DeadlineExceeded:
return StreamErrorf(codes.DeadlineExceeded, "%v", err)
return streamErrorf(codes.DeadlineExceeded, "%v", err)
case context.Canceled:
return StreamErrorf(codes.Canceled, "%v", err)
return streamErrorf(codes.Canceled, "%v", err)
}
panic(fmt.Sprintf("Unexpected error from context packet: %v", err))
}