Add `Ping` and `Pong` variants to `Message`

This'll allow removal/deprecation of the `send_ping` custom method but also
allow applications to react to `Pong` messages as appropriate.

Closes #16
This commit is contained in:
Alex Crichton 2017-07-19 15:04:23 -07:00
parent 9d6c3058d5
commit 9f6f42766e
4 changed files with 84 additions and 55 deletions

View File

@ -5,7 +5,7 @@ extern crate url;
use url::Url;
use tungstenite::{connect, Error, Result};
use tungstenite::{connect, Error, Result, Message};
const AGENT: &'static str = "Tungstenite";
@ -33,8 +33,14 @@ fn run_test(case: u32) -> Result<()> {
).unwrap();
let mut socket = connect(case_url)?;
loop {
let msg = socket.read_message()?;
socket.write_message(msg)?;
match socket.read_message()? {
msg @ Message::Text(_) |
msg @ Message::Binary(_) => {
socket.write_message(msg)?;
}
Message::Ping(_) |
Message::Pong(_) => {}
}
}
}

View File

@ -5,7 +5,7 @@ extern crate tungstenite;
use std::net::{TcpListener, TcpStream};
use std::thread::spawn;
use tungstenite::{accept, HandshakeError, Error, Result};
use tungstenite::{accept, HandshakeError, Error, Result, Message};
fn must_not_block<Stream, Role>(err: HandshakeError<Stream, Role>) -> Error {
match err {
@ -17,8 +17,14 @@ fn must_not_block<Stream, Role>(err: HandshakeError<Stream, Role>) -> Error {
fn handle_client(stream: TcpStream) -> Result<()> {
let mut socket = accept(stream).map_err(must_not_block)?;
loop {
let msg = socket.read_message()?;
socket.write_message(msg)?;
match socket.read_message()? {
msg @ Message::Text(_) |
msg @ Message::Binary(_) => {
socket.write_message(msg)?;
}
Message::Ping(_) |
Message::Pong(_) => {}
}
}
}

View File

@ -141,6 +141,14 @@ pub enum Message {
Text(String),
/// A binary WebSocket message
Binary(Vec<u8>),
/// A ping message with the specified payload
///
/// The payload here must have a length less than 125 bytes
Ping(Vec<u8>),
/// A pong message with the specified payload
///
/// The payload here must have a length less than 125 bytes
Pong(Vec<u8>),
}
impl Message {
@ -163,15 +171,31 @@ impl Message {
pub fn is_text(&self) -> bool {
match *self {
Message::Text(_) => true,
Message::Binary(_) => false,
_ => false,
}
}
/// Indicates whether a message is a binary message.
pub fn is_binary(&self) -> bool {
match *self {
Message::Text(_) => false,
Message::Binary(_) => true,
_ => false,
}
}
/// Indicates whether a message is a ping message.
pub fn is_ping(&self) -> bool {
match *self {
Message::Ping(_) => true,
_ => false,
}
}
/// Indicates whether a message is a pong message.
pub fn is_pong(&self) -> bool {
match *self {
Message::Pong(_) => true,
_ => false,
}
}
@ -179,24 +203,25 @@ impl Message {
pub fn len(&self) -> usize {
match *self {
Message::Text(ref string) => string.len(),
Message::Binary(ref data) => data.len(),
Message::Binary(ref data) |
Message::Ping(ref data) |
Message::Pong(ref data) => data.len(),
}
}
/// Returns true if the WebSocket message has no content.
/// For example, if the other side of the connection sent an empty string.
pub fn is_empty(&self) -> bool {
match *self {
Message::Text(ref string) => string.is_empty(),
Message::Binary(ref data) => data.is_empty(),
}
self.len() == 0
}
/// Consume the WebSocket and return it as binary data.
pub fn into_data(self) -> Vec<u8> {
match self {
Message::Text(string) => string.into_bytes(),
Message::Binary(data) => data,
Message::Binary(data) |
Message::Ping(data) |
Message::Pong(data) => data,
}
}
@ -204,7 +229,9 @@ impl Message {
pub fn into_text(self) -> Result<String> {
match self {
Message::Text(string) => Ok(string),
Message::Binary(data) => Ok(try!(
Message::Binary(data) |
Message::Ping(data) |
Message::Pong(data) => Ok(try!(
String::from_utf8(data).map_err(|err| err.utf8_error()))),
}
}
@ -214,7 +241,9 @@ impl Message {
pub fn to_text(&self) -> Result<&str> {
match *self {
Message::Text(ref string) => Ok(string),
Message::Binary(ref data) => Ok(try!(str::from_utf8(data))),
Message::Binary(ref data) |
Message::Ping(ref data) |
Message::Pong(ref data) => Ok(try!(str::from_utf8(data))),
}
}

View File

@ -103,24 +103,27 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// This function guarantees that the frame is queued regardless of any errors.
/// There is no need to resend the frame. In order to handle WouldBlock or Incomplete,
/// call write_pending() afterwards.
///
/// Note that only the last pong frame is stored to be sent, and only the
/// most recent pong frame is sent if multiple pong frames are queued up.
pub fn write_message(&mut self, message: Message) -> Result<()> {
let frame = {
let opcode = match message {
Message::Text(_) => OpData::Text,
Message::Binary(_) => OpData::Binary,
};
Frame::message(message.into_data(), OpCode::Data(opcode), true)
let frame = match message {
Message::Text(data) => {
Frame::message(data.into(), OpCode::Data(OpData::Text), true)
}
Message::Binary(data) => {
Frame::message(data, OpCode::Data(OpData::Binary), true)
}
Message::Ping(data) => Frame::ping(data),
Message::Pong(data) => {
self.pong = Some(Frame::pong(data));
return self.write_pending()
}
};
self.send_queue.push_back(frame);
self.write_pending()
}
/// Send ping.
pub fn send_ping(&mut self, payload: Vec<u8>) -> Result<()> {
self.send_queue.push_back(Frame::ping(payload));
self.write_pending()
}
/// Close the connection.
///
/// This function guarantees that the close frame will be queued.
@ -212,7 +215,7 @@ impl<Stream: Read + Write> WebSocket<Stream> {
match frame.opcode() {
OpCode::Control(ctl) => {
(match ctl {
match ctl {
// All control frames MUST have a payload length of 125 bytes or less
// and MUST NOT be fragmented. (RFC 6455)
_ if !frame.is_final() => {
@ -222,22 +225,24 @@ impl<Stream: Read + Write> WebSocket<Stream> {
Err(Error::Protocol("Control frame too big".into()))
}
OpCtl::Close => {
self.do_close(frame.into_close()?)
self.do_close(frame.into_close()?).map(|_| None)
}
OpCtl::Reserved(i) => {
Err(Error::Protocol(format!("Unknown control frame type {}", i).into()))
}
OpCtl::Ping | OpCtl::Pong if !self.state.is_active() => {
// No ping processing while closing.
Ok(())
Ok(None)
}
OpCtl::Ping => {
self.do_ping(frame.into_data())
let data = frame.into_data();
self.pong = Some(Frame::pong(data.clone()));
Ok(Some(Message::Ping(data)))
}
OpCtl::Pong => {
self.do_pong(frame.into_data())
Ok(Some(Message::Pong(frame.into_data())))
}
}).map(|_| None)
}
}
OpCode::Data(_) if !self.state.is_active() => {
@ -353,27 +358,6 @@ impl<Stream: Read + Write> WebSocket<Stream> {
}
}
/// Received a ping frame.
fn do_ping(&mut self, ping: Vec<u8>) -> Result<()> {
// If an endpoint receives a Ping frame and has not yet sent Pong
// frame(s) in response to previous Ping frame(s), the endpoint MAY
// elect to send a Pong frame for only the most recently processed Ping
// frame. (RFC 6455)
// We do exactly that, keeping a "queue" from one and only Pong frame.
self.pong = Some(Frame::pong(ping));
Ok(())
}
/// Received a pong frame.
fn do_pong(&mut self, _: Vec<u8>) -> Result<()> {
// A Pong frame MAY be sent unsolicited. This serves as a
// unidirectional heartbeat. A response to an unsolicited Pong frame is
// not expected. (RFC 6455)
// Due to this, we just don't check pongs right now.
// TODO: check if there was a reply to our ping at all...
Ok(())
}
/// Send a single pending frame.
fn send_one_frame(&mut self, mut frame: Frame) -> Result<()> {
match self.role {
@ -463,6 +447,8 @@ mod tests {
#[test]
fn receive_messages() {
let incoming = Cursor::new(vec![
0x89, 0x02, 0x01, 0x02,
0x8a, 0x01, 0x03,
0x01, 0x07,
0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20,
0x80, 0x06,
@ -471,6 +457,8 @@ mod tests {
0x01, 0x02, 0x03,
]);
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client);
assert_eq!(socket.read_message().unwrap(), Message::Ping(vec![1, 2]));
assert_eq!(socket.read_message().unwrap(), Message::Pong(vec![3]));
assert_eq!(socket.read_message().unwrap(), Message::Text("Hello, World!".into()));
assert_eq!(socket.read_message().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03]));
}