Skip to content

Commit a8b853f

Browse files
committed
Use Pin<Box<S>> in BodyStream and SizedStream
Fixes actix#1321 A better fix would be to change `MessageBody` to take a `Pin<&mut Self>`, rather than a `Pin<&mut Self>`. This will avoid requiring the use of `Box` for all consumers by allowing the caller to determine how to pin the `MessageBody` implementation (e.g. via stack pinning). However, doing so is a breaking change that will affect every user of `MessageBody`. By pinning the inner stream ourselves, we can fix the undefined behavior without breaking the API. I've included @sebzim4500's reproduction case as a new test case. However, due to the nature of undefined behavior, this could pass (and not segfault) even if underlying issue were to regress. Unfortunately, until rust-lang/unsafe-code-guidelines#148 is resolved, it's not even possible to write a Miri test that will pass when the bug is fixed.
1 parent a2d4ff1 commit a8b853f

File tree

2 files changed

+33
-14
lines changed

2 files changed

+33
-14
lines changed

actix-http/src/body.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -360,10 +360,8 @@ impl MessageBody for String {
360360

361361
/// Type represent streaming body.
362362
/// Response does not contain `content-length` header and appropriate transfer encoding is used.
363-
#[pin_project]
364363
pub struct BodyStream<S, E> {
365-
#[pin]
366-
stream: S,
364+
stream: Pin<Box<S>>,
367365
_t: PhantomData<E>,
368366
}
369367

@@ -374,7 +372,7 @@ where
374372
{
375373
pub fn new(stream: S) -> Self {
376374
BodyStream {
377-
stream,
375+
stream: Box::pin(stream),
378376
_t: PhantomData,
379377
}
380378
}
@@ -390,29 +388,27 @@ where
390388
}
391389

392390
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
393-
unsafe { Pin::new_unchecked(self) }
394-
.project()
391+
self
395392
.stream
393+
.as_mut()
396394
.poll_next(cx)
397395
.map(|res| res.map(|res| res.map_err(std::convert::Into::into)))
398396
}
399397
}
400398

401399
/// Type represent streaming body. This body implementation should be used
402400
/// if total size of stream is known. Data get sent as is without using transfer encoding.
403-
#[pin_project]
404401
pub struct SizedStream<S> {
405402
size: u64,
406-
#[pin]
407-
stream: S,
403+
stream: Pin<Box<S>>,
408404
}
409405

410406
impl<S> SizedStream<S>
411407
where
412408
S: Stream<Item = Result<Bytes, Error>>,
413409
{
414410
pub fn new(size: u64, stream: S) -> Self {
415-
SizedStream { size, stream }
411+
SizedStream { size, stream: Box::pin(stream) }
416412
}
417413
}
418414

@@ -425,10 +421,7 @@ where
425421
}
426422

427423
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
428-
unsafe { Pin::new_unchecked(self) }
429-
.project()
430-
.stream
431-
.poll_next(cx)
424+
self.stream.as_mut().poll_next(cx)
432425
}
433426
}
434427

tests/test_weird_poll.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Regression test for #/1321
2+
3+
use futures::task::{noop_waker, Context};
4+
use futures::stream::once;
5+
use actix_http::body::{MessageBody, BodyStream};
6+
use bytes::Bytes;
7+
8+
#[test]
9+
fn weird_poll() {
10+
let (sender, receiver) = futures::channel::oneshot::channel();
11+
let mut body_stream = Ok(BodyStream::new(once(async {
12+
let x = Box::new(0);
13+
let y = &x;
14+
receiver.await.unwrap();
15+
let _z = **y;
16+
Ok::<_, ()>(Bytes::new())
17+
})));
18+
19+
let waker = noop_waker();
20+
let mut context = Context::from_waker(&waker);
21+
22+
let _ = body_stream.as_mut().unwrap().poll_next(&mut context);
23+
sender.send(()).unwrap();
24+
let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context);
25+
}
26+

0 commit comments

Comments
 (0)