bugfix: Enter tokio context while dropping the inner future

This commit is contained in:
Jonas Platte 2023-10-10 21:49:56 +02:00 committed by GitHub
parent e7a7ea52a0
commit 581a2c9ada
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 45 additions and 21 deletions

View File

@ -198,9 +198,20 @@ pin_project! {
/// Compatibility adapter for futures and I/O types.
pub struct Compat<T> {
#[pin]
inner: T,
inner: Option<T>,
seek_pos: Option<io::SeekFrom>,
}
impl<T> PinnedDrop for Compat<T> {
fn drop(this: Pin<&mut Self>) {
if this.inner.is_some() {
// If the inner future wasn't moved out using into_inner,
// enter the tokio context while the inner value is dropped.
let _guard = TOKIO1.enter();
this.project().inner.set(None);
}
}
}
}
impl<T> Compat<T> {
@ -236,7 +247,7 @@ impl<T> Compat<T> {
/// ```
pub fn new(t: T) -> Compat<T> {
Compat {
inner: t,
inner: Some(t),
seek_pos: None,
}
}
@ -258,7 +269,9 @@ impl<T> Compat<T> {
/// # }
/// ```
pub fn get_ref(&self) -> &T {
&self.inner
self.inner
.as_ref()
.expect("inner is only None when Compat is about to drop")
}
/// Gets a mutable reference to the inner value.
@ -279,7 +292,16 @@ impl<T> Compat<T> {
/// # }
/// ```
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
self.inner
.as_mut()
.expect("inner is only None when Compat is about to drop")
}
fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
self.project()
.inner
.as_pin_mut()
.expect("inner is only None when Compat is about to drop")
}
/// Unwraps the compatibility adapter.
@ -292,8 +314,10 @@ impl<T> Compat<T> {
/// let stdout = Compat::new(tokio::io::stdout());
/// let original = stdout.into_inner();
/// ```
pub fn into_inner(self) -> T {
pub fn into_inner(mut self) -> T {
self.inner
.take()
.expect("inner is only None when Compat is about to drop")
}
}
@ -302,7 +326,7 @@ impl<T: Future> Future for Compat<T> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let _guard = TOKIO1.enter();
self.project().inner.poll(cx)
self.get_pin_mut().poll(cx)
}
}
@ -313,7 +337,7 @@ impl<T: tokio::io::AsyncRead> futures_io::AsyncRead for Compat<T> {
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut buf = tokio::io::ReadBuf::new(buf);
ready!(self.project().inner.poll_read(cx, &mut buf))?;
ready!(self.get_pin_mut().poll_read(cx, &mut buf))?;
Poll::Ready(Ok(buf.filled().len()))
}
}
@ -325,7 +349,7 @@ impl<T: futures_io::AsyncRead> tokio::io::AsyncRead for Compat<T> {
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let unfilled = buf.initialize_unfilled();
let poll = self.project().inner.poll_read(cx, unfilled);
let poll = self.get_pin_mut().poll_read(cx, unfilled);
if let Poll::Ready(Ok(num)) = &poll {
buf.advance(*num);
}
@ -335,21 +359,21 @@ impl<T: futures_io::AsyncRead> tokio::io::AsyncRead for Compat<T> {
impl<T: tokio::io::AsyncBufRead> futures_io::AsyncBufRead for Compat<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.project().inner.poll_fill_buf(cx)
self.get_pin_mut().poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().inner.consume(amt)
self.get_pin_mut().consume(amt)
}
}
impl<T: futures_io::AsyncBufRead> tokio::io::AsyncBufRead for Compat<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.project().inner.poll_fill_buf(cx)
self.get_pin_mut().poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().inner.consume(amt)
self.get_pin_mut().consume(amt)
}
}
@ -359,15 +383,15 @@ impl<T: tokio::io::AsyncWrite> futures_io::AsyncWrite for Compat<T> {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
self.get_pin_mut().poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
self.get_pin_mut().poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_shutdown(cx)
self.get_pin_mut().poll_shutdown(cx)
}
}
@ -377,15 +401,15 @@ impl<T: futures_io::AsyncWrite> tokio::io::AsyncWrite for Compat<T> {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
self.get_pin_mut().poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
self.get_pin_mut().poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_close(cx)
self.get_pin_mut().poll_close(cx)
}
}
@ -396,10 +420,10 @@ impl<T: tokio::io::AsyncSeek> futures_io::AsyncSeek for Compat<T> {
pos: io::SeekFrom,
) -> Poll<io::Result<u64>> {
if self.seek_pos != Some(pos) {
self.as_mut().project().inner.start_seek(pos)?;
self.as_mut().get_pin_mut().start_seek(pos)?;
*self.as_mut().project().seek_pos = Some(pos);
}
let res = ready!(self.as_mut().project().inner.poll_complete(cx));
let res = ready!(self.as_mut().get_pin_mut().poll_complete(cx));
*self.as_mut().project().seek_pos = None;
Poll::Ready(res)
}
@ -422,7 +446,7 @@ impl<T: futures_io::AsyncSeek> tokio::io::AsyncSeek for Compat<T> {
}
Some(pos) => pos,
};
let res = ready!(self.as_mut().project().inner.poll_seek(cx, pos));
let res = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos));
*self.as_mut().project().seek_pos = None;
Poll::Ready(res)
}