Skip to content

Happens-before relationship between Publisher#subscribe() and Subscriber.onSubscribe()? #486

@NiteshKant

Description

@NiteshKant

Considering a simple example:

public class UnsafeSubscriber implements Subscriber<String> {
    private boolean duplicateOnSubscribe = false;

    @Override
    public void onSubscribe(final Subscription s) {
        if (duplicateOnSubscribe) {
            throw new IllegalStateException("Duplicate onSubscribe() calls.");
        }
        duplicateOnSubscribe = true;
    }

    @Override
    public void onNext(final String s) {

    }

    @Override
    public void onError(final Throwable t) {

    }

    @Override
    public void onComplete() {

    }
}

If an UnsafeSubscriber instance is created in a different thread than the one that invokes onSubscribe() (true for an asynchronous Publisher), according to the java memory model, this statement inside onSubscribe():

if (duplicateOnSubscribe) {

is guaranteed to compute to false if and only if the instance is published safely between these threads. None of the rules in the specifications establish a happens-before relationship between Publisher#subscribe() and Subscriber#onSubscribe(). So, the usage above can be categorized as unsafe. In a more convoluted form, the assignment:

private boolean duplicateOnSubscribe = false;

can be interleaved with

duplicateOnSubscribe = true; such that duplicateOnSubscribe is set to false later.

Has this been considered before or am I missing something?

Activity

viktorklang

viktorklang commented on Apr 3, 2020

@viktorklang
Contributor
NiteshKant

NiteshKant commented on Apr 4, 2020

@NiteshKant
Author

I do not think so. Take the below code as an example:

public class UnsafePublisher implements Publisher<String> {
    @Override
    public void subscribe(final Subscriber<? super String> subscriber) {
        // Assume unsafePublish does an unsafe publish of the Subscriber instance
        unsafePublish(subscriber);
    }
    
    private void unsafePublish(final Subscriber<? super String> subscriber) {
        // Assume we are here in a different thread to which subscriber instance was published unsafely.
        subscriber.onSubscribe(new Subscription() {
            private boolean done;
            @Override
            public void request(final long n) {
                if (!done) {
                    done = true;
                    subscriber.onNext("foo");
                    subscriber.onComplete();
                }
            }

            @Override
            public void cancel() {
                done = true;
            }
        });
    }
}

UnsafePublisher follows rule 1.3 i.e. it makes sure that all Subscriber methods are invoked serially. There is a happens-before relationship between onSubscribe() -> onNext() due to rule 2.11 which says receive of onSubscribe() happens before processing of onSubscribe().

However, due to the unsafe publishing of Subscriber instance there is no happens-before between subscribe() and onSubscribe() which means duplicateOnSubscribe in the original code still is racy.

viktorklang

viktorklang commented on Apr 4, 2020

@viktorklang
Contributor

@NiteshKant Wouldn't https://github.com/reactive-streams/reactive-streams-jvm#1.9 prevent the onSubscribe to be executed on some other thread, as it would not be executed under subscribe?

Scottmitch

Scottmitch commented on Apr 4, 2020

@Scottmitch
Contributor

https://github.com/reactive-streams/reactive-streams-jvm#1.9 is focused on the ordering of methods invoked on the Subscriber (e.g. onSubscribe must be first). The specification uses the term serial to require a "happens-before" relationship, and it isn't clear that 1.9 provides this (or if some other combination of rules is meant to imply this relationship). What I would expect is something like:

<1.12, or existing rule(s)?>:
A Publisher MUST establish a serial relationship between subscribe and its first interaction with the Subscriber (e.g. the onSubscribe method per 1.9).

rational:
This rule is intended to clarify that any local non-final state initialized for use in the Subscriber before the Publisher.subscribe call will be visible before the Publisher interacts with a Subscriber.

NiteshKant

NiteshKant commented on Apr 4, 2020

@NiteshKant
Author

@NiteshKant Wouldn't https://github.com/reactive-streams/reactive-streams-jvm#1.9 prevent the onSubscribe to be executed on some other thread, as it would not be executed under subscribe?

I don’t think 1.9 disallows onSubscribe() to be called from a different thread as long as the order of method calls to the Subscriber is as suggested by the spec. In my example I am just demonstrating how an unsafe publication of Subscriber can still lead to following the spec but unexpected behavior in the Subscriber.

The intent here is to see whether this aspect should be covered by the spec.

rkuhn

rkuhn commented on Apr 4, 2020

@rkuhn
Member

So far the creation of protocol entities as well as their internal structure and communication is not mentioned in the spec. It is reasonable to assume that a Subscriber does not need to be thread-safe given the provisions in §1.9, the same as objects delivered to onNext are reasonably expected to be normal (unsynchronized) POJOs. In this light I’d find it reckless to use unsafe publication in the implementation of a Publisher, and if I’m not mistaken this would be outside the JMM in any case. So for me the proposed new rule can already be derived from the existing ones.

@NiteshKant are you aware of any implementations that would be affected by such a new rule?


Side note: §1.9 is indeed ambiguous on whether onSubscribe must happen before subscribe returns — it is worded as though it should (though not crystal clear), but the intent clarification detracts from this provision.

NiteshKant

NiteshKant commented on Apr 4, 2020

@NiteshKant
Author

@rkuhn I would agree that it will be reckless to use unsafe publication of a subscriber instance inside a Publisher implementation but isn't one of the goals of a specification to ensure that implementors are not reckless? 🙂

The objects delivered to onNext doesn't have to worry about safe-publication (if processes asynchronously from within onNext) as Rule 2.11 prohibits unsafe publication.

are you aware of any implementations that would be affected by such a new rule?

I do not know of any implementation that would be negatively affected but I can point you to this change in ServiceTalk that will be positively impacted with regards to clarity in memory visibility.

Side note: §1.9 is indeed ambiguous on whether onSubscribe must happen before subscribe returns 

Actually I thought it is intentional for the spec to allow for delayed onSubscribe() signal after subscribe() returns. It certainly is useful to decouple sources that perform blocking work inside subscribe() from the caller of subscribe().

viktorklang

viktorklang commented on Apr 4, 2020

@viktorklang
Contributor

@rkuhn @NiteshKant You could also argue that since the spec doesn't mandate that creation of any of Publisher, Subscriber, Subscription, and Processor is safely published, the Publisher cannot assume that it can unsafely publish a Subscriber.

NiteshKant

NiteshKant commented on Apr 4, 2020

@NiteshKant
Author

@rkuhn @NiteshKant You could also argue that since the spec doesn't mandate that creation of any of Publisher, Subscriber, Subscription, and Processor is safely published, the Publisher cannot assume that it can unsafely publish a Subscriber.

Yes agreed! The question here is whether we should make this explicit in the spec that a Publisher should safely publish the Subscriber such that implementations of Subscribers (like the ServiceTalk one I referenced above) can assume memory visibility guarantees.

viktorklang

viktorklang commented on Apr 4, 2020

@viktorklang
Contributor

@NiteshKant I guess we could clarify the intent of 1.9?

NiteshKant

NiteshKant commented on Apr 4, 2020

@NiteshKant
Author

Yes that sounds like something that will be beneficial. Should I take a stab at the clarification?

viktorklang

viktorklang commented on Apr 4, 2020

@viktorklang
Contributor

@NiteshKant Yes, please do! :)

21 remaining items

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

      Participants

      @viktorklang@rkuhn@NiteshKant@devsr@OlegDokuka

      Issue actions

        Happens-before relationship between `Publisher#subscribe()` and `Subscriber.onSubscribe()`? · Issue #486 · reactive-streams/reactive-streams-jvm