Skip to content

Conversation

@MoritzArena
Copy link
Contributor

Problem Description

Ref: #15660

The MutinyClientCallsTest.testOneToManyReturnsMultiAndEmitsItems() test case encounters a NullPointerException in certain environments:

Exception in thread "Thread-2" java.lang.NullPointerException: 
Cannot invoke "java.util.concurrent.Flow$Subscriber.onNext(Object)" because "this.downstream" is null
	at org.apache.dubbo.mutiny.AbstractTripleMutinyPublisher.onNext(AbstractTripleMutinyPublisher.java:126)

This error occurs when the test attempts to call publisher.onNext() before the downstream subscriber is properly established, resulting in a null downstream reference.

Root Cause Analysis

Why Local Tests Pass

In local development environments, tests typically pass due to:

  • Sufficient CPU resources: Local machines usually have adequate processing power
  • Lower system load: Fewer competing processes and background tasks
  • Predictable timing: Consistent thread scheduling allows the subscription to complete before data emission
  • JVM optimizations: Local JVMs may have different optimization profiles

Why GitHub CI Fails Intermittently

GitHub CI environments are prone to this race condition because of:

  • Resource constraints: Shared CPU resources with limited allocation per container
  • Variable system load: Multiple builds and processes running concurrently
  • Virtualization overhead: Additional scheduling delays in containerized environments
  • Thread scheduling variance: Less predictable timing due to resource competition

The core issue is a race condition where the data-emitting thread starts before the Multi subscription is fully established.

Solution Comparison

Approach 1: Original Code (Problematic)

publisher.onSubscribe(fakeSubscription);

new Thread(() -> {
    publisher.onNext("item1");
    publisher.onNext("item2");
    publisher.onCompleted();
}).start();

Related issues:

  • Race condition between subscription and data emission
  • Environment-dependent timing
  • No synchronization mechanism

Approach 2: Thread.sleep() (Workaround)

publisher.onSubscribe(fakeSubscription);

new Thread(() -> {
    try {
        Thread.sleep(50); // Wait for subscription to complete
        publisher.onNext("item1");
        publisher.onNext("item2");
        publisher.onCompleted();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        publisher.onError(e);
    }
}).start();

Characteristics:

  • Reduces race condition probability but doesn't eliminate it
  • Still relies on timing assumptions
  • Minimal code changes
  • Not guaranteed to work in all environments

Approach 3: AssertSubscriber with CountDownLatch (Recommended)

CountDownLatch subscribed = new CountDownLatch(1);

// In the mock setup
new Thread(() -> {
    try {
        if (subscribed.await(2, TimeUnit.SECONDS)) {
            publisher.onNext("item1");
            publisher.onNext("item2");
            publisher.onCompleted();
        } else {
            publisher.onError(new IllegalStateException("Downstream not subscribed"));
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        publisher.onError(e);
    }
}).start();

// In the test execution
AssertSubscriber<String> sub = AssertSubscriber.create(Long.MAX_VALUE);
multiResponse.subscribe().withSubscriber(sub);
sub.awaitSubscription();    // Ensure subscription is complete
subscribed.countDown();     // Signal that data can be emitted

Advantages:

  • Completely eliminates race condition
  • Environment-independent
  • Uses Mutiny's native testing tools
  • Clear synchronization semantics
  • Timeout protection

Final Solution Implementation

Here's the complete fixed test method:

@Test
void testOneToManyReturnsMultiAndEmitsItems() {
    Invoker<Object> invoker = Mockito.mock(Invoker.class);
    StubMethodDescriptor method = Mockito.mock(StubMethodDescriptor.class);

    try (MockedStatic<StubInvocationUtil> mocked = Mockito.mockStatic(StubInvocationUtil.class)) {
        AtomicBoolean stubCalled = new AtomicBoolean(false);
        CountDownLatch subscribed = new CountDownLatch(1);

        mocked.when(() -> StubInvocationUtil.serverStreamCall(
                        Mockito.eq(invoker), Mockito.eq(method), Mockito.eq("testRequest"), Mockito.any()))
                .thenAnswer(invocation -> {
                    stubCalled.set(true);
                    ClientTripleMutinyPublisher<String> publisher = invocation.getArgument(3);

                    CallStreamObserver<String> fakeSubscription = new CallStreamObserver<>() {
                        @Override
                        public void request(int n) {
                            /* no-op */
                        }

                        @Override
                        public void setCompression(String compression) {}

                        @Override
                        public void disableAutoFlowControl() {}

                        @Override
                        public void onNext(String v) {
                            publisher.onNext(v);
                        }

                        @Override
                        public void onError(Throwable t) {
                            publisher.onError(t);
                        }

                        @Override
                        public void onCompleted() {
                            publisher.onCompleted();
                        }
                    };
                    publisher.onSubscribe(fakeSubscription);

                    // Wait for downstream subscription to complete before emitting data
                    new Thread(() -> {
                                try {
                                    if (subscribed.await(5, TimeUnit.SECONDS)) {
                                        publisher.onNext("item1");
                                        publisher.onNext("item2");
                                        publisher.onCompleted();
                                    } else {
                                        publisher.onError(
                                                new IllegalStateException("Downstream subscription timeout"));
                                    }
                                } catch (InterruptedException e) {
                                    Thread.currentThread().interrupt();
                                    publisher.onError(e);
                                }
                            })
                            .start();

                    return null;
                });

        Uni<String> uniRequest = Uni.createFrom().item("testRequest");
        Multi<String> multiResponse = MutinyClientCalls.oneToMany(invoker, uniRequest, method);

        // Use AssertSubscriber to ensure proper subscription timing
        AssertSubscriber<String> subscriber = AssertSubscriber.create(Long.MAX_VALUE);
        multiResponse.subscribe().withSubscriber(subscriber);

        // Wait for subscription to be established
        subscriber.awaitSubscription();
        subscribed.countDown(); // Signal that data emission can begin

        // Wait for completion
        subscriber.awaitCompletion(Duration.ofSeconds(5));

        // Verify results
        Assertions.assertTrue(stubCalled.get(), "StubInvocationUtil.serverStreamCall should be called");
        Assertions.assertEquals(List.of("item1", "item2"), subscriber.getItems());
        subscriber.assertCompleted();
    }
}

Key Benefits of This Solution

  1. Deterministic Execution: Guarantees that data emission only occurs after subscription is complete
  2. Framework Integration: Uses Mutiny's AssertSubscriber for proper reactive stream testing
  3. Environment Agnostic: Works consistently across different environments and system loads
  4. Timeout Protection: Includes safeguards against indefinite blocking
  5. Clear Intent: Makes the synchronization requirements explicit and maintainable

This solution addresses the fundamental race condition while maintaining the test's original intent and providing robust execution guarantees across all environments.

Checklist

  • Make sure there is a GitHub_issue field for the change.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Write necessary unit-test to verify your logic correction. If the new feature or significant change is committed, please remember to add sample in dubbo samples project.
  • Make sure gitHub actions can pass. Why the workflow is failing and how to fix it?

@codecov-commenter
Copy link

codecov-commenter commented Sep 1, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 61.01%. Comparing base (5b9adb0) to head (4615517).

Additional details and impacted files
@@             Coverage Diff              @@
##                3.3   #15662      +/-   ##
============================================
- Coverage     61.06%   61.01%   -0.05%     
- Complexity    11704    11724      +20     
============================================
  Files          1923     1923              
  Lines         87069    87069              
  Branches      13112    13112              
============================================
- Hits          53165    53128      -37     
- Misses        28465    28499      +34     
- Partials       5439     5442       +3     
Flag Coverage Δ
integration-tests-java21 32.91% <ø> (+<0.01%) ⬆️
integration-tests-java8 33.03% <ø> (+0.07%) ⬆️
samples-tests-java21 32.60% <ø> (-0.04%) ⬇️
samples-tests-java8 30.31% <ø> (-0.05%) ⬇️
unit-tests-java11 59.00% <ø> (-0.01%) ⬇️
unit-tests-java17 58.74% <ø> (-0.02%) ⬇️
unit-tests-java21 58.75% <ø> (-0.02%) ⬇️
unit-tests-java8 58.98% <ø> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a race condition in the Mutiny test case that was causing intermittent NullPointerException failures in CI environments. The issue occurred when data emission started before the downstream subscriber was properly established.

Key changes:

  • Implemented proper synchronization using CountDownLatch to coordinate subscription and data emission timing
  • Replaced direct collection with AssertSubscriber for better reactive stream testing
  • Added timeout protection and proper error handling for interrupted threads

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +106 to +107
public void request(int n) {
/* no-op */
Copy link

Copilot AI Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The comment '/* no-op */' should be more descriptive to explain why this method is intentionally empty in the test context.

Suggested change
public void request(int n) {
/* no-op */
// Intentionally left empty: flow control is not required for this test's mock observer.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@zrlw zrlw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@oxsean oxsean merged commit 07e1c2a into apache:3.3 Sep 3, 2025
32 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants