Skip to content

Support for Stream::for_each_concurrent()? #477

@aretsan

Description

@aretsan

futures has a method for_each_concurrent() on StreamExt. It functions basically the same as for_each() but the future to be executed doesn't block the stream. Is it possible to implement the function in async-std as well?

Activity

yoshuawuyts

yoshuawuyts commented on Nov 9, 2019

@yoshuawuyts
Contributor

@aretsan Thanks for opening this! We probably want to expose something similar to that, but not exactly that -- similar to FuturesUnordered and Future::join_all. I suspect we can do somewhat better API-wise (:

schulace

schulace commented on Nov 12, 2019

@schulace

The lack of this feature is a regression. Since 0.99 re-exported futures-preview which contained futures-preview's Stream, for_each_concurrent worked on 0.99 provided you also had futures-preview. Now, this doesn't work at all since you rewrote the traits and your Stream is no longer compatible with futures::stream::Stream and futures::stream::StreamExt.

yoshuawuyts

yoshuawuyts commented on Nov 12, 2019

@yoshuawuyts
Contributor

@schulace futures 0.3 has been released, replacing futures-preview. We've since upgraded to it and are indeed no longer compatible with futures-preview. This was part of our 0.99.12 release.

https://docs.rs/futures/0.3.1/futures/

schulace

schulace commented on Nov 14, 2019

@schulace

Futures contains this same method for StreamExt. Is there a way to get your library to play nicely with futures or are you taking the tokio approach and saying "we'll roll our own (nearly) identical traits that don't work with anything else"

yoshuawuyts

yoshuawuyts commented on Nov 14, 2019

@yoshuawuyts
Contributor

@schulace that's some harsh phrasing; but the answer is no. Our traits are re-exports from futures-rs. The only difference with futures-rs traits is that define our own custom Ext traits that we export from the async_std::prelude::*.

It should be possible to use the async-std prelude and futures prelude side-by-side.

lqf96

lqf96 commented on Dec 11, 2019

@lqf96
Contributor

@yoshuawuyts I'm also wondering about the reason to implement many Stream combinators again, such as map or filter, which are already implemented in futures_util::stream::StreamExt. Should we remove these duplicated implementations and only keep the ones unique to async-rs or should we keep all of them?

BTW, it seems that async-rs lacks stream combinators that handle Result<T, E>, which are covered by futures_util::stream::TryStreamExt. I think we should just reuse futures_utils's implementation rather than reinventing the wheel here.

yoshuawuyts

yoshuawuyts commented on Dec 13, 2019

@yoshuawuyts
Contributor

@lqf96 the design so far has been to achieve parity between std::iter::Iterator's adapters and async_std::stream::Stream. The only real additions we've authored so far have been related to time and timeouts, which in its general form is a concept that's somewhat unique to asynchronous Rust.

Both futures and async-std build on top of futures-core and futures-io. We expose different adapters but are completely compatible, and generally would recommend choosing either async-std or futures to fill the need for adapters. Hope that makes sense!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    api designOpen design questions

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

      Development

      No branches or pull requests

        Participants

        @yoshuawuyts@lqf96@schulace@aretsan

        Issue actions

          Support for Stream::for_each_concurrent()? · Issue #477 · async-rs/async-std