From f52b113c79afa04a771aa28bfcd7d6be9ec6ddb1 Mon Sep 17 00:00:00 2001 From: Mohd Tarmizi Date: Thu, 25 Jun 2015 22:31:40 +0800 Subject: [PATCH] First public release --- Cargo.toml | 8 + LICENSE => LICENSE.txt | 0 examples/hello.rs | 10 + src/lib.rs | 617 +++++++++++++++++++++++++++++++++++++++++ src/unix.rs | 132 +++++++++ src/windows.rs | 62 +++++ 6 files changed, 829 insertions(+) create mode 100644 Cargo.toml rename LICENSE => LICENSE.txt (100%) create mode 100644 examples/hello.rs create mode 100644 src/lib.rs create mode 100644 src/unix.rs create mode 100644 src/windows.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..1c57c70 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "fastcgi" +version = "1.0.0-alpha" +authors = ["Mohd Tarmizi Mohd Affandi"] +license = "MIT" + +[dependencies] +libc = "0.1.8" diff --git a/LICENSE b/LICENSE.txt similarity index 100% rename from LICENSE rename to LICENSE.txt diff --git a/examples/hello.rs b/examples/hello.rs new file mode 100644 index 0000000..6b7e394 --- /dev/null +++ b/examples/hello.rs @@ -0,0 +1,10 @@ +extern crate fastcgi; + +use std::io::Write; + +fn main() { + fastcgi::run(|mut req| { + write!(&mut req.stdout(), "Content-Type: text/plain\n\nHello, world!") + .unwrap(); + }); +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..3704ee3 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,617 @@ +//////////////////////////////////////////////////////////////////////////////// +// // +// Copyright (c) 2015 Mohd Tarmizi Mohd Affandi // +// // +// Permission is hereby granted, free of charge, to any person obtaining a // +// copy of this software and associated documentation files (the // +// "Software"), to deal in the Software without restriction, including // +// without limitation the rights to use, copy, modify, merge, publish, // +// distribute, sublicense, and/or sell copies of the Software, and to // +// permit persons to whom the Software is furnished to do so, subject to // +// the following conditions: // +// // +// The above copyright notice and this permission notice shall be included // +// in all copies or substantial portions of the Software. // +// // +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF // +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. // +// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY // +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, // +// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE // +// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // +// // +//////////////////////////////////////////////////////////////////////////////// + +//! Pure Rust implementation of FastCGI 1.0. +//! +//! Example usage: +//! +//! extern crate fastcgi; +//! +//! use std::io::Write; +//! +//! fn main() { +//! fastcgi::run(|mut req| { +//! write!(&mut req.stdout(), "Content-Type: text/plain\n\nHello, world!") +//! .unwrap(); +//! }); +//! } + +extern crate libc; + +use std::collections::{HashMap, HashSet}; +use std::io::{self, Read, Write, Cursor, BufRead}; +use std::mem; +use std::net::TcpListener; +use std::rc::Rc; + +#[cfg(unix)] use unix::{Transport, Socket}; +#[cfg(unix)] mod unix; + +#[cfg(windows)] use windows::{Transport, Socket}; +#[cfg(windows)] mod windows; + +const HEADER_LEN: usize = 8; + +#[derive(Debug, Clone, Copy)] +pub enum Role { + Responder, + Authorizer, + Filter, +} + +#[derive(Debug)] +enum ProtocolStatus { + RequestComplete, + CantMpxConn, + #[allow(dead_code)] Overloaded, + UnknownRole, +} + +#[derive(Debug)] +enum Record { + BeginRequest { request_id: u16, role: Result, keep_conn: bool }, + AbortRequest { request_id: u16 }, + EndRequest { request_id: u16, app_status: i32, protocol_status: ProtocolStatus }, + Params { request_id: u16, content: Vec }, + Stdin { request_id: u16, content: Vec }, + Stdout { request_id: u16, content: Vec }, + Stderr { request_id: u16, content: Vec }, + Data { request_id: u16, content: Vec }, + GetValues(Vec), + GetValuesResult(Vec<(String, String)>), + UnknownType(u8), +} + +fn read_len(mut r: &mut R) -> io::Result { + let mut buf: Vec = Vec::with_capacity(4); + try!((&mut r).take(1).read_to_end(&mut buf)); + if buf.len() == 1 { + if buf[0] >> 7 == 1 { + assert!(try!((&mut r).take(3).read_to_end(&mut buf)) == 3); + Ok( + (((buf[0] & 0x7f) as u32) << 24) + + ((buf[1] as u32) << 16) + + ((buf[2] as u32) << 8) + + (buf[3] as u32) + ) + } else { + Ok(buf[0] as u32) + } + } else { + Err(io::Error::new(io::ErrorKind::Other, "EOF")) + } +} + +fn read_pair(mut r: &mut R) -> io::Result<(String, String)> { + let key_len = try!(read_len(r)); + let value_len = try!(read_len(r)); + let mut key = String::with_capacity(key_len as usize); + assert!(try!((&mut r).take(key_len as u64).read_to_string(&mut key)) == key_len as usize); + let mut value = String::with_capacity(value_len as usize); + assert!(try!((&mut r).take(value_len as u64).read_to_string(&mut value)) == value_len as usize); + Ok((key, value)) +} + +fn read_pairs(r: &mut R) -> io::Result> { + let mut params = Vec::new(); + match read_pair(r) { + Ok(param) => { + params.push(param); + params.extend(try!(read_pairs(r)).into_iter()); + Ok(params) + }, + Err(_) => Ok(params), + } +} + +fn write_len(mut w: &mut W, n: u32) -> io::Result<()> { + if n < 0x80 { + try!(w.write_all(&[n as u8])); + } else { + assert!(n < 0x80000000); + let buf = unsafe { + mem::transmute::((0x80000000 & n).to_be()) + }; + try!(w.write_all(&buf)); + } + Ok(()) +} + +fn write_pairs(w: &mut W, pairs: Vec<(String, String)>) -> io::Result<()> { + for (key, value) in pairs { + try!(write_len(w, key.len() as u32)); + try!(write_len(w, value.len() as u32)); + try!(write!(w, "{}{}", key, value)); + } + Ok(()) +} + +#[inline] +fn write_record(w: &mut W, record_type: u8, request_id: u16, content: &[u8]) -> io::Result<()> { + assert!(content.len() <= std::u32::MAX as usize); + let request_id = unsafe { + mem::transmute::<_, [u8; 2]>(request_id.to_be()) + }; + let content_length = unsafe { + mem::transmute::<_, [u8; 2]>((content.len() as u16).to_be()) + }; + try!(w.write_all(&[ + 1, record_type, request_id[0], request_id[1], + content_length[0], content_length[1], 0, 0, + ])); // TODO: Padding + try!(w.write_all(content)); + Ok(()) +} + +#[inline] +fn read_record(r: &mut R) -> io::Result<(u8, u16, Vec)> { + let mut header: Vec = Vec::with_capacity(HEADER_LEN); + assert!(try!(r.take(HEADER_LEN as u64).read_to_end(&mut header)) == HEADER_LEN); + assert!(header[0] == 1); + let record_type = header[1]; + let request_id = unsafe { u16::from_be(mem::transmute([header[2], header[3]])) }; + let content_length = unsafe { u16::from_be(mem::transmute([header[4], header[5]])) }; + let padding_length = header[6]; + let mut content: Vec = Vec::with_capacity(content_length as usize); + assert!(try!(r.take(content_length as u64).read_to_end(&mut content)) == content_length as usize); + assert!(try!(r.take(padding_length as u64).read_to_end(&mut Vec::with_capacity(padding_length as usize))) == padding_length as usize); + Ok((record_type, request_id, content)) +} + +impl Record { + fn send(self, w: &mut W) -> io::Result<()> { + match self { + Record::EndRequest { request_id, app_status, protocol_status } => { + let app_status = unsafe { + mem::transmute::<_, [u8; 4]>(app_status.to_be()) + }; + let protocol_status = match protocol_status { + ProtocolStatus::RequestComplete => 0, + ProtocolStatus::CantMpxConn => 1, + ProtocolStatus::Overloaded => 2, + ProtocolStatus::UnknownRole => 3, + }; + let content = [ + app_status[0], app_status[1], app_status[2], app_status[3], + protocol_status, 0, 0, 0, + ]; + try!(write_record(w, 3, request_id, &content)); + }, + Record::Stdout { request_id, content } => { + try!(write_record(w, 6, request_id, &content)); + }, + Record::Stderr { request_id, content } => { + try!(write_record(w, 7, request_id, &content)); + }, + Record::GetValuesResult(items) => { + let mut content = Cursor::new(Vec::new()); + try!(write_pairs(&mut content, items)); + try!(write_record(w, 10, 0, &content.into_inner())); + }, + Record::UnknownType(record_type) => { + let content = [record_type, 0, 0, 0, 0, 0, 0, 0]; + try!(write_record(w, 11, 0, &content)); + }, + _ => panic!("Record not sendable"), + } + Ok(()) + } + + fn receive(r: &mut R) -> io::Result { + let (record_type, request_id, content) = try!(read_record(r)); + let rec = match record_type { + 1 => { + let role = unsafe { + u16::from_be(mem::transmute([content[0], content[1]])) + }; + let role = match role { + 1 => Ok(Role::Responder), + 2 => Ok(Role::Authorizer), + 3 => Ok(Role::Filter), + _ => Err(role), + }; + let keep_conn = content[2] & 1 == 1; + Record::BeginRequest { + request_id: request_id, + role: role, + keep_conn: keep_conn + } + }, + 2 => Record::AbortRequest { request_id: request_id }, + 4 => Record::Params { request_id: request_id, content: content }, + 5 => Record::Stdin { request_id: request_id, content: content }, + 8 => Record::Data { request_id: request_id, content: content }, + 9 => { + let items = try!(read_pairs(&mut Cursor::new(content))); + Record::GetValues(items.into_iter().map(|(key, _)| key).collect()) + }, + _ if record_type >= 11 => Record::UnknownType(record_type), + _ => panic!("Record not receivable"), + }; + Ok(rec) + } +} + +pub struct Stdin<'a> { + req: &'a mut Request, +} + +impl<'a> Stdin<'a> { + /// Begin reading the second stream of the request, for FastCGI Filter + /// applications. + /// + /// May only be called after all contents of stdin has been read. Panics + /// if stdin has not reached EOF yet. + pub fn start_filter_data(&mut self) { + if !self.req.filter_data { + assert!(self.req.is_eof); + self.req.is_eof = false; + self.req.filter_data = true; + } + } +} + +impl<'a> BufRead for Stdin<'a> { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + if self.req.aborted { + return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "Request aborted")); + } + if self.req.pos == self.req.buf.len() && !self.req.is_eof { + let mut sock = &*self.req.sock; + loop { + match (try!(Record::receive(&mut sock)), self.req.filter_data) { + (Record::UnknownType(rec_type), _) => { + try!(Record::UnknownType(rec_type).send(&mut sock)); + }, + (Record::GetValues(keys), _) => { + try!( + Record::GetValuesResult(get_values(keys)) + .send(&mut sock) + ); + }, + (Record::BeginRequest { request_id, .. }, _) => { + try!(Record::EndRequest { + request_id: request_id, + app_status: 0, + protocol_status: ProtocolStatus::CantMpxConn, + } + .send(&mut sock) + ); + }, + (Record::AbortRequest { request_id }, _) => { + if request_id != self.req.id { + continue; + } + self.req.aborted = true; + return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "Request aborted")); + } + (Record::Stdin { request_id, content }, false) + | (Record::Data { request_id, content }, true) => { + if request_id != self.req.id { + continue; + } + if content.is_empty() { + self.req.is_eof = true; + } + self.req.buf = content; + break; + }, + _ => (), + } + } + } + Ok(&self.req.buf[self.req.pos..]) + } + + fn consume(&mut self, amount: usize) { + self.req.pos = std::cmp::min(self.req.pos + amount, self.req.buf.len()); + } +} + +impl<'a> Read for Stdin<'a> { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let n = { + let mut chunk = try!(self.fill_buf()); + try!(chunk.read(buf)) + }; + self.consume(n); + Ok(n) + } +} + +macro_rules! writer { + ($Writer:ident) => ( + pub struct $Writer<'a> { + req: &'a mut Request, + } + + impl<'a> Write for $Writer<'a> { + fn write(&mut self, buf: &[u8]) -> io::Result { + if self.req.aborted { + return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "Request aborted")); + } + if buf.is_empty() { + Ok(0) + } else { + for chunk in buf.chunks(std::u16::MAX as usize) { + let rec = Record::$Writer { + request_id: self.req.id, + content: chunk.to_owned(), + }; + try!(rec.send(&mut &*self.req.sock)); + } + Ok(buf.len()) + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + ); +} + +writer!(Stdout); + +writer!(Stderr); + +/// Request objects are what a FastCGI application will primarily deal with +/// throughout its lifetime. +/// +/// The Request API is designed to be an abstraction of the traditional CGI +/// process model. Note that this API is low level. Dealing with things like +/// GET/POST parameters or cookies is outside the scope of this library. +pub struct Request { + sock: Rc, + id: u16, + role: Role, + params: HashMap, + aborted: bool, + status: i32, + buf: Vec, + pos: usize, + is_eof: bool, + filter_data: bool, +} + +pub type Params<'a> = Box + 'a>; + +fn get_values(keys: Vec) -> Vec<(String, String)> { + keys.into_iter().filter_map(|key| + match key.as_ref() { + "FCGI_MAX_CONNS" => Some((key, "1".to_owned())), + "FCGI_MAX_REQS" => Some((key, "1".to_owned())), + "FCGI_MPXS_CONNS" => Some((key, "0".to_owned())), + _ => None, + } + ).collect() +} + +impl Request { + fn begin(mut sock: &Socket) -> io::Result<(u16, Role, bool)> { + loop { + match try!(Record::receive(&mut sock)) { + Record::UnknownType(rec_type) => { + Record::UnknownType(rec_type).send(&mut sock).unwrap(); + }, + Record::GetValues(keys) => { + Record::GetValuesResult(get_values(keys)).send(&mut sock).unwrap(); + }, + Record::BeginRequest { request_id, role: Ok(role), keep_conn } => { + return Ok((request_id, role, keep_conn)); + }, + Record::BeginRequest { request_id, role: Err(_), .. } => { + try!(Record::EndRequest { + request_id: request_id, + app_status: 0, + protocol_status: ProtocolStatus::UnknownRole + }.send(&mut sock)); + }, + _ => (), + } + } + } + + fn new(sock: Rc, id: u16, role: Role) -> io::Result { + let mut buf = Vec::new(); + let mut params = HashMap::new(); + let mut aborted = false; + loop { + match try!(Record::receive(&mut &*sock)) { + Record::UnknownType(rec_type) => { + try!(Record::UnknownType(rec_type).send(&mut &*sock)); + }, + Record::GetValues(keys) => { + try!( + Record::GetValuesResult(get_values(keys)) + .send(&mut &*sock) + ); + }, + Record::BeginRequest { request_id, .. } => { + try!(Record::EndRequest { + request_id: request_id, + app_status: 0, + protocol_status: ProtocolStatus::CantMpxConn, + } + .send(&mut &*sock) + ); + } + Record::AbortRequest { request_id } => { + if id != request_id { + continue; + } + aborted = true; + break; + } + Record::Params { request_id, content } => { + if id != request_id { + continue; + } + if content.is_empty() { + params.extend(read_pairs(&mut Cursor::new(buf.as_ref())).unwrap()); + break; + } else { + buf.extend(content); + } + }, + _ => (), + } + } + Ok(Request { + sock: sock, + id: id, + role: role, + params: params, + aborted: aborted, + status: 0, + buf: Vec::new(), + pos: 0, + is_eof: false, + filter_data: false, + }) + } + + pub fn role(&self) -> Role { + self.role + } + + /// Retrieves the value of the given parameter name. + pub fn param(&self, key: &str) -> Option { + self.params.get(key).map(|s| s.clone()) + } + + /// Iterates over the FastCGI parameters. + pub fn params(&self) -> Params { + Box::new(self.params.iter().map(|(k, v)| (k.clone(), v.clone()))) + } + + /// Standard input stream of the request. + pub fn stdin(&mut self) -> Stdin { + Stdin { req: self } + } + + /// Standard output stream of the request. + pub fn stdout(&mut self) -> Stdout { + Stdout { req: self } + } + + /// Standard error stream of the request. + pub fn stderr(&mut self) -> Stderr { + Stderr { req: self } + } + + /// Checks if the client has closed the connection prematurely. + /// + /// The reliability of this method depends on whether the web server + /// notifies such event (by sending the `FCGI_REQUEST_ABORTED` record) to + /// the FastCGI application. This value is updated synchronously; the + /// update may only be triggered by reading from stdin. + pub fn is_aborted(&self) -> bool { + self.aborted + } + + /// Reports the specified exit code to the web server. + /// + /// This will consume the Request object. If you finish processing the + /// Request object without calling `exit`, it is assumed that the exit code + /// is 0. + pub fn exit(mut self, code: i32) { + self.status = code; + } +} + +impl Drop for Request { + fn drop(&mut self) { + Record::Stdout { + request_id: self.id, + content: Vec::new(), + }.send(&mut &*self.sock).unwrap(); + Record::Stderr { + request_id: self.id, + content: Vec::new() + }.send(&mut &*self.sock).unwrap(); + Record::EndRequest { + request_id: self.id, + app_status: self.status, + protocol_status: ProtocolStatus::RequestComplete, + }.send(&mut &*self.sock).unwrap(); + } +} + +fn run_transport(mut handler: F, transport: &mut Transport) where F: FnMut(Request) { + let addrs: Option> = match std::env::var("FCGI_WEB_SERVER_ADDRS") { + Ok(value) => Some(value.split(',').map(|s| s.to_owned()).collect()), + Err(std::env::VarError::NotPresent) => None, + Err(e) => Err(e).unwrap(), + }; + loop { + let sock = match transport.accept() { + Ok(sock) => sock, + Err(e) => panic!(e.to_string()), + }; + let allow = match addrs { + Some(ref addrs) => addrs.contains(&sock.peer().unwrap()), + None => true, + }; + if allow { + let sock = Rc::new(sock); + loop { + let (request_id, role, keep_conn) = Request::begin(&sock).unwrap(); + handler(Request::new(sock.clone(), request_id, role).unwrap()); + if !keep_conn { break; } + } + } + } +} + +#[cfg(unix)] +/// Runs as a FastCGI process with the given handler. +/// +/// Available under Unix only. If you are using Windows, use `run_tcp` instead. +pub fn run(handler: F) where F: FnMut(Request) { + run_transport(handler, &mut Transport::new()) +} + +#[cfg(unix)] +/// Accepts requests from a user-supplied raw file descriptor. IPv4, IPv6, and +/// Unix domain sockets are supported. +/// +/// Available under Unix only. +pub fn run_raw(handler: F, raw_fd: std::os::unix::io::RawFd) where F: FnMut(Request) { + run_transport(handler, &mut Transport::from_raw_fd(raw_fd)) +} + +#[cfg(unix)] +/// Accepts requests from a user-supplied TCP listener. +pub fn run_tcp(handler: F, listener: &TcpListener) where F: FnMut(Request) { + use std::os::unix::io::AsRawFd; + run_transport(handler, &mut Transport::from_raw_fd(listener.as_raw_fd())) +} + +#[cfg(windows)] +/// Accepts requests from a user-supplied TCP listener. +pub fn run_tcp(handler: F, listener: &TcpListener) where F: FnMut(Request) { + run_transport(handler, &mut Transport::from_tcp(&listener)) +} diff --git a/src/unix.rs b/src/unix.rs new file mode 100644 index 0000000..908105c --- /dev/null +++ b/src/unix.rs @@ -0,0 +1,132 @@ +use libc as c; +use std::io::{self, Read, Write}; +use std::mem; +use std::net::{Ipv4Addr, Ipv6Addr}; +use std::os::unix::io::RawFd; + +const LISTENSOCK_FILENO: c::c_int = 0; + +pub struct Transport { + inner: c::c_int, +} + +impl Transport { + pub fn new() -> Self { + Self::from_raw_fd(LISTENSOCK_FILENO) + } + + pub fn from_raw_fd(raw_fd: RawFd) -> Self { + Transport { inner: raw_fd } + } + + pub fn accept(&mut self) -> io::Result { + let res = unsafe { + c::accept(self.inner, 0 as *mut _, 0 as *mut _) + }; + if res == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(Socket { inner: res }) + } + } +} + +pub struct Socket { + inner: c::c_int, +} + +impl Socket { + pub fn peer(&self) -> io::Result { + unsafe { + let mut ss = mem::zeroed::(); + let mut len = mem::size_of::() as c::socklen_t; + let res = c::getpeername( + self.inner, + &mut ss as *mut _ as *mut c::sockaddr, + &mut len + ); + if res == -1 { + return Err(io::Error::last_os_error()); + } + match ss.ss_family as c::c_int { + c::AF_INET => { + let sin = *(&ss as *const _ as *const c::sockaddr_in); + let ip = mem::transmute::(sin.sin_addr); + Ok(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]).to_string()) + }, + c::AF_INET6 => { + let sin = *(&ss as *const _ as *const c::sockaddr_in6); + let ip = mem::transmute::(sin.sin6_addr); + Ok(Ipv6Addr::new( + ip[0], ip[1], ip[2], ip[3], + ip[4], ip[5], ip[6], ip[7] + ).to_string() + ) + }, + c::AF_UNIX => Ok(String::new()), + _ => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Unsupported FastCGI socket" + )), + } + } + } +} + +impl Drop for Socket { + fn drop(&mut self) { + unsafe { c::close(self.inner); } + } +} + +impl<'a> Read for &'a Socket { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let res = unsafe { + c::read( + self.inner, + buf.as_mut_ptr() as *mut c::c_void, + buf.len() as c::size_t + ) + }; + if res == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(res as usize) + } + } +} + +impl<'a> Write for &'a Socket { + fn write(&mut self, buf: &[u8]) -> io::Result { + let res = unsafe { + c::write( + self.inner, + buf.as_ptr() as *const c::c_void, + buf.len() as c::size_t + ) + }; + if res == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(res as usize) + } + } + + fn flush(&mut self) -> io::Result<()> { Ok(()) } +} + +impl Read for Socket { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (&*self).read(buf) + } +} + +impl Write for Socket { + fn write(&mut self, buf: &[u8]) -> io::Result { + (&*self).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (&*self).flush() + } +} diff --git a/src/windows.rs b/src/windows.rs new file mode 100644 index 0000000..2cfe71c --- /dev/null +++ b/src/windows.rs @@ -0,0 +1,62 @@ +use std::io::{self, Read, Write}; +use std::net::{TcpListener, TcpStream, SocketAddr}; + +pub struct Transport<'a> { + inner: &'a TcpListener, +} + +impl<'a> Transport<'a> { + pub fn from_tcp(listener: &'a TcpListener) -> Self { + Transport { inner: listener } + } + + pub fn accept(&mut self) -> io::Result { + let (stream, _) = try!(self.inner.accept()); + Ok(Socket { inner: stream }) + } +} + +pub struct Socket { + inner: TcpStream, +} + +impl Socket { + pub fn peer(&self) -> io::Result { + match try!(self.inner.peer_addr()) { + SocketAddr::V4(addr) => Ok(addr.ip().to_string()), + SocketAddr::V6(addr) => Ok(addr.ip().to_string()), + } + } +} + +impl<'a> Read for &'a Socket { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (&self.inner).read(buf) + } +} + +impl<'a> Write for &'a Socket { + fn write(&mut self, buf: &[u8]) -> io::Result { + (&self.inner).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.inner).flush() + } +} + +impl Read for Socket { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (&*self).read(buf) + } +} + +impl Write for Socket { + fn write(&mut self, buf: &[u8]) -> io::Result { + (&*self).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (&*self).flush() + } +}