Skip to content

Commit 6cda25f

Browse files
fzyzcjytarinkk
authored andcommitted
[PD] Add control to slow down a server (sgl-project#5572)
1 parent 8824266 commit 6cda25f

File tree

4 files changed

+57
-0
lines changed

4 files changed

+57
-0
lines changed

python/sglang/srt/entrypoints/http_server.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
ResumeMemoryOccupationReqInput,
6363
SeparateReasoningReqInput,
6464
SetInternalStateReq,
65+
SlowDownReqInput,
6566
UpdateWeightFromDiskReqInput,
6667
UpdateWeightsFromDistributedReqInput,
6768
UpdateWeightsFromTensorReqInput,
@@ -494,6 +495,19 @@ async def resume_memory_occupation(
494495
return _create_error_response(e)
495496

496497

498+
@app.api_route("/slow_down", methods=["GET", "POST"])
499+
async def slow_down(obj: SlowDownReqInput, request: Request):
500+
"""Slow down the system deliberately. Only for testing. Example scenario:
501+
when we want to test performance of D in large-scale PD disaggregation and have no enough nodes for P,
502+
we can use this to slow down D to let it have enough running sequences, and then disable slowdown
503+
to let it run in full batch size.
504+
"""
505+
try:
506+
await _global_state.tokenizer_manager.slow_down(obj, request)
507+
except Exception as e:
508+
return _create_error_response(e)
509+
510+
497511
@app.api_route("/open_session", methods=["GET", "POST"])
498512
async def open_session(obj: OpenSessionReqInput, request: Request):
499513
"""Open a session, and return its unique session id."""

python/sglang/srt/managers/io_struct.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,16 @@ class ResumeMemoryOccupationReqOutput:
790790
pass
791791

792792

793+
@dataclass
794+
class SlowDownReqInput:
795+
forward_sleep_time: Optional[float]
796+
797+
798+
@dataclass
799+
class SlowDownReqOutput:
800+
pass
801+
802+
793803
@dataclass
794804
class AbortReq:
795805
# The request id

python/sglang/srt/managers/scheduler.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@
8787
RpcReqOutput,
8888
SetInternalStateReq,
8989
SetInternalStateReqOutput,
90+
SlowDownReqInput,
91+
SlowDownReqOutput,
9092
TokenizedEmbeddingReqInput,
9193
TokenizedGenerateReqInput,
9294
UpdateWeightFromDiskReqInput,
@@ -417,6 +419,8 @@ def __init__(
417419
self.profiler_id: Optional[str] = None
418420
self.profiler_target_forward_ct: Optional[int] = None
419421

422+
self.forward_sleep_time = None
423+
420424
# Init metrics stats
421425
self.init_metrics()
422426

@@ -439,6 +443,7 @@ def __init__(
439443
(GetWeightsByNameReqInput, self.get_weights_by_name),
440444
(ReleaseMemoryOccupationReqInput, self.release_memory_occupation),
441445
(ResumeMemoryOccupationReqInput, self.resume_memory_occupation),
446+
(SlowDownReqInput, self.slow_down),
442447
(ProfileReq, self.profile),
443448
(GetInternalStateReq, self.get_internal_state),
444449
(SetInternalStateReq, self.set_internal_state),
@@ -1550,6 +1555,10 @@ def run_batch(
15501555
):
15511556
self.stop_profile()
15521557

1558+
if self.forward_sleep_time is not None:
1559+
logger.info(f"Scheduler.run_batch sleep {self.forward_sleep_time}s")
1560+
time.sleep(self.forward_sleep_time)
1561+
15531562
# Run forward
15541563
if self.is_generation:
15551564
if self.spec_algorithm.is_none():
@@ -2025,6 +2034,13 @@ def resume_memory_occupation(self, recv_req: ResumeMemoryOccupationReqInput):
20252034
del self.stashed_model_static_state
20262035
return ResumeMemoryOccupationReqOutput()
20272036

2037+
def slow_down(self, recv_req: SlowDownReqInput):
2038+
t = recv_req.forward_sleep_time
2039+
if t is not None and t <= 0:
2040+
t = None
2041+
self.forward_sleep_time = t
2042+
return SlowDownReqOutput()
2043+
20282044
def profile(self, recv_req: ProfileReq):
20292045
if recv_req.type == ProfileReqType.START_PROFILE:
20302046
return self.start_profile(

python/sglang/srt/managers/tokenizer_manager.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@
9090
ResumeMemoryOccupationReqInput,
9191
ResumeMemoryOccupationReqOutput,
9292
SessionParams,
93+
SlowDownReqInput,
94+
SlowDownReqOutput,
9395
TokenizedEmbeddingReqInput,
9496
TokenizedGenerateReqInput,
9597
UpdateWeightFromDiskReqInput,
@@ -259,6 +261,9 @@ def __init__(
259261
self.resume_memory_occupation_communicator = _Communicator(
260262
self.send_to_scheduler, server_args.dp_size
261263
)
264+
self.slow_down_communicator = _Communicator(
265+
self.send_to_scheduler, server_args.dp_size
266+
)
262267
self.flush_cache_communicator = _Communicator(
263268
self.send_to_scheduler, server_args.dp_size
264269
)
@@ -312,6 +317,10 @@ def __init__(
312317
ResumeMemoryOccupationReqOutput,
313318
self.resume_memory_occupation_communicator.handle_recv,
314319
),
320+
(
321+
SlowDownReqOutput,
322+
self.slow_down_communicator.handle_recv,
323+
),
315324
(
316325
FlushCacheReqOutput,
317326
self.flush_cache_communicator.handle_recv,
@@ -870,6 +879,14 @@ async def resume_memory_occupation(
870879
self.auto_create_handle_loop()
871880
await self.resume_memory_occupation_communicator(obj)
872881

882+
async def slow_down(
883+
self,
884+
obj: SlowDownReqInput,
885+
request: Optional[fastapi.Request] = None,
886+
):
887+
self.auto_create_handle_loop()
888+
await self.slow_down_communicator(obj)
889+
873890
async def open_session(
874891
self, obj: OpenSessionReqInput, request: Optional[fastapi.Request] = None
875892
):

0 commit comments

Comments
 (0)