Skip to content

[Misc] Update usage with mooncake lib for kv transfer #16523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(
):

try:
from mooncake_vllm_adaptor import MooncakeDistributedStore
from mooncake.store import MooncakeDistributedStore
except ImportError as e:
raise ImportError(
"Please install mooncake by following the instructions at "
Expand Down
20 changes: 10 additions & 10 deletions vllm/distributed/kv_transfer/kv_pipe/mooncake_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ class MooncakeTransferEngine:

def __init__(self, kv_rank: int, local_rank: int):
try:
import mooncake_vllm_adaptor as mva
from mooncake.engine import TransferEngine
except ImportError as e:
raise ImportError(
"Please install mooncake by following the instructions at "
"https://github.com/kvcache-ai/Mooncake/blob/main/doc/en/build.md " # noqa: E501
"to run vLLM with MooncakeConnector.") from e

self.engine = mva.mooncake_vllm_adaptor()
self.engine = TransferEngine()
self.local_rank = local_rank

try:
Expand Down Expand Up @@ -140,26 +140,26 @@ def initialize(self, local_hostname: str, metadata_server: str,
"Mooncake Configuration error. `metadata_backend`"
f" should be one of {supported_backend}.")

self.engine.initializeExt(local_hostname, metadata_server,
protocol, device_name, metadata_backend)
self.engine.initialize_ext(local_hostname, metadata_server,
protocol, device_name, metadata_backend)

def allocate_managed_buffer(self, length: int) -> int:
"""Allocate a managed buffer of the specified length."""
ret = self.engine.allocateManagedBuffer(length)
ret = self.engine.allocate_managed_buffer(length)
if ret <= 0:
logger.error("Allocation Return Error")
raise Exception("Allocation Return Error")
return ret

def free_managed_buffer(self, buffer: int, length: int) -> int:
"""Free a previously allocated managed buffer."""
return self.engine.freeManagedBuffer(buffer, length)
return self.engine.free_managed_buffer(buffer, length)

def transfer_sync(self, buffer: int, peer_buffer_address: int,
length: int) -> int:
"""Synchronously transfer data to the specified address."""
ret = self.engine.transferSync(self.remote_url, buffer,
peer_buffer_address, length)
ret = self.engine.transfer_sync_read(self.remote_url, buffer,
peer_buffer_address, length)
if ret < 0:
logger.error("Transfer Return Error")
raise Exception("Transfer Return Error")
Expand All @@ -168,11 +168,11 @@ def transfer_sync(self, buffer: int, peer_buffer_address: int,
def write_bytes_to_buffer(self, buffer: int, user_data: bytes,
length: int) -> int:
"""Write bytes to the allocated buffer."""
return self.engine.writeBytesToBuffer(buffer, user_data, length)
return self.engine.write_bytes_to_buffer(buffer, user_data, length)

def read_bytes_from_buffer(self, buffer: int, length: int) -> bytes:
"""Read bytes from the allocated buffer."""
return self.engine.readBytesFromBuffer(buffer, length)
return self.engine.read_bytes_from_buffer(buffer, length)

def wait_for_ack(self, src_ptr: int, length: int) -> None:
"""Asynchronously wait for ACK from the receiver."""
Expand Down