-
Notifications
You must be signed in to change notification settings - Fork 567
LMDeploy Distserve #3304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LMDeploy Distserve #3304
Changes from 27 commits
97d6d5d
3241c1a
1788a28
03b363f
aabb72b
3ba605f
2e6ee7a
cdf55c1
ace6ece
481052e
f9b7409
60032b6
aa43faa
97e4430
1e6c4da
290e606
b530384
efcb72c
a3d973b
31fd9f3
48d791a
2f02e05
ae959a0
11d9961
18da0fb
a478c77
c490de4
df3f9ef
61ad2a7
ad27c3a
1c3b20c
119059f
1f220d4
0a58979
83838d8
b108752
74d9256
39b2c4f
65ba59f
3af751b
6028ec2
3047e7b
649b51e
531524a
ce660ca
957bd68
f6de868
7437bfa
b0a8f1f
a7bb7c4
d488d87
b626d9e
2d6f8c1
fec61ba
2637091
3dedc69
c09a06b
160cb3c
e97a486
0eb588a
a048dfd
506bdb2
4e0f31d
3f53e64
b70fc44
6498133
8d89f55
4ac8f37
d858e81
6741c48
10a70c9
c9d9e13
d292bf5
70dc438
2c54627
82a0a58
ab4a5b9
c8212e3
0e83d26
5312fac
53091e3
4af8d3d
76c3a04
5f10df9
25f3488
2c70c55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
# Copyright (c) OpenMMLab. All rights reserved. | ||
|
||
from lmdeploy.disagg.messages import EngineRole, MigrationBackend, MigrationTransportProtocol | ||
from lmdeploy.utils import get_max_batch_size | ||
|
||
from .cli import CLI | ||
|
@@ -125,6 +125,23 @@ def add_parser_api_server(): | |
'engine’s tasks once the maximum number of concurrent requests is ' | ||
'reached, regardless of any additional requests sent by clients ' | ||
'concurrently during that time. Default to None.') | ||
parser.add_argument('--role', | ||
type=str, | ||
default='Hybrid', | ||
choices=['Hybrid', 'Prefill', 'Decode'], | ||
help='Hybrid for Non-Disaggregated Engine;' | ||
'Prefill for Disaggregated Prefill Engine;' | ||
'Decode fro Disaggregated Decode Engine;') | ||
parser.add_argument('--migration-backend', | ||
type=str, | ||
default='DLSlime', | ||
choices=['DLSlime', 'Mooncake', 'InfiniStore'], | ||
help='kvcache migration management backend when PD disaggregation') | ||
parser.add_argument('--migration-protocol', | ||
type=str, | ||
default='RDMA', | ||
choices=['TCP', 'RDMA', 'NVLINK'], | ||
help='kvcache migration protocol') | ||
# common args | ||
ArgumentHelper.backend(parser) | ||
ArgumentHelper.log_level(parser) | ||
|
@@ -215,7 +232,12 @@ def add_parser_proxy(): | |
parser.set_defaults(run=SubCliServe.proxy) | ||
parser.add_argument('--server-name', type=str, default='0.0.0.0', help='Host ip for proxy serving') | ||
parser.add_argument('--server-port', type=int, default=8000, help='Server port of the proxy') | ||
parser.add_argument('--strategy', | ||
parser.add_argument('--serving-strategy', | ||
type=str, | ||
choices=['Disaggregated', 'NonDisaggregated'], | ||
default='NonDisaggregated', | ||
help='the strategy to dispatch requests to nodes') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May clarify the help info. It is the same as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The api_server is assigned a specific "role" in this PR. I propose updating the communication protocol between the api_server and the proxy_server to include this role information. Benefits:
Would this approach be feasible? I’d appreciate any feedback or suggestions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. |
||
parser.add_argument('--routing-strategy', | ||
type=str, | ||
choices=['random', 'min_expected_latency', 'min_observed_latency'], | ||
default='min_expected_latency', | ||
|
@@ -307,7 +329,10 @@ def api_server(args): | |
device_type=args.device, | ||
quant_policy=args.quant_policy, | ||
eager_mode=args.eager_mode, | ||
max_prefill_token_num=args.max_prefill_token_num) | ||
max_prefill_token_num=args.max_prefill_token_num, | ||
role=EngineRole.__members__[args.role], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use |
||
migration_backend=MigrationBackend.__members__[args.migration_backend], | ||
migration_protocol=MigrationTransportProtocol.__members__[args.migration_protocol]) | ||
else: | ||
from lmdeploy.messages import TurbomindEngineConfig | ||
backend_config = TurbomindEngineConfig(dtype=args.dtype, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
# LMDeploy-DistServe | ||
|
||
## Key Components | ||
1. **Router Service**: Coordinates between prefill/decode engines | ||
4. **Migration Manager**: Facilitates high-performance memory sharing | ||
|
||
## Installation | ||
``` | ||
# Inference Engine | ||
pip install lmdeploy[all] >= 0.7.0 | ||
|
||
# Transfer Engine | ||
lvhan028 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pip install dlslime==0.0.1.post1 | ||
``` | ||
|
||
## Quick Start | ||
### 1. Configure Endpoints | ||
First deploy your prefill and decode engines. | ||
|
||
``` shell | ||
# Prefill Engine | ||
CUDA_VISIBLE_DEVICES=0,1 lmdeploy serve api_server internlm/internlm2_5-7b-chat --server-port 23333 --role Prefill --tp 2 --cache-block-seq 32 | ||
# Decode Engine | ||
CUDA_VISIBLE_DEVICES=2,3 lmdeploy serve api_server internlm/internlm2_5-7b-chat --server-port 23334 --role Decode --tp 2 --cache-block-seq 32 | ||
``` | ||
|
||
### 2. Launch Router Service | ||
|
||
``` shell | ||
python -m lmdeploy.disagg.router \ | ||
--host 0.0.0.0 \ | ||
--port 5000 \ | ||
--prefill-endpoint http://prefill-host:port1 http://prefill-host:port2 \ | ||
--decode-endpoint http://decode-host:port3 http://decode-host:port4 | ||
``` | ||
|
||
## API Usage | ||
|
||
```shell | ||
# API Invoke | ||
curl -X POST "http://localhost:5000/v1/completions" \ | ||
-H "Content-Type: application/json" \ | ||
-d '{"model": "internlm/internlm2_5-7b-chat", "temperature":0, "prompt": "Shanghai is a city that ", "max_tokens": 16, "stream": false}' | ||
# Output | ||
{"id":"2","object":"text_completion","created":1743662400,"model":"/nvme1/majinming/hub/models--internlm--internlm2_5-7b-chat/snapshots/4434a5ffc2582f9d5ac45085043ed3e3264f0a9b","choices":[{"index":0,"text":" is very famous for its skyscrapers. It is also a city","logprobs":null,"finish_reason":"length"}],"usage":{"prompt_tokens":7,"total_tokens":23,"completion_tokens":16}} | ||
``` | ||
|
||
## Trouble Shooting | ||
|
||
### RDMA Connection Failed: | ||
|
||
``` bash | ||
ibstatus # Verify IB device status | ||
ibv_devinfo # Check device capabilities | ||
``` | ||
|
||
### Check NVSHMEM configuration: | ||
Make sure to verify NVSHMEM installation. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you kindly provide the checking method or related url links? |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# Copyright (c) OpenMMLab. All rights reserved. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from typing import Dict | ||
from lmdeploy.logger import get_logger | ||
|
||
logger = get_logger("lmdeploy") | ||
|
||
|
||
try: | ||
logger.debug("Registering DLSlime Backend") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can use the INFO log when trying to register the kv transfer engine backend. |
||
from .dlslime import DLSlimeBackend | ||
except ImportError as e: | ||
logger.debug("Disable DLSlime Backend") | ||
|
||
try: | ||
logger.debug("Registering Mooncake Backend") | ||
from .mooncake import MooncakeBackend | ||
except ImportError as e: | ||
logger.debug("Disable Mooncake Backend") | ||
|
||
|
||
try: | ||
logger.debug("Registering InfiniStoreBackend Backend") | ||
from .infinistore import InfiniStoreBackend | ||
except ImportError as e: | ||
logger.debug("Disable InfiniStoreBackend Backend") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from lmdeploy.disagg.messages import MigrationBackend | ||
|
||
|
||
MIGRATION_BACKENDS = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can use the |
||
|
||
|
||
def register_migration_backend(backend_name: MigrationBackend): | ||
def register(cls): | ||
MIGRATION_BACKENDS[backend_name] = cls | ||
return cls | ||
|
||
return register |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
from abc import abstractmethod | ||
|
||
from lmdeploy.disagg.messages import ( | ||
MigrationInitRequest, | ||
MigrationConnectionRequest, | ||
MigrationAssignment, | ||
MigrationRegisterMemoryRequest, | ||
MigrationTransportProtocol | ||
) | ||
|
||
|
||
class MigrationBackendImpl: | ||
@abstractmethod | ||
def p2p_initialize(self, init_request: MigrationInitRequest): | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
def register_memory_region(self, register_mr_request:MigrationRegisterMemoryRequest): | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
def endpoint_info(self, remote_engine_id: int, protocol: MigrationTransportProtocol): | ||
return NotImplementedError | ||
|
||
@abstractmethod | ||
def p2p_connect(self, connect_request: MigrationConnectionRequest): | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
async def p2p_migrate(self, assignment: MigrationAssignment): | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
async def store(self, assignment: MigrationAssignment): | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
async def load(self, assignment: MigrationAssignment): | ||
raise NotImplementedError | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
from typing import Dict | ||
|
||
from lmdeploy.disagg.messages import ( | ||
MigrationBackend, | ||
MigrationInitRequest, | ||
MigrationTransportProtocol, | ||
DisaggEngineConfig, | ||
MigrationConnectionRequest, | ||
MigrationAssignment, | ||
MigrationRegisterMemoryRequest | ||
) | ||
|
||
from lmdeploy.disagg.backend.base import MigrationBackendImpl | ||
from lmdeploy.disagg.backend.backend import register_migration_backend | ||
|
||
from dlslime import RDMAEndpoint, available_nic | ||
|
||
|
||
class DLSlimeMigrationManagement: | ||
def __init__(self, init_request: MigrationInitRequest): | ||
self.rank = init_request.rank | ||
self.tp_rank = init_request.tp_rank | ||
self.remote_engine_config: DisaggEngineConfig = init_request.remote_engine_config | ||
self.endpoint: Dict[str, RDMAEndpoint] = { | ||
MigrationTransportProtocol.TCP: None, | ||
MigrationTransportProtocol.RDMA: None, | ||
MigrationTransportProtocol.NVLINK: None, | ||
} | ||
if init_request.rdma_init_request: | ||
if not init_request.rdma_init_request.device_name: | ||
nics = available_nic() | ||
init_request.rdma_init_request.device_name = nics[self.rank % len(nics)] | ||
self.endpoint[MigrationTransportProtocol.RDMA] = RDMAEndpoint( | ||
device_name=init_request.rdma_init_request.device_name, | ||
ib_port=init_request.rdma_init_request.ib_port, | ||
link_type=init_request.rdma_init_request.link_type | ||
) | ||
|
||
def register_memory_region(self, register_mr_request: MigrationRegisterMemoryRequest): | ||
self.endpoint[register_mr_request.protocol].register_memory_region( | ||
register_mr_request.mr_key, | ||
register_mr_request.addr, | ||
register_mr_request.length | ||
) | ||
|
||
def connect_to(self, connect_request: MigrationConnectionRequest): | ||
self.endpoint[connect_request.protocol].connect_to(connect_request.remote_endpoint_info) | ||
|
||
async def p2p_migrate(self, assignment: MigrationAssignment): | ||
max_batch = 4096 + 2048 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What the two magic numbers represent? |
||
for i in range(0, len(assignment.target_offset), max_batch): | ||
await self.endpoint[assignment.protocol].read_batch_async( | ||
assignment.mr_key, | ||
assignment.target_offset[i: i+max_batch], | ||
assignment.source_offset[i: i+max_batch], | ||
assignment.length | ||
) | ||
|
||
|
||
@register_migration_backend(MigrationBackend.DLSlime) | ||
class DLSlimeBackend(MigrationBackendImpl): | ||
def __init__(self): | ||
self.links: Dict[int, DLSlimeMigrationManagement] = {} | ||
|
||
def p2p_initialize(self, init_request: MigrationInitRequest): | ||
self.links[init_request.remote_engine_id] = DLSlimeMigrationManagement(init_request) | ||
|
||
def register_memory_region(self, register_mr_request:MigrationRegisterMemoryRequest): | ||
self.links[register_mr_request.remote_engine_id].register_memory_region(register_mr_request) | ||
|
||
def endpoint_info(self, remote_engine_id: int, protocol: MigrationTransportProtocol): | ||
return self.links[remote_engine_id].endpoint[protocol].local_endpoint_info | ||
|
||
def p2p_connect(self, connect_request: MigrationConnectionRequest): | ||
self.links[connect_request.remote_engine_id].connect_to(connect_request) | ||
|
||
async def p2p_migrate(self, assignment: MigrationAssignment): | ||
await self.links[assignment.remote_engine_id].p2p_migrate(assignment) | ||
|
||
async def store(self, assignment: MigrationAssignment): | ||
raise NotImplementedError | ||
|
||
async def load(self, assignment: MigrationAssignment): | ||
raise NotImplementedError |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from lmdeploy.disagg.messages import ( | ||
MigrationBackend, | ||
MigrationInitRequest, | ||
MigrationConnectionRequest, | ||
MigrationAssignment, | ||
MigrationRegisterMemoryRequest, | ||
MigrationTransportProtocol | ||
) | ||
|
||
from lmdeploy.disagg.backend.backend import register_migration_backend | ||
from lmdeploy.disagg.backend.base import MigrationBackendImpl | ||
|
||
|
||
@register_migration_backend(MigrationBackend.InfiniStore) | ||
class InfiniStoreBackend(MigrationBackendImpl): | ||
def p2p_initialize(self, init_request: MigrationInitRequest): | ||
raise NotImplementedError | ||
|
||
def register_memory_region(self, register_mr_request:MigrationRegisterMemoryRequest): | ||
raise NotImplementedError | ||
|
||
def endpoint_info(self, remote_engine_id: int, protocol: MigrationTransportProtocol): | ||
return NotImplementedError | ||
|
||
def p2p_connect(self, connect_request: MigrationConnectionRequest): | ||
raise NotImplementedError | ||
|
||
async def p2p_migrate(self, assignment: MigrationAssignment): | ||
raise NotImplementedError | ||
|
||
async def store(self, assignment: MigrationAssignment): | ||
raise NotImplementedError | ||
|
||
async def load(self, assignment: MigrationAssignment): | ||
raise NotImplementedError |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from lmdeploy.disagg.messages import ( | ||
MigrationBackend, | ||
MigrationInitRequest, | ||
MigrationConnectionRequest, | ||
MigrationAssignment, | ||
MigrationRegisterMemoryRequest, | ||
MigrationTransportProtocol | ||
) | ||
|
||
from lmdeploy.disagg.backend.backend import register_migration_backend | ||
from lmdeploy.disagg.backend.base import MigrationBackendImpl | ||
|
||
|
||
@register_migration_backend(MigrationBackend.Mooncake) | ||
class MooncakeBackend(MigrationBackendImpl): | ||
def p2p_initialize(self, init_request: MigrationInitRequest): | ||
raise NotImplementedError | ||
|
||
def register_memory_region(self, register_mr_request:MigrationRegisterMemoryRequest): | ||
raise NotImplementedError | ||
|
||
def endpoint_info(self, remote_engine_id: int, protocol: MigrationTransportProtocol): | ||
return NotImplementedError | ||
|
||
def p2p_connect(self, connect_request: MigrationConnectionRequest): | ||
raise NotImplementedError | ||
|
||
async def p2p_migrate(self, assignment: MigrationAssignment): | ||
raise NotImplementedError | ||
|
||
async def store(self, assignment: MigrationAssignment): | ||
raise NotImplementedError | ||
|
||
async def load(self, assignment: MigrationAssignment): | ||
raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can put this after line 307 to avoid unnecessary importing time