protocol: correct non-blocking handling

This commit is contained in:
Alexey Galakhov 2017-02-03 22:14:15 +01:00
parent 838c8e7ea8
commit 46bfd81539
1 changed files with 58 additions and 5 deletions

View File

@ -5,12 +5,13 @@ mod message;
pub use self::message::Message;
use self::message::{IncompleteMessage, IncompleteMessageType};
use std::collections::VecDeque;
use std::io::{Read, Write};
use std::io::{Read, Write, Error as IoError, ErrorKind as IoErrorKind};
use std::mem::replace;
use std::result::Result as StdResult;
use error::{Error, Result};
use self::message::{IncompleteMessage, IncompleteMessageType};
use self::frame::{Frame, FrameSocket};
use self::frame::coding::{OpCode, Data as OpData, Control as OpCtl, CloseCode};
@ -56,8 +57,14 @@ impl<Stream> WebSocket<Stream>
/// Read a message from stream, if possible.
pub fn read_message(&mut self) -> Result<Message> {
loop {
self.send_pending()?; // FIXME
if let Some(message) = self.read_message_frame()? {
let write_blocks = self.send_pending().no_block()?.is_none();
let read = self.read_message_frame();
let frame = if write_blocks {
Some(read?)
} else {
read.no_block()?
};
if let Some(Some(message)) = frame {
debug!("Received message {}", message);
return Ok(message)
}
@ -74,7 +81,8 @@ impl<Stream> WebSocket<Stream>
Frame::message(message.into_data(), OpCode::Data(opcode), true)
};
self.send_queue.push_back(frame);
self.send_pending()
self.send_pending().no_block()?;
Ok(())
}
/// Close the connection.
@ -89,6 +97,7 @@ impl<Stream> WebSocket<Stream>
// already closed, nothing to do
}
}
self.send_pending().no_block()?;
Ok(())
}
@ -348,6 +357,50 @@ impl WebSocketState {
}
}
/// Non-blocking IO handling.
trait NonBlockingError: Sized {
fn into_non_blocking(self) -> Option<Self>;
}
impl NonBlockingError for IoError {
fn into_non_blocking(self) -> Option<Self> {
match self.kind() {
IoErrorKind::Interrupted => None,
_ => Some(self),
}
}
}
impl NonBlockingError for Error {
fn into_non_blocking(self) -> Option<Self> {
match self {
Error::Io(e) => e.into_non_blocking().map(|e| e.into()),
x => Some(x),
}
}
}
/// Non-blocking IO wrapper.
trait NonBlockingResult {
type Result;
fn no_block(self) -> Self::Result;
}
impl<T, E> NonBlockingResult for StdResult<T, E>
where E : NonBlockingError
{
type Result = StdResult<Option<T>, E>;
fn no_block(self) -> Self::Result {
match self {
Ok(x) => Ok(Some(x)),
Err(e) => match e.into_non_blocking() {
Some(e) => Err(e),
None => Ok(None),
}
}
}
}
#[cfg(test)]
mod tests {
use super::{WebSocket, Role, Message};