22
22
from typing import Any
23
23
24
24
import msgspec
25
- import pandas as pd
26
25
27
26
from nautilus_trader .adapters .dydx .common .enums import DYDXCandlesResolution
28
27
from nautilus_trader .adapters .dydx .http .errors import should_retry
@@ -85,9 +84,6 @@ def __init__(
85
84
self ._is_running = False
86
85
self ._subscriptions : set [tuple [str , str ]] = set ()
87
86
self ._subscription_rate_limit_per_second = subscription_rate_limit_per_second
88
- self ._msg_timestamp = self ._clock .utc_now ()
89
- self ._msg_timeout_secs : int = 60
90
- self ._reconnect_task : asyncio .Task | None = None
91
87
self ._max_send_retries = max_send_retries
92
88
self ._retry_delay_secs = retry_delay_secs
93
89
@@ -160,7 +156,7 @@ async def connect(self) -> None:
160
156
self ._log .debug (f"Connecting to { self ._base_url } websocket stream" )
161
157
config = WebSocketConfig (
162
158
url = self ._base_url ,
163
- handler = self ._msg_handler ,
159
+ handler = self ._handler ,
164
160
heartbeat = 10 ,
165
161
headers = [],
166
162
ping_handler = self ._handle_ping ,
@@ -173,24 +169,6 @@ async def connect(self) -> None:
173
169
self ._client = client
174
170
self ._log .info (f"Connected to { self ._base_url } " , LogColor .BLUE )
175
171
176
- self ._msg_timestamp = self ._clock .utc_now ()
177
-
178
- if self ._reconnect_task is None :
179
- self ._reconnect_task = self ._loop .create_task (self ._reconnect_guard ())
180
-
181
- def _msg_handler (self , raw : bytes ) -> None :
182
- """
183
- Handle pushed websocket messages.
184
-
185
- Parameters
186
- ----------
187
- raw : bytes
188
- The received message in bytes.
189
-
190
- """
191
- self ._msg_timestamp = self ._clock .utc_now ()
192
- self ._handler (raw )
193
-
194
172
def _handle_ping (self , raw : bytes ) -> None :
195
173
"""
196
174
Handle ping messages by returning a pong message.
@@ -213,7 +191,7 @@ async def send_pong(self, raw: bytes) -> None:
213
191
The pong message in bytes.
214
192
215
193
"""
216
- if self ._client is None :
194
+ if self ._client is None or self . _client . is_active () is False :
217
195
return
218
196
219
197
async with self ._retry_manager_pool as retry_manager :
@@ -224,42 +202,6 @@ async def send_pong(self, raw: bytes) -> None:
224
202
data = raw ,
225
203
)
226
204
227
- async def _reconnect_guard (self ) -> None :
228
- """
229
- Reconnect the websocket client when a message has not been received for some
230
- time.
231
- """
232
- try :
233
- while True :
234
- await asyncio .sleep (1 )
235
- time_since_previous_msg = self ._clock .utc_now () - self ._msg_timestamp
236
-
237
- if self .is_disconnected () or time_since_previous_msg > pd .Timedelta (
238
- seconds = self ._msg_timeout_secs ,
239
- ):
240
- if self .is_disconnected ():
241
- self ._log .error ("Websocket disconnected. Reconnecting." )
242
-
243
- # Print error if no message has been received for twice the timeout time
244
- # to reduce log noise.
245
- if time_since_previous_msg > pd .Timedelta (seconds = 2 * self ._msg_timeout_secs ):
246
- self ._log .error (
247
- f"{ time_since_previous_msg } since previous received message. Reconnecting." ,
248
- )
249
-
250
- try :
251
- await self .disconnect ()
252
- await self .connect ()
253
- self .reconnect ()
254
-
255
- self ._log .info ("Websocket connected" )
256
- except Exception as e :
257
- self ._log .error (f"Failed to connect the websocket: { e } " )
258
- self ._client = None
259
-
260
- except asyncio .CancelledError :
261
- self ._log .debug ("Canceled task 'reconnect_guard'" )
262
-
263
205
def reconnect (self ) -> None :
264
206
"""
265
207
Reconnect the client to the server and resubscribe to all streams.
0 commit comments