diff --git a/.github/workflows/checkpoints.yml b/.github/workflows/checkpoints.yml index efb6bd5539..87571dc2f7 100644 --- a/.github/workflows/checkpoints.yml +++ b/.github/workflows/checkpoints.yml @@ -22,7 +22,7 @@ permissions: contents: read jobs: - e2e_gsm8k_megatron: + checkpoints: runs-on: [self-hosted, l20-0] timeout-minutes: 40 # Increase this timeout value as needed env: @@ -31,7 +31,7 @@ jobs: NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: whatcanyousee/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te2.0-megatron0.11.0-v0.0.6 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 @@ -47,7 +47,6 @@ jobs: - name: Running Checkpoint Integration Test (Qwen Megatron) run: | ray stop --force - export PYTHONPATH=$PYTHONPATH:/opt/nvidia/Megatron-LM bash tests/checkpoint/run_qwen_megatron_ckpt.sh - name: Running Checkpoint Integration Test (Deepseek Megatron) run: | diff --git a/.github/workflows/dataset.yml b/.github/workflows/dataset.yml index ba20878a3e..ffc889bc1c 100644 --- a/.github/workflows/dataset.yml +++ b/.github/workflows/dataset.yml @@ -32,7 +32,7 @@ jobs: NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: verlai/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te1.7-v0.0.3 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 diff --git a/.github/workflows/e2e_eval_aime24.yml b/.github/workflows/e2e_eval_aime24.yml index 2b2c8502da..22a0dbc212 100644 --- a/.github/workflows/e2e_eval_aime24.yml +++ b/.github/workflows/e2e_eval_aime24.yml @@ -28,7 +28,7 @@ jobs: NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: hiyouga/verl:ngc-th2.6.0-cu120-vllm0.8.2 + image: whatcanyousee/verl:ngc-th2.6.0-cu126-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 diff --git a/.github/workflows/e2e_grpo.yml b/.github/workflows/e2e_grpo.yml index ccf2fc02a2..8d076bce95 100644 --- a/.github/workflows/e2e_grpo.yml +++ b/.github/workflows/e2e_grpo.yml @@ -24,7 +24,7 @@ permissions: contents: read jobs: - e2e_gsm8k_megatron-l20-0: + e2e_grpo-l20-0: runs-on: [self-hosted, l20-0] timeout-minutes: 40 # Increase this timeout value as needed env: @@ -33,7 +33,7 @@ jobs: NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: whatcanyousee/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te2.0-megatron0.11.0-v0.0.6 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 @@ -54,7 +54,7 @@ jobs: run: | ray stop --force bash tests/e2e/run_qwen_grpo_megatron.sh - e2e_gsm8k_megatron-l20-1: + e2e_grpo-l20-1: runs-on: [self-hosted, l20-1] timeout-minutes: 40 # Increase this timeout value as needed env: @@ -63,7 +63,7 @@ jobs: NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: whatcanyousee/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te2.0-megatron0.11.0-v0.0.6 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 diff --git a/.github/workflows/e2e_gsm8k.yml b/.github/workflows/e2e_gsm8k.yml index aedad2d25d..f5916f018e 100644 --- a/.github/workflows/e2e_gsm8k.yml +++ b/.github/workflows/e2e_gsm8k.yml @@ -33,7 +33,7 @@ jobs: NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: hiyouga/verl:ngc-th2.6.0-cu120-vllm0.8.2 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 diff --git a/.github/workflows/e2e_gsm8k_megatron.yml b/.github/workflows/e2e_gsm8k_megatron.yml index c42e0cd34a..ea2ee8f8f1 100644 --- a/.github/workflows/e2e_gsm8k_megatron.yml +++ b/.github/workflows/e2e_gsm8k_megatron.yml @@ -1,5 +1,5 @@ name: e2e_gsm8k_megatron -# latest version: Megatron-LM core_r0.11.0 https://github.com/NVIDIA/Megatron-LM/tree/core_r0.11.0 +# latest version: Megatron-LM v0.11.0 https://github.com/NVIDIA/Megatron-LM/tree/v0.11.0 on: # Trigger the workflow on push or pull request, @@ -26,7 +26,7 @@ permissions: contents: read jobs: - e2e_gsm8k_megatron: + e2e_gsm8k_megatron-l20-0: runs-on: [self-hosted, l20-0] timeout-minutes: 40 # Increase this timeout value as needed env: @@ -35,7 +35,7 @@ jobs: NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: whatcanyousee/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te2.0-megatron0.11.0-v0.0.6 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 @@ -52,6 +52,28 @@ jobs: run: | ray stop --force bash tests/e2e/run_deepseek_megatron_parallelism.sh + e2e_gsm8k_megatron-l20-1: + runs-on: [self-hosted, l20-1] + timeout-minutes: 40 # Increase this timeout value as needed + env: + HTTP_PROXY: ${{ secrets.PROXY_HTTP }} + HTTPS_PROXY: ${{ secrets.PROXY_HTTPS }} + NO_PROXY: "localhost,127.0.0.1" + HF_HUB_ENABLE_HF_TRANSFER: 1 + container: + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 + options: --gpus all --shm-size=10g + steps: + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + fetch-depth: 0 + - name: Install the current repository + run: | + pip3 install hf_transfer + pip3 install -e .[test] + - name: Prepare gsm8k dataset + run: | + python3 examples/data_preprocess/gsm8k.py - name: Running gsm8k e2e training tests with 3D parallelism on 8 L20 GPUs with Megatron (Qwen) run: | ray stop --force diff --git a/.github/workflows/e2e_gsm8k_prime.yml b/.github/workflows/e2e_gsm8k_prime.yml index dfb8a02ee2..4dc111e06a 100644 --- a/.github/workflows/e2e_gsm8k_prime.yml +++ b/.github/workflows/e2e_gsm8k_prime.yml @@ -30,7 +30,7 @@ jobs: NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: hiyouga/verl:ngc-th2.6.0-cu120-vllm0.8.2 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 diff --git a/.github/workflows/e2e_lora.yml b/.github/workflows/e2e_lora.yml index e5db154833..36bf2d1a3d 100644 --- a/.github/workflows/e2e_lora.yml +++ b/.github/workflows/e2e_lora.yml @@ -33,7 +33,7 @@ jobs: NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: verlai/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te1.7-v0.0.3 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 diff --git a/.github/workflows/e2e_sft.yml b/.github/workflows/e2e_sft.yml index fee1e241b3..471aa55aa3 100644 --- a/.github/workflows/e2e_sft.yml +++ b/.github/workflows/e2e_sft.yml @@ -33,7 +33,7 @@ jobs: NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: verlai/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te1.7-v0.0.3 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 diff --git a/.github/workflows/e2e_vlm_geo3k.yml b/.github/workflows/e2e_vlm_geo3k.yml index e869542136..6b7c5194c4 100644 --- a/.github/workflows/e2e_vlm_geo3k.yml +++ b/.github/workflows/e2e_vlm_geo3k.yml @@ -27,7 +27,7 @@ jobs: NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: hiyouga/verl:ngc-th2.6.0-cu120-vllm0.8.2 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=40g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 diff --git a/.github/workflows/model.yml b/.github/workflows/model.yml index 798b502c46..fd0ab4cafa 100644 --- a/.github/workflows/model.yml +++ b/.github/workflows/model.yml @@ -27,7 +27,7 @@ jobs: NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: whatcanyousee/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te2.0-megatron0.11.0-v0.0.6 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 diff --git a/.github/workflows/ray_test.yml b/.github/workflows/ray_test.yml index 11b9810688..c584589f33 100644 --- a/.github/workflows/ray_test.yml +++ b/.github/workflows/ray_test.yml @@ -31,14 +31,14 @@ permissions: jobs: ray: runs-on: [self-hosted, l20-0] - timeout-minutes: 5 # Increase this timeout value as needed + timeout-minutes: 10 # Increase this timeout value as needed env: HTTP_PROXY: ${{ secrets.PROXY_HTTP }} HTTPS_PROXY: ${{ secrets.PROXY_HTTPS }} NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: verlai/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te1.7-v0.0.3 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 diff --git a/.github/workflows/sandbox.yml b/.github/workflows/sandbox.yml index 043bd7fe52..16aa043999 100644 --- a/.github/workflows/sandbox.yml +++ b/.github/workflows/sandbox.yml @@ -23,14 +23,14 @@ permissions: jobs: sandbox: runs-on: [self-hosted, l20-0] - timeout-minutes: 3 # Increase this timeout value as needed + timeout-minutes: 10 # Increase this timeout value as needed env: HTTP_PROXY: ${{ secrets.PROXY_HTTP }} HTTPS_PROXY: ${{ secrets.PROXY_HTTPS }} NO_PROXY: "localhost,127.0.0.1" HF_HUB_ENABLE_HF_TRANSFER: 1 container: - image: verlai/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te1.7-v0.0.3 + image: whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0 options: --gpus all --shm-size=10g steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 diff --git a/.github/workflows/vllm.yml b/.github/workflows/vllm.yml index 3f055f027a..e08bab0d05 100644 --- a/.github/workflows/vllm.yml +++ b/.github/workflows/vllm.yml @@ -43,6 +43,7 @@ jobs: pip3 install hf_transfer pip3 install -e .[test] pip3 install vllm==0.5.4 + pip3 install flash_attn - name: Running vllm tests on 8 L20 GPUs run: | cd tests/rollout @@ -50,6 +51,7 @@ jobs: - name: Test the latest vLLM run: | pip3 install --upgrade vllm==0.7.3 + pip3 install flash_attn cd tests/rollout torchrun --standalone --nnodes=1 --nproc_per_node=4 $(which pytest) -s test_vllm_spmd.py - name: Run Qwen 0.5B generation test diff --git a/docker/Dockerfile.megatron b/docker/Dockerfile.megatron index dd355211dc..fc8c993843 100644 --- a/docker/Dockerfile.megatron +++ b/docker/Dockerfile.megatron @@ -1,9 +1,36 @@ -FROM verlai/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te1.7-v0.0.3 +FROM hiyouga/verl:ngc-th2.6.0-cu120-vllm0.8.2 -RUN pip install git+https://github.com/NVIDIA/TransformerEngine.git@stable +# Define environments +ENV MAX_JOBS=64 -RUN cd /opt/nvidia && git clone --single-branch --branch core_r0.11.0 https://github.com/NVIDIA/Megatron-LM.git Megatron-LM +RUN apt-get update && \ + apt-get install -y aria2 -# only config pip index with https://pypi.tuna.tsinghua.edu.cn/simple if needed -# unset for now -RUN cd /opt/nvidia/Megatron-LM && pip3 install --no-deps -e . \ No newline at end of file +# 1. Reinstall CUDA 12.4 +RUN aria2c https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64/cuda-ubuntu2204.pin && \ + mv cuda-ubuntu2204.pin /etc/apt/preferences.d/cuda-repository-pin-600 + +RUN aria2c --always-resume=true --max-tries=99999 https://developer.download.nvidia.com/compute/cuda/12.4.1/local_installers/cuda-repo-ubuntu2204-12-4-local_12.4.1-550.54.15-1_amd64.deb + +RUN dpkg -i cuda-repo-ubuntu2204-12-4-local_12.4.1-550.54.15-1_amd64.deb + +RUN cp /var/cuda-repo-ubuntu2204-12-4-local/cuda-*-keyring.gpg /usr/share/keyrings/ + +RUN apt-get update + +RUN apt-get -y install cuda-toolkit-12-4 + +RUN rm cuda-repo-ubuntu2204-12-4-local_12.4.1-550.54.15-1_amd64.deb + +RUN update-alternatives --set cuda /usr/local/cuda-12.4 + +# 2. Install Apex +RUN git clone https://github.com/NVIDIA/apex.git && \ + cd apex && \ + pip install -v --disable-pip-version-check --no-cache-dir --no-build-isolation --config-settings "--build-option=--cpp_ext" --config-settings "--build-option=--cuda_ext" ./ + +# 3. Install TransformerEngine +RUN export NVTE_FRAMEWORK=pytorch && pip3 install --no-deps git+https://github.com/NVIDIA/TransformerEngine.git@v2.1 + +# 4. Install Megatron-LM +RUN pip3 install git+https://github.com/NVIDIA/Megatron-LM.git@v0.11.0 \ No newline at end of file diff --git a/docs/advance/checkpoint.rst b/docs/advance/checkpoint.rst index f397e67d12..f97357d252 100644 --- a/docs/advance/checkpoint.rst +++ b/docs/advance/checkpoint.rst @@ -84,10 +84,11 @@ So example use of Megatron model merger is: .. code:: bash - python3 scripts/model_merger.py --backend megatron \ - --is-value-model \ - --hf_model_path Qwen/Qwen2-7B \ - --local_dir checkpoints/verl_megatron_gsm8k_examples/deepseek_megatron_checkpoint_saveload/global_step_1/actor/model + python scripts/model_merger.py \ + --backend megatron \ + --tie-word-embedding \ + --hf_model_path Qwen/Qwen2.5-0.5B \ + --local_dir checkpoints/verl_megatron_gsm8k_examples/qwen2_5_0b5_megatron_saveload/global_step_1/actor Megatron Merger details ----------------------- diff --git a/docs/start/install.rst b/docs/start/install.rst index 460dba668d..971428080b 100644 --- a/docs/start/install.rst +++ b/docs/start/install.rst @@ -19,7 +19,7 @@ Choices of Backend Engines We recommend using **FSDP** backend to investigate, research and prototype different models, datasets and RL algorithms. The guide for using FSDP backend can be found in :doc:`FSDP Workers<../workers/fsdp_workers>`. -For users who pursue better scalability, we recommend using **Megatron-LM** backend. Currently, we support Megatron-LM v0.11 [1]_. The guide for using Megatron-LM backend can be found in :doc:`Megatron-LM Workers<../workers/megatron_workers>`. +For users who pursue better scalability, we recommend using **Megatron-LM** backend. Currently, we support `Megatron-LM v0.11`_. The guide for using Megatron-LM backend can be found in :doc:`Megatron-LM Workers<../workers/megatron_workers>`. .. note:: @@ -40,17 +40,15 @@ Install from docker image We provide pre-built Docker images for quick setup. For SGLang usage, please follow the later sections in this doc. -Image and tag: ``whatcanyousee/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te2.0-megatron0.11.0-v0.0.6`` if you need both FSDP and Megatron support. +Image and tag: ``whatcanyousee/verl:ngc-th2.6.0-cu124-vllm0.8.2-mcore0.11.0``. Check files under ``docker/`` for NGC-based image or if you want to build your own. -We highly recommend ``hiyouga/verl:ngc-th2.6.0-cu120-vllm0.8.2-verl0.3.0.post1`` with vllm v0.8.2 for fastest rollout performance with FSDP. - -See files under ``docker/`` for NGC-based image or if you want to build your own. - -1. Launch the desired Docker image: +1. Launch the desired Docker image and attach into it: .. code:: bash - docker run --runtime=nvidia -it --rm --shm-size="10g" --cap-add=SYS_ADMIN -v + docker create --runtime=nvidia --gpus all --net=host --shm-size="10g" --cap-add=SYS_ADMIN -v .:/workspace/verl --name verl + docker start verl + docker exec -it verl bash 2. Inside the container, install latest verl: @@ -65,14 +63,14 @@ See files under ``docker/`` for NGC-based image or if you want to build your own The Docker image ``whatcanyousee/verl:vemlp-th2.4.0-cu124-vllm0.6.3-ray2.10-te2.0-megatron0.11.0-v0.0.6`` is built with the following configurations: - - **PyTorch**: 2.4.0+cu124 - - **CUDA**: 12.4 - - **Megatron-LM**: core_r0.11.0 - - **vLLM**: 0.6.3 - - **Ray**: 2.10.0 - - **TransformerEngine**: 2.0.0+754d2a0 + - **PyTorch**: 2.6.0+cu124 + - **CUDA**: 12.6 + - **Megatron-LM**: v0.11.0 + - **vLLM**: 0.8.2 + - **Ray**: 2.44.0 + - **TransformerEngine**: 2.1.0+8eb1712 - Now verl has been **compatible to Megatron-LM core_r0.11.0**, and there is **no need to apply patches** to Megatron-LM. Also, the image has integrated **Megatron-LM core_r0.11.0**, located at ``/opt/nvidia/Meagtron-LM``. One more thing, because verl only use ``megatron.core`` module for now, there is **no need to modify** ``PATH`` if you have installed Megatron-LM with this docker image. + Now verl has been **compatible to Megatron-LM v0.11.0**, and there is **no need to apply patches** to Megatron-LM. Also, the image has integrated **Megatron-LM v0.11.0**, located at ``/opt/nvidia/Meagtron-LM``. One more thing, because verl only use ``megatron.core`` module for now, there is **no need to modify** ``PATH`` if you have installed Megatron-LM with this docker image. Install SGLang as rollout backend @@ -127,7 +125,7 @@ own post-training jobs. .. code:: bash # install verl together with some lightweight dependencies in setup.py - pip3 install torch==2.4.0 --index-url https://download.pytorch.org/whl/cu124 + pip3 install torch==2.6.0 --index-url https://download.pytorch.org/whl/cu124 pip3 install flash-attn --no-build-isolation git clone https://github.com/volcengine/verl.git cd verl diff --git a/examples/grpo_trainer/run_deepseek7b_llm_math_megatron.sh b/examples/grpo_trainer/run_deepseek7b_llm_math_megatron.sh index 89a13d9b88..0ba0cc0379 100644 --- a/examples/grpo_trainer/run_deepseek7b_llm_math_megatron.sh +++ b/examples/grpo_trainer/run_deepseek7b_llm_math_megatron.sh @@ -40,7 +40,7 @@ python3 -m verl.trainer.main_ppo --config-path=config \ algorithm.use_kl_in_reward=False \ trainer.critic_warmup=0 \ trainer.logger=['console','wandb'] \ - trainer.project_name='verl_grpo_example_gsm8k' \ + trainer.project_name='try_fix_megatron_loss_calc' \ trainer.experiment_name='deepseek_llm_7b_function_rm_math_megatron' \ trainer.n_gpus_per_node=16 \ trainer.nnodes=1 \ diff --git a/examples/grpo_trainer/run_qwen2-7b_math_megatron.sh b/examples/grpo_trainer/run_qwen2-7b_math_megatron.sh index 5f3f989661..66e2b096fe 100644 --- a/examples/grpo_trainer/run_qwen2-7b_math_megatron.sh +++ b/examples/grpo_trainer/run_qwen2-7b_math_megatron.sh @@ -41,7 +41,7 @@ python3 -m verl.trainer.main_ppo --config-path=config \ algorithm.use_kl_in_reward=False \ trainer.critic_warmup=0 \ trainer.logger=['console','wandb'] \ - trainer.project_name='verl_grpo_example_gsm8k' \ + trainer.project_name='try_fix_megatron_loss_calc' \ trainer.experiment_name='qwen2_7b_function_rm_megatron' \ trainer.n_gpus_per_node=16 \ trainer.nnodes=1 \ diff --git a/examples/grpo_trainer/run_qwen2_5-7b_math.sh b/examples/grpo_trainer/run_qwen2_5-7b_math.sh new file mode 100644 index 0000000000..bb430507ef --- /dev/null +++ b/examples/grpo_trainer/run_qwen2_5-7b_math.sh @@ -0,0 +1,50 @@ +set -x + +export VLLM_ATTENTION_BACKEND=XFORMERS + +gsm8k_train_path=$HOME/data/gsm8k/train.parquet +gsm8k_test_path=$HOME/data/gsm8k/test.parquet +math_train_path=$HOME/data/math/train.parquet +math_test_path=$HOME/data/math/test.parquet + +train_files="['$gsm8k_train_path', '$math_train_path']" +test_files="['$gsm8k_test_path', '$math_test_path']" + +python3 -m verl.trainer.main_ppo \ + algorithm.adv_estimator=grpo \ + data.train_files="$train_files" \ + data.val_files="$test_files" \ + data.train_batch_size=1024 \ + data.max_prompt_length=1024 \ + data.max_response_length=1024 \ + data.filter_overlong_prompts=True \ + data.truncation='error' \ + actor_rollout_ref.model.path=Qwen/Qwen2.5-7B-Instruct \ + actor_rollout_ref.actor.optim.lr=1e-6 \ + actor_rollout_ref.model.use_remove_padding=True \ + actor_rollout_ref.actor.ppo_mini_batch_size=256 \ + actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=16 \ + actor_rollout_ref.actor.use_kl_loss=True \ + actor_rollout_ref.actor.kl_loss_coef=0.001 \ + actor_rollout_ref.actor.kl_loss_type=low_var_kl \ + actor_rollout_ref.actor.entropy_coeff=0 \ + actor_rollout_ref.model.enable_gradient_checkpointing=True \ + actor_rollout_ref.actor.fsdp_config.param_offload=False \ + actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \ + actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=16 \ + actor_rollout_ref.rollout.tensor_model_parallel_size=2 \ + actor_rollout_ref.rollout.name=vllm \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \ + actor_rollout_ref.rollout.n=5 \ + actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=16 \ + actor_rollout_ref.ref.fsdp_config.param_offload=True \ + algorithm.use_kl_in_reward=False \ + trainer.critic_warmup=0 \ + trainer.logger=['console','wandb'] \ + trainer.project_name='verl_grpo_example_gsm8k' \ + trainer.experiment_name='qwen2_7b_function_rm' \ + trainer.n_gpus_per_node=16 \ + trainer.nnodes=1 \ + trainer.save_freq=-1 \ + trainer.test_freq=5 \ + trainer.total_epochs=15 $@ \ No newline at end of file diff --git a/examples/grpo_trainer/run_qwen2_5-7b_math_megatron.sh b/examples/grpo_trainer/run_qwen2_5-7b_math_megatron.sh new file mode 100644 index 0000000000..4863832444 --- /dev/null +++ b/examples/grpo_trainer/run_qwen2_5-7b_math_megatron.sh @@ -0,0 +1,50 @@ +set -x + +export VLLM_ATTENTION_BACKEND=XFORMERS + +gsm8k_train_path=$HOME/data/gsm8k/train.parquet +gsm8k_test_path=$HOME/data/gsm8k/test.parquet +math_train_path=$HOME/data/math/train.parquet +math_test_path=$HOME/data/math/test.parquet + +train_files="['$gsm8k_train_path', '$math_train_path']" +test_files="['$gsm8k_test_path', '$math_test_path']" + +python3 -m verl.trainer.main_ppo --config-path=config \ + --config-name='ppo_megatron_trainer.yaml'\ + algorithm.adv_estimator=grpo \ + data.train_files="$train_files" \ + data.val_files="$test_files" \ + data.train_batch_size=1024 \ + data.max_prompt_length=1024 \ + data.max_response_length=1024 \ + data.filter_overlong_prompts=True \ + data.truncation='error' \ + actor_rollout_ref.model.path=Qwen/Qwen2.5-7B-Instruct \ + actor_rollout_ref.actor.optim.lr=1e-6 \ + actor_rollout_ref.actor.ppo_mini_batch_size=256 \ + actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \ + actor_rollout_ref.actor.megatron.pipeline_model_parallel_size=2 \ + actor_rollout_ref.actor.megatron.virtual_pipeline_model_parallel_size=2 \ + actor_rollout_ref.actor.megatron.tensor_model_parallel_size=4 \ + actor_rollout_ref.actor.use_kl_loss=True \ + actor_rollout_ref.actor.kl_loss_coef=0.001 \ + actor_rollout_ref.actor.kl_loss_type=low_var_kl \ + actor_rollout_ref.actor.entropy_coeff=0 \ + actor_rollout_ref.model.enable_gradient_checkpointing=True \ + actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=4 \ + actor_rollout_ref.rollout.tensor_model_parallel_size=2 \ + actor_rollout_ref.rollout.name=vllm \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \ + actor_rollout_ref.rollout.n=5 \ + actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4 \ + algorithm.use_kl_in_reward=False \ + trainer.critic_warmup=0 \ + trainer.logger=['console','wandb'] \ + trainer.project_name='try_fix_megatron_loss_calc' \ + trainer.experiment_name='qwen2_5_7b_function_rm_megatron' \ + trainer.n_gpus_per_node=16 \ + trainer.nnodes=1 \ + trainer.save_freq=-1 \ + trainer.test_freq=5 \ + trainer.total_epochs=15 $@ \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index f45c177e74..ee0fc01cc1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ verl = [ [tool.pylint.message_control] disable = [ + "abstract-class-instantiated", "abstract-method", "anomalous-backslash-in-string", "arguments-differ", @@ -100,6 +101,7 @@ disable = [ "no-else-raise", "no-else-return", "no-member", + "no-name-in-module", "no-self-argument", "no-value-for-parameter", "not-an-iterable", @@ -129,6 +131,7 @@ disable = [ "too-many-instance-attributes", "too-many-lines", "too-many-locals", + "too-many-function-args", "too-many-positional-arguments", "too-many-return-statements", "too-many-statements", @@ -138,6 +141,7 @@ disable = [ "unbalanced-tuple-unpacking", "undefined-loop-variable", "undefined-variable", + "unexpected-keyword-arg", "ungrouped-imports", "unidiomatic-typecheck", "unnecessary-comprehension", diff --git a/requirements.txt b/requirements.txt index 596ec4cde5..c988d74239 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,7 +10,7 @@ numpy pandas datasets peft -pyarrow>=15.0.0 +pyarrow>=19.0.0 pybind11 pylatexenc pylint==3.3.6 diff --git a/tests/checkpoint/run_deepseek_megatron_ckpt.sh b/tests/checkpoint/run_deepseek_megatron_ckpt.sh index 0f36b9f48c..c99a01c33f 100644 --- a/tests/checkpoint/run_deepseek_megatron_ckpt.sh +++ b/tests/checkpoint/run_deepseek_megatron_ckpt.sh @@ -6,7 +6,7 @@ mkdir -p $HOME/models huggingface-cli download deepseek-ai/deepseek-coder-1.3b-instruct --local-dir $HOME/models/deepseek-ai/deepseek-coder-1.3b-instruct -export VLLM_ATTENTION_BACKEND=XFORMERS +export CUDA_DEVICE_MAX_CONNECTIONS=1 python3 -m verl.trainer.main_ppo --config-path=config \ --config-name='ppo_megatron_trainer.yaml'\ @@ -22,7 +22,7 @@ python3 -m verl.trainer.main_ppo --config-path=config \ actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \ actor_rollout_ref.actor.megatron.pipeline_model_parallel_size=2 \ actor_rollout_ref.actor.megatron.virtual_pipeline_model_parallel_size=2 \ - actor_rollout_ref.actor.megatron.tensor_model_parallel_size=4 \ + actor_rollout_ref.actor.megatron.tensor_model_parallel_size=2 \ actor_rollout_ref.actor.checkpoint.contents=['model','hf_model','optimizer','extra'] \ actor_rollout_ref.actor.use_kl_loss=False \ actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \ @@ -71,7 +71,7 @@ python3 -m verl.trainer.main_ppo --config-path=config \ actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \ actor_rollout_ref.actor.megatron.pipeline_model_parallel_size=2 \ actor_rollout_ref.actor.megatron.virtual_pipeline_model_parallel_size=2 \ - actor_rollout_ref.actor.megatron.tensor_model_parallel_size=4 \ + actor_rollout_ref.actor.megatron.tensor_model_parallel_size=2 \ actor_rollout_ref.actor.use_kl_loss=False \ actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \ actor_rollout_ref.rollout.tensor_model_parallel_size=2 \ @@ -101,4 +101,4 @@ python3 -m verl.trainer.main_ppo --config-path=config \ trainer.save_freq=-1 \ trainer.test_freq=1 \ trainer.total_epochs=15 \ - trainer.total_training_steps=1 $@ \ No newline at end of file + trainer.total_training_steps=2 $@ \ No newline at end of file diff --git a/tests/checkpoint/run_qwen_megatron_ckpt.sh b/tests/checkpoint/run_qwen_megatron_ckpt.sh index 125f6952bb..0c05e869f0 100644 --- a/tests/checkpoint/run_qwen_megatron_ckpt.sh +++ b/tests/checkpoint/run_qwen_megatron_ckpt.sh @@ -6,7 +6,7 @@ mkdir -p $HOME/models huggingface-cli download Qwen/Qwen2.5-0.5B --local-dir $HOME/models/Qwen/Qwen2.5-0.5B -export VLLM_ATTENTION_BACKEND=XFORMERS +export CUDA_DEVICE_MAX_CONNECTIONS=1 python3 -m verl.trainer.main_ppo --config-path=config \ --config-name='ppo_megatron_trainer.yaml'\ @@ -101,4 +101,4 @@ python3 -m verl.trainer.main_ppo --config-path=config \ trainer.save_freq=-1 \ trainer.test_freq=1 \ trainer.total_epochs=15 \ - trainer.total_training_steps=1 $@ \ No newline at end of file + trainer.total_training_steps=2 $@ \ No newline at end of file diff --git a/tests/e2e/run_deepseek_grpo.sh b/tests/e2e/run_deepseek_grpo.sh index 097d8207cd..751b1d0ef0 100644 --- a/tests/e2e/run_deepseek_grpo.sh +++ b/tests/e2e/run_deepseek_grpo.sh @@ -1,7 +1,5 @@ set -x -export VLLM_ATTENTION_BACKEND=XFORMERS - huggingface-cli download deepseek-ai/deepseek-coder-1.3b-instruct --local-dir $HOME/models/deepseek-ai/deepseek-coder-1.3b-instruct python3 -m verl.trainer.main_ppo \ diff --git a/tests/e2e/run_deepseek_grpo_megatron.sh b/tests/e2e/run_deepseek_grpo_megatron.sh index 40b01748da..ee5f6266a6 100644 --- a/tests/e2e/run_deepseek_grpo_megatron.sh +++ b/tests/e2e/run_deepseek_grpo_megatron.sh @@ -1,6 +1,6 @@ set -x -export VLLM_ATTENTION_BACKEND=XFORMERS +export CUDA_DEVICE_MAX_CONNECTIONS=1 huggingface-cli download deepseek-ai/deepseek-coder-1.3b-instruct --local-dir $HOME/models/deepseek-ai/deepseek-coder-1.3b-instruct diff --git a/tests/e2e/run_deepseek_megatron.sh b/tests/e2e/run_deepseek_megatron.sh index 5169c93ad2..2c92015b55 100644 --- a/tests/e2e/run_deepseek_megatron.sh +++ b/tests/e2e/run_deepseek_megatron.sh @@ -1,6 +1,6 @@ set -x -export VLLM_ATTENTION_BACKEND=XFORMERS +export CUDA_DEVICE_MAX_CONNECTIONS=1 # the config file used: verl/trainer/main_ppo/config/ppo_megatron_trainer.yaml diff --git a/tests/e2e/run_deepseek_megatron_parallelism.sh b/tests/e2e/run_deepseek_megatron_parallelism.sh index 8b8fcb182e..ff3a4d3e05 100644 --- a/tests/e2e/run_deepseek_megatron_parallelism.sh +++ b/tests/e2e/run_deepseek_megatron_parallelism.sh @@ -1,6 +1,6 @@ set -x -export VLLM_ATTENTION_BACKEND=XFORMERS +export CUDA_DEVICE_MAX_CONNECTIONS=1 # the config file used: verl/trainer/main_ppo/config/ppo_megatron_trainer.yaml @@ -21,7 +21,6 @@ python3 -m verl.trainer.main_ppo --config-path=config \ actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \ actor_rollout_ref.actor.megatron.pipeline_model_parallel_size=2 \ actor_rollout_ref.actor.megatron.virtual_pipeline_model_parallel_size=2 \ - actor_rollout_ref.actor.megatron.context_parallel_size=2 \ actor_rollout_ref.actor.megatron.tensor_model_parallel_size=2 \ actor_rollout_ref.actor.use_kl_loss=False \ actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \ @@ -31,7 +30,6 @@ python3 -m verl.trainer.main_ppo --config-path=config \ actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=16 \ actor_rollout_ref.ref.megatron.pipeline_model_parallel_size=2 \ actor_rollout_ref.ref.megatron.virtual_pipeline_model_parallel_size=2 \ - actor_rollout_ref.ref.megatron.context_parallel_size=2 \ actor_rollout_ref.ref.megatron.tensor_model_parallel_size=2 \ critic.optim.lr=2e-5 \ critic.model.path=$HOME/models/deepseek-ai/deepseek-coder-1.3b-instruct \ @@ -39,7 +37,6 @@ python3 -m verl.trainer.main_ppo --config-path=config \ critic.ppo_micro_batch_size_per_gpu=4 \ critic.megatron.pipeline_model_parallel_size=2 \ critic.megatron.virtual_pipeline_model_parallel_size=2 \ - critic.megatron.context_parallel_size=2 \ critic.megatron.tensor_model_parallel_size=2 \ algorithm.use_kl_in_reward=True \ algorithm.kl_penalty=kl \ diff --git a/tests/e2e/run_qwen_grpo.sh b/tests/e2e/run_qwen_grpo.sh index 7a1b6215cb..20672f1692 100644 --- a/tests/e2e/run_qwen_grpo.sh +++ b/tests/e2e/run_qwen_grpo.sh @@ -1,7 +1,5 @@ set -x -export VLLM_ATTENTION_BACKEND=XFORMERS - huggingface-cli download Qwen/Qwen2.5-0.5B --local-dir $HOME/models/Qwen/Qwen2.5-0.5B python3 -m verl.trainer.main_ppo \ diff --git a/tests/e2e/run_qwen_grpo_megatron.sh b/tests/e2e/run_qwen_grpo_megatron.sh index 899f5c8056..649ea8e696 100644 --- a/tests/e2e/run_qwen_grpo_megatron.sh +++ b/tests/e2e/run_qwen_grpo_megatron.sh @@ -1,6 +1,6 @@ set -x -export VLLM_ATTENTION_BACKEND=XFORMERS +export CUDA_DEVICE_MAX_CONNECTIONS=1 huggingface-cli download Qwen/Qwen2.5-0.5B --local-dir $HOME/models/Qwen/Qwen2.5-0.5B diff --git a/tests/e2e/run_qwen_gsm8k_function_rm_both_kl.sh b/tests/e2e/run_qwen_gsm8k_function_rm_both_kl.sh index 3267d9906a..9f49126638 100644 --- a/tests/e2e/run_qwen_gsm8k_function_rm_both_kl.sh +++ b/tests/e2e/run_qwen_gsm8k_function_rm_both_kl.sh @@ -1,7 +1,5 @@ set -x -export VLLM_ATTENTION_BACKEND=XFORMERS - python3 -m verl.trainer.main_ppo \ algorithm.adv_estimator=gae \ data.train_files=$HOME/data/gsm8k/train.parquet \ diff --git a/tests/e2e/run_qwen_megatron.sh b/tests/e2e/run_qwen_megatron.sh index 9c3fe6d336..78b630d31b 100644 --- a/tests/e2e/run_qwen_megatron.sh +++ b/tests/e2e/run_qwen_megatron.sh @@ -1,14 +1,12 @@ set -x -export VLLM_ATTENTION_BACKEND=XFORMERS +export CUDA_DEVICE_MAX_CONNECTIONS=1 # the config file used: verl/trainer/main_ppo/config/ppo_megatron_trainer.yaml mkdir -p $HOME/models huggingface-cli download Qwen/Qwen2.5-0.5B --local-dir $HOME/models/Qwen/Qwen2.5-0.5B -export VLLM_ATTENTION_BACKEND=XFORMERS - python3 -m verl.trainer.main_ppo --config-path=config \ --config-name='ppo_megatron_trainer.yaml'\ algorithm.adv_estimator=gae \ diff --git a/tests/e2e/run_qwen_megatron_parallelism.sh b/tests/e2e/run_qwen_megatron_parallelism.sh index 2eb19cc471..7dfa168e61 100644 --- a/tests/e2e/run_qwen_megatron_parallelism.sh +++ b/tests/e2e/run_qwen_megatron_parallelism.sh @@ -1,6 +1,6 @@ set -x -export VLLM_ATTENTION_BACKEND=XFORMERS +export CUDA_DEVICE_MAX_CONNECTIONS=1 # the config file used: verl/trainer/main_ppo/config/ppo_megatron_trainer.yaml @@ -8,7 +8,7 @@ mkdir -p $HOME/models huggingface-cli download Qwen/Qwen2.5-0.5B --local-dir $HOME/models/Qwen/Qwen2.5-0.5B -export VLLM_ATTENTION_BACKEND=XFORMERS +export CUDA_DEVICE_MAX_CONNECTIONS=1 python3 -m verl.trainer.main_ppo --config-path=config \ --config-name='ppo_megatron_trainer.yaml'\ @@ -24,7 +24,6 @@ python3 -m verl.trainer.main_ppo --config-path=config \ actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \ actor_rollout_ref.actor.megatron.pipeline_model_parallel_size=2 \ actor_rollout_ref.actor.megatron.virtual_pipeline_model_parallel_size=2 \ - actor_rollout_ref.actor.megatron.context_parallel_size=2 \ actor_rollout_ref.actor.megatron.tensor_model_parallel_size=2 \ actor_rollout_ref.actor.use_kl_loss=False \ actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \ @@ -34,7 +33,6 @@ python3 -m verl.trainer.main_ppo --config-path=config \ actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=16 \ actor_rollout_ref.ref.megatron.pipeline_model_parallel_size=2 \ actor_rollout_ref.ref.megatron.virtual_pipeline_model_parallel_size=2 \ - actor_rollout_ref.ref.megatron.context_parallel_size=2 \ actor_rollout_ref.ref.megatron.tensor_model_parallel_size=2 \ critic.optim.lr=2e-5 \ critic.model.path=$HOME/models/Qwen/Qwen2.5-0.5B \ @@ -42,7 +40,6 @@ python3 -m verl.trainer.main_ppo --config-path=config \ critic.ppo_micro_batch_size_per_gpu=4 \ critic.megatron.pipeline_model_parallel_size=2 \ critic.megatron.virtual_pipeline_model_parallel_size=2 \ - critic.megatron.context_parallel_size=2 \ critic.megatron.tensor_model_parallel_size=2 \ algorithm.use_kl_in_reward=True \ algorithm.kl_penalty=kl \ diff --git a/verl/models/llama/megatron/checkpoint_utils/llama_loader.py b/verl/models/llama/megatron/checkpoint_utils/llama_loader.py index 4f602b5f46..c9fe885917 100644 --- a/verl/models/llama/megatron/checkpoint_utils/llama_loader.py +++ b/verl/models/llama/megatron/checkpoint_utils/llama_loader.py @@ -135,7 +135,7 @@ def _fetch_tp_shard_tensor(tensor, name, chunk_dim=0, mutate_func=None) -> torch full_weight = mutate_func(full_weight) tensor_chunk = torch.chunk(full_weight, tp_size, dim=chunk_dim) if tensor is not None: - tensor.data.copy_(tensor_chunk[tp_rank]) + tensor.data.copy_(tensor_chunk[tp_rank].full_tensor()) else: print(f"tp_shard tensor:[{name}] not in state_dict, skip loading") @@ -154,8 +154,8 @@ def _fetch_tp_shard_tensor_gate_up(tensor, gate_name, up_name) -> torch.Tensor: device=torch.cuda.current_device()) for i in range(tp_size): intermediate_size_tp = config.intermediate_size // tp_size - gate_weight_tp = gate_weight[i * intermediate_size_tp:(i + 1) * intermediate_size_tp] - up_weight_tp = up_weight[i * intermediate_size_tp:(i + 1) * intermediate_size_tp] + gate_weight_tp = gate_weight[i * intermediate_size_tp:(i + 1) * intermediate_size_tp].full_tensor() + up_weight_tp = up_weight[i * intermediate_size_tp:(i + 1) * intermediate_size_tp].full_tensor() new_gate_up_weight[intermediate_size_tp * 2 * i:intermediate_size_tp * 2 * (i + 1)].copy_( torch.cat([gate_weight_tp, up_weight_tp], dim=0)) @@ -185,11 +185,11 @@ def _fetch_tp_shard_tensor_qkv(tensor, q_name, k_name, v_name) -> torch.Tensor: new_weight_qkv = torch.empty(total_size * tp_size, config.hidden_size, dtype=params_dtype, - device=torch.cuda.current_device()) + device=torch.cuda.current_device()).full_tensor() for i in range(tp_size): - q_part = full_weight_q[i * q_size_tp:(i + 1) * q_size_tp] - k_part = full_weight_k[i * kv_size_tp:(i + 1) * kv_size_tp] - v_part = full_weight_v[i * kv_size_tp:(i + 1) * kv_size_tp] + q_part = full_weight_q[i * q_size_tp:(i + 1) * q_size_tp].full_tensor() + k_part = full_weight_k[i * kv_size_tp:(i + 1) * kv_size_tp].full_tensor() + v_part = full_weight_v[i * kv_size_tp:(i + 1) * kv_size_tp].full_tensor() new_weight_qkv[i * total_size:(i + 1) * total_size].copy_(torch.cat([q_part, k_part, v_part], dim=0)) else: @@ -199,13 +199,13 @@ def _fetch_tp_shard_tensor_qkv(tensor, q_name, k_name, v_name) -> torch.Tensor: new_weight_qkv = torch.empty(total_size * tp_size, config.hidden_size, dtype=params_dtype, - device=torch.cuda.current_device()) + device=torch.cuda.current_device()).full_tensor() for i in range(tp_size): - q_part = full_weight_q[i * q_size_tp:(i + 1) * q_size_tp] + q_part = full_weight_q[i * q_size_tp:(i + 1) * q_size_tp].full_tensor() start_idx = i * config.num_key_value_heads // tp_size * hidden_size_per_head end_idx = (i * config.num_key_value_heads // tp_size + 1) * hidden_size_per_head - k_part = full_weight_k[start_idx:end_idx] - v_part = full_weight_v[start_idx:end_idx] + k_part = full_weight_k[start_idx:end_idx].full_tensor() + v_part = full_weight_v[start_idx:end_idx].full_tensor() new_weight_qkv[i * total_size:(i + 1) * total_size].copy_(torch.cat([q_part, k_part, v_part], dim=0)) tensor_chunk = torch.chunk(new_weight_qkv, tp_size, dim=0) diff --git a/verl/models/llama/megatron/layers/parallel_decoder.py b/verl/models/llama/megatron/layers/parallel_decoder.py index e51632a33c..b810c10ab9 100644 --- a/verl/models/llama/megatron/layers/parallel_decoder.py +++ b/verl/models/llama/megatron/layers/parallel_decoder.py @@ -29,7 +29,8 @@ from .parallel_mlp import ParallelLlamaMLP from .parallel_rmsnorm import ParallelLlamaRMSNorm -from verl.utils.megatron_utils import TransformerConfig, convert_config +from verl.utils.megatron_utils import convert_config +from megatron.core.transformer import TransformerConfig class ParallelLlamaDecoderLayer(nn.Module): diff --git a/verl/models/llama/megatron/modeling_llama_megatron.py b/verl/models/llama/megatron/modeling_llama_megatron.py index 83d0f0f2bb..39162351fb 100644 --- a/verl/models/llama/megatron/modeling_llama_megatron.py +++ b/verl/models/llama/megatron/modeling_llama_megatron.py @@ -34,8 +34,10 @@ from verl.utils.megatron import sequence_parallel as sp_utils from verl.utils.megatron import tensor_parallel as tp_utils -from verl.utils.megatron_utils import TransformerConfig, convert_config +from verl.utils.megatron_utils import convert_config from .layers import ParallelLlamaDecoderLayer, ParallelLlamaRMSNorm, ParallelLlamaDecoderLayerRmPad + +from megatron.core.transformer import TransformerConfig """ TODO: 1. Add weight initialization. Here we need to be careful on TP weight init. diff --git a/verl/models/qwen2/megatron/checkpoint_utils/qwen2_loader.py b/verl/models/qwen2/megatron/checkpoint_utils/qwen2_loader.py index 87e1561a40..69b1f00ede 100644 --- a/verl/models/qwen2/megatron/checkpoint_utils/qwen2_loader.py +++ b/verl/models/qwen2/megatron/checkpoint_utils/qwen2_loader.py @@ -13,6 +13,7 @@ # limitations under the License. import torch +from torch.distributed.tensor import DTensor import time from typing import Dict, Any, Callable, Optional import torch.distributed as dist @@ -131,7 +132,7 @@ def _fetch_tp_shard_tensor(tensor, name, chunk_dim=0, mutate_func=None) -> torch full_weight = mutate_func(full_weight) tensor_chunk = torch.chunk(full_weight, tp_size, dim=chunk_dim) if tensor is not None: - tensor = tensor.data.copy_(tensor_chunk[tp_rank], non_blocking=True) + tensor = tensor.data.copy_(tensor_chunk[tp_rank].full_tensor(), non_blocking=True) else: print(f"tp_shard tensor:[{name}] not in state_dict, skip loading") @@ -150,8 +151,8 @@ def _fetch_tp_shard_tensor_gate_up(tensor, gate_name, up_name) -> torch.Tensor: device=torch.cuda.current_device()) for i in range(tp_size): intermediate_size_tp = config.intermediate_size // tp_size - gate_weight_tp = gate_weight[i * intermediate_size_tp:(i + 1) * intermediate_size_tp] - up_weight_tp = up_weight[i * intermediate_size_tp:(i + 1) * intermediate_size_tp] + gate_weight_tp = gate_weight[i * intermediate_size_tp:(i + 1) * intermediate_size_tp].full_tensor() + up_weight_tp = up_weight[i * intermediate_size_tp:(i + 1) * intermediate_size_tp].full_tensor() new_gate_up_weight[intermediate_size_tp * 2 * i:intermediate_size_tp * 2 * (i + 1)].copy_( torch.cat([gate_weight_tp, up_weight_tp], dim=0)) @@ -188,9 +189,9 @@ def _fetch_tp_shard_tensor_qkv(tensor, q_name, k_name, v_name, bias=False) -> to dtype=params_dtype, device=torch.cuda.current_device()) for i in range(tp_size): - q_part = full_weight_q[i * q_size_tp:(i + 1) * q_size_tp] - k_part = full_weight_k[i * kv_size_tp:(i + 1) * kv_size_tp] - v_part = full_weight_v[i * kv_size_tp:(i + 1) * kv_size_tp] + q_part = full_weight_q[i * q_size_tp:(i + 1) * q_size_tp].full_tensor() + k_part = full_weight_k[i * kv_size_tp:(i + 1) * kv_size_tp].full_tensor() + v_part = full_weight_v[i * kv_size_tp:(i + 1) * kv_size_tp].full_tensor() new_weight_qkv[i * total_size:(i + 1) * total_size].copy_(torch.cat([q_part, k_part, v_part], dim=0)) else: @@ -207,11 +208,11 @@ def _fetch_tp_shard_tensor_qkv(tensor, q_name, k_name, v_name, bias=False) -> to dtype=params_dtype, device=torch.cuda.current_device()) for i in range(tp_size): - q_part = full_weight_q[i * q_size_tp:(i + 1) * q_size_tp] + q_part = full_weight_q[i * q_size_tp:(i + 1) * q_size_tp].full_tensor() start_idx = i * config.num_key_value_heads // tp_size * hidden_size_per_head end_idx = (i * config.num_key_value_heads // tp_size + 1) * hidden_size_per_head - k_part = full_weight_k[start_idx:end_idx] - v_part = full_weight_v[start_idx:end_idx] + k_part = full_weight_k[start_idx:end_idx].full_tensor() + v_part = full_weight_v[start_idx:end_idx].full_tensor() new_weight_qkv[i * total_size:(i + 1) * total_size].copy_(torch.cat([q_part, k_part, v_part], dim=0)) tensor_chunk = torch.chunk(new_weight_qkv, tp_size, dim=0) diff --git a/verl/models/qwen2/megatron/layers/parallel_decoder.py b/verl/models/qwen2/megatron/layers/parallel_decoder.py index 84562b3bca..d0de51d295 100644 --- a/verl/models/qwen2/megatron/layers/parallel_decoder.py +++ b/verl/models/qwen2/megatron/layers/parallel_decoder.py @@ -29,7 +29,8 @@ from .parallel_mlp import ParallelQwen2MLP from .parallel_rmsnorm import ParallelQwen2RMSNorm -from verl.utils.megatron_utils import TransformerConfig, convert_config +from verl.utils.megatron_utils import convert_config +from megatron.core.transformer import TransformerConfig class ParallelQwen2DecoderLayer(nn.Module): diff --git a/verl/models/qwen2/megatron/modeling_qwen2_megatron.py b/verl/models/qwen2/megatron/modeling_qwen2_megatron.py index c15111bd6b..147cda86bb 100644 --- a/verl/models/qwen2/megatron/modeling_qwen2_megatron.py +++ b/verl/models/qwen2/megatron/modeling_qwen2_megatron.py @@ -34,8 +34,10 @@ from verl.utils.megatron import sequence_parallel as sp_utils from verl.utils.megatron import tensor_parallel as tp_utils -from verl.utils.megatron_utils import TransformerConfig, convert_config +from verl.utils.megatron_utils import convert_config from .layers import ParallelQwen2DecoderLayer, ParallelQwen2RMSNorm, ParallelQwen2DecoderLayerRmPad + +from megatron.core.transformer import TransformerConfig """ TODO: 1. Add weight initialization. Here we need to be careful on TP weight init. diff --git a/verl/single_controller/base/decorator.py b/verl/single_controller/base/decorator.py index 79d3aceae6..0f57f8e0a0 100644 --- a/verl/single_controller/base/decorator.py +++ b/verl/single_controller/base/decorator.py @@ -219,7 +219,7 @@ def collect_megatron_pp_as_dp(worker_group, output): output_in_dp = [] for global_rank in range(worker_group.world_size): local_rank_info = worker_group.get_megatron_rank_info(rank=global_rank) - if local_rank_info.tp_rank == 0 and local_rank_info.pp_rank == 0: + if local_rank_info.tp_rank == 0: output_in_dp.append(output[global_rank]) return output_in_dp diff --git a/verl/third_party/vllm/__init__.py b/verl/third_party/vllm/__init__.py index 34d61e7149..7e6c2f88a2 100644 --- a/verl/third_party/vllm/__init__.py +++ b/verl/third_party/vllm/__init__.py @@ -59,6 +59,7 @@ def get_version(pkg): from vllm import LLM from vllm.distributed import parallel_state + vllm_version = package_version else: if not is_sglang_available(): raise ValueError( diff --git a/verl/third_party/vllm/vllm_v_0_5_4/model_runner.py b/verl/third_party/vllm/vllm_v_0_5_4/model_runner.py index d6ab232558..89a1881c27 100644 --- a/verl/third_party/vllm/vllm_v_0_5_4/model_runner.py +++ b/verl/third_party/vllm/vllm_v_0_5_4/model_runner.py @@ -77,10 +77,10 @@ def __init__( load_config, lora_config, kv_cache_dtype, - is_driver_worker=True, # a hack - prompt_adapter_config=prompt_adapter_config, - multimodal_config=multimodal_config, - return_hidden_states=return_hidden_states) + True, # a hack + prompt_adapter_config, + multimodal_config, + return_hidden_states) # NOTE(sgm): add for verl self.model = model # this will be replaced by get_model() diff --git a/verl/third_party/vllm/vllm_v_0_6_3/model_runner.py b/verl/third_party/vllm/vllm_v_0_6_3/model_runner.py index b0cceffb52..aab103aa0d 100644 --- a/verl/third_party/vllm/vllm_v_0_6_3/model_runner.py +++ b/verl/third_party/vllm/vllm_v_0_6_3/model_runner.py @@ -87,12 +87,12 @@ def __init__( load_config, lora_config, kv_cache_dtype, - is_driver_worker=True, # a hack - prompt_adapter_config=prompt_adapter_config, - return_hidden_states=return_hidden_states, - observability_config=observability_config, - input_registry=input_registry, - mm_registry=mm_registry, + True, # a hack + prompt_adapter_config, + return_hidden_states, + observability_config, + input_registry, + mm_registry, ) # NOTE(sgm): add for verl diff --git a/verl/trainer/config/ppo_megatron_trainer.yaml b/verl/trainer/config/ppo_megatron_trainer.yaml index 32e864c2ef..8d70d03006 100644 --- a/verl/trainer/config/ppo_megatron_trainer.yaml +++ b/verl/trainer/config/ppo_megatron_trainer.yaml @@ -64,7 +64,7 @@ actor_rollout_ref: total_training_steps: -1 # must be override by program weight_decay: 0.01 megatron: - tensor_model_parallel_size: 4 + tensor_model_parallel_size: 1 pipeline_model_parallel_size: 1 virtual_pipeline_model_parallel_size: null # change VPP interface for parallelism tests context_parallel_size: 1 @@ -76,7 +76,7 @@ actor_rollout_ref: contents: ['model', 'optimizer', 'extra'] # with 'hf_model' you can save whole model as hf format, now only use sharded model checkpoint to save space ref: megatron: - tensor_model_parallel_size: 4 + tensor_model_parallel_size: 1 pipeline_model_parallel_size: 1 virtual_pipeline_model_parallel_size: null # change VPP interface for parallelism tests context_parallel_size: 1 @@ -101,7 +101,7 @@ actor_rollout_ref: enforce_eager: True free_cache_engine: True load_format: dummy_megatron - tensor_model_parallel_size: 2 + tensor_model_parallel_size: 1 max_num_batched_tokens: 8192 max_model_len: null max_num_seqs: 1024 @@ -147,7 +147,7 @@ critic: activations_checkpoint_granularity: null activations_checkpoint_num_layers: null megatron: - tensor_model_parallel_size: 4 + tensor_model_parallel_size: 1 pipeline_model_parallel_size: 1 virtual_pipeline_model_parallel_size: null # change VPP interface for parallelism tests context_parallel_size: 1 @@ -172,7 +172,7 @@ reward_model: enable: False strategy: megatron megatron: - tensor_model_parallel_size: 4 + tensor_model_parallel_size: 1 pipeline_model_parallel_size: 1 virtual_pipeline_model_parallel_size: null # change VPP interface for parallelism tests context_parallel_size: 1 diff --git a/verl/utils/checkpoint/megatron_checkpoint_manager.py b/verl/utils/checkpoint/megatron_checkpoint_manager.py index 0baf64140c..c398bf39a5 100644 --- a/verl/utils/checkpoint/megatron_checkpoint_manager.py +++ b/verl/utils/checkpoint/megatron_checkpoint_manager.py @@ -27,7 +27,7 @@ from verl.models.weight_loader_registry import get_weight_saver from verl.models.weight_loader_registry import get_weight_loader from verl.utils.model import load_megatron_model_weights -from verl.utils.megatron_utils import TransformerConfig, get_model_checkpoint_path, get_hf_model_checkpoint_path, get_optimizer_checkpoint_path, get_rng_states_checkpoint_path, unwrap_model +from verl.utils.megatron_utils import get_model_checkpoint_path, get_hf_model_checkpoint_path, get_optimizer_checkpoint_path, get_rng_states_checkpoint_path, unwrap_model from .checkpoint_manager import BaseCheckpointManager from transformers import AutoModelForCausalLM @@ -36,6 +36,7 @@ from megatron.core.dist_checkpointing.mapping import ShardedObject from megatron.core.transformer.module import Float16Module from megatron.core.distributed import DistributedDataParallel as LocalDDP +from megatron.core.transformer import TransformerConfig class MegatronCheckpointManager(BaseCheckpointManager): @@ -175,7 +176,7 @@ def load_optimizer(self, ckpt_path): def load_rng_states(self, ckpt_path, data_parallel_random_init=False, use_dist_ckpt=False): rng_state_path = get_rng_states_checkpoint_path(ckpt_path) print(f"Loading rng states from {rng_state_path}") - rng_state = torch.load(rng_state_path) + rng_state = torch.load(rng_state_path, weights_only=False) # access rng_state for data parallel rank if not use_dist_ckpt: if data_parallel_random_init: @@ -198,7 +199,7 @@ def load_checkpoint(self, local_path: str, hdfs_path: str = None, del_local_afte if 'model' in self.checkpoint_contents: model_path = get_model_checkpoint_path(local_path) ckpt_name = self.get_checkpoint_name(model_path, return_base_dir=False) - state_dicts = torch.load(os.path.join(ckpt_name)) + state_dicts = torch.load(os.path.join(ckpt_name), weights_only=False) assert len(state_dicts) == len( self.model), f'state_dicts length: {len(state_dicts)} mismatch with model length: {len(self.model)}' for vpp_rank, (state_dict, model) in enumerate(zip(state_dicts, self.model)): @@ -301,9 +302,9 @@ def save_checkpoint(self, local_path: str, hdfs_path: str = None, global_step: i torch.distributed.barrier() rng_state_path = get_rng_states_checkpoint_path(local_path) - rng_state = self.get_rng_state() - torch.save(rng_state, rng_state_path) - if self.rank == 0: - print(f"saving rng states to {rng_state_path}") + if self.rank % torch.cuda.device_count() == 0: + rng_state = self.get_rng_state() + torch.save(rng_state, rng_state_path) + print(f"Rank {self.rank} saving rng states to {rng_state_path}") self.previous_saved_paths.append(local_path) diff --git a/verl/utils/debug/performance.py b/verl/utils/debug/performance.py index 615475a66a..e64145d134 100644 --- a/verl/utils/debug/performance.py +++ b/verl/utils/debug/performance.py @@ -17,7 +17,7 @@ import logging -def log_gpu_memory_usage(head: str, logger: logging.Logger = None, level=logging.DEBUG, rank: int = 0): +def log_gpu_memory_usage(head: str, logger: logging.Logger = None, level=logging.INFO, rank: int = 0): if (not dist.is_initialized()) or (rank is None) or (dist.get_rank() == rank): memory_allocated = torch.cuda.memory_allocated() / 1024**3 memory_reserved = torch.cuda.memory_reserved() / 1024**3 diff --git a/verl/utils/fsdp_utils.py b/verl/utils/fsdp_utils.py index b3f5b73534..4929c67646 100644 --- a/verl/utils/fsdp_utils.py +++ b/verl/utils/fsdp_utils.py @@ -30,7 +30,7 @@ def init_fn(x: torch.nn.Module): - if not torch.distributed.get_rank() == 0: + if torch.distributed.get_rank() != 0: x = x.to_empty(device=torch.cuda.current_device(), recurse=False) torch.cuda.empty_cache() return x diff --git a/verl/utils/import_utils.py b/verl/utils/import_utils.py index 989eb79a4a..000947c304 100644 --- a/verl/utils/import_utils.py +++ b/verl/utils/import_utils.py @@ -18,33 +18,34 @@ from functools import cache from typing import List, Optional +import importlib @cache def is_megatron_core_available(): try: - from megatron.core import parallel_state as mpu - return True - except ImportError: - return False + mcore_spec = importlib.util.find_spec('megatron.core') + except ModuleNotFoundError: + mcore_spec = None + return mcore_spec is not None @cache def is_vllm_available(): try: - import vllm - return True - except ImportError: - return False + vllm_spec = importlib.util.find_spec('vllm') + except ModuleNotFoundError: + vllm_spec = None + return vllm_spec is not None @cache def is_sglang_available(): try: - import sglang - return True - except ImportError: - return False + sglang_spec = importlib.util.find_spec('sglang') + except ModuleNotFoundError: + sglang_spec = None + return sglang_spec is not None def import_external_libs(external_libs=None): @@ -77,4 +78,4 @@ def load_extern_type(file_path: Optional[str], type_name: Optional[str]): if not hasattr(module, type_name): raise AttributeError(f"Custom type '{type_name}' not found in '{file_path}'.") - return getattr(module, type_name) \ No newline at end of file + return getattr(module, type_name) diff --git a/verl/utils/megatron/pipeline_parallel.py b/verl/utils/megatron/pipeline_parallel.py index 3a3790bb1a..2aaa853f2c 100644 --- a/verl/utils/megatron/pipeline_parallel.py +++ b/verl/utils/megatron/pipeline_parallel.py @@ -14,7 +14,6 @@ # limitations under the License. import torch -from megatron.core import parallel_state as mpu from .sequence_parallel import pad_to_sequence_parallel @@ -23,6 +22,7 @@ def compute_transformers_input_shapes(batches, meta_info): from flash_attn.bert_padding import unpad_input # flash 2 is a must for Megatron # pre-compute input shapes for each micro-batch at each pp stage input_shapes = [] + from megatron.core import mpu for model_inputs in batches: input_ids = model_inputs['input_ids'] attention_mask = model_inputs['attention_mask'] diff --git a/verl/utils/megatron/sequence_parallel.py b/verl/utils/megatron/sequence_parallel.py index 4b76cb295e..e2a1bc3aef 100644 --- a/verl/utils/megatron/sequence_parallel.py +++ b/verl/utils/megatron/sequence_parallel.py @@ -15,7 +15,6 @@ import torch import torch.nn.functional as F -from megatron.core import parallel_state as mpu def mark_parameter_as_sequence_parallel(parameter): @@ -36,6 +35,7 @@ def pad_to_sequence_parallel(unpad_tokens: torch.Tensor): """ total_nnz = unpad_tokens.shape[0] + from megatron.core import mpu sp_world_size = mpu.get_tensor_model_parallel_world_size() if total_nnz % sp_world_size == 0: diff --git a/verl/utils/megatron/tensor_parallel.py b/verl/utils/megatron/tensor_parallel.py index 63a22fb3b4..4fc8603460 100644 --- a/verl/utils/megatron/tensor_parallel.py +++ b/verl/utils/megatron/tensor_parallel.py @@ -19,12 +19,11 @@ import torch from torch.nn import init import torch.distributed as dist -from megatron.core import ModelParallelConfig -from megatron.core import parallel_state as mpu, tensor_parallel import verl.utils.torch_functional as verl_F -def update_kwargs_with_config(dictionary: Dict, config: ModelParallelConfig): +def update_kwargs_with_config(dictionary: Dict, config): + # config: ModelParallelConfig dictionary['config'] = config return dictionary @@ -41,6 +40,7 @@ def get_default_kwargs_for_model_parallel_config(): def get_default_model_parallel_config(): + from megatron.core import ModelParallelConfig return ModelParallelConfig(**get_default_kwargs_for_model_parallel_config()) @@ -61,6 +61,7 @@ def get_default_kwargs_for_column_parallel_linear(): 'async_tensor_model_parallel_allreduce': False, } model_parallel_config_kwargs.update(column_parallel_config_kwargs) + from megatron.core import ModelParallelConfig column_default_kwargs = { 'config': ModelParallelConfig(**model_parallel_config_kwargs), } @@ -76,6 +77,7 @@ def get_default_kwargs_for_row_parallel_linear(): def get_default_kwargs_for_parallel_embedding(): model_parallel_config_kwargs = get_default_kwargs_for_model_parallel_config() + from megatron.core import ModelParallelConfig embedding_default_kwargs = { 'init_method': init.xavier_normal_, 'config': ModelParallelConfig(**model_parallel_config_kwargs), @@ -106,6 +108,7 @@ def forward(ctx, vocab_parallel_logits: torch.Tensor) -> torch.Tensor: def mul_reduce(a, b): return (a * b).sum(dim=-1, keepdim=True) + from megatron.core import parallel_state as mpu logits_max = vocab_parallel_logits.max(dim=-1, keepdim=True).values dist.all_reduce(logits_max, op=dist.ReduceOp.MAX, group=mpu.get_tensor_model_parallel_group()) normalized_vocab_parallel_logits = vocab_parallel_logits - logits_max @@ -146,6 +149,7 @@ def vocab_parallel_entropy(vocab_parallel_logits: torch.Tensor) -> torch.Tensor: def vocab_parallel_log_probs_from_logits(logits, labels): """TODO(zhangchi.usc1992): We may change the implementation later""" + from megatron.core import tensor_parallel return -tensor_parallel.vocab_parallel_cross_entropy(vocab_parallel_logits=logits, target=labels) diff --git a/verl/utils/megatron_utils.py b/verl/utils/megatron_utils.py index d9aa76368d..cf7599cfbd 100644 --- a/verl/utils/megatron_utils.py +++ b/verl/utils/megatron_utils.py @@ -19,16 +19,6 @@ import torch import torch.nn as nn import torch.nn.functional as F -from megatron.core import ModelParallelConfig -from megatron.core import mpu, tensor_parallel -from megatron.core.distributed import DistributedDataParallel as DDP -from megatron.core.distributed import DistributedDataParallelConfig -from megatron.core.enums import ModelType -from megatron.core.optimizer import OptimizerConfig -from megatron.core.transformer import TransformerConfig -from megatron.core.transformer.enums import AttnBackend -from megatron.core.transformer.module import Float16Module -from megatron.core.utils import get_attr_wrapped_model from omegaconf import DictConfig from verl.utils.memory_buffer import build_memory_reference_from_module @@ -36,15 +26,21 @@ def get_model_config(model): + from megatron.core.utils import get_attr_wrapped_model return get_attr_wrapped_model(model, 'config', allow_none=False) -def get_model(model_provider_func, - model_type=ModelType.encoder_or_decoder, - wrap_with_ddp=True, - use_distributed_optimizer=True): +def get_model(model_provider_func, model_type=None, wrap_with_ddp=True, use_distributed_optimizer=True): """Build the model.""" # Build model. + from megatron.core import mpu, tensor_parallel + from megatron.core.distributed import DistributedDataParallel as DDP + from megatron.core.distributed import DistributedDataParallelConfig + from megatron.core.enums import ModelType + from megatron.core.transformer import TransformerConfig + from megatron.core.transformer.module import Float16Module + if model_type is None: + model_type = ModelType.encoder_or_decoder if mpu.get_pipeline_model_parallel_world_size() > 1 and \ mpu.get_virtual_pipeline_model_parallel_world_size() is not None: assert model_type != ModelType.encoder_and_decoder, \ @@ -133,10 +129,11 @@ def get_model(model_provider_func, return model -ALL_MODULE_WRAPPER_CLASSNAMES = (DDP, Float16Module) - - -def unwrap_model(model, module_instances=ALL_MODULE_WRAPPER_CLASSNAMES): +def unwrap_model(model, module_instances=None): + if module_instances is None: + from megatron.core.distributed import DistributedDataParallel as DDP + from megatron.core.transformer.module import Float16Module + module_instances = (DDP, Float16Module) return_list = True if not isinstance(model, list): model = [model] @@ -154,7 +151,7 @@ def unwrap_model(model, module_instances=ALL_MODULE_WRAPPER_CLASSNAMES): from transformers import PretrainedConfig -def convert_config(hf_config: PretrainedConfig, megatron_config) -> TransformerConfig: +def convert_config(hf_config: PretrainedConfig, megatron_config): print(f'megatron config {megatron_config}') dt = PrecisionType.to_dtype(megatron_config.params_dtype) print(f'pipeline_dtype=megatron_config {dt}') @@ -162,9 +159,12 @@ def convert_config(hf_config: PretrainedConfig, megatron_config) -> TransformerC qkv_bias = True else: qkv_bias = getattr(hf_config, 'attention_bias', False) + from megatron.core import mpu overlap_p2p_comm = mpu.get_virtual_pipeline_model_parallel_world_size( ) is not None and mpu.get_virtual_pipeline_model_parallel_world_size() > 1 batch_p2p_comm = False + from megatron.core.transformer import TransformerConfig + from megatron.core.transformer.enums import AttnBackend transformer_config = TransformerConfig( num_layers=hf_config.num_hidden_layers, hidden_size=hf_config.hidden_size, @@ -200,7 +200,8 @@ def convert_config(hf_config: PretrainedConfig, megatron_config) -> TransformerC return transformer_config -def init_megatron_optim_config(optim_config: Dict) -> OptimizerConfig: +def init_megatron_optim_config(optim_config: Dict): + from megatron.core.optimizer import OptimizerConfig config = OptimizerConfig( optimizer='adam', lr=optim_config.get('lr'), @@ -216,7 +217,9 @@ def init_megatron_optim_config(optim_config: Dict) -> OptimizerConfig: def mcore_model_parallel_config( sequence_parallel: bool, params_dtype: torch.dtype, -) -> ModelParallelConfig: +): + from megatron.core import mpu + from megatron.core import ModelParallelConfig return ModelParallelConfig( tensor_model_parallel_size=mpu.get_tensor_model_parallel_world_size(), pipeline_model_parallel_size=mpu.get_pipeline_model_parallel_world_size(), @@ -231,6 +234,7 @@ def mcore_model_parallel_config( def offload_megatron_param_and_grad(module_list: nn.ModuleList, offload_grad=False, hybrid_engine=None): + from megatron.core import mpu if hybrid_engine is not None: pp_rank = mpu.get_pipeline_model_parallel_rank() for buffer in hybrid_engine.memory_buffers[pp_rank].values(): @@ -246,6 +250,7 @@ def offload_megatron_param_and_grad(module_list: nn.ModuleList, offload_grad=Fal def load_megatron_param_and_grad(module_list: nn.ModuleList, device_id, load_grad=False, hybrid_engine=None): + from megatron.core import mpu if hybrid_engine is not None: pp_rank = mpu.get_pipeline_model_parallel_rank() for buffer in hybrid_engine.memory_buffers[pp_rank].values(): @@ -283,6 +288,7 @@ def get_optimizer_checkpoint_path(checkpoint_path, use_distributed_optimizer=Tru os.makedirs(os.path.join(checkpoint_path, "optim"), exist_ok=True) if not use_distributed_optimizer: return os.path.join(checkpoint_path, "optim", "optim.pt") + from megatron.core import mpu pp_rank = mpu.get_pipeline_model_parallel_rank() tp_rank = mpu.get_tensor_model_parallel_rank() cp_rank = mpu.get_context_parallel_rank() @@ -291,9 +297,144 @@ def get_optimizer_checkpoint_path(checkpoint_path, use_distributed_optimizer=Tru return os.path.join(checkpoint_path, f"optim", f"distrib_optim_pp{pp_rank}_tp{tp_rank}_cp{cp_rank}_dp{dp_rank}.pt") -def get_rng_states_checkpoint_path(checkpoint_path, data_parallel_random_init=False): +def get_rng_states_checkpoint_path(checkpoint_path, only_rank0_save=True): + # save rng states cause interrupts os.makedirs(os.path.join(checkpoint_path, "rng_states"), exist_ok=True) - if not data_parallel_random_init: + if only_rank0_save: return os.path.join(checkpoint_path, f'rng_states', "rng_states.pt") + from megatron.core import mpu + pp_rank = mpu.get_pipeline_model_parallel_rank() + tp_rank = mpu.get_tensor_model_parallel_rank() + cp_rank = mpu.get_context_parallel_rank() dp_rank = mpu.get_data_parallel_rank() - return os.path.join(checkpoint_path, f'rng_states', f"rng_states_{dp_rank}.pt") + return os.path.join(checkpoint_path, f'rng_states', + f"rng_states_pp{pp_rank}_tp{tp_rank}_cp{cp_rank}_dp{dp_rank}.pt") + + +def convert_megatron_model_to_transformers_model(name, + param, + config: PretrainedConfig, + tp_size: int, + num_query_groups: int, + convert_qkv_gate_up_by_trunk_concat=False): + """Convert megatron model to transformers model.""" + new_params = {} + + def convert_qkv_shard(full_tensor, q_name, k_name, v_name): + nonlocal config + nonlocal tp_size + nonlocal num_query_groups + + q_shard_list = [] + k_shard_list = [] + v_shard_list = [] + hidden_size_per_head = config.hidden_size // config.num_attention_heads + + if config.num_key_value_heads >= tp_size: + q_size_tp = config.hidden_size // tp_size + kv_size_tp = hidden_size_per_head * config.num_key_value_heads // tp_size + total_size = q_size_tp + 2 * kv_size_tp + for i in range(tp_size): + num_query_groups_per_partition = num_query_groups // tp_size + qkv_part = full_tensor[i * total_size:(i + 1) * total_size] + q_size_chunk = q_size_tp // num_query_groups_per_partition + kv_size_chunk = kv_size_tp // num_query_groups_per_partition + for qkv_part_chunk in qkv_part.chunk(num_query_groups_per_partition): + q_part = qkv_part_chunk[:q_size_chunk] + k_part = qkv_part_chunk[q_size_chunk:q_size_chunk + kv_size_chunk] + v_part = qkv_part_chunk[q_size_chunk + kv_size_chunk:] + q_shard_list.append(q_part) + k_shard_list.append(k_part) + v_shard_list.append(v_part) + else: + q_size_tp = config.hidden_size // tp_size + kv_size_tp = hidden_size_per_head + total_size = q_size_tp + 2 * kv_size_tp + for i in range(tp_size): + num_query_groups_per_partition = num_query_groups // tp_size + qkv_part = full_tensor[i * total_size:(i + 1) * total_size] + q_size_chunk = q_size_tp // num_query_groups_per_partition + kv_size_chunk = kv_size_tp // num_query_groups_per_partition + for qkv_part_chunk in qkv_part.chunk(num_query_groups_per_partition): + q_part = qkv_part_chunk[:q_size_chunk] + k_part = qkv_part_chunk[q_size_chunk:q_size_chunk + kv_size_chunk] + v_part = qkv_part_chunk[q_size_chunk + kv_size_chunk:] + q_shard_list.append(q_part) + if i * config.num_key_value_heads % tp_size == 0: + k_shard_list.append(k_part) + v_shard_list.append(v_part) + + new_params[q_name] = torch.cat(q_shard_list, dim=0) + new_params[k_name] = torch.cat(k_shard_list, dim=0) + new_params[v_name] = torch.cat(v_shard_list, dim=0) + + def convert_gate_up_shard(full_tensor, gate_name, up_name): + nonlocal config + nonlocal tp_size + + intermediate_size_tp = config.intermediate_size // tp_size + gate_weight_list = [] + up_weight_list = [] + for i in range(tp_size): + gate_up_weight_tp = full_tensor[intermediate_size_tp * 2 * i:intermediate_size_tp * 2 * (i + 1)] + gate_weight_tp = gate_up_weight_tp[:intermediate_size_tp] + up_weight_tp = gate_up_weight_tp[intermediate_size_tp:] + gate_weight_list.append(gate_weight_tp) + up_weight_list.append(up_weight_tp) + + new_params[gate_name] = torch.cat(gate_weight_list, dim=0) + new_params[up_name] = torch.cat(up_weight_list, dim=0) + + if name == 'embedding.word_embeddings.weight': + new_params['model.embed_tokens.weight'] = param + elif 'self_attention' in name: + splitted_name = name.split('.') + layer_number = splitted_name[2] + component = splitted_name[4] + param_type = splitted_name[5] + if component == 'linear_proj': + new_params[f'model.layers.{layer_number}.self_attn.o_proj.weight'] = param + elif component == 'linear_qkv' and not isinstance(param, list): + if param_type == 'layer_norm_weight': + new_params[f'model.layers.{layer_number}.input_layernorm.weight'] = param + else: + if convert_qkv_gate_up_by_trunk_concat: + convert_qkv_shard(param, f'model.layers.{layer_number}.self_attn.q_proj.{param_type}', + f'model.layers.{layer_number}.self_attn.k_proj.{param_type}', + f'model.layers.{layer_number}.self_attn.v_proj.{param_type}') + else: + new_params[f'model.layers.{layer_number}.self_attn.qkv_proj.{param_type}'] = param + else: + assert isinstance(param, list) and len(param) == 3 + assert param_type == 'weight' or param_type == 'bias' + new_params[f'model.layers.{layer_number}.self_attn.q_proj.{param_type}'] = param[0] + new_params[f'model.layers.{layer_number}.self_attn.k_proj.{param_type}'] = param[1] + new_params[f'model.layers.{layer_number}.self_attn.v_proj.{param_type}'] = param[2] + elif 'mlp' in name: + splitted_name = name.split('.') + layer_number = splitted_name[2] + component = splitted_name[4] + param_type = splitted_name[5] + if component == 'linear_fc1' and not isinstance(param, list): + if param_type == 'layer_norm_weight': + new_params[f'model.layers.{layer_number}.post_attention_layernorm.weight'] = param + elif param_type == 'weight': + if convert_qkv_gate_up_by_trunk_concat: + convert_gate_up_shard(param, f'model.layers.{layer_number}.mlp.gate_proj.weight', + f'model.layers.{layer_number}.mlp.up_proj.weight') + else: + new_params[f'model.layers.{layer_number}.mlp.gate_up_proj.weight'] = param + elif component == 'linear_fc1' and isinstance(param, list): + assert len(param) == 2 + assert param_type == 'weight' or param_type == 'bias' + new_params[f'model.layers.{layer_number}.mlp.gate_proj.weight'] = param[0] + new_params[f'model.layers.{layer_number}.mlp.up_proj.weight'] = param[1] + elif component == 'linear_fc2': + new_params[f'model.layers.{layer_number}.mlp.down_proj.weight'] = param + elif name == "decoder.final_layernorm.weight": + new_params['model.norm.weight'] = param + elif name == "output_layer.weight": + new_params["lm_head.weight"] = param + else: + raise ValueError(f"Unknown param name: {name}") + return new_params.keys(), new_params.values() diff --git a/verl/utils/model.py b/verl/utils/model.py index 6af6542ba8..e71567687e 100644 --- a/verl/utils/model.py +++ b/verl/utils/model.py @@ -200,7 +200,7 @@ def normalize_pp_vpp_params(params, num_hidden_layers, layer_name='layers'): Normalize the pp vpp params into a complete named parameters. This is useful when gather parameters from pp ranks and passed to a model without pp - params: List[List[Dict[str, param]]] + params: Iterable[List[Dict[str, param]]] params contains a list of pp, with a list of vpp named_parameters in each vpp chunk. output: Dict[str, param] @@ -237,15 +237,12 @@ def normalize_model_name(name, pp_rank, vpp_rank, pp_size, vpp_size, num_layers) return name pp_size = len(params) - normalized_name_to_param = {} for pp_rank in range(len(params)): vpp_size = len(params[pp_rank]) for vpp_rank in range(vpp_size): for name, param in params[pp_rank][vpp_rank].items(): normalized_name = normalize_model_name(name, pp_rank, vpp_rank, pp_size, vpp_size, num_hidden_layers) - normalized_name_to_param[normalized_name] = param - - return normalized_name_to_param + yield normalized_name, param def get_parallel_model_from_config(config, diff --git a/verl/workers/actor/megatron_actor.py b/verl/workers/actor/megatron_actor.py index 2d0adc807f..21a460cfd7 100644 --- a/verl/workers/actor/megatron_actor.py +++ b/verl/workers/actor/megatron_actor.py @@ -27,16 +27,7 @@ import torch from torch import nn import torch.distributed -from megatron.core.optimizer import OptimizerConfig -from megatron.core import parallel_state as mpu -from megatron.core import ModelParallelConfig from verl.utils.megatron_utils import get_model_config -from megatron.core.pipeline_parallel import get_forward_backward_func - -from megatron.core.distributed import finalize_model_grads -# from megatron.core.optimizer import DistributedOptimizer - -from megatron.core.optimizer import DistributedOptimizer from omegaconf import OmegaConf from verl.utils.megatron.tensor_parallel import vocab_parallel_entropy, vocab_parallel_log_probs_from_logits @@ -52,8 +43,8 @@ class MegatronPPOActor(BasePPOActor): - def __init__(self, config, model_config, megatron_config: ModelParallelConfig, actor_module: nn.ModuleList, - actor_optimizer: DistributedOptimizer, actor_optimizer_config: OptimizerConfig): + def __init__(self, config, model_config, megatron_config, actor_module: nn.ModuleList, actor_optimizer, + actor_optimizer_config): """MeagtronPPOActor class. This class implements the simple PPO logics when the model is built with Megatron. Args: @@ -77,6 +68,10 @@ def __init__(self, config, model_config, megatron_config: ModelParallelConfig, a ``sequence_parallel_enabled``: whether the sequence parallel is enabled. ``param_dtype``: the dtype of the parameters. + + ``actor_optimizer``: megatron.core.optimizer.DistributedOptimizer + + ``actor_optimizer_config``: megatron.core.optimizer.OptimizerConfig ``virtual_pipeline_model_parallel_size``: virtual pipeline model parallel size. a.k.a number of chunks in each pp stage. actor_module (nn.ModuleList): actor module is a ModuleList that contains a list of nn.Module in this pp stage. @@ -116,6 +111,7 @@ def __init__(self, config, model_config, megatron_config: ModelParallelConfig, a self.model_config = model_config self.megatron_config = megatron_config self.actor_module = actor_module + from megatron.core.optimizer import DistributedOptimizer self.actor_optimizer: DistributedOptimizer = actor_optimizer self.actor_optimizer_config = actor_optimizer_config @@ -133,6 +129,7 @@ def __init__(self, config, model_config, megatron_config: ModelParallelConfig, a config = get_model_config(self.actor_module[0]) print(config) + from megatron.core.distributed import finalize_model_grads config.finalize_model_grads_func = finalize_model_grads def _validate_config(self, config) -> None: @@ -180,6 +177,7 @@ def compute_logprobs_fn(output, data): response_length = response.size(1) with torch.no_grad(): output = self.forward_backward_batch(data, forward_only=True, post_process_fn=compute_logprobs_fn) + from megatron.core import mpu if mpu.is_pipeline_last_stage(ignore_virtual=True): # only on last rank. It should be on every tp rank log_probs = torch.cat([o['log_probs'] for o in output], dim=0) # (bs, seq_size) @@ -238,6 +236,7 @@ def forward_backward_batch(self, data: DataProto, forward_only=False, post_proce """ # broadcast from last pp rank to all other pp ranks # TODO: actually, we just need to control the sampling order. + from megatron.core import mpu broadcast_dict_tensor(data.batch, src=mpu.get_pipeline_model_parallel_last_rank(), group=mpu.get_pipeline_model_parallel_group()) @@ -259,6 +258,7 @@ def forward_backward_batch(self, data: DataProto, forward_only=False, post_proce n_micro_batch = len(batches) seq_len = batches[0]['input_ids'].shape[1] + from megatron.core.pipeline_parallel import get_forward_backward_func forward_backward_func = get_forward_backward_func() def loss_func(output, data, meta_info): @@ -287,7 +287,9 @@ def loss_func(output, data, meta_info): logits = logits[:, -response_length - 1:-1].contiguous() logits_back = logits.clone() log_prob = vocab_parallel_log_probs_from_logits(logits, responses) + del logits logits = logits_back + logits_back = logits.clone() pg_loss, pg_clipfrac, ppo_kl, pg_clipfrac_lower = compute_policy_loss(old_log_prob=old_log_prob, log_prob=log_prob, advantages=advantages, @@ -297,6 +299,8 @@ def loss_func(output, data, meta_info): cliprange_high=clip_ratio_high, clip_ratio_c=clip_ratio_c, loss_agg_mode=loss_agg_mode) + del logits + logits = logits_back entropy = vocab_parallel_entropy(logits) entropy_loss = agg_loss(loss_mat=entropy, loss_mask=response_mask, loss_agg_mode=loss_agg_mode) policy_loss = pg_loss - entropy_loss * entropy_coeff diff --git a/verl/workers/critic/megatron_critic.py b/verl/workers/critic/megatron_critic.py index 55076075c5..a179d3efa7 100644 --- a/verl/workers/critic/megatron_critic.py +++ b/verl/workers/critic/megatron_critic.py @@ -33,17 +33,17 @@ from verl.utils.torch_dtypes import PrecisionType from verl.utils.torch_functional import masked_mean, broadcast_dict_tensor, split_dict_tensor_into_batches from verl.utils.megatron import sequence_parallel as sp_utils -from megatron.core.optimizer import OptimizerConfig - -from megatron.core import parallel_state as mpu -from megatron.core.pipeline_parallel import get_forward_backward_func -from megatron.core.optimizer import DistributedOptimizer class MegatronPPOCritic(BasePPOCritic): - def __init__(self, config, model_config, megatron_config, critic_module: nn.ModuleList, - critic_optimizer: DistributedOptimizer, critic_optimizer_config: OptimizerConfig): + def __init__(self, config, model_config, megatron_config, critic_module: nn.ModuleList, critic_optimizer, + critic_optimizer_config): + """ + Args: + ``critic_optimizer``: megatron.core.optimizers.DistributedOptimizer + ``critic_optimizer_config``: megatron.core.optimizer.OptimizerConfig + """ super().__init__(config=config) self._validate_config(config) self.model_config = model_config @@ -77,6 +77,7 @@ def compute_values(self, data: DataProto) -> DataProto: response_length = responses.size(1) with torch.no_grad(): output = self.forward_backward_batch(data=data, forward_only=True) + from megatron.core import mpu if mpu.is_pipeline_last_stage(ignore_virtual=True): # only on last rank. It should be on every tp rank values = torch.cat([o['vpreds'] for o in output], dim=0) # (bs, seq_size, vocal_size) @@ -109,6 +110,7 @@ def make_minibatch_iterator(self, data: DataProto) -> Iterable[DataProto]: def forward_backward_batch(self, data: DataProto, forward_only=False): # broadcast from last pp rank to all other pp ranks data.batch = data.batch.contiguous() + from megatron.core import mpu broadcast_dict_tensor(data.batch, src=mpu.get_pipeline_model_parallel_last_rank(), group=mpu.get_pipeline_model_parallel_group()) @@ -126,6 +128,7 @@ def forward_backward_batch(self, data: DataProto, forward_only=False): 'hidden_size': self.model_config.hidden_size }) + from megatron.core.pipeline_parallel import get_forward_backward_func forward_backward_func = get_forward_backward_func() def loss_func(output, data, meta_info): diff --git a/verl/workers/megatron_workers.py b/verl/workers/megatron_workers.py index d094a9d7d5..bbee8a8f19 100644 --- a/verl/workers/megatron_workers.py +++ b/verl/workers/megatron_workers.py @@ -35,16 +35,12 @@ from verl.utils.debug import log_gpu_memory_usage from verl.utils.model import load_megatron_model_weights, load_megatron_gptmodel_weights from verl.utils.flops_counter import FlopsCounter -from verl.utils.checkpoint.megatron_checkpoint_manager import MegatronCheckpointManager from verl.utils.megatron_utils import mcore_model_parallel_config from verl.utils.megatron_utils import offload_megatron_param_and_grad, load_megatron_param_and_grad from verl.utils import hf_tokenizer from codetiming import Timer -from megatron.core import parallel_state as mpu -from megatron.core import ModelParallelConfig - logger = logging.getLogger(__file__) logger.setLevel(os.getenv('VERL_PPO_LOGGING_LEVEL', 'WARN')) @@ -75,6 +71,8 @@ def __init__(self, config: DictConfig, role: str): super().__init__() self.config = config + from megatron.core import mpu + # NOTE(sgm): We utilize colocate WorkerGroup by default. # As a result, Workers for different model share the same process. # Therefore, we only require one distribute initialization. @@ -135,7 +133,7 @@ def __init__(self, config: DictConfig, role: str): def _build_model_optimizer(self, model_path, - megatron_config: ModelParallelConfig, + megatron_config, optim_config, override_model_config, enable_gradient_checkpointing=False): @@ -212,7 +210,7 @@ def megatron_actor_model_provider(pre_process, post_process): if self.rank == 0: print_model_size(actor_module[0]) - log_gpu_memory_usage('After AllGatherPPModel init', logger=logger) + log_gpu_memory_usage('After AllGatherPPModel init', logger=None) elif self._is_ref: print(f'self.config.ref.load_weight: {self.config.ref.load_weight}') ref_module = get_model(model_provider_func=megatron_actor_model_provider, @@ -229,7 +227,7 @@ def megatron_actor_model_provider(pre_process, post_process): ref_module, params_dtype=megatron_config.params_dtype, is_value_model=False) - log_gpu_memory_usage('After ref module init', logger=logger) + log_gpu_memory_usage('After ref module init', logger=None) return ref_module, actor_model_config # TODO: add more optimizer args into config @@ -240,15 +238,16 @@ def megatron_actor_model_provider(pre_process, post_process): optim_config = None actor_optimizer = None - log_gpu_memory_usage('After actor optimizer init', logger=logger) + log_gpu_memory_usage('After actor optimizer init', logger=None) return actor_module, hybrid_engine, actor_optimizer, actor_model_config, optim_config - def _build_rollout(self): + def _build_rollout(self, trust_remote_code=False): if self.config.rollout.name == 'vllm': from verl.workers.rollout.vllm_rollout import vLLMRollout, vllm_mode from verl.workers.sharding_manager import MegatronVLLMShardingManager from verl.utils.model import normalize_pp_vpp_params + from torch.distributed.device_mesh import init_device_mesh # NOTE(sgm): If the QKV and gate_up projection layer are concate together in actor, # we will reorganize their weight format when resharding from actor to rollout. @@ -257,31 +256,35 @@ def _build_rollout(self): "gate_proj_layer_name": "linear_fc1.weight", } - # reshard the weight partition from actor to rollout to initialize the rollout class - # create a new cuda space for parameters not in this pp rank - self.hybrid_engine.load_params_to_cuda() - # broadcast the parameters from pp rank to other ranks - self.hybrid_engine.allgather_params() - # obtain name to parameters in pp/vpp - params = self.hybrid_engine.get_all_params() - # update the param name for the - params = normalize_pp_vpp_params(params=params, - num_hidden_layers=self.actor_model_config.num_hidden_layers, - layer_name='layers') - assert vllm_mode == 'customized', "Support for vllm>=0.7 for Megatron-LM backend has not been implemented yet." - rollout = vLLMRollout(actor_module=params, - config=self.config.rollout, - tokenizer=self.tokenizer, - model_hf_config=self.actor_model_config, - train_tp=mpu.get_tensor_model_parallel_world_size()) - log_gpu_memory_usage('After building vllm rollout', logger=logger) + infer_tp = self.config.rollout.tensor_model_parallel_size + dp = self.world_size // infer_tp + assert self.world_size % infer_tp == 0, f'rollout world_size: {self.world_size} is not divisible by infer_tp: {infer_tp}' + rollout_device_mesh = init_device_mesh('cuda', mesh_shape=(dp, infer_tp), mesh_dim_names=['dp', 'infer_tp']) + log_gpu_memory_usage(f'Before building vllm rollout', logger=None) + + from megatron.core import mpu + from verl.workers.rollout.vllm_rollout import vLLMRollout, vllm_mode + local_path = copy_to_local(self.config.model.path) + if vllm_mode == 'customized': + rollout = vLLMRollout(actor_module=self.actor_module, + config=self.config.rollout, + tokenizer=self.tokenizer, + model_hf_config=self.actor_model_config) + elif vllm_mode == 'spmd': + rollout = vLLMRollout(model_path=local_path, + config=self.config.rollout, + tokenizer=self.tokenizer, + model_hf_config=self.actor_model_config, + device_mesh=rollout_device_mesh, + trust_remote_code=trust_remote_code) + log_gpu_memory_usage('After building vllm rollout', logger=None) # perform weight resharding between actor and rollout sharding_manager = MegatronVLLMShardingManager(module=self.hybrid_engine, inference_engine=rollout.inference_engine, model_config=self.actor_model_config, layer_name_mapping=layer_name_mapping) - log_gpu_memory_usage('After building sharding manager', logger=logger) + log_gpu_memory_usage('After building sharding manager', logger=None) else: raise NotImplementedError('Only vllmRollout is supported with Megatron now') @@ -299,6 +302,8 @@ def init_model(self): override_model_config = OmegaConf.to_container(self.config.model.get('override_config', OmegaConf.create())) self.param_dtype = torch.bfloat16 + from megatron.core import mpu + megatron_config = mcore_model_parallel_config(sequence_parallel=self.config.actor.megatron.get( 'sequence_parallel', True), params_dtype=PrecisionType.to_dtype(self.param_dtype)) @@ -327,7 +332,8 @@ def init_model(self): actor_optimizer_config=self.actor_optim_config) if self._is_rollout: - self.rollout, self.sharding_manager = self._build_rollout() + self.rollout, self.sharding_manager = self._build_rollout( + trust_remote_code=self.config.model.get('trust_remote_code', False)) if self._is_ref: self.ref_module, self.ref_model_config = self._build_model_optimizer( @@ -345,6 +351,7 @@ def init_model(self): if self._is_actor: self.flops_counter = FlopsCounter(self.actor_model_config) + from verl.utils.checkpoint.megatron_checkpoint_manager import MegatronCheckpointManager self.checkpoint_mananager = MegatronCheckpointManager( config=self.config, model_config=self.actor_model_config, @@ -367,7 +374,7 @@ def update_actor(self, data: DataProto): data.batch = data.batch.cuda() - log_gpu_memory_usage('Before update policy', logger=logger) + log_gpu_memory_usage('Before update policy', logger=None) dataloader = self.actor.make_minibatch_iterator(data=data) with Timer(name='update_policy', logger=None) as timer: @@ -377,7 +384,7 @@ def update_actor(self, data: DataProto): estimated_flops, promised_flops = self.flops_counter.estimate_flops(global_num_tokens, delta_time) metrics['perf/mfu/actor'] = estimated_flops * self.config.actor.ppo_epochs / promised_flops / self.world_size - log_gpu_memory_usage('After update policy', logger=logger) + log_gpu_memory_usage('After update policy', logger=None) # TODO: here, we should return all metrics output = DataProto(meta_info={'metrics': metrics}) @@ -400,19 +407,19 @@ def generate_sequences(self, prompts: DataProto): } prompts.meta_info.update(meta_info) with self.sharding_manager: - log_gpu_memory_usage('After entering sharding manager', logger=logger) + log_gpu_memory_usage('After entering sharding manager', logger=None) prompts = self.sharding_manager.preprocess_data(prompts) output = self.rollout.generate_sequences(prompts=prompts) - log_gpu_memory_usage('After rollout generation', logger=logger) + log_gpu_memory_usage('After rollout generation', logger=None) output = self.sharding_manager.postprocess_data(output) output = output.to('cpu') # clear kv cache torch.cuda.empty_cache() - log_gpu_memory_usage('After generate_sequences', logger=logger) + log_gpu_memory_usage('After generate_sequences', logger=None) return output @register(dispatch_mode=Dispatch.MEGATRON_COMPUTE_PROTO) @@ -447,7 +454,7 @@ def compute_log_prob(self, data: DataProto): output = output.to('cpu') # clear kv cache torch.cuda.empty_cache() - log_gpu_memory_usage('After generate_sequences', logger=logger) + log_gpu_memory_usage('After generate_sequences', logger=None) return output @register(dispatch_mode=Dispatch.ONE_TO_ALL) @@ -474,6 +481,8 @@ def __init__(self, config): super().__init__() self.config = config + from megatron.core import mpu + # NOTE(sgm): We utilize colocate WorkerGroup by default. # As a result, Workers for different model share the same process. # Therefore, we only require one distribute initialization. @@ -511,7 +520,7 @@ def __init__(self, config): def _build_critic_model_optimizer(self, model_path, - megatron_config: ModelParallelConfig, + megatron_config, optim_config, override_model_config, enable_gradient_checkpointing=False): @@ -612,6 +621,7 @@ def init_model(self): critic_optimizer=self.critic_optimizer, critic_optimizer_config=critic_optimizer_config) self.flops_counter = FlopsCounter(self.critic_model_config) + from verl.utils.checkpoint.megatron_checkpoint_manager import MegatronCheckpointManager self.checkpoint_mananager = MegatronCheckpointManager( config=self.config, model_config=self.critic_model_config, @@ -671,6 +681,8 @@ def __init__(self, config): super().__init__() self.config = config + from megatron.core import mpu + # NOTE(sgm): We utilize colocate WorkerGroup by default. # As a result, Workers for different model share the same process. # Therefore, we only require one distribute initialization. @@ -702,7 +714,7 @@ def __init__(self, config): self.config.micro_batch_size //= mpu.get_data_parallel_world_size() self.config.micro_batch_size_per_gpu = self.config.micro_batch_size - def _build_rm_model(self, model_path, megatron_config: ModelParallelConfig, override_model_config): + def _build_rm_model(self, model_path, megatron_config, override_model_config): from megatron.core.models.gpt.gpt_model import ModelType from verl.utils.model import update_model_config from verl.utils.megatron_utils import get_model @@ -729,7 +741,7 @@ def _build_rm_model(self, model_path, megatron_config: ModelParallelConfig, over def megatron_rm_model_provider(pre_process, post_process): from verl.utils.model import get_parallel_model_from_config # vpp is not supported yet because it will hang for some reason. Need debugging - vpp_rank = mpu.get_virtual_pipeline_model_parallel_rank() # this will be set inside get_model + # vpp_rank = mpu.get_virtual_pipeline_model_parallel_rank() # this will be set inside get_model # this_megatron_config = copy.deepcopy(megatron_config) # this_megatron_config.virtual_pipeline_model_parallel_rank = vpp_rank parallel_model = get_parallel_model_from_config(config=rm_model_config, diff --git a/verl/workers/reward_model/megatron/reward_model.py b/verl/workers/reward_model/megatron/reward_model.py index a890f28843..d2db8df2d7 100644 --- a/verl/workers/reward_model/megatron/reward_model.py +++ b/verl/workers/reward_model/megatron/reward_model.py @@ -25,8 +25,6 @@ from verl import DataProto from verl.utils.torch_functional import broadcast_dict_tensor, split_dict_tensor_into_batches from verl.workers.reward_model.base import BasePPORewardModel -from megatron.core import parallel_state as mpu -from megatron.core.pipeline_parallel import get_forward_backward_func class MegatronRewardModel(BasePPORewardModel): @@ -116,6 +114,7 @@ def re_encode_by_rm_tokenizer(self, data: DataProto) -> DataProto: @torch.no_grad() def compute_reward(self, data: DataProto) -> DataProto: + from megatron.core import parallel_state as mpu if self.config.param_offload: self.load_params_to_cuda() @@ -185,6 +184,7 @@ def forward_batch(self, data: DataProto): """ # broadcast from last pp rank to all other pp ranks # TODO: actually, we just need to control the sampling order. + from megatron.core import parallel_state as mpu data.batch = data.batch.contiguous() broadcast_dict_tensor(data.batch, src=mpu.get_pipeline_model_parallel_last_rank(), @@ -209,6 +209,7 @@ def forward_batch(self, data: DataProto): 'hidden_size': self.model_config.hidden_size }) # compute input shapes for pp stages + from megatron.core.pipeline_parallel import get_forward_backward_func forward_backward_func = get_forward_backward_func() def loss_func(output): diff --git a/verl/workers/rollout/vllm_rollout/vllm_rollout_spmd.py b/verl/workers/rollout/vllm_rollout/vllm_rollout_spmd.py index 6df1cc3a2d..d517cadbb9 100644 --- a/verl/workers/rollout/vllm_rollout/vllm_rollout_spmd.py +++ b/verl/workers/rollout/vllm_rollout/vllm_rollout_spmd.py @@ -89,10 +89,13 @@ def __init__(self, model_path: str, config: DictConfig, tokenizer, model_hf_conf # deployed with megatron os.environ['CUDA_TIMER_STREAM_KAFKA_ENABLE'] = '0' os.environ['MEGATRON_IMPORT_TIMERS'] = '0' - train_tp = kwargs.get('train_tp', None) - num_tp_per_train_tp = train_tp // tensor_parallel_size - vllm_ps.initialize_parallel_state(tensor_model_parallel_size=tensor_parallel_size, - num_tp_per_train_tp=num_tp_per_train_tp) + if vllm_version in ('0.3.1', '0.4.2', '0.5.4', '0.6.3'): + train_tp = kwargs.get('train_tp', None) + num_tp_per_train_tp = train_tp // tensor_parallel_size + vllm_ps.initialize_parallel_state(tensor_model_parallel_size=tensor_parallel_size, + num_tp_per_train_tp=num_tp_per_train_tp) + else: + vllm_ps.initialize_model_parallel(tensor_model_parallel_size=tensor_parallel_size) assert model_hf_config.max_position_embeddings >= config.prompt_length + config.response_length, \ "model context length should be greater than total sequence length" @@ -190,7 +193,9 @@ def generate_sequences(self, prompts: DataProto, **kwargs) -> DataProto: [_pre_process_inputs(self.pad_token_id, idx[i]) for i in range(batch_size)], dtype=object) if batch_size != len(non_tensor_batch['raw_prompt_ids']): - raise RuntimeError('vllm sharding manager is not work properly.') + raise RuntimeError( + f'vllm sharding manager is not work properly. {batch_size} vs {len(non_tensor_batch["raw_prompt_ids"])}' + ) if 'multi_modal_data' in non_tensor_batch: vllm_inputs = [] diff --git a/verl/workers/sharding_manager/megatron_vllm.py b/verl/workers/sharding_manager/megatron_vllm.py index d7c684d5e1..527d4d4881 100644 --- a/verl/workers/sharding_manager/megatron_vllm.py +++ b/verl/workers/sharding_manager/megatron_vllm.py @@ -16,28 +16,35 @@ """ import importlib +import logging +import os from packaging.version import Version import torch import torch.distributed as dist from torch import nn -from megatron.core import parallel_state as mpu -from megatron.core import DistributedDataParallel as LocalDDP -from megatron.core.transformer.module import Float16Module -from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP from verl.utils.megatron_utils import get_model, unwrap_model from verl.utils.memory_buffer import ( build_memory_buffer, build_memory_reference_from_module, get_weight_buffer_meta_from_module, ) +from verl.utils.debug import log_gpu_memory_usage +from verl.third_party.vllm import vllm_version +from verl.utils.megatron_utils import convert_megatron_model_to_transformers_model + +logger = logging.getLogger(__file__) +logger.setLevel(os.getenv('VERL_PPO_LOGGING_LEVEL', 'WARN')) class AllGatherPPModel: def __init__(self, model_provider, use_distributed_optimizer=True) -> None: + from megatron.core import parallel_state as mpu + from megatron.core import DistributedDataParallel as LocalDDP + from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP self._pp_group = mpu.get_pipeline_model_parallel_group() self._pp_rank = mpu.get_pipeline_model_parallel_rank() self._pp_size = mpu.get_pipeline_model_parallel_world_size() @@ -143,6 +150,7 @@ def allgather_params(self): dist.broadcast(tensor=param.data, src=global_src, group=self.pp_group, async_op=False) def forward(self, *inputs, **kwargs): + from megatron.core import parallel_state as mpu try: prev_output = None for cur_chunk_rank in range(self._model_chunk_size): @@ -186,6 +194,9 @@ def get_all_params(self): tensors of each model chunk """ + from megatron.core import DistributedDataParallel as LocalDDP + from megatron.core.transformer.module import Float16Module + from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP params = [] for pp_rank in range(self.pp_size): params.append([]) @@ -203,6 +214,8 @@ def get_all_params(self): def update_this_rank_models(self, new_models): self._this_rank_models = new_models + from megatron.core import DistributedDataParallel as LocalDDP + from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP self._pp_models[self.pp_rank] = unwrap_model(new_models, (torchDDP, LocalDDP)) @property @@ -241,8 +254,12 @@ def pp_models(self): from torch import nn import torch.distributed from torch.distributed import new_group +from torch.distributed._tensor import DTensor +from typing import Dict, Iterable, Union, Tuple +import numpy as np from verl import DataProto +from verl.protocol import all_gather_data_proto from verl.utils.torch_functional import (broadcast_dict_tensor, allgather_dict_tensors) import verl.utils.megatron.tensor_parallel as tp_utils from verl.third_party.vllm import parallel_state as vllm_ps @@ -250,12 +267,15 @@ def pp_models(self): from verl.utils.model import normalize_pp_vpp_params # Micro Data parallel group. Micro data parallel group is additional dp group that origins from splitting training tp # into infer_tp and micro_tp. By default, we use order micro_dp - tp +# NOTICE: in new version of vLLM, We need to all-gather all tp rank's model weights +# For code reuse, we directly assign Megatron's TENSOR_MODEL_PARALLEL_GROUP to this _MICRO_DATA_PARALLEL_GROUP = None class MegatronVLLMShardingManager(BaseShardingManager): def __init__(self, module: AllGatherPPModel, inference_engine: LLM, model_config, layer_name_mapping): + from megatron.core import parallel_state as mpu self.module = module self.inference_engine = inference_engine self.model_config = model_config @@ -265,15 +285,20 @@ def __init__(self, module: AllGatherPPModel, inference_engine: LLM, model_config global _MICRO_DATA_PARALLEL_GROUP world_size = torch.distributed.get_world_size() rank = torch.distributed.get_rank() - train_tensor_parallel_size = mpu.get_tensor_model_parallel_world_size() - infer_tensor_parallel_size = vllm_ps.get_tensor_model_parallel_world_size() + self.infer_tp_size = vllm_ps.get_tensor_model_parallel_world_size() + self.infer_tp_rank = vllm_ps.get_tensor_model_parallel_rank() + self.infer_tp_group = vllm_ps.get_tensor_model_parallel_group() + self.train_tp_size = mpu.get_tensor_model_parallel_world_size() + self.train_tp_rank = mpu.get_tensor_model_parallel_rank() + self.train_tp_group = mpu.get_tensor_model_parallel_group() + self.need_tp_reshard = self.infer_tp_size == self.train_tp_size # TODO(sgm): this may not be true for FSDP -> vLLM - assert infer_tensor_parallel_size <= train_tensor_parallel_size, \ + assert self.infer_tp_size <= self.train_tp_size, \ 'Not implemented for infer_tp > train_tp' - assert train_tensor_parallel_size % infer_tensor_parallel_size == 0 + assert self.train_tp_size % self.infer_tp_size == 0 - micro_dp_size = train_tensor_parallel_size // infer_tensor_parallel_size + micro_dp_size = self.train_tp_size // self.infer_tp_size num_micro_dp_groups = world_size // micro_dp_size assert _MICRO_DATA_PARALLEL_GROUP is None, ("micro data parallel group is already initialized") for i in range(num_micro_dp_groups): @@ -282,16 +307,23 @@ def __init__(self, module: AllGatherPPModel, inference_engine: LLM, model_config if rank in ranks: _MICRO_DATA_PARALLEL_GROUP = group - def default_tp_concat_fn(self, name, param, infer_params, model_config): + def _make_iterator(self, params: Dict[str, Union[torch.Tensor, + list]]) -> Iterable[Tuple[str, Union[torch.Tensor, list]]]: + for name, tensor in params.items(): + yield name, tensor + del tensor + + def default_tp_concat_fn(self, name, param, infer_params, model_config, convert_qkv_gate_up_by_simple_split=False): """ name: name of the parameter param: training parameters - infer_params (List[torch.Tensor]): a list of parameters all-gathered from micro_dp_group + infer_params (Iterable[torch.Tensor]): a iterator towards list of parameters all-gathered from train tp group (vllm 0.8.2) or micro-dp group (vllm <= 0.6.3) model_config: huggingface model_config TODO(zhangchi.usc1992): currently, the implementation is adhoc. We can move this function to the model definition so that it is model-agnostic. If the model doesn't implement this function, we can throw an error to force user disable TP HybridEngine. """ + from megatron.core import mpu if self.layer_name_mapping.get("qkv_layer_name") in name and "layer_norm" not in name: # if the tensor is qkv, for each param on tp, split into q, k, v @@ -305,8 +337,7 @@ def default_tp_concat_fn(self, name, param, infer_params, model_config): kv_size_per_tp = infer_params[0].shape[0] // (num_q_per_kv + 2) split_size = [kv_size_per_tp * num_q_per_kv, kv_size_per_tp, kv_size_per_tp] for infer_param in infer_params: - num_query_groups_per_partition = model_config.num_key_value_heads // mpu.get_tensor_model_parallel_world_size( - ) + num_query_groups_per_partition = model_config.num_key_value_heads // self.train_tp_size for chunk in infer_param.chunk(num_query_groups_per_partition): split_size = [ kv_size_per_tp * num_q_per_kv // num_query_groups_per_partition, @@ -320,8 +351,10 @@ def default_tp_concat_fn(self, name, param, infer_params, model_config): q = torch.cat(q_lst, dim=0) k = torch.cat(k_lst, dim=0) v = torch.cat(v_lst, dim=0) - - infer_params = torch.cat((q, k, v), dim=0) + if not convert_qkv_gate_up_by_simple_split: + infer_params = torch.cat((q, k, v), dim=0) + else: + infer_params = [q, k, v] elif self.layer_name_mapping.get("gate_proj_layer_name") in name: # if the tensor is gate and proj @@ -333,7 +366,10 @@ def default_tp_concat_fn(self, name, param, infer_params, model_config): up_lst.append(up) gate = torch.cat(gate_lst, dim=0) up = torch.cat(up_lst, dim=0) - infer_params = torch.cat((gate, up), dim=0) + if not convert_qkv_gate_up_by_simple_split: + infer_params = torch.cat((gate, up), dim=0) + else: + infer_params = [gate, up] else: # concat tensor @@ -341,60 +377,82 @@ def default_tp_concat_fn(self, name, param, infer_params, model_config): return infer_params - def _post_process_params(self, params): + def _post_process_params(self, params, convert_qkv_gate_up_by_simple_split=False): + from megatron.core import mpu """ - For each param, if it is a tp-splited param, we all-gather from micro_dp group. + For each param, if it is a tp-splited param, we all-gather from train tp group (vllm 0.8.2) or micro-dp group (vllm <= 0.6.3) """ # here the params are in train tp format. we iterate params and all-gather # TODO(zhangchi.usc1992) We can consider copy non-tp weight to another infer buffer. # In this way, all the params in the original memory_buffers and can be offload. - micro_dp_size = get_micro_data_parallel_world_size() - micro_dp_group = get_micro_data_parallel_group() + if vllm_version == '0.8.2': + all_gather_group = self.train_tp_group + else: + all_gather_group = get_micro_data_parallel_group() + all_gather_group_size = torch.distributed.get_world_size(group=all_gather_group) - origin_params = {} - for name in params.keys(): - param = params[name] + for name, param in params: if tp_utils.is_tensor_parallel_param(param): # allocate a new tensor with proper size - if micro_dp_size <= 1: + if all_gather_group_size <= 1: infer_params = [param] else: - infer_params = [torch.empty_like(param) for _ in range(micro_dp_size)] - torch.distributed.all_gather(infer_params, param, group=micro_dp_group) - infer_params = self.default_tp_concat_fn(name, param, infer_params, self.model_config) - # replace with original param - params[name] = infer_params - origin_params[name] = param - - return origin_params + infer_params = [torch.empty_like(param) for _ in range(all_gather_group_size)] + torch.distributed.all_gather(infer_params, param, group=all_gather_group) + infer_params = self.default_tp_concat_fn(name, param, infer_params, self.model_config, + convert_qkv_gate_up_by_simple_split) + else: + infer_params = param + converted_names, converted_params = convert_megatron_model_to_transformers_model( + name, + infer_params, + self.model_config, + self.train_tp_size, + self.module.pp_models[0][0].config.num_query_groups, + convert_qkv_gate_up_by_trunk_concat=False) + for converted_name, infer_param in zip(converted_names, converted_params): + yield converted_name, infer_param def __enter__(self): + from megatron.core import mpu + + log_gpu_memory_usage('Just enter MegatronVLLMShardingManager sharding manager memory', logger=None) # create a new cuda space for parameters not in this pp rank self.module.load_params_to_cuda() + log_gpu_memory_usage('After load_params_to_cuda sharding manager memory', logger=None) # broadcast the parameters from pp rank to other ranks self.module.allgather_params() # obtain name to parameters in pp/vpp params = self.module.get_all_params() # bind the params to inference engine - self.params = normalize_pp_vpp_params(params=params, - num_hidden_layers=self.model_config.num_hidden_layers, - layer_name='layers') - self.origin_params = self._post_process_params(self.params) - self.inference_engine.sync_model_weights(self.params, load_format='megatron') + cur_tp_rank_param = normalize_pp_vpp_params(params=params, + num_hidden_layers=self.model_config.num_hidden_layers, + layer_name='layers') + log_gpu_memory_usage('After normalize_pp_vpp_params sharding manager memory', logger=None) + if vllm_version in ('0.4.2', '0.5.4', '0.6.3'): + per_tensor_param = self._post_process_params(cur_tp_rank_param, convert_qkv_gate_up_by_simple_split=False) + self.inference_engine.sync_model_weights(per_tensor_param, load_format='megatron') + else: + per_tensor_param = self._post_process_params(cur_tp_rank_param, convert_qkv_gate_up_by_simple_split=True) + self.inference_engine.wake_up() + model = self.inference_engine.llm_engine.model_executor.driver_worker.worker.model_runner.model + loaded_params = model.load_weights(per_tensor_param) + logger.info(f"vLLM load weights, loaded_params: {len(loaded_params)}") + log_gpu_memory_usage('After load_weights sharding manager memory', logger=None) + log_gpu_memory_usage('After delete params sharding manager memory', logger=None) def __exit__(self, exc_type, exc_value, traceback): + log_gpu_memory_usage('Before vllm offload in sharding manager', logger=None) # offload parameters doesn't belong to this pp rank self.module.offload_params_to_cpu() - # FIXME(sgm): the best practice is to delete the cuda tensor - # rebind the model weights, can be any cpu tensor - if self.origin_params is not None: - for name, param in self.origin_params.items(): - self.params[name] = param - # self.inference_engine.sync_model_weights(params) - self.inference_engine.offload_model_weights() + if vllm_version in ('0.4.2', '0.5.4', '0.6.3'): + self.inference_engine.offload_model_weights() + else: + self.inference_engine.sleep(level=1) + log_gpu_memory_usage('After vllm offload in sharding manager', logger=None) self.module.train() @@ -404,21 +462,14 @@ def __exit__(self, exc_type, exc_value, traceback): def preprocess_data(self, data: DataProto) -> DataProto: # prompts are identical for each training tp. We select for each inference tp micro_dp_size = get_micro_data_parallel_world_size() - micro_dp_rank = get_micro_data_parallel_rank() - - # broadcast from tp=0 to other tp ranks - broadcast_dict_tensor(data.batch, - src=mpu.get_tensor_model_parallel_src_rank(), - group=mpu.get_tensor_model_parallel_group()) - if micro_dp_size > 1: local_prompts = data.chunk(chunks=micro_dp_size) - data = local_prompts[micro_dp_rank] + data = local_prompts[get_micro_data_parallel_rank()] return data def postprocess_data(self, data: DataProto) -> DataProto: - meta_info = data.meta_info + # MEGATRON_PP_AS_DP_PROTO will collect PP+CP+DP group # all gather batch among micro-dp groups micro_dp_size = get_micro_data_parallel_world_size() if micro_dp_size > 1: @@ -426,13 +477,6 @@ def postprocess_data(self, data: DataProto) -> DataProto: size=get_micro_data_parallel_world_size(), group=get_micro_data_parallel_group(), dim=0) - - # all gather batch among pp group - if meta_info.get('allgather_pp_output', True): - data.batch = allgather_dict_tensors(data.batch.contiguous(), - size=mpu.get_pipeline_model_parallel_world_size(), - group=mpu.get_pipeline_model_parallel_group(), - dim=0) return data