Skip to content

Commit b3e7484

Browse files
committed
Continue LiveNode and AsyncRunner in Rust
1 parent 62b6aa9 commit b3e7484

File tree

11 files changed

+200
-90
lines changed

11 files changed

+200
-90
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/adapters/databento/bin/node_test.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3232
// #[cfg(feature = "python")]
3333
pyo3::prepare_freethreaded_python();
3434

35-
tracing_subscriber::fmt()
36-
.with_max_level(tracing::Level::INFO)
37-
.init();
38-
3935
dotenvy::dotenv().ok();
4036

4137
let environment = Environment::Live;
@@ -70,7 +66,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7066
// Create and register a Databento subscriber actor
7167
let client_id = ClientId::new("DATABENTO");
7268
let instrument_ids = vec![
73-
InstrumentId::from("ES.c.0.GLBX"),
69+
InstrumentId::from("ESM5.XCME"),
7470
// Add more instruments as needed
7571
];
7672

crates/adapters/databento/src/data.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,16 @@ use std::{
2626
use ahash::AHashMap;
2727
use databento::live::Subscription;
2828
use indexmap::IndexMap;
29-
use nautilus_common::messages::data::{
30-
RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
31-
SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades, UnsubscribeBookDeltas,
32-
UnsubscribeInstrumentStatus, UnsubscribeQuotes, UnsubscribeTrades,
29+
use nautilus_common::{
30+
messages::{
31+
DataEvent,
32+
data::{
33+
RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
34+
SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades, UnsubscribeBookDeltas,
35+
UnsubscribeInstrumentStatus, UnsubscribeQuotes, UnsubscribeTrades,
36+
},
37+
},
38+
runner::get_data_event_sender,
3339
};
3440
use nautilus_core::time::AtomicTime;
3541
use nautilus_data::client::DataClient;
@@ -38,7 +44,10 @@ use nautilus_model::{
3844
identifiers::{ClientId, Symbol, Venue},
3945
instruments::Instrument,
4046
};
41-
use tokio::{sync::mpsc, task::JoinHandle};
47+
use tokio::{
48+
sync::mpsc::{self, UnboundedSender},
49+
task::JoinHandle,
50+
};
4251
use tokio_util::sync::CancellationToken;
4352

4453
use crate::{
@@ -108,6 +117,8 @@ pub struct DatabentoDataClient {
108117
publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
109118
/// Symbol to venue mapping (for caching).
110119
symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
120+
/// Data event sender for forwarding data to AsyncRunner.
121+
data_sender: UnboundedSender<DataEvent>,
111122
}
112123

113124
impl DatabentoDataClient {
@@ -141,6 +152,9 @@ impl DatabentoDataClient {
141152
.map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
142153
.collect::<IndexMap<u16, Venue>>();
143154

155+
// Get the data event sender for forwarding data to AsyncRunner
156+
let data_sender = get_data_event_sender();
157+
144158
Ok(Self {
145159
client_id,
146160
config,
@@ -152,6 +166,7 @@ impl DatabentoDataClient {
152166
cancellation_token: CancellationToken::new(),
153167
publisher_venue_map: Arc::new(publisher_venue_map),
154168
symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
169+
data_sender,
155170
})
156171
}
157172

@@ -238,6 +253,7 @@ impl DatabentoDataClient {
238253
});
239254

240255
let cancellation_token = self.cancellation_token.clone();
256+
let data_sender = self.data_sender.clone();
241257

242258
// Spawn message processing task with cancellation support
243259
let msg_handle = tokio::spawn(async move {
@@ -248,7 +264,9 @@ impl DatabentoDataClient {
248264
match msg {
249265
Some(LiveMessage::Data(data)) => {
250266
tracing::debug!("Received data: {data:?}");
251-
// TODO: Forward to message bus or data engine
267+
if let Err(e) = data_sender.send(DataEvent::Data(data)) {
268+
tracing::error!("Failed to send data event: {e}");
269+
}
252270
}
253271
Some(LiveMessage::Instrument(instrument)) => {
254272
tracing::debug!("Received instrument: {}", instrument.id());

crates/common/src/msgbus/switchboard.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,11 @@ impl MessagingSwitchboard {
219219
"DataEngine.process".into()
220220
}
221221

222+
#[must_use]
223+
pub fn data_engine_response() -> MStr<Endpoint> {
224+
"DataEngine.response".into()
225+
}
226+
222227
#[must_use]
223228
pub fn exec_engine_execute() -> MStr<Endpoint> {
224229
"ExecEngine.execute".into()

crates/common/src/runner.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use std::{
1919
rc::Rc,
2020
};
2121

22+
use tokio::sync::mpsc::UnboundedSender;
23+
2224
use crate::{
2325
clock::Clock,
2426
messages::{DataEvent, data::DataCommand},
@@ -56,6 +58,7 @@ pub fn set_global_clock(c: Rc<RefCell<dyn Clock>>) {
5658
pub type DataCommandQueue = Rc<RefCell<VecDeque<DataCommand>>>;
5759

5860
/// Get globally shared message bus command queue
61+
///
5962
/// # Panics
6063
///
6164
/// Panics if thread-local storage cannot be accessed.
@@ -106,16 +109,64 @@ pub fn set_data_evt_queue(dq: Rc<RefCell<dyn DataQueue>>) {
106109
.expect("Should be able to access thread local storage");
107110
}
108111

112+
/// Sends a data event to the global data event queue.
113+
///
114+
/// This function provides a convenient way for data clients and feed handlers
115+
/// to send data events to the AsyncRunner for processing.
116+
///
117+
/// # Panics
118+
///
119+
/// Panics if thread-local storage cannot be accessed or the data event queue is uninitialized.
120+
pub fn send_data_event(event: DataEvent) {
121+
get_data_evt_queue().borrow_mut().push(event);
122+
}
123+
124+
/// Sets the global data event sender.
125+
///
126+
/// This should be called by the AsyncRunner when it creates the channel.
127+
///
128+
/// # Panics
129+
///
130+
/// Panics if thread-local storage cannot be accessed or a sender is already set.
131+
pub fn set_data_event_sender(sender: UnboundedSender<DataEvent>) {
132+
DATA_EVT_SENDER
133+
.try_with(|s| {
134+
assert!(s.set(sender).is_ok(), "Data event sender already set");
135+
})
136+
.expect("Should be able to access thread local storage");
137+
}
138+
139+
/// Gets a cloned data event sender.
140+
///
141+
/// This allows data clients to send events directly to the AsyncRunner
142+
/// without going through shared mutable state.
143+
///
144+
/// # Panics
145+
///
146+
/// Panics if thread-local storage cannot be accessed or the sender is uninitialized.
147+
#[must_use]
148+
pub fn get_data_event_sender() -> UnboundedSender<DataEvent> {
149+
DATA_EVT_SENDER
150+
.try_with(|s| {
151+
s.get()
152+
.expect("Data event sender should be initialized by AsyncRunner")
153+
.clone()
154+
})
155+
.expect("Should be able to access thread local storage")
156+
}
157+
109158
thread_local! {
110159
static CLOCK: OnceCell<GlobalClock> = OnceCell::new();
111160
static DATA_EVT_QUEUE: OnceCell<GlobalDataQueue> = OnceCell::new();
112161
static DATA_CMD_QUEUE: DataCommandQueue = Rc::new(RefCell::new(VecDeque::new()));
162+
// TODO: Potentially redundant but added to simplify the abstraction layers for now
163+
static DATA_EVT_SENDER: OnceCell<UnboundedSender<DataEvent>> = const { OnceCell::new() };
113164
}
114165

115166
// Represents different event types for the runner.
116167
#[allow(clippy::large_enum_variant)]
117168
#[derive(Debug)]
118169
pub enum RunnerEvent {
170+
Time(TimeEvent),
119171
Data(DataEvent),
120-
Timer(TimeEvent),
121172
}

crates/data/src/engine/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,9 @@ impl DataEngine {
571571
/// Currently supports `InstrumentAny`; unrecognized types are logged as errors.
572572
pub fn process(&mut self, data: &dyn Any) {
573573
// TODO: Eventually these could be added to the `Data` enum? process here for now
574-
if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
574+
if let Some(data) = data.downcast_ref::<Data>() {
575+
self.process_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
576+
} else if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
575577
self.handle_instrument(instrument.clone());
576578
} else {
577579
log::error!("Cannot process data {data:?}, type is unrecognized");

crates/live/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ pyo3 = { workspace = true, optional = true }
7171
serde = { workspace = true }
7272
strum = { workspace = true }
7373
tokio = { workspace = true }
74+
tracing = { workspace = true }
7475
ustr = { workspace = true }
7576

7677
[dev-dependencies]

crates/live/src/node.rs

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919

2020
use std::{cell::RefCell, collections::HashMap, rc::Rc};
2121

22+
use anyhow::Context;
2223
use nautilus_common::{
23-
actor::DataActor, clock::LiveClock, component::Component, enums::Environment,
24+
actor::{Actor, DataActor},
25+
clock::LiveClock,
26+
component::Component,
27+
enums::Environment,
2428
};
2529
use nautilus_core::UUID4;
2630
use nautilus_data::client::DataClientAdapter;
@@ -30,6 +34,7 @@ use nautilus_system::{
3034
factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
3135
kernel::NautilusKernel,
3236
};
37+
use tokio::sync::mpsc::UnboundedSender;
3338

3439
use crate::{config::LiveNodeConfig, runner::AsyncRunner};
3540

@@ -42,6 +47,7 @@ pub struct LiveNode {
4247
clock: Rc<RefCell<LiveClock>>,
4348
kernel: NautilusKernel,
4449
runner: AsyncRunner,
50+
signal_tx: Option<UnboundedSender<()>>,
4551
config: LiveNodeConfig,
4652
is_running: bool,
4753
}
@@ -83,14 +89,15 @@ impl LiveNode {
8389

8490
let clock = Rc::new(RefCell::new(LiveClock::new()));
8591
let kernel = NautilusKernel::new(name, config.clone())?;
86-
let runner = AsyncRunner::new(clock.clone());
92+
let (runner, signal_tx) = AsyncRunner::new(clock.clone());
8793

8894
log::info!("LiveNode built successfully with kernel config");
8995

9096
Ok(Self {
9197
clock,
9298
kernel,
9399
runner,
100+
signal_tx: Some(signal_tx),
94101
config,
95102
is_running: false,
96103
})
@@ -106,9 +113,9 @@ impl LiveNode {
106113
anyhow::bail!("LiveNode is already running");
107114
}
108115

109-
log::info!("Starting live node");
116+
log::info!("Starting LiveNode");
117+
110118
self.kernel.start_async().await;
111-
log::info!("Kernel start completed");
112119
self.is_running = true;
113120

114121
log::info!("LiveNode started successfully");
@@ -125,7 +132,8 @@ impl LiveNode {
125132
anyhow::bail!("LiveNode is not running");
126133
}
127134

128-
log::info!("Stopping live node");
135+
log::info!("Stopping LiveNode");
136+
129137
self.kernel.stop_async().await;
130138
self.is_running = false;
131139

@@ -142,40 +150,35 @@ impl LiveNode {
142150
///
143151
/// Returns an error if the node fails to start or encounters a runtime error.
144152
pub async fn run(&mut self) -> anyhow::Result<()> {
145-
log::info!("Starting LiveNode...");
146-
self.start().await?;
153+
let signal_tx = self.signal_tx.take().context("LiveNode already running")?;
147154

148-
log::info!("LiveNode started, setting up signal handler...");
149-
150-
let loop_duration = tokio::time::Duration::from_millis(100);
155+
self.start().await?;
151156

152-
// TODO: Temporary count logging for development
153-
let mut count = 0;
154-
loop {
155-
count += 1;
156-
if count % 10 == 0 {
157-
log::info!("Signal handler loop iteration {count}");
157+
tokio::select! {
158+
// Run on main thread
159+
_ = self.runner.run() => {
160+
log::info!("AsyncRunner finished");
158161
}
159-
160-
tokio::select! {
161-
result = tokio::signal::ctrl_c() => {
162-
match result {
163-
Ok(()) => {
164-
log::info!("Received SIGINT, shutting down...");
165-
break;
166-
}
167-
Err(e) => {
168-
log::error!("Failed to listen for SIGINT: {e}");
169-
anyhow::bail!("Signal handling failed: {e}");
162+
// Handle SIGINT signal
163+
result = tokio::signal::ctrl_c() => {
164+
match result {
165+
Ok(()) => {
166+
log::info!("Received SIGINT, shutting down");
167+
if let Err(e) = signal_tx.send(()) {
168+
log::error!("Failed to send shutdown signal: {e}");
170169
}
170+
// Give the AsyncRunner a moment to process the shutdown signal
171+
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
172+
}
173+
Err(e) => {
174+
log::error!("Failed to listen for SIGINT: {e}");
171175
}
172-
}
173-
_ = tokio::time::sleep(loop_duration) => {
174176
}
175177
}
176178
}
177179

178-
log::info!("Shutting down after signal...");
180+
log::debug!("AsyncRunner and signal handling finished"); // TODO: Temp logging
181+
179182
self.stop().await?;
180183
Ok(())
181184
}
@@ -224,7 +227,7 @@ impl LiveNode {
224227
/// - The node is currently running.
225228
pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
226229
where
227-
T: DataActor + Component + 'static,
230+
T: DataActor + Component + Actor + 'static,
228231
{
229232
if self.is_running {
230233
anyhow::bail!(
@@ -417,7 +420,7 @@ impl LiveNodeBuilder {
417420

418421
let clock = Rc::new(RefCell::new(LiveClock::new()));
419422
let kernel = NautilusKernel::new("LiveNode".to_string(), self.config.clone())?;
420-
let runner = AsyncRunner::new(clock.clone());
423+
let (runner, signal_tx) = AsyncRunner::new(clock.clone());
421424

422425
// Create and register data clients
423426
for (name, factory) in self.data_client_factories.into_iter() {
@@ -471,6 +474,7 @@ impl LiveNodeBuilder {
471474
clock,
472475
kernel,
473476
runner,
477+
signal_tx: Some(signal_tx),
474478
config: self.config,
475479
is_running: false,
476480
})

0 commit comments

Comments
 (0)