Rename methods to `read`, `send`, `write` & `flush`
Refine docs Add `send` method Add deprecated versions of write_message, write_pending, read_message Handle pong WriteBufferFull error Add changelog
This commit is contained in:
parent
0203a1849b
commit
84a54b76e6
|
@ -1,2 +1,3 @@
|
|||
target
|
||||
Cargo.lock
|
||||
.vscode
|
||||
|
|
15
CHANGELOG.md
15
CHANGELOG.md
|
@ -1,3 +1,18 @@
|
|||
# Unreleased (0.20.0)
|
||||
- Remove many implicit flushing behaviours. In general reading and writing messages will no
|
||||
longer flush until calling `flush`. An exception is automatic responses (e.g. pongs)
|
||||
which will continue to be written and flushed when reading and writing.
|
||||
This allows writing a batch of messages and flushing once.
|
||||
- Add `WebSocket::read`, `write`, `send`, `flush`. Deprecate `read_message`, `write_message`, `write_pending`.
|
||||
- Add `FrameSocket::read`, `write`, `send`, `flush`. Remove `read_frame`, `write_frame`, `write_pending`.
|
||||
Note: Previous use of `write_frame` may be replaced with `send`.
|
||||
- Add `WebSocketContext::read`, `write`, `flush`. Remove `read_message`, `write_message`, `write_pending`.
|
||||
Note: Previous use of `write_message` may be replaced with `write` + `flush`.
|
||||
- Remove `send_queue`, replaced with using the frame write buffer to achieve similar results.
|
||||
* Add `WebSocketConfig::max_write_buffer_size`. Deprecate `max_send_queue`.
|
||||
* Add `Error::WriteBufferFull`. Remove `Error::SendQueueFull`.
|
||||
Note: `WriteBufferFull` returns the message that could not be written as a `Message::Frame`.
|
||||
|
||||
# 0.19.0
|
||||
|
||||
- Update TLS dependencies.
|
||||
|
|
|
@ -14,11 +14,11 @@ fn main () {
|
|||
spawn (move || {
|
||||
let mut websocket = accept(stream.unwrap()).unwrap();
|
||||
loop {
|
||||
let msg = websocket.read_message().unwrap();
|
||||
let msg = websocket.read().unwrap();
|
||||
|
||||
// We do not want to send back ping/pong messages.
|
||||
if msg.is_binary() || msg.is_text() {
|
||||
websocket.write_message(msg).unwrap();
|
||||
websocket.send(msg).unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -42,7 +42,7 @@ impl Write for MockSlowFlushWrite {
|
|||
}
|
||||
|
||||
fn benchmark(c: &mut Criterion) {
|
||||
// Writes 100k small json text messages then calls `write_pending`
|
||||
// Writes 100k small json text messages then flushes
|
||||
c.bench_function("write 100k small texts then flush", |b| {
|
||||
let mut ws = WebSocket::from_raw_socket(
|
||||
MockSlowFlushWrite(Vec::with_capacity(MOCK_WRITE_LEN)),
|
||||
|
@ -54,9 +54,9 @@ fn benchmark(c: &mut Criterion) {
|
|||
|| (0..100_000).map(|i| Message::Text(format!("{{\"id\":{i}}}"))),
|
||||
|batch| {
|
||||
for msg in batch {
|
||||
ws.write_message(msg).unwrap();
|
||||
ws.write(msg).unwrap();
|
||||
}
|
||||
ws.write_pending().unwrap();
|
||||
ws.flush().unwrap();
|
||||
},
|
||||
BatchSize::SmallInput,
|
||||
)
|
||||
|
|
|
@ -7,7 +7,7 @@ const AGENT: &str = "Tungstenite";
|
|||
|
||||
fn get_case_count() -> Result<u32> {
|
||||
let (mut socket, _) = connect(Url::parse("ws://localhost:9001/getCaseCount").unwrap())?;
|
||||
let msg = socket.read_message()?;
|
||||
let msg = socket.read()?;
|
||||
socket.close(None)?;
|
||||
Ok(msg.into_text()?.parse::<u32>().unwrap())
|
||||
}
|
||||
|
@ -26,9 +26,9 @@ fn run_test(case: u32) -> Result<()> {
|
|||
Url::parse(&format!("ws://localhost:9001/runCase?case={}&agent={}", case, AGENT)).unwrap();
|
||||
let (mut socket, _) = connect(case_url)?;
|
||||
loop {
|
||||
match socket.read_message()? {
|
||||
match socket.read()? {
|
||||
msg @ Message::Text(_) | msg @ Message::Binary(_) => {
|
||||
socket.write_message(msg)?;
|
||||
socket.send(msg)?;
|
||||
}
|
||||
Message::Ping(_) | Message::Pong(_) | Message::Close(_) | Message::Frame(_) => {}
|
||||
}
|
||||
|
|
|
@ -17,9 +17,9 @@ fn handle_client(stream: TcpStream) -> Result<()> {
|
|||
let mut socket = accept(stream).map_err(must_not_block)?;
|
||||
info!("Running test");
|
||||
loop {
|
||||
match socket.read_message()? {
|
||||
match socket.read()? {
|
||||
msg @ Message::Text(_) | msg @ Message::Binary(_) => {
|
||||
socket.write_message(msg)?;
|
||||
socket.send(msg)?;
|
||||
}
|
||||
Message::Ping(_) | Message::Pong(_) | Message::Close(_) | Message::Frame(_) => {}
|
||||
}
|
||||
|
|
|
@ -14,9 +14,9 @@ fn main() {
|
|||
println!("* {}", header);
|
||||
}
|
||||
|
||||
socket.write_message(Message::Text("Hello WebSocket".into())).unwrap();
|
||||
socket.send(Message::Text("Hello WebSocket".into())).unwrap();
|
||||
loop {
|
||||
let msg = socket.read_message().expect("Error reading message");
|
||||
let msg = socket.read().expect("Error reading message");
|
||||
println!("Received: {}", msg);
|
||||
}
|
||||
// socket.close(None);
|
||||
|
|
|
@ -28,9 +28,9 @@ fn main() {
|
|||
let mut websocket = accept_hdr(stream.unwrap(), callback).unwrap();
|
||||
|
||||
loop {
|
||||
let msg = websocket.read_message().unwrap();
|
||||
let msg = websocket.read().unwrap();
|
||||
if msg.is_binary() || msg.is_text() {
|
||||
websocket.write_message(msg).unwrap();
|
||||
websocket.send(msg).unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -38,7 +38,7 @@ fn main() {
|
|||
let mut websocket = accept_hdr_with_config(stream.unwrap(), callback, config).unwrap();
|
||||
|
||||
loop {
|
||||
let msg = websocket.read_message().unwrap();
|
||||
let msg = websocket.read().unwrap();
|
||||
if msg.is_binary() || msg.is_text() {
|
||||
println!("received message {}", msg);
|
||||
}
|
||||
|
|
|
@ -33,5 +33,5 @@ fuzz_target!(|data: &[u8]| {
|
|||
//let vector: Vec<u8> = data.into();
|
||||
let cursor = Cursor::new(data);
|
||||
let mut socket = WebSocket::from_raw_socket(WriteMoc(cursor), Role::Client, None);
|
||||
socket.read_message().ok();
|
||||
socket.read().ok();
|
||||
});
|
||||
|
|
|
@ -33,5 +33,5 @@ fuzz_target!(|data: &[u8]| {
|
|||
//let vector: Vec<u8> = data.into();
|
||||
let cursor = Cursor::new(data);
|
||||
let mut socket = WebSocket::from_raw_socket(WriteMoc(cursor), Role::Server, None);
|
||||
socket.read_message().ok();
|
||||
socket.read().ok();
|
||||
});
|
||||
|
|
|
@ -56,7 +56,7 @@ where
|
|||
Stream: Read,
|
||||
{
|
||||
/// Read a frame from stream.
|
||||
pub fn read_frame(&mut self, max_size: Option<usize>) -> Result<Option<Frame>> {
|
||||
pub fn read(&mut self, max_size: Option<usize>) -> Result<Option<Frame>> {
|
||||
self.codec.read_frame(&mut self.stream, max_size)
|
||||
}
|
||||
}
|
||||
|
@ -65,18 +65,27 @@ impl<Stream> FrameSocket<Stream>
|
|||
where
|
||||
Stream: Write,
|
||||
{
|
||||
/// Write a frame to stream.
|
||||
///
|
||||
/// This function guarantees that the frame is queued regardless of any errors.
|
||||
/// There is no need to resend the frame. In order to handle WouldBlock or Incomplete,
|
||||
/// call write_pending() afterwards.
|
||||
pub fn write_frame(&mut self, frame: Frame) -> Result<()> {
|
||||
self.codec.write_frame(&mut self.stream, frame)?;
|
||||
Ok(self.stream.flush()?)
|
||||
/// Writes and immediately flushes a frame.
|
||||
/// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush).
|
||||
pub fn send(&mut self, frame: Frame) -> Result<()> {
|
||||
self.write(frame)?;
|
||||
self.flush()
|
||||
}
|
||||
|
||||
/// Complete pending write, if any.
|
||||
pub fn write_pending(&mut self) -> Result<()> {
|
||||
/// Write a frame to stream.
|
||||
///
|
||||
/// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
|
||||
///
|
||||
/// This function guarantees that the frame is queued unless [`Error::WriteBufferFull`]
|
||||
/// is returned.
|
||||
/// In order to handle WouldBlock or Incomplete, call [`flush`](Self::flush) afterwards.
|
||||
pub fn write(&mut self, frame: Frame) -> Result<()> {
|
||||
self.codec.write_frame(&mut self.stream, frame)
|
||||
}
|
||||
|
||||
/// Flush writes.
|
||||
pub fn flush(&mut self) -> Result<()> {
|
||||
self.codec.write_out_buffer(&mut self.stream)?;
|
||||
Ok(self.stream.flush()?)
|
||||
}
|
||||
}
|
||||
|
@ -245,11 +254,11 @@ mod tests {
|
|||
let mut sock = FrameSocket::new(raw);
|
||||
|
||||
assert_eq!(
|
||||
sock.read_frame(None).unwrap().unwrap().into_data(),
|
||||
sock.read(None).unwrap().unwrap().into_data(),
|
||||
vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]
|
||||
);
|
||||
assert_eq!(sock.read_frame(None).unwrap().unwrap().into_data(), vec![0x03, 0x02, 0x01]);
|
||||
assert!(sock.read_frame(None).unwrap().is_none());
|
||||
assert_eq!(sock.read(None).unwrap().unwrap().into_data(), vec![0x03, 0x02, 0x01]);
|
||||
assert!(sock.read(None).unwrap().is_none());
|
||||
|
||||
let (_, rest) = sock.into_inner();
|
||||
assert_eq!(rest, vec![0x99]);
|
||||
|
@ -260,7 +269,7 @@ mod tests {
|
|||
let raw = Cursor::new(vec![0x02, 0x03, 0x04, 0x05, 0x06, 0x07]);
|
||||
let mut sock = FrameSocket::from_partially_read(raw, vec![0x82, 0x07, 0x01]);
|
||||
assert_eq!(
|
||||
sock.read_frame(None).unwrap().unwrap().into_data(),
|
||||
sock.read(None).unwrap().unwrap().into_data(),
|
||||
vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]
|
||||
);
|
||||
}
|
||||
|
@ -270,10 +279,10 @@ mod tests {
|
|||
let mut sock = FrameSocket::new(Vec::new());
|
||||
|
||||
let frame = Frame::ping(vec![0x04, 0x05]);
|
||||
sock.write_frame(frame).unwrap();
|
||||
sock.send(frame).unwrap();
|
||||
|
||||
let frame = Frame::pong(vec![0x01]);
|
||||
sock.write_frame(frame).unwrap();
|
||||
sock.send(frame).unwrap();
|
||||
|
||||
let (buf, _) = sock.into_inner();
|
||||
assert_eq!(buf, vec![0x89, 0x02, 0x04, 0x05, 0x8a, 0x01, 0x01]);
|
||||
|
@ -285,7 +294,7 @@ mod tests {
|
|||
0x83, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00,
|
||||
]);
|
||||
let mut sock = FrameSocket::new(raw);
|
||||
let _ = sock.read_frame(None); // should not crash
|
||||
let _ = sock.read(None); // should not crash
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -293,7 +302,7 @@ mod tests {
|
|||
let raw = Cursor::new(vec![0x82, 0x07, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]);
|
||||
let mut sock = FrameSocket::new(raw);
|
||||
assert!(matches!(
|
||||
sock.read_frame(Some(5)),
|
||||
sock.read(Some(5)),
|
||||
Err(Error::Capacity(CapacityError::MessageTooLong { size: 7, max_size: 5 }))
|
||||
));
|
||||
}
|
||||
|
|
|
@ -185,7 +185,7 @@ impl Message {
|
|||
Message::Text(string.into())
|
||||
}
|
||||
|
||||
/// Create a new binary WebSocket message by converting to Vec<u8>.
|
||||
/// Create a new binary WebSocket message by converting to `Vec<u8>`.
|
||||
pub fn binary<B>(bin: B) -> Message
|
||||
where
|
||||
B: Into<Vec<u8>>,
|
||||
|
|
|
@ -75,6 +75,8 @@ impl Default for WebSocketConfig {
|
|||
///
|
||||
/// This is THE structure you want to create to be able to speak the WebSocket protocol.
|
||||
/// It may be created by calling `connect`, `accept` or `client` functions.
|
||||
///
|
||||
/// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages.
|
||||
#[derive(Debug)]
|
||||
pub struct WebSocket<Stream> {
|
||||
/// The underlying socket.
|
||||
|
@ -148,89 +150,116 @@ impl<Stream> WebSocket<Stream> {
|
|||
impl<Stream: Read + Write> WebSocket<Stream> {
|
||||
/// Read a message from stream, if possible.
|
||||
///
|
||||
/// This will queue responses to ping and close messages to be sent. It will call
|
||||
/// `write_pending` before trying to read in order to make sure that those responses
|
||||
/// make progress even if you never call `write_pending`. That does mean that they
|
||||
/// get sent out earliest on the next call to `read_message`, `write_message` or `write_pending`.
|
||||
/// This will also queue responses to ping and close messages. These responses
|
||||
/// will be written and flushed on the next call to [`read`](Self::read),
|
||||
/// [`write`](Self::write) or [`flush`](Self::flush).
|
||||
///
|
||||
/// ## Closing the connection
|
||||
/// # Closing the connection
|
||||
/// When the remote endpoint decides to close the connection this will return
|
||||
/// the close message with an optional close frame.
|
||||
///
|
||||
/// You should continue calling `read_message`, `write_message` or `write_pending` to drive
|
||||
/// the reply to the close frame until [Error::ConnectionClosed] is returned. Once that happens
|
||||
/// it is safe to drop the underlying connection.
|
||||
pub fn read_message(&mut self) -> Result<Message> {
|
||||
self.context.read_message(&mut self.socket)
|
||||
/// You should continue calling [`read`](Self::read), [`write`](Self::write) or
|
||||
/// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`]
|
||||
/// is returned. Once that happens it is safe to drop the underlying connection.
|
||||
pub fn read(&mut self) -> Result<Message> {
|
||||
self.context.read(&mut self.socket)
|
||||
}
|
||||
|
||||
/// Writes and immediately flushes a message.
|
||||
/// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush).
|
||||
pub fn send(&mut self, message: Message) -> Result<()> {
|
||||
self.write(message)?;
|
||||
self.flush()
|
||||
}
|
||||
|
||||
/// Write a message to the provided stream, if possible.
|
||||
///
|
||||
/// A subsequent call should be made to [`Self::write_pending`] to flush writes.
|
||||
/// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
|
||||
///
|
||||
/// In the event of stream write failure the message frame will be stored
|
||||
/// in the write buffer and will try again on the next call to [`Self::write_message`] or [`Self::write_pending`].
|
||||
/// in the write buffer and will try again on the next call to [`write`](Self::write)
|
||||
/// or [`flush`](Self::flush).
|
||||
///
|
||||
/// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
|
||||
/// `Err(WriteBufferFull(msg_frame))` is returned.
|
||||
/// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
|
||||
///
|
||||
/// This call will not flush, except to reply to Ping
|
||||
/// requests. A Pong reply will flush early because the
|
||||
/// [websocket RFC](https://tools.ietf.org/html/rfc6455#section-5.5.2) specifies it should be sent
|
||||
/// as soon as is practical.
|
||||
/// This call will generally not flush. However, if there are queued automatic messages
|
||||
/// they will be written and eagerly flushed.
|
||||
///
|
||||
/// Note that upon receiving a ping message, tungstenite cues a pong reply automatically.
|
||||
/// When you call either `read_message`, `write_message` or `write_pending` next it will try to
|
||||
/// write & flush the pong reply if possible. This means you should not respond to ping frames manually.
|
||||
/// For example, upon receiving ping messages tungstenite queues pong replies automatically.
|
||||
/// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush)
|
||||
/// will write & flush the pong reply. This means you should not respond to ping frames manually.
|
||||
///
|
||||
/// You can however send pong frames manually in order to indicate a unidirectional heartbeat
|
||||
/// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
|
||||
/// if `read_message` returns a ping, you should call `write_pending` until it doesn't return
|
||||
/// WouldBlock before passing a pong to `write_message`, otherwise the response to the
|
||||
/// ping will not be sent, but rather replaced by your custom pong message.
|
||||
/// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing
|
||||
/// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the
|
||||
/// ping will not be sent as it will be replaced by your custom pong message.
|
||||
///
|
||||
/// ## Errors
|
||||
/// # Errors
|
||||
/// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned
|
||||
/// along with the equivalent passed message frame. Otherwise, the message is queued and Ok(()) is returned.
|
||||
/// along with the equivalent passed message frame.
|
||||
/// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`].
|
||||
/// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from `read_message`,
|
||||
/// [`Error::AlreadyClosed`] will be returned. This indicates a program error on your part.
|
||||
/// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from
|
||||
/// [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program
|
||||
/// error on your part.
|
||||
/// - [`Error::Io`] is returned if the underlying connection returns an error
|
||||
/// (consider these fatal except for WouldBlock).
|
||||
/// - [`Error::Capacity`] if your message size is bigger than the configured max message size.
|
||||
pub fn write_message(&mut self, message: Message) -> Result<()> {
|
||||
self.context.write_message(&mut self.socket, message)
|
||||
pub fn write(&mut self, message: Message) -> Result<()> {
|
||||
self.context.write(&mut self.socket, message)
|
||||
}
|
||||
|
||||
/// Flush pending writes.
|
||||
pub fn write_pending(&mut self) -> Result<()> {
|
||||
self.context.write_pending(&mut self.socket)
|
||||
/// Flush writes.
|
||||
///
|
||||
/// Ensures all messages previously passed to [`write`](Self::write) and automatic
|
||||
/// queued pong responses are written & flushed into the underlying stream.
|
||||
pub fn flush(&mut self) -> Result<()> {
|
||||
self.context.flush(&mut self.socket)
|
||||
}
|
||||
|
||||
/// Close the connection.
|
||||
///
|
||||
/// This function guarantees that the close frame will be queued.
|
||||
/// There is no need to call it again. Calling this function is
|
||||
/// the same as calling `write_message(Message::Close(..))`.
|
||||
/// the same as calling `write(Message::Close(..))`.
|
||||
///
|
||||
/// After queuing the close frame you should continue calling `read_message` or
|
||||
/// `write_pending` to drive the close handshake to completion.
|
||||
/// After queuing the close frame you should continue calling [`read`](Self::read) or
|
||||
/// [`flush`](Self::flush) to drive the close handshake to completion.
|
||||
///
|
||||
/// The websocket RFC defines that the underlying connection should be closed
|
||||
/// by the server. Tungstenite takes care of this asymmetry for you.
|
||||
///
|
||||
/// When the close handshake is finished (we have both sent and received
|
||||
/// a close message), `read_message` or `write_pending` will return
|
||||
/// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return
|
||||
/// [Error::ConnectionClosed] if this endpoint is the server.
|
||||
///
|
||||
/// If this endpoint is a client, [Error::ConnectionClosed] will only be
|
||||
/// returned after the server has closed the underlying connection.
|
||||
///
|
||||
/// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed]
|
||||
/// is returned from `read_message` or `write_pending`.
|
||||
/// is returned from [`read`](Self::read) or [`flush`](Self::flush).
|
||||
pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
|
||||
self.context.close(&mut self.socket, code)
|
||||
}
|
||||
|
||||
/// Old name for [`read`](Self::read).
|
||||
#[deprecated(note = "Use `read`")]
|
||||
pub fn read_message(&mut self) -> Result<Message> {
|
||||
self.read()
|
||||
}
|
||||
|
||||
/// Old name for [`send`](Self::send).
|
||||
#[deprecated(note = "Use `send`")]
|
||||
pub fn write_message(&mut self, message: Message) -> Result<()> {
|
||||
self.send(message)
|
||||
}
|
||||
|
||||
/// Old name for [`flush`](Self::flush).
|
||||
#[deprecated(note = "Use `flush`")]
|
||||
pub fn write_pending(&mut self) -> Result<()> {
|
||||
self.flush()
|
||||
}
|
||||
}
|
||||
|
||||
/// A context for managing WebSocket stream.
|
||||
|
@ -305,7 +334,7 @@ impl WebSocketContext {
|
|||
///
|
||||
/// This function sends pong and close responses automatically.
|
||||
/// However, it never blocks on write.
|
||||
pub fn read_message<Stream>(&mut self, stream: &mut Stream) -> Result<Message>
|
||||
pub fn read<Stream>(&mut self, stream: &mut Stream) -> Result<Message>
|
||||
where
|
||||
Stream: Read + Write,
|
||||
{
|
||||
|
@ -315,14 +344,13 @@ impl WebSocketContext {
|
|||
loop {
|
||||
if self.additional_send.is_some() {
|
||||
// Since we may get ping or close, we need to reply to the messages even during read.
|
||||
// Thus we call write_pending() but ignore its blocking.
|
||||
self.write_pending(stream).no_block()?;
|
||||
// Thus we flush but ignore its blocking.
|
||||
self.flush(stream).no_block()?;
|
||||
} else if self.role == Role::Server && !self.state.can_read() {
|
||||
self.state = WebSocketState::Terminated;
|
||||
return Err(Error::ConnectionClosed);
|
||||
}
|
||||
|
||||
// TODO don't flush writes when reading
|
||||
// If we get here, either write blocks or we have nothing to write.
|
||||
// Thus if read blocks, just let it return WouldBlock.
|
||||
if let Some(message) = self.read_message_frame(stream)? {
|
||||
|
@ -332,19 +360,17 @@ impl WebSocketContext {
|
|||
}
|
||||
}
|
||||
|
||||
/// Write a message to the provided stream, if possible.
|
||||
/// Write a message to the provided stream.
|
||||
///
|
||||
/// A subsequent call should be made to [`Self::write_pending`] to flush writes.
|
||||
/// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
|
||||
///
|
||||
/// In the event of stream write failure the message frame will be stored
|
||||
/// in the write buffer and will try again on the next call to [`Self::write_message`] or [`Self::write_pending`].
|
||||
/// in the write buffer and will try again on the next call to [`write`](Self::write)
|
||||
/// or [`flush`](Self::flush).
|
||||
///
|
||||
/// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
|
||||
/// `Err(WriteBufferFull(msg_frame))` is returned.
|
||||
///
|
||||
/// Note that only the latest pong frame is stored to be sent, so only the
|
||||
/// most recent pong frame is sent if multiple pong frames are queued.
|
||||
pub fn write_message<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()>
|
||||
/// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
|
||||
pub fn write<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()>
|
||||
where
|
||||
Stream: Read + Write,
|
||||
{
|
||||
|
@ -363,35 +389,38 @@ impl WebSocketContext {
|
|||
Message::Pong(data) => {
|
||||
self.set_additional(Frame::pong(data));
|
||||
// Note: user pongs can be user flushed so no need to flush here
|
||||
return self.write(stream, None).map(|_| ());
|
||||
return self._write(stream, None).map(|_| ());
|
||||
}
|
||||
Message::Close(code) => return self.close(stream, code),
|
||||
Message::Frame(f) => f,
|
||||
};
|
||||
|
||||
let should_flush = self.write(stream, Some(frame))?;
|
||||
let should_flush = self._write(stream, Some(frame))?;
|
||||
if should_flush {
|
||||
self.write_pending(stream)?;
|
||||
self.flush(stream)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flush pending writes.
|
||||
/// Flush writes.
|
||||
///
|
||||
/// Ensures all messages previously passed to [`write`](Self::write) and automatically
|
||||
/// queued pong responses are written & flushed into the `stream`.
|
||||
#[inline]
|
||||
pub fn write_pending<Stream>(&mut self, stream: &mut Stream) -> Result<()>
|
||||
pub fn flush<Stream>(&mut self, stream: &mut Stream) -> Result<()>
|
||||
where
|
||||
Stream: Read + Write,
|
||||
{
|
||||
_ = self.write(stream, None)?;
|
||||
self._write(stream, None)?;
|
||||
Ok(stream.flush()?)
|
||||
}
|
||||
|
||||
/// Write send queue & pongs.
|
||||
/// Writes any data in the out_buffer, `additional_send` and given `data`.
|
||||
///
|
||||
/// Does **not** flush.
|
||||
///
|
||||
/// Returns if the write contents indicate we should flush immediately.
|
||||
fn write<Stream>(&mut self, stream: &mut Stream, data: Option<Frame>) -> Result<bool>
|
||||
/// Returns true if the write contents indicate we should flush immediately.
|
||||
fn _write<Stream>(&mut self, stream: &mut Stream, data: Option<Frame>) -> Result<bool>
|
||||
where
|
||||
Stream: Read + Write,
|
||||
{
|
||||
|
@ -405,7 +434,15 @@ impl WebSocketContext {
|
|||
// respond with Pong frame as soon as is practical. (RFC 6455)
|
||||
let should_flush = if let Some(msg) = self.additional_send.take() {
|
||||
trace!("Sending pong/close");
|
||||
self.write_one_frame(stream, msg)?;
|
||||
if let Err(err) = self.write_one_frame(stream, msg) {
|
||||
match err {
|
||||
// if an system message would exceed the buffer put it back in
|
||||
// `additional_send` for retry. Otherwise returning this error
|
||||
// may not make sense to the user, e.g. calling `flush`.
|
||||
Error::WriteBufferFull(Message::Frame(msg)) => self.set_additional(msg),
|
||||
err => return Err(err),
|
||||
}
|
||||
}
|
||||
true
|
||||
} else {
|
||||
false
|
||||
|
@ -438,7 +475,7 @@ impl WebSocketContext {
|
|||
if let WebSocketState::Active = self.state {
|
||||
self.state = WebSocketState::ClosedByUs;
|
||||
let frame = Frame::close(code);
|
||||
self.write(stream, Some(frame))?;
|
||||
self._write(stream, Some(frame))?;
|
||||
} else {
|
||||
// Already closed, nothing to do.
|
||||
}
|
||||
|
@ -731,10 +768,10 @@ mod tests {
|
|||
0x03,
|
||||
]);
|
||||
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None);
|
||||
assert_eq!(socket.read_message().unwrap(), Message::Ping(vec![1, 2]));
|
||||
assert_eq!(socket.read_message().unwrap(), Message::Pong(vec![3]));
|
||||
assert_eq!(socket.read_message().unwrap(), Message::Text("Hello, World!".into()));
|
||||
assert_eq!(socket.read_message().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03]));
|
||||
assert_eq!(socket.read().unwrap(), Message::Ping(vec![1, 2]));
|
||||
assert_eq!(socket.read().unwrap(), Message::Pong(vec![3]));
|
||||
assert_eq!(socket.read().unwrap(), Message::Text("Hello, World!".into()));
|
||||
assert_eq!(socket.read().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -747,7 +784,7 @@ mod tests {
|
|||
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
|
||||
|
||||
assert!(matches!(
|
||||
socket.read_message(),
|
||||
socket.read(),
|
||||
Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 }))
|
||||
));
|
||||
}
|
||||
|
@ -759,7 +796,7 @@ mod tests {
|
|||
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
|
||||
|
||||
assert!(matches!(
|
||||
socket.read_message(),
|
||||
socket.read(),
|
||||
Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 }))
|
||||
));
|
||||
}
|
||||
|
|
|
@ -52,27 +52,27 @@ fn test_server_close() {
|
|||
do_test(
|
||||
3012,
|
||||
|mut cli_sock| {
|
||||
cli_sock.write_message(Message::Text("Hello WebSocket".into())).unwrap();
|
||||
cli_sock.send(Message::Text("Hello WebSocket".into())).unwrap();
|
||||
|
||||
let message = cli_sock.read_message().unwrap(); // receive close from server
|
||||
let message = cli_sock.read().unwrap(); // receive close from server
|
||||
assert!(message.is_close());
|
||||
|
||||
let err = cli_sock.read_message().unwrap_err(); // now we should get ConnectionClosed
|
||||
let err = cli_sock.read().unwrap_err(); // now we should get ConnectionClosed
|
||||
match err {
|
||||
Error::ConnectionClosed => {}
|
||||
_ => panic!("unexpected error: {:?}", err),
|
||||
}
|
||||
},
|
||||
|mut srv_sock| {
|
||||
let message = srv_sock.read_message().unwrap();
|
||||
let message = srv_sock.read().unwrap();
|
||||
assert_eq!(message.into_data(), b"Hello WebSocket");
|
||||
|
||||
srv_sock.close(None).unwrap(); // send close to client
|
||||
|
||||
let message = srv_sock.read_message().unwrap(); // receive acknowledgement
|
||||
let message = srv_sock.read().unwrap(); // receive acknowledgement
|
||||
assert!(message.is_close());
|
||||
|
||||
let err = srv_sock.read_message().unwrap_err(); // now we should get ConnectionClosed
|
||||
let err = srv_sock.read().unwrap_err(); // now we should get ConnectionClosed
|
||||
match err {
|
||||
Error::ConnectionClosed => {}
|
||||
_ => panic!("unexpected error: {:?}", err),
|
||||
|
@ -86,26 +86,26 @@ fn test_evil_server_close() {
|
|||
do_test(
|
||||
3013,
|
||||
|mut cli_sock| {
|
||||
cli_sock.write_message(Message::Text("Hello WebSocket".into())).unwrap();
|
||||
cli_sock.send(Message::Text("Hello WebSocket".into())).unwrap();
|
||||
|
||||
sleep(Duration::from_secs(1));
|
||||
|
||||
let message = cli_sock.read_message().unwrap(); // receive close from server
|
||||
let message = cli_sock.read().unwrap(); // receive close from server
|
||||
assert!(message.is_close());
|
||||
|
||||
let err = cli_sock.read_message().unwrap_err(); // now we should get ConnectionClosed
|
||||
let err = cli_sock.read().unwrap_err(); // now we should get ConnectionClosed
|
||||
match err {
|
||||
Error::ConnectionClosed => {}
|
||||
_ => panic!("unexpected error: {:?}", err),
|
||||
}
|
||||
},
|
||||
|mut srv_sock| {
|
||||
let message = srv_sock.read_message().unwrap();
|
||||
let message = srv_sock.read().unwrap();
|
||||
assert_eq!(message.into_data(), b"Hello WebSocket");
|
||||
|
||||
srv_sock.close(None).unwrap(); // send close to client
|
||||
|
||||
let message = srv_sock.read_message().unwrap(); // receive acknowledgement
|
||||
let message = srv_sock.read().unwrap(); // receive acknowledgement
|
||||
assert!(message.is_close());
|
||||
// and now just drop the connection without waiting for `ConnectionClosed`
|
||||
srv_sock.get_mut().set_linger(Some(Duration::from_secs(0))).unwrap();
|
||||
|
@ -119,32 +119,32 @@ fn test_client_close() {
|
|||
do_test(
|
||||
3014,
|
||||
|mut cli_sock| {
|
||||
cli_sock.write_message(Message::Text("Hello WebSocket".into())).unwrap();
|
||||
cli_sock.send(Message::Text("Hello WebSocket".into())).unwrap();
|
||||
|
||||
let message = cli_sock.read_message().unwrap(); // receive answer from server
|
||||
let message = cli_sock.read().unwrap(); // receive answer from server
|
||||
assert_eq!(message.into_data(), b"From Server");
|
||||
|
||||
cli_sock.close(None).unwrap(); // send close to server
|
||||
|
||||
let message = cli_sock.read_message().unwrap(); // receive acknowledgement from server
|
||||
let message = cli_sock.read().unwrap(); // receive acknowledgement from server
|
||||
assert!(message.is_close());
|
||||
|
||||
let err = cli_sock.read_message().unwrap_err(); // now we should get ConnectionClosed
|
||||
let err = cli_sock.read().unwrap_err(); // now we should get ConnectionClosed
|
||||
match err {
|
||||
Error::ConnectionClosed => {}
|
||||
_ => panic!("unexpected error: {:?}", err),
|
||||
}
|
||||
},
|
||||
|mut srv_sock| {
|
||||
let message = srv_sock.read_message().unwrap();
|
||||
let message = srv_sock.read().unwrap();
|
||||
assert_eq!(message.into_data(), b"Hello WebSocket");
|
||||
|
||||
srv_sock.write_message(Message::Text("From Server".into())).unwrap();
|
||||
srv_sock.send(Message::Text("From Server".into())).unwrap();
|
||||
|
||||
let message = srv_sock.read_message().unwrap(); // receive close from client
|
||||
let message = srv_sock.read().unwrap(); // receive close from client
|
||||
assert!(message.is_close());
|
||||
|
||||
let err = srv_sock.read_message().unwrap_err(); // now we should get ConnectionClosed
|
||||
let err = srv_sock.read().unwrap_err(); // now we should get ConnectionClosed
|
||||
match err {
|
||||
Error::ConnectionClosed => {}
|
||||
_ => panic!("unexpected error: {:?}", err),
|
||||
|
|
|
@ -29,10 +29,10 @@ fn test_no_send_after_close() {
|
|||
let client_thread = spawn(move || {
|
||||
let (mut client, _) = connect(Url::parse("ws://localhost:3013/socket").unwrap()).unwrap();
|
||||
|
||||
let message = client.read_message().unwrap(); // receive close from server
|
||||
let message = client.read().unwrap(); // receive close from server
|
||||
assert!(message.is_close());
|
||||
|
||||
let err = client.read_message().unwrap_err(); // now we should get ConnectionClosed
|
||||
let err = client.read().unwrap_err(); // now we should get ConnectionClosed
|
||||
match err {
|
||||
Error::ConnectionClosed => {}
|
||||
_ => panic!("unexpected error: {:?}", err),
|
||||
|
@ -44,7 +44,7 @@ fn test_no_send_after_close() {
|
|||
|
||||
client_handler.close(None).unwrap(); // send close to client
|
||||
|
||||
let err = client_handler.write_message(Message::Text("Hello WebSocket".into()));
|
||||
let err = client_handler.send(Message::Text("Hello WebSocket".into()));
|
||||
|
||||
assert!(err.is_err());
|
||||
|
||||
|
|
|
@ -29,12 +29,12 @@ fn test_receive_after_init_close() {
|
|||
let client_thread = spawn(move || {
|
||||
let (mut client, _) = connect(Url::parse("ws://localhost:3013/socket").unwrap()).unwrap();
|
||||
|
||||
client.write_message(Message::Text("Hello WebSocket".into())).unwrap();
|
||||
client.send(Message::Text("Hello WebSocket".into())).unwrap();
|
||||
|
||||
let message = client.read_message().unwrap(); // receive close from server
|
||||
let message = client.read().unwrap(); // receive close from server
|
||||
assert!(message.is_close());
|
||||
|
||||
let err = client.read_message().unwrap_err(); // now we should get ConnectionClosed
|
||||
let err = client.read().unwrap_err(); // now we should get ConnectionClosed
|
||||
match err {
|
||||
Error::ConnectionClosed => {}
|
||||
_ => panic!("unexpected error: {:?}", err),
|
||||
|
@ -47,12 +47,12 @@ fn test_receive_after_init_close() {
|
|||
client_handler.close(None).unwrap(); // send close to client
|
||||
|
||||
// This read should succeed even though we already initiated a close
|
||||
let message = client_handler.read_message().unwrap();
|
||||
let message = client_handler.read().unwrap();
|
||||
assert_eq!(message.into_data(), b"Hello WebSocket");
|
||||
|
||||
assert!(client_handler.read_message().unwrap().is_close()); // receive acknowledgement
|
||||
assert!(client_handler.read().unwrap().is_close()); // receive acknowledgement
|
||||
|
||||
let err = client_handler.read_message().unwrap_err(); // now we should get ConnectionClosed
|
||||
let err = client_handler.read().unwrap_err(); // now we should get ConnectionClosed
|
||||
match err {
|
||||
Error::ConnectionClosed => {}
|
||||
_ => panic!("unexpected error: {:?}", err),
|
||||
|
|
Loading…
Reference in New Issue