Skip to content

[core][compiled graphs] Slow NCCL init on H200 server #53619

@dengwxn

Description

@dengwxn

What happened + What you expected to happen

We observe unexpected long init time (about 1 min) for NCCL communicators on our H200 server, with timing variations across different GPU combinations.

This is reproducible by launching two Ray actors, initing NCCL communicator, and sending data through P2P. The fastest pair is pair(1,2) that completes in 5 seconds. Other pairs complete in 30-80 seconds. We try both communicators from cupy and torch distributed, and they exhibit the same behaviors of slow init on some GPU pairs.

Our GPU spec is as follows.

nvidia-smi
Fri Jun  6 19:33:09 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 570.86.15              Driver Version: 570.86.15      CUDA Version: 12.8     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|=========================================+========================+======================|
|   0  NVIDIA H200                    Off |   00000000:03:00.0 Off |                    0 |
| N/A   31C    P0            115W /  700W |   87424MiB / 143771MiB |      0%      Default |
|                                         |                        |             Disabled |
+-----------------------------------------+------------------------+----------------------+

This is the result of using default cupy communicator with different GPU pairs.

# Using default cupy communicator
python -m issue.p2p.cupy.example --name p2p_bench
[INFO example.py:47 run_p2p] Running with CUDA devices 0,1...
2025-06-06 19:18:29,119 INFO worker.py:1879 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
2025-06-06 19:18:35,520 INFO torch_tensor_nccl_channel.py:772 -- Creating NCCL group 15b03da4-09d7-45ba-b934-35843cd6a3c5 on actors: [Actor(Actor, 85a29fabe01f09db55695d0a01000000), Actor(Actor, e07e45beebc9f6b9fad7322801000000)]
2025-06-06 19:19:01,036 INFO torch_tensor_nccl_channel.py:797 -- NCCL group initialized.
(Actor pid=287821) Actor 0 completed in 54707 ms
(Actor pid=287845) Actor 1 completed in 40414 ms
2025-06-06 19:19:31,556 INFO compiled_dag_node.py:2173 -- Tearing down compiled DAG
2025-06-06 19:19:31,558 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, e07e45beebc9f6b9fad7322801000000)
2025-06-06 19:19:31,558 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, 85a29fabe01f09db55695d0a01000000)
(Actor pid=287821) Destructing NCCL group on actor: Actor(Actor, e07e45beebc9f6b9fad7322801000000)
(Actor pid=287845) Destructing NCCL group on actor: Actor(Actor, 85a29fabe01f09db55695d0a01000000)
2025-06-06 19:19:32,395 INFO compiled_dag_node.py:2200 -- Waiting for worker tasks to exit
2025-06-06 19:19:32,395 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, e07e45beebc9f6b9fad7322801000000)
2025-06-06 19:19:32,396 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, 85a29fabe01f09db55695d0a01000000)
2025-06-06 19:19:32,396 INFO compiled_dag_node.py:2203 -- Teardown complete
[INFO example.py:47 run_p2p] Running with CUDA devices 1,2...
2025-06-06 19:19:39,563 INFO worker.py:1879 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
2025-06-06 19:19:46,023 INFO torch_tensor_nccl_channel.py:772 -- Creating NCCL group f1a338a0-4c8c-4345-8cc6-3fbcf00f35de on actors: [Actor(Actor, 74032e44d44ce4dc2a7679ab01000000), Actor(Actor, ca313a1dd9013a6449002e9201000000)]
2025-06-06 19:19:49,322 INFO torch_tensor_nccl_channel.py:797 -- NCCL group initialized.
(Actor pid=338089) Actor 0 completed in 4648 ms
(Actor pid=338090) Actor 1 completed in 4336 ms
2025-06-06 19:19:51,111 INFO compiled_dag_node.py:2173 -- Tearing down compiled DAG
2025-06-06 19:19:51,111 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, 74032e44d44ce4dc2a7679ab01000000)
2025-06-06 19:19:51,111 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, ca313a1dd9013a6449002e9201000000)
(Actor pid=338089) Destructing NCCL group on actor: Actor(Actor, 74032e44d44ce4dc2a7679ab01000000)
(Actor pid=338090) Destructing NCCL group on actor: Actor(Actor, ca313a1dd9013a6449002e9201000000)
2025-06-06 19:19:51,924 INFO compiled_dag_node.py:2200 -- Waiting for worker tasks to exit
2025-06-06 19:19:51,924 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, 74032e44d44ce4dc2a7679ab01000000)
2025-06-06 19:19:51,925 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, ca313a1dd9013a6449002e9201000000)
2025-06-06 19:19:51,925 INFO compiled_dag_node.py:2203 -- Teardown complete
[INFO example.py:47 run_p2p] Running with CUDA devices 2,3...
2025-06-06 19:20:00,373 INFO worker.py:1879 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
2025-06-06 19:20:06,642 INFO torch_tensor_nccl_channel.py:772 -- Creating NCCL group 8504a534-5431-4fd2-b876-c3aa8c6e4d27 on actors: [Actor(Actor, 4526187245950a790d3feaca01000000), Actor(Actor, e08582217004190b705b30ed01000000)]
2025-06-06 19:20:23,186 INFO torch_tensor_nccl_channel.py:797 -- NCCL group initialized.
(Actor pid=358683) Actor 0 completed in 35780 ms
(Actor pid=358680) Actor 1 completed in 26668 ms
2025-06-06 19:20:42,812 INFO compiled_dag_node.py:2173 -- Tearing down compiled DAG
2025-06-06 19:20:42,813 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, 4526187245950a790d3feaca01000000)
2025-06-06 19:20:42,813 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, e08582217004190b705b30ed01000000)
(Actor pid=358683) Destructing NCCL group on actor: Actor(Actor, 4526187245950a790d3feaca01000000)
(Actor pid=358680) Destructing NCCL group on actor: Actor(Actor, e08582217004190b705b30ed01000000)
2025-06-06 19:20:43,657 INFO compiled_dag_node.py:2200 -- Waiting for worker tasks to exit
2025-06-06 19:20:43,657 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, 4526187245950a790d3feaca01000000)
2025-06-06 19:20:43,657 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, e08582217004190b705b30ed01000000)
2025-06-06 19:20:43,657 INFO compiled_dag_node.py:2203 -- Teardown complete
[INFO example.py:47 run_p2p] Running with CUDA devices 3,4...
2025-06-06 19:20:51,255 INFO worker.py:1879 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
2025-06-06 19:20:57,800 INFO torch_tensor_nccl_channel.py:772 -- Creating NCCL group a6925f71-67d9-4043-8567-ae369554513f on actors: [Actor(Actor, 574b8685655782fc621afaa401000000), Actor(Actor, 522119d2b2b784c30daffe7001000000)]
2025-06-06 19:21:27,805 WARNING torch_tensor_nccl_channel.py:792 -- NCCL group creation not done after 30s. NCCL group creation may be hung.
2025-06-06 19:21:34,556 INFO torch_tensor_nccl_channel.py:797 -- NCCL group initialized.
(Actor pid=400570) Actor 1 completed in 60067 ms
(Actor pid=400584) Actor 0 completed in 83290 ms
2025-06-06 19:22:21,540 INFO compiled_dag_node.py:2173 -- Tearing down compiled DAG
2025-06-06 19:22:21,541 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, 574b8685655782fc621afaa401000000)
2025-06-06 19:22:21,541 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, 522119d2b2b784c30daffe7001000000)
(Actor pid=400570) Destructing NCCL group on actor: Actor(Actor, 522119d2b2b784c30daffe7001000000)
(Actor pid=400584) Destructing NCCL group on actor: Actor(Actor, 574b8685655782fc621afaa401000000)
2025-06-06 19:22:22,506 INFO compiled_dag_node.py:2200 -- Waiting for worker tasks to exit
2025-06-06 19:22:22,507 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, 574b8685655782fc621afaa401000000)
2025-06-06 19:22:22,507 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, 522119d2b2b784c30daffe7001000000)
2025-06-06 19:22:22,507 INFO compiled_dag_node.py:2203 -- Teardown complete

This is the result of using custom pytorch distributed communicator with different GPU pairs.

# Using custom pytorch distributed communicator
python -m issue.p2p.distributed.example --name p2p_bench
[INFO example.py:165 run_p2p] Running with CUDA devices 0,1...
2025-06-06 19:23:17,673 INFO worker.py:1879 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
(Actor pid=458461) Initializing process group for actor 1...
(Actor pid=458447) Initializing process group for actor 0...
(Actor pid=458461) Process group for actor 1 initialized in 1679 ms
(Actor pid=458447) Process group for actor 0 initialized in 1631 ms
2025-06-06 19:23:25,311 INFO torch_tensor_nccl_channel.py:770 -- Initializing custom NCCL group 205984d3-74a2-4c94-92cb-6e13678edc22 on actors: [Actor(Actor, 1cd49c662929441bcf07ea8701000000), Actor(Actor, 33bbc1f2b702144e243c7ae401000000)]
(Actor pid=458447) Initializing communicator for rank 0...
2025-06-06 19:23:26,342 INFO torch_tensor_nccl_channel.py:797 -- NCCL group initialized.
(Actor pid=458461) Initializing communicator for rank 1...
(Actor pid=458461) Actor 1 completed in 39547 ms
(Actor pid=458447) Actor 0 completed in 53486 ms
2025-06-06 19:24:18,449 INFO compiled_dag_node.py:2173 -- Tearing down compiled DAG
2025-06-06 19:24:18,450 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, 1cd49c662929441bcf07ea8701000000)
2025-06-06 19:24:18,450 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, 33bbc1f2b702144e243c7ae401000000)
2025-06-06 19:24:18,468 INFO compiled_dag_node.py:2200 -- Waiting for worker tasks to exit
2025-06-06 19:24:18,468 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, 1cd49c662929441bcf07ea8701000000)
2025-06-06 19:24:18,469 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, 33bbc1f2b702144e243c7ae401000000)
2025-06-06 19:24:18,469 INFO compiled_dag_node.py:2203 -- Teardown complete
[INFO example.py:165 run_p2p] Running with CUDA devices 1,2...
2025-06-06 19:24:26,012 INFO worker.py:1879 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
(Actor pid=498128) Initializing process group for actor 1...
(Actor pid=498127) Initializing process group for actor 0...
(Actor pid=498128) Process group for actor 1 initialized in 1213 ms
(Actor pid=498127) Process group for actor 0 initialized in 1310 ms
2025-06-06 19:24:32,900 INFO torch_tensor_nccl_channel.py:770 -- Initializing custom NCCL group c54ffa79-85db-4174-99c2-2ddfd8c019e9 on actors: [Actor(Actor, 10d1e79a95617dcdee188e5101000000), Actor(Actor, 8a3a520bc6193c3b4b7ce00701000000)]
(Actor pid=498127) Initializing communicator for rank 0...
2025-06-06 19:24:33,942 INFO torch_tensor_nccl_channel.py:797 -- NCCL group initialized.
(Actor pid=498128) Initializing communicator for rank 1...
(Actor pid=498128) Actor 1 completed in 4825 ms
(Actor pid=498127) Actor 0 completed in 5104 ms
2025-06-06 19:24:37,108 INFO compiled_dag_node.py:2173 -- Tearing down compiled DAG
2025-06-06 19:24:37,108 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, 10d1e79a95617dcdee188e5101000000)
2025-06-06 19:24:37,109 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, 8a3a520bc6193c3b4b7ce00701000000)
2025-06-06 19:24:37,126 INFO compiled_dag_node.py:2200 -- Waiting for worker tasks to exit
2025-06-06 19:24:37,126 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, 10d1e79a95617dcdee188e5101000000)
2025-06-06 19:24:37,126 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, 8a3a520bc6193c3b4b7ce00701000000)
2025-06-06 19:24:37,126 INFO compiled_dag_node.py:2203 -- Teardown complete
[INFO example.py:165 run_p2p] Running with CUDA devices 2,3...
2025-06-06 19:24:44,687 INFO worker.py:1879 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
(Actor pid=517743) Initializing process group for actor 0...
(Actor pid=517761) Initializing process group for actor 1...
(Actor pid=517743) Process group for actor 0 initialized in 1287 ms
(Actor pid=517761) Process group for actor 1 initialized in 1226 ms
2025-06-06 19:24:52,155 INFO torch_tensor_nccl_channel.py:770 -- Initializing custom NCCL group f1765a53-2599-4df9-8091-4d1df4f4869d on actors: [Actor(Actor, ab539af56c5fb861e97c6ec801000000), Actor(Actor, eac24c74e5209284f1a56b5501000000)]
(Actor pid=517743) Initializing communicator for rank 0...
2025-06-06 19:24:53,251 INFO torch_tensor_nccl_channel.py:797 -- NCCL group initialized.
(Actor pid=517761) Initializing communicator for rank 1...
(Actor pid=517743) Actor 0 completed in 36049 ms
(Actor pid=517761) Actor 1 completed in 26885 ms
2025-06-06 19:25:27,334 INFO compiled_dag_node.py:2173 -- Tearing down compiled DAG
2025-06-06 19:25:27,335 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, ab539af56c5fb861e97c6ec801000000)
2025-06-06 19:25:27,335 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, eac24c74e5209284f1a56b5501000000)
2025-06-06 19:25:27,354 INFO compiled_dag_node.py:2200 -- Waiting for worker tasks to exit
2025-06-06 19:25:27,354 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, ab539af56c5fb861e97c6ec801000000)
2025-06-06 19:25:27,355 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, eac24c74e5209284f1a56b5501000000)
2025-06-06 19:25:27,355 INFO compiled_dag_node.py:2203 -- Teardown complete
[INFO example.py:165 run_p2p] Running with CUDA devices 3,4...
2025-06-06 19:25:35,086 INFO worker.py:1879 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
(Actor pid=561114) Initializing process group for actor 0...
(Actor pid=561119) Initializing process group for actor 1...
(Actor pid=561119) Process group for actor 1 initialized in 1235 ms
(Actor pid=561114) Process group for actor 0 initialized in 1348 ms
2025-06-06 19:25:42,240 INFO torch_tensor_nccl_channel.py:770 -- Initializing custom NCCL group 51a2ec1b-91d4-4f11-aeb0-b7eb305a911f on actors: [Actor(Actor, f8fb433041160173f7ef3ef501000000), Actor(Actor, 861db754116193616a9caf0b01000000)]
(Actor pid=561114) Initializing communicator for rank 0...
2025-06-06 19:25:43,279 INFO torch_tensor_nccl_channel.py:797 -- NCCL group initialized.
(Actor pid=561119) Initializing communicator for rank 1...
(Actor pid=561119) Actor 1 completed in 61155 ms
(Actor pid=561114) Actor 0 completed in 84962 ms
2025-06-06 19:27:06,288 INFO compiled_dag_node.py:2173 -- Tearing down compiled DAG
2025-06-06 19:27:06,289 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, f8fb433041160173f7ef3ef501000000)
2025-06-06 19:27:06,289 INFO compiled_dag_node.py:2178 -- Cancelling compiled worker on actor: Actor(Actor, 861db754116193616a9caf0b01000000)
2025-06-06 19:27:06,310 INFO compiled_dag_node.py:2200 -- Waiting for worker tasks to exit
2025-06-06 19:27:06,310 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, f8fb433041160173f7ef3ef501000000)
2025-06-06 19:27:06,311 INFO compiled_dag_node.py:2161 -- Killing actor: Actor(Actor, 861db754116193616a9caf0b01000000)
2025-06-06 19:27:06,311 INFO compiled_dag_node.py:2203 -- Teardown complete

Versions / Dependencies

python: 3.9.22
ray: 2.46.0
torch: 2.6.0
cupy-cuda12x: 13.4.1

Reproduction script

Shell scripts for env.

conda create -c conda-forge python=3.9 -n py39
conda activate py39
pip install -r requirements.txt

# Using default cupy communicator
python -m issue.p2p.cupy.example --name p2p_bench

# Using custom pytorch distributed communicator
python -m issue.p2p.distributed.example --name p2p_bench

Python pip dependency.

# requirements.txt
cupy-cuda12x; sys_platform != 'darwin'
fire
ipykernel
jupyter
numpy
pandas
pyarrow
ray
torch

Ray code using default cupy communicator.

# issue/p2p/cupy/example.py
import logging
import os

os.environ["RAY_DEDUP_LOGS"] = "0"

import time

import fire
import ray
import torch
from ray.dag import InputNode

from common.time import get_time_perf_counter, secs_to_millis

logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="[%(levelname)s %(filename)s:%(lineno)d %(funcName)s] %(message)s",
)


@ray.remote(num_gpus=1)
class Actor:
    def __init__(self, rank: int, world_size: int):
        self.rank = rank
        self.world_size = world_size
        self.start = get_time_perf_counter()

    def run_send(self, _):
        tensor = torch.zeros(1).to("cuda:0")
        tensor.fill_(1.0)
        self.end = get_time_perf_counter(sync=True)
        return tensor

    def run_recv(self, tensor: torch.Tensor):
        tensor += 1
        self.end = get_time_perf_counter(sync=True)
        return tensor

    def get_time(self):
        elapse_ms = secs_to_millis(self.end - self.start)
        logger.warning(f"Actor {self.rank} completed in {elapse_ms} ms")
        time.sleep(1)


def run_p2p(devices: str = "1,2"):
    logger.info(f"Running with CUDA devices {devices}...")
    os.environ["CUDA_VISIBLE_DEVICES"] = devices
    ray.init()

    actors = [Actor.remote(i, 2) for i in range(2)]

    with InputNode() as inp:
        dag = actors[0].run_send.bind(inp).with_tensor_transport(transport="nccl")
        dag = actors[1].run_recv.bind(dag)
        dag = actors[1].run_send.bind(dag).with_tensor_transport(transport="nccl")
        dag = actors[0].run_recv.bind(dag)

    compiled_dag = dag.experimental_compile()

    ray.get(compiled_dag.execute(None), timeout=120)
    ray.get([actor.get_time.remote() for actor in actors], timeout=120)

    ray.shutdown()


def run_p2p_bench():
    for devices in ["0,1", "1,2", "2,3", "3,4"]:
        run_p2p(devices)


def main(name: str):
    if name == "p2p":
        run_p2p()
    elif name == "p2p_bench":
        run_p2p_bench()
    else:
        logger.error(f"Unknown name: {name}")


if __name__ == "__main__":
    fire.Fire(main)

Ray code using custom pytorch distributed communicator.

# issue/p2p/distributed/example.py
import logging
import os

os.environ["RAY_DEDUP_LOGS"] = "0"

import time
from typing import List, Optional, Tuple

import fire
import ray
import torch
import torch.distributed as dist
from ray.air._internal import torch_utils
from ray.dag import InputNode
from ray.experimental.channel.communicator import Communicator, TorchTensorAllocator
from ray.experimental.util.types import ReduceOp

from common.time import get_time_perf_counter, secs_to_millis

logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="[%(levelname)s %(filename)s:%(lineno)d %(funcName)s] %(message)s",
)


class TorchDistCommunicator(Communicator):
    import cupy as cp

    def __init__(self, world_size, actor_handles):
        self._world_size = world_size
        self._actor_handles = actor_handles
        self._rank = None

    def initialize(self, rank: int) -> None:
        logger.warning(f"Initializing communicator for rank {rank}...")
        expected_rank = self.get_rank(ray.get_runtime_context().current_actor)
        assert (
            rank == expected_rank
        ), f"NCCL actor's rank {rank} does not match expected rank {expected_rank}"
        self._rank = rank
        self._device = torch_utils.get_devices()[0]

    def get_rank(self, actor: ray.actor.ActorHandle) -> int:
        actor_ids = [a._ray_actor_id for a in self._actor_handles]
        try:
            rank = actor_ids.index(actor._ray_actor_id)
        except ValueError:
            raise ValueError("Actor is not in the NCCL group.")
        return rank

    def get_world_size(self) -> int:
        return self._world_size

    def get_self_rank(self) -> Optional[int]:
        return self._rank

    def get_actor_handles(self) -> List["ray.actor.ActorHandle"]:
        return self._actor_handles

    def send(self, value: "torch.Tensor", peer_rank: int) -> None:
        dist.send(value, peer_rank)

    def recv(
        self,
        shape: Tuple[int],
        dtype: "torch.dtype",
        peer_rank: int,
        allocator: Optional[TorchTensorAllocator] = None,
    ) -> "torch.Tensor":
        tensor = torch.empty(torch.Size(shape), dtype=dtype, device=self._device)
        dist.recv(tensor, peer_rank)
        return tensor

    def allgather(
        self,
        send_buf: "torch.Tensor",
        recv_buf: "torch.Tensor",
    ) -> None:
        raise NotImplementedError

    def allreduce(
        self,
        send_buf: "torch.Tensor",
        recv_buf: "torch.Tensor",
        op: ReduceOp = ReduceOp.SUM,
    ) -> None:
        raise NotImplementedError

    def reducescatter(
        self,
        send_buf: "torch.Tensor",
        recv_buf: "torch.Tensor",
        op: ReduceOp = ReduceOp.SUM,
    ) -> None:
        raise NotImplementedError

    @property
    def recv_stream(self) -> Optional["cp.cuda.ExternalStream"]:
        import cupy as cp

        return cp.cuda.get_current_stream()

    @property
    def send_stream(self) -> Optional["cp.cuda.ExternalStream"]:
        import cupy as cp

        return cp.cuda.get_current_stream()

    @property
    def coll_stream(self) -> Optional["cp.cuda.ExternalStream"]:
        import cupy as cp

        return cp.cuda.get_current_stream()

    def destroy(self) -> None:
        pass

    def get_transport_name(self) -> str:
        return "nccl"


@ray.remote(num_gpus=1)
class Actor:
    def __init__(self, rank: int, world_size: int):
        self.rank = rank
        self.world_size = world_size
        self.start = get_time_perf_counter()

    def init_process_group(self):
        os.environ["MASTER_ADDR"] = "localhost"
        os.environ["MASTER_PORT"] = "12345"

        logger.warning(f"Initializing process group for actor {self.rank}...")
        start_init = get_time_perf_counter()
        self.process_group = dist.init_process_group(
            backend="nccl",
            rank=self.rank,
            world_size=self.world_size,
        )
        end_init = get_time_perf_counter(sync=True)
        elapse_init_ms = secs_to_millis(end_init - start_init)
        logger.warning(
            f"Process group for actor {self.rank} initialized in {elapse_init_ms} ms"
        )

    def run_send(self, _):
        tensor = torch.zeros(1).to("cuda:0")
        tensor.fill_(1.0)
        self.end = get_time_perf_counter(sync=True)
        return tensor

    def run_recv(self, tensor: torch.Tensor):
        tensor += 1
        self.end = get_time_perf_counter(sync=True)
        return tensor

    def get_time(self):
        elapse_ms = secs_to_millis(self.end - self.start)
        logger.warning(f"Actor {self.rank} completed in {elapse_ms} ms")
        time.sleep(1)


def run_p2p(devices: str = "1,2"):
    logger.info(f"Running with CUDA devices {devices}...")
    os.environ["CUDA_VISIBLE_DEVICES"] = devices
    ray.init()

    actors = [Actor.remote(i, 2) for i in range(2)]
    ray.get([actor.init_process_group.remote() for actor in actors], timeout=120)
    communicator = TorchDistCommunicator(2, actors)

    with InputNode() as inp:
        dag = actors[0].run_send.bind(inp).with_tensor_transport(transport=communicator)
        dag = actors[1].run_recv.bind(dag)
        dag = actors[1].run_send.bind(dag).with_tensor_transport(transport=communicator)
        dag = actors[0].run_recv.bind(dag)

    compiled_dag = dag.experimental_compile()

    ray.get(compiled_dag.execute(None), timeout=120)
    ray.get([actor.get_time.remote() for actor in actors], timeout=120)

    ray.shutdown()


def run_p2p_bench():
    for devices in ["0,1", "1,2", "2,3", "3,4"]:
        run_p2p(devices)


def main(name: str):
    if name == "p2p":
        run_p2p()
    elif name == "p2p_bench":
        run_p2p_bench()
    else:
        logger.error(f"Unknown name: {name}")


if __name__ == "__main__":
    fire.Fire(main)

Issue Severity

Medium: It is a significant difficulty but I can work around it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    P1Issue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tcompiled-graphscoreIssues that should be addressed in Ray CoregpuGPU related issuesperformance

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions