add support for batching websocket messages
This commit is contained in:
parent
3bc195a42f
commit
68daa29b19
|
@ -66,13 +66,9 @@ impl<Stream> FrameSocket<Stream>
|
|||
where
|
||||
Stream: Write,
|
||||
{
|
||||
/// Write a frame to stream.
|
||||
///
|
||||
/// This function guarantees that the frame is queued regardless of any errors.
|
||||
/// There is no need to resend the frame. In order to handle WouldBlock or Incomplete,
|
||||
/// call write_pending() afterwards.
|
||||
pub fn write_frame(&mut self, frame: Frame) -> Result<()> {
|
||||
self.codec.write_frame(&mut self.stream, frame)
|
||||
/// Add a frame to the end of the output buffer.
|
||||
pub fn queue_frame(&mut self, frame: Frame) {
|
||||
self.codec.queue_frame(frame);
|
||||
}
|
||||
|
||||
/// Complete pending write, if any.
|
||||
|
@ -98,6 +94,10 @@ impl FrameCodec {
|
|||
Self { in_buffer: ReadBuffer::new(), out_buffer: Vec::new(), header: None }
|
||||
}
|
||||
|
||||
pub(super) fn has_unsent(&self) -> bool {
|
||||
!self.out_buffer.is_empty()
|
||||
}
|
||||
|
||||
/// Create a new frame codec from partially read data.
|
||||
pub(super) fn from_partially_read(part: Vec<u8>) -> Self {
|
||||
Self {
|
||||
|
@ -165,15 +165,12 @@ impl FrameCodec {
|
|||
Ok(Some(frame))
|
||||
}
|
||||
|
||||
/// Write a frame to the provided stream.
|
||||
pub(super) fn write_frame<Stream>(&mut self, stream: &mut Stream, frame: Frame) -> Result<()>
|
||||
where
|
||||
Stream: Write,
|
||||
/// Add a frame to the end of the output buffer.
|
||||
pub(super) fn queue_frame(&mut self, frame: Frame)
|
||||
{
|
||||
trace!("writing frame {}", frame);
|
||||
self.out_buffer.reserve(frame.len());
|
||||
frame.format(&mut self.out_buffer).expect("Bug: can't write to vector");
|
||||
self.write_pending(stream)
|
||||
}
|
||||
|
||||
/// Complete pending write, if any.
|
||||
|
@ -241,10 +238,28 @@ mod tests {
|
|||
let mut sock = FrameSocket::new(Vec::new());
|
||||
|
||||
let frame = Frame::ping(vec![0x04, 0x05]);
|
||||
sock.write_frame(frame).unwrap();
|
||||
sock.queue_frame(frame);
|
||||
sock.write_pending().unwrap();
|
||||
|
||||
let frame = Frame::pong(vec![0x01]);
|
||||
sock.write_frame(frame).unwrap();
|
||||
sock.queue_frame(frame);
|
||||
sock.write_pending().unwrap();
|
||||
|
||||
let (buf, _) = sock.into_inner();
|
||||
assert_eq!(buf, vec![0x89, 0x02, 0x04, 0x05, 0x8a, 0x01, 0x01]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queue_frames() {
|
||||
let mut sock = FrameSocket::new(Vec::new());
|
||||
|
||||
let frame = Frame::ping(vec![0x04, 0x05]);
|
||||
sock.queue_frame(frame);
|
||||
|
||||
let frame = Frame::pong(vec![0x01]);
|
||||
sock.queue_frame(frame);
|
||||
|
||||
sock.write_pending().unwrap();
|
||||
|
||||
let (buf, _) = sock.into_inner();
|
||||
assert_eq!(buf, vec![0x89, 0x02, 0x04, 0x05, 0x8a, 0x01, 0x01]);
|
||||
|
|
|
@ -190,7 +190,41 @@ impl<Stream: Read + Write> WebSocket<Stream> {
|
|||
/// (consider these fatal except for WouldBlock).
|
||||
/// - [Error::Capacity] if your message size is bigger than the configured max message size.
|
||||
pub fn write_message(&mut self, message: Message) -> Result<()> {
|
||||
self.context.write_message(&mut self.socket, message)
|
||||
self.context.queue_message(&mut self.socket, message)?;
|
||||
self.context.write_pending(&mut self.socket)
|
||||
}
|
||||
|
||||
|
||||
/// Add a message to the send queue. Differs from write_message in that this method does not
|
||||
/// flush queued messages to the stream.
|
||||
///
|
||||
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
|
||||
/// requests. A Pong reply will jump the queue because the
|
||||
/// [websocket RFC](https://tools.ietf.org/html/rfc6455#section-5.5.2) specifies it should be sent
|
||||
/// as soon as is practical.
|
||||
///
|
||||
/// Note that upon receiving a ping message, tungstenite cues a pong reply automatically.
|
||||
/// When you call either `read_message`, `write_message` or `write_pending` next it will try to send
|
||||
/// that pong out if the underlying connection can take more data. This means you should not
|
||||
/// respond to ping frames manually.
|
||||
///
|
||||
/// You can however send pong frames manually in order to indicate a unidirectional heartbeat
|
||||
/// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
|
||||
/// if `read_message` returns a ping, you should call `write_pending` until it doesn't return
|
||||
/// WouldBlock before passing a pong to `write_message`, otherwise the response to the
|
||||
/// ping will not be sent, but rather replaced by your custom pong message.
|
||||
///
|
||||
/// ## Errors
|
||||
/// - If the WebSocket's send queue is full, `SendQueueFull` will be returned
|
||||
/// along with the passed message. Otherwise, the message is queued and Ok(()) is returned.
|
||||
/// - If the connection is closed and should be dropped, this will return [Error::ConnectionClosed].
|
||||
/// - If you try again after [Error::ConnectionClosed] was returned either from here or from `read_message`,
|
||||
/// [Error::AlreadyClosed] will be returned. This indicates a program error on your part.
|
||||
/// - [Error::Io] is returned if the underlying connection returns an error
|
||||
/// (consider these fatal except for WouldBlock).
|
||||
/// - [Error::Capacity] if your message size is bigger than the configured max message size.
|
||||
pub fn queue_message(&mut self, message: Message) -> Result<()> {
|
||||
self.context.queue_message(&mut self.socket, message)
|
||||
}
|
||||
|
||||
/// Flush the pending send queue.
|
||||
|
@ -314,7 +348,7 @@ impl WebSocketContext {
|
|||
}
|
||||
}
|
||||
|
||||
/// Send a message to the provided stream, if possible.
|
||||
/// Queue up a message without flushing to the stream.
|
||||
///
|
||||
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
|
||||
/// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned
|
||||
|
@ -322,7 +356,7 @@ impl WebSocketContext {
|
|||
///
|
||||
/// Note that only the last pong frame is stored to be sent, and only the
|
||||
/// most recent pong frame is sent if multiple pong frames are queued.
|
||||
pub fn write_message<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()>
|
||||
pub fn queue_message<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()>
|
||||
where
|
||||
Stream: Read + Write,
|
||||
{
|
||||
|
@ -360,7 +394,7 @@ impl WebSocketContext {
|
|||
};
|
||||
|
||||
self.send_queue.push_back(frame);
|
||||
self.write_pending(stream)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flush the pending send queue.
|
||||
|
@ -368,20 +402,22 @@ impl WebSocketContext {
|
|||
where
|
||||
Stream: Read + Write,
|
||||
{
|
||||
// First, make sure we have no pending frame sending.
|
||||
self.frame.write_pending(stream)?;
|
||||
|
||||
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
|
||||
// response, unless it already received a Close frame. It SHOULD
|
||||
// respond with Pong frame as soon as is practical. (RFC 6455)
|
||||
if let Some(pong) = self.pong.take() {
|
||||
trace!("Sending pong reply");
|
||||
self.send_one_frame(stream, pong)?;
|
||||
self.queue_one_frame(pong);
|
||||
}
|
||||
// If we have any unsent frames, send them.
|
||||
// If we have any frames queued, add them to the outgoing buffer
|
||||
trace!("Frames still in queue: {}", self.send_queue.len());
|
||||
while let Some(data) = self.send_queue.pop_front() {
|
||||
self.send_one_frame(stream, data)?;
|
||||
self.queue_one_frame(data);
|
||||
}
|
||||
|
||||
// if the outgoing buffer isn't empty then try write some data
|
||||
if self.frame.has_unsent() {
|
||||
self.frame.write_pending(stream)?;
|
||||
}
|
||||
|
||||
// If we get to this point, the send queue is empty and the underlying socket is still
|
||||
|
@ -588,10 +624,8 @@ impl WebSocketContext {
|
|||
}
|
||||
}
|
||||
|
||||
/// Send a single pending frame.
|
||||
fn send_one_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()>
|
||||
where
|
||||
Stream: Read + Write,
|
||||
/// Add a single frame to the end of the output buffer.
|
||||
fn queue_one_frame(&mut self, mut frame: Frame)
|
||||
{
|
||||
match self.role {
|
||||
Role::Server => {}
|
||||
|
@ -603,7 +637,7 @@ impl WebSocketContext {
|
|||
}
|
||||
|
||||
trace!("Sending frame: {:?}", frame);
|
||||
self.frame.write_frame(stream, frame).check_connection_reset(self.state)
|
||||
self.frame.queue_frame(frame);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue