Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/checkpoint_converter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
NO_PROXY: "localhost,127.0.0.1"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down Expand Up @@ -81,7 +81,7 @@ jobs:
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
HF_ENDPOINT: "https://hf-mirror.com"
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/dataset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/disabled/e2e_prime.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e_dapo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e_eval_aime24.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e_ppo_trainer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e_ppo_trainer_megatron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ jobs:
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e_sft.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/kernels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
NO_PROXY: "localhost,127.0.0.1"
HF_HUB_ENABLE_HF_TRANSFER: 1
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/model.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down Expand Up @@ -93,7 +93,7 @@ jobs:
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ray_gpu_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/sandbox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
HF_ENDPOINT: "https://hf-mirror.com"
HF_HUB_ENABLE_HF_TRANSFER: "0" # This is more stable
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/utils_gpu_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
runs-on: [L20x8]
timeout-minutes: 20 # Increase this timeout value as needed
container:
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6-mcore0.12.0-te2.3
image: whatcanyousee/verl:ngc-cu124-vllm0.8.5-sglang0.4.6.post5-mcore0.12.0-te2.3
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
6 changes: 4 additions & 2 deletions examples/grpo_trainer/run_qwen2-7b_seq_balance.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ set -x
# export VLLM_ATTENTION_BACKEND=XFORMERS

# For async rollout mode, dataset should return raw chat.
rollout_mode="sync"
rollout_mode="async"
rollout_name="sglang" # sglang or vllm
if [ "$rollout_mode" = "async" ]; then
export VLLM_USE_V1=1
return_raw_chat="True"
chat_scheduler=examples.ppo_trainer.naive_chat_scheduler.NaiveChatCompletionScheduler
fi
Expand Down Expand Up @@ -34,7 +36,7 @@ python3 -m verl.trainer.main_ppo \
actor_rollout_ref.actor.fsdp_config.param_offload=False \
actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \
actor_rollout_ref.rollout.tensor_model_parallel_size=2 \
actor_rollout_ref.rollout.name=vllm \
actor_rollout_ref.rollout.name=$rollout_name \
actor_rollout_ref.rollout.mode=$rollout_mode \
actor_rollout_ref.rollout.chat_scheduler=$chat_scheduler \
actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/ppo_trainer/run_function_reward.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ MAX_PROMPT_LEN=${MAX_PROMPT_LEN:-512}
MAX_RESPONSE_LEN=${MAX_RESPONSE_LEN:-512}

ENGINE=${ENGINE:-vllm}
ROLLOUT_MODE=${ROLLOUT_MODE:-sync}
GPU_MEMORY_UTILIZATION=${GPU_MEMORY_UTILIZATION:-0.8}
ACTOR_FSDP_PARAM_OFFLOAD=${ACTOR_FSDP_PARAM_OFFLOAD:-False}
ACTOR_FSDP_OPTIMIZER_OFFLOAD=${ACTOR_FSDP_OPTIMIZER_OFFLOAD:-False}
Expand Down Expand Up @@ -101,6 +102,7 @@ python3 -m verl.trainer.main_ppo \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=${train_traj_micro_bsz_per_gpu} \
actor_rollout_ref.rollout.tensor_model_parallel_size=2 \
actor_rollout_ref.rollout.name="${ENGINE}" \
actor_rollout_ref.rollout.mode="${ROLLOUT_MODE}" \
actor_rollout_ref.rollout.load_format=${LOAD_FORMAT} \
actor_rollout_ref.rollout.layered_summon=${LAYERED_SUMMON} \
actor_rollout_ref.rollout.gpu_memory_utilization="${GPU_MEMORY_UTILIZATION}" \
Expand Down
61 changes: 49 additions & 12 deletions tests/workers/rollout/test_async_sglang_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,6 @@
},
)
class TestAsyncSglangServer:
@pytest.fixture
def mock_ray_actor(self):
mock_actor = MagicMock()
mock_actor.execute_method.remote = AsyncMock(return_value={"content": "mocked response"})
mock_actor.resume.remote = AsyncMock()
mock_actor.offload.remote = AsyncMock()
return mock_actor

@pytest.fixture
def server_config(self):
return DictConfig({"rollout": {"tensor_model_parallel_size": 2}})
Expand All @@ -41,22 +33,67 @@ def server_config(self):
@patch("verl.workers.rollout.sglang_rollout.async_sglang_server.ray.util.list_named_actors")
@patch("verl.workers.rollout.async_server.AsyncServerBase._start_fastapi_server", new_callable=AsyncMock)
@pytest.mark.filterwarnings("ignore:Ray state API is no longer experimental:DeprecationWarning")
async def test_init_engine(self, mock_start_fastapi_server, mock_list_actors, server_config, mock_ray_actor):
async def test_init_engine(self, mock_start_fastapi_server, mock_list_actors, server_config):
mock_list_actors.return_value = [
{"name": "test_prefixWorkerDict_1:0", "namespace": "test"},
{"name": "test_prefixWorkerDict_1:1", "namespace": "test"},
{"name": "test_prefixWorkerDict_0:0", "namespace": "test"},
{"name": "test_prefixWorkerDict_0:1", "namespace": "test"},
{"name": "test_prefixWorkerDict_1:2", "namespace": "test"},
{"name": "test_prefixWorkerDict_1:3", "namespace": "test"},
{"name": "test_prefixWorkerDict_0:2", "namespace": "test"},
{"name": "test_prefixWorkerDict_0:3", "namespace": "test"},
]
from verl.workers.rollout.sglang_rollout.async_sglang_server import AsyncSglangServer

ActualClassToInstantiate = AsyncSglangServer
if hasattr(AsyncSglangServer, "__ray_metadata__") and hasattr(AsyncSglangServer.__ray_metadata__, "modified_class"):
ActualClassToInstantiate = AsyncSglangServer.__ray_metadata__.modified_class

with patch("verl.workers.rollout.sglang_rollout.async_sglang_server.ray.get_actor", return_value=mock_ray_actor):
instance = ActualClassToInstantiate(server_config, 2, 0, "test_prefix")
def mock_get_actor_side_effect(name, namespace=None):
# Create a new mock actor for each call
actor_mock = MagicMock()

# Support .name attribute access
actor_mock.name = name # Use 'name' here

# Support ['name'] item access by mocking __getitem__
def getitem_mock(key):
if key == "name":
return name # Use 'name' here
# For other keys, return a new MagicMock to mimic default behavior or raise KeyError
# Returning a MagicMock is consistent with the original error's cause for unmocked keys
return MagicMock(name=f"mock.__getitem__('{key}')")

actor_mock.__getitem__.side_effect = getitem_mock

return actor_mock

# Verify instance.workers is correctly populated
with patch("verl.workers.rollout.sglang_rollout.async_sglang_server.ray.get_actor", side_effect=mock_get_actor_side_effect):
# Instance 1
instance = ActualClassToInstantiate(server_config, 4, 0, "test_prefix")
await instance.init_engine()

assert len(instance.workers) == 2
assert instance.master_worker["name"] == "test_prefixWorkerDict_0:0"
assert instance.workers[0].name == "test_prefixWorkerDict_0:0"
assert instance.workers[1].name == "test_prefixWorkerDict_0:1"

# Instance 2
instance = ActualClassToInstantiate(server_config, 4, 1, "test_prefix")
await instance.init_engine()

assert len(instance.workers) == 2
assert instance.master_worker["name"] == "test_prefixWorkerDict_0:2"
assert instance.workers[0].name == "test_prefixWorkerDict_0:2"
assert instance.workers[1].name == "test_prefixWorkerDict_0:3"

# Instance 3
instance = ActualClassToInstantiate(server_config, 4, 3, "test_prefix")
await instance.init_engine()

# Verify instance.workers is correctly populated
assert len(instance.workers) == 2
assert instance.master_worker["name"] == "test_prefixWorkerDict_1:2"
assert instance.workers[0].name == "test_prefixWorkerDict_1:2"
assert instance.workers[1].name == "test_prefixWorkerDict_1:3"
4 changes: 2 additions & 2 deletions verl/trainer/main_ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ def run(self, config):
elif config.actor_rollout_ref.actor.strategy == "megatron":
assert config.actor_rollout_ref.actor.strategy == config.critic.strategy
from verl.single_controller.ray.megatron import NVMegatronRayWorkerGroup
from verl.workers.megatron_workers import ActorRolloutRefWorker, CriticWorker
from verl.workers.megatron_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker

actor_rollout_cls = ActorRolloutRefWorker
actor_rollout_cls = AsyncActorRolloutRefWorker if config.actor_rollout_ref.rollout.mode == "async" else ActorRolloutRefWorker
ray_worker_group_cls = NVMegatronRayWorkerGroup

else:
Expand Down
17 changes: 13 additions & 4 deletions verl/workers/fsdp_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1430,10 +1430,19 @@ def execute_method(self, method: Union[str, bytes], *args, **kwargs):
print(f"[DP={self.vllm_dp_rank},TP={self.vllm_tp_rank}] execute_method: {method if isinstance(method, str) else 'Callable'}")
return self.rollout.execute_method(method, *args, **kwargs)

@register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD, blocking=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use the rollout API directly instead of adding a glue layer at the FSDP worker level?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use the rollout API directly instead of adding a glue layer at the FSDP worker level?

I have tried this approach, but have not success yet, because sglang need tp start a process for each TP worker, otherwise it stacks at " params = self.module.state_dict()".

Do you know how to get it work?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After investigation, it seems the base Worker class includes a routing mechanism through a registration system, which may hinder rollout from supporting the dispatch method via registration. Therefore, the current logic is essential.

async def chat_completion(self, json_request):
ret = await self.rollout.chat_completion(json_request)
return ret

@register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD)
def resume(self):
return self.rollout.resume()
async def wake_up(self):
await self.rollout.wake_up()
# return something to block the caller
return True

@register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD)
def offload(self):
return self.rollout.offload()
async def sleep(self):
await self.rollout.sleep()
# return something to block the caller
return True
46 changes: 45 additions & 1 deletion verl/workers/megatron_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import time
import warnings
from typing import Union

import torch
import torch.distributed
Expand Down Expand Up @@ -308,7 +309,7 @@ def _build_rollout(self, trust_remote_code=False):
log_gpu_memory_usage("After building sharding manager", logger=logger)
else:
raise NotImplementedError("Only vllmRollout is supported with Megatron now")

print(f"rollout and sharding manager init done sharding_manager: {sharding_manager}")
return rollout, sharding_manager

@register(dispatch_mode=Dispatch.ONE_TO_ALL)
Expand Down Expand Up @@ -362,6 +363,8 @@ def init_model(self):

if self._is_rollout:
self.rollout, self.sharding_manager = self._build_rollout(trust_remote_code=self.config.model.get("trust_remote_code", False))
# used for sleep/wake_up
self.rollout.sharding_manager = self.sharding_manager
log_gpu_memory_usage("After rollout init", logger=logger)

if self._is_ref:
Expand Down Expand Up @@ -548,6 +551,47 @@ def save_checkpoint(self, checkpoint_path, hdfs_path=None, global_step=0, max_ck
offload_megatron_model_to_cpu(self.actor_module)


class AsyncActorRolloutRefWorker(ActorRolloutRefWorker):
def _build_rollout(self, trust_remote_code=False):
rollout, rollout_sharding_manager = super()._build_rollout(trust_remote_code)

# NOTE: rollout is not actually initialized here, it's deferred
# to be initialized by AsyncvLLMServer.

self.vllm_tp_size = self.config.rollout.tensor_model_parallel_size
self.vllm_dp_rank = int(os.environ["RANK"]) // self.vllm_tp_size
self.vllm_tp_rank = int(os.environ["RANK"]) % self.vllm_tp_size

# used for sleep/wake_up
rollout.sharding_manager = rollout_sharding_manager

return rollout, rollout_sharding_manager

@register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD)
def execute_method(self, method: Union[str, bytes], *args, **kwargs):
"""Called by ExternalRayDistributedExecutor collective_rpc."""
if self.vllm_tp_rank == 0 and method != "execute_model":
print(f"[DP={self.vllm_dp_rank},TP={self.vllm_tp_rank}] execute_method: {method if isinstance(method, str) else 'Callable'}")
return self.rollout.execute_method(method, *args, **kwargs)

@register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD, blocking=False)
async def chat_completion(self, json_request):
ret = await self.rollout.chat_completion(json_request)
return ret

@register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD)
async def wake_up(self):
await self.rollout.wake_up()
# return something to block the caller
return True

@register(dispatch_mode=Dispatch.DIRECT_ROLLOUT_METHOD)
async def sleep(self):
await self.rollout.sleep()
# return something to block the caller
return True


class CriticWorker(MegatronWorker):
def __init__(self, config):
super().__init__()
Expand Down
Loading
Loading