38
38
POLLING_TIMEOUT_MS = 5000
39
39
POLLING_TIMEOUT_S = POLLING_TIMEOUT_MS // 1000
40
40
41
- EXECUTE_MODEL_TIMEOUT_S = 30
41
+ EXECUTE_MODEL_TIMEOUT_S = 40
42
42
43
43
44
44
class MultiprocExecutor (Executor ):
@@ -151,16 +151,16 @@ def execute_model(
151
151
152
152
def collective_rpc (self ,
153
153
method : Union [str , Callable ],
154
- timeout : Optional [float ] = 180.0 ,
154
+ timeout : Optional [float ] = None ,
155
155
args : tuple = (),
156
156
kwargs : Optional [dict ] = None ,
157
157
rank0_reply_only : bool = False ) -> list [Any ]:
158
- start_time = time .monotonic ()
159
- kwargs = kwargs or {}
160
-
161
158
if self .is_failed :
162
159
raise RuntimeError ("Executor failed." )
163
160
161
+ deadline = None if timeout is None else time .monotonic () + timeout
162
+ kwargs = kwargs or {}
163
+
164
164
# NOTE: If the args are heterogeneous, then we pack them into a list,
165
165
# and unpack them in the method of every worker, because every worker
166
166
# knows their own rank.
@@ -176,8 +176,8 @@ def collective_rpc(self,
176
176
workers = (self .workers [0 ], ) if rank0_reply_only else self .workers
177
177
responses = [None ] * len (workers )
178
178
for w in workers :
179
- dequeue_timeout = timeout - ( time . monotonic () - start_time
180
- ) if timeout is not None else None
179
+ dequeue_timeout = None if deadline is None else (
180
+ deadline - time . monotonic ())
181
181
status , result = w .worker_response_mq .dequeue (
182
182
timeout = dequeue_timeout , cancel = self .shutdown_event )
183
183
0 commit comments