Skip to content

Commit dc53353

Browse files
authored
Add bar build delay to data engine config (#2676)
1 parent 0dc8896 commit dc53353

File tree

9 files changed

+92
-75
lines changed

9 files changed

+92
-75
lines changed

crates/data/src/aggregation.rs

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -860,11 +860,10 @@ where
860860
timer_name: String,
861861
interval_ns: UnixNanos,
862862
next_close_ns: UnixNanos,
863-
composite_bar_build_delay: i64,
864-
add_delay: bool,
863+
bar_build_delay: u64,
865864
batch_open_ns: UnixNanos,
866865
batch_next_close_ns: UnixNanos,
867-
time_bars_origin: Option<TimeDelta>,
866+
time_bars_origin_offset: Option<TimeDelta>,
868867
skip_first_non_full_bar: bool,
869868
}
870869

@@ -877,7 +876,7 @@ impl<H: FnMut(Bar)> Debug for TimeBarAggregator<H> {
877876
.field("is_left_open", &self.is_left_open)
878877
.field("timer_name", &self.timer_name)
879878
.field("interval_ns", &self.interval_ns)
880-
.field("composite_bar_build_delay", &self.composite_bar_build_delay)
879+
.field("bar_build_delay", &self.bar_build_delay)
881880
.field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
882881
.finish()
883882
}
@@ -926,18 +925,15 @@ where
926925
build_with_no_updates: bool,
927926
timestamp_on_close: bool,
928927
interval_type: BarIntervalType,
929-
time_bars_origin: Option<TimeDelta>,
930-
composite_bar_build_delay: i64,
928+
time_bars_origin_offset: Option<TimeDelta>,
929+
bar_build_delay: u64,
931930
skip_first_non_full_bar: bool,
932931
) -> Self {
933932
let is_left_open = match interval_type {
934933
BarIntervalType::LeftOpen => true,
935934
BarIntervalType::RightOpen => false,
936935
};
937936

938-
let add_delay = bar_type.is_composite()
939-
&& bar_type.composite().aggregation_source() == AggregationSource::Internal;
940-
941937
let core = BarAggregatorCore::new(
942938
bar_type.standard(),
943939
price_precision,
@@ -958,11 +954,10 @@ where
958954
timer_name: bar_type.to_string(),
959955
interval_ns: get_bar_interval_ns(&bar_type),
960956
next_close_ns: UnixNanos::default(),
961-
composite_bar_build_delay,
962-
add_delay,
957+
bar_build_delay,
963958
batch_open_ns: UnixNanos::default(),
964959
batch_next_close_ns: UnixNanos::default(),
965-
time_bars_origin,
960+
time_bars_origin_offset,
966961
skip_first_non_full_bar,
967962
}
968963
}
@@ -978,15 +973,14 @@ where
978973
/// Panics if the underlying clock timer registration fails.
979974
pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
980975
let now = self.clock.borrow().utc_now();
981-
let mut start_time = get_time_bar_start(now, &self.bar_type(), self.time_bars_origin);
976+
let mut start_time =
977+
get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
982978

983979
if start_time == now {
984980
self.skip_first_non_full_bar = false;
985981
}
986982

987-
if self.add_delay {
988-
start_time += TimeDelta::microseconds(self.composite_bar_build_delay);
989-
}
983+
start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
990984

991985
let spec = &self.bar_type().spec();
992986
let start_time_ns = UnixNanos::from(start_time);
@@ -1032,7 +1026,7 @@ where
10321026
self.core.batch_mode = true;
10331027

10341028
let time = time_ns.to_datetime_utc();
1035-
let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin);
1029+
let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin_offset);
10361030
self.batch_open_ns = UnixNanos::from(start_time);
10371031

10381032
if spec.aggregation == BarAggregation::Month {
@@ -1946,8 +1940,8 @@ mod tests {
19461940
true, // build_with_no_updates
19471941
false, // timestamp_on_close
19481942
BarIntervalType::LeftOpen,
1949-
None, // time_bars_origin
1950-
15, // composite_bar_build_delay
1943+
None, // time_bars_origin_offset
1944+
15, // bar_build_delay
19511945
false, // skip_first_non_full_bar
19521946
);
19531947

examples/backtest/databento_test_request_bars.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# extension: .py
77
# format_name: percent
88
# format_version: '1.3'
9-
# jupytext_version: 1.17.0
9+
# jupytext_version: 1.16.6
1010
# kernelspec:
1111
# display_name: Python 3 (ipykernel)
1212
# language: python
@@ -268,9 +268,12 @@ def on_stop(self):
268268
]
269269

270270
data_engine = DataEngineConfig(
271-
time_bars_origins={
271+
time_bars_origin_offset={
272272
BarAggregation.MINUTE: pd.Timedelta(seconds=0),
273273
},
274+
bar_build_delay=20,
275+
# default is 15 when using composite bars aggregating internal bars
276+
# also useful in live context to account for network delay
274277
)
275278

276279
engine_config = BacktestEngineConfig(
@@ -332,3 +335,5 @@ def on_stop(self):
332335

333336
# %%
334337
results = node.run()
338+
339+
# %%

nautilus_trader/data/aggregation.pxd

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,17 +99,16 @@ cdef class TimeBarAggregator(BarAggregator):
9999
cdef bint _build_on_next_tick
100100
cdef uint64_t _stored_open_ns
101101
cdef uint64_t _stored_close_ns
102-
cdef tuple _cached_update
103102
cdef str _timer_name
104103
cdef bint _is_left_open
105104
cdef bint _timestamp_on_close
106105
cdef bint _skip_first_non_full_bar
107106
cdef bint _build_with_no_updates
108-
cdef int _composite_bar_build_delay
107+
cdef int _bar_build_delay
109108
cdef bint _add_delay
110109
cdef uint64_t _batch_open_ns
111110
cdef uint64_t _batch_next_close_ns
112-
cdef object _time_bars_origin
111+
cdef object _time_bars_origin_offset
113112

114113
cdef readonly timedelta interval
115114
"""The aggregators time interval.\n\n:returns: `timedelta`"""

nautilus_trader/data/aggregation.pyx

Lines changed: 42 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -693,10 +693,12 @@ cdef class TimeBarAggregator(BarAggregator):
693693
If will skip emitting a bar if the aggregation starts mid-interval.
694694
build_with_no_updates : bool, default True
695695
If build and emit bars with no new market updates.
696-
time_bars_origin : pd.Timedelta or pd.DateOffset, optional
696+
time_bars_origin_offset : pd.Timedelta or pd.DateOffset, optional
697697
The origin time offset.
698-
composite_bar_build_delay : int, default 15
698+
bar_build_delay : int, default 0
699699
The time delay (microseconds) before building and emitting a composite bar type.
700+
15 microseconds can be useful in a backtest context, when aggregating internal bars
701+
from internal bars several times so all messages are processed before a timer triggers.
700702
701703
Raises
702704
------
@@ -714,34 +716,15 @@ cdef class TimeBarAggregator(BarAggregator):
714716
bint timestamp_on_close = True,
715717
bint skip_first_non_full_bar = False,
716718
bint build_with_no_updates = True,
717-
object time_bars_origin: pd.Timedelta | pd.DateOffset = None,
718-
int composite_bar_build_delay = 15, # in microsecond
719+
object time_bars_origin_offset: pd.Timedelta | pd.DateOffset = None,
720+
int bar_build_delay = 0,
719721
) -> None:
720722
super().__init__(
721723
instrument=instrument,
722724
bar_type=bar_type.standard(),
723725
handler=handler,
724726
)
725-
726727
self._clock = clock
727-
self.interval = self._get_interval()
728-
self.interval_ns = self._get_interval_ns()
729-
self._timer_name = None
730-
self._set_build_timer()
731-
self.next_close_ns = self._clock.next_time_ns(self._timer_name)
732-
self._build_on_next_tick = False
733-
cdef datetime now = self._clock.utc_now()
734-
self._stored_open_ns = dt_to_unix_nanos(self.get_start_time(now))
735-
self._stored_close_ns = 0
736-
self._cached_update = None
737-
self._build_with_no_updates = build_with_no_updates
738-
self._timestamp_on_close = timestamp_on_close
739-
self._composite_bar_build_delay = composite_bar_build_delay
740-
self._add_delay = bar_type.is_composite() and bar_type.composite().is_internally_aggregated()
741-
self._batch_open_ns = 0
742-
self._batch_next_close_ns = 0
743-
self._time_bars_origin = time_bars_origin
744-
self._skip_first_non_full_bar = skip_first_non_full_bar
745728

746729
if interval_type == "left-open":
747730
self._is_left_open = True
@@ -752,6 +735,26 @@ cdef class TimeBarAggregator(BarAggregator):
752735
f"Invalid interval_type: {interval_type}. Must be 'left-open' or 'right-open'.",
753736
)
754737

738+
self._timestamp_on_close = timestamp_on_close
739+
self._skip_first_non_full_bar = skip_first_non_full_bar
740+
self._build_with_no_updates = build_with_no_updates
741+
self._time_bars_origin_offset = time_bars_origin_offset
742+
self._bar_build_delay = bar_build_delay
743+
744+
self._timer_name = None
745+
self._build_on_next_tick = False
746+
self._batch_open_ns = 0
747+
self._batch_next_close_ns = 0
748+
749+
self.interval = self._get_interval()
750+
self.interval_ns = self._get_interval_ns()
751+
self._set_build_timer()
752+
self.next_close_ns = self._clock.next_time_ns(self._timer_name)
753+
754+
cdef datetime now = self._clock.utc_now()
755+
self._stored_open_ns = dt_to_unix_nanos(self.get_start_time(now))
756+
self._stored_close_ns = 0
757+
755758
def __str__(self):
756759
return f"{type(self).__name__}(interval_ns={self.interval_ns}, next_close_ns={self.next_close_ns})"
757760

@@ -771,8 +774,8 @@ cdef class TimeBarAggregator(BarAggregator):
771774
if aggregation == BarAggregation.MILLISECOND:
772775
start_time = now.floor(freq="s")
773776

774-
if self._time_bars_origin is not None:
775-
start_time += self._time_bars_origin
777+
if self._time_bars_origin_offset is not None:
778+
start_time += self._time_bars_origin_offset
776779

777780
if now < start_time:
778781
start_time -= pd.Timedelta(seconds=1)
@@ -784,8 +787,8 @@ cdef class TimeBarAggregator(BarAggregator):
784787
elif aggregation == BarAggregation.SECOND:
785788
start_time = now.floor(freq="min")
786789

787-
if self._time_bars_origin is not None:
788-
start_time += self._time_bars_origin
790+
if self._time_bars_origin_offset is not None:
791+
start_time += self._time_bars_origin_offset
789792

790793
if now < start_time:
791794
start_time -= pd.Timedelta(minutes=1)
@@ -797,8 +800,8 @@ cdef class TimeBarAggregator(BarAggregator):
797800
elif aggregation == BarAggregation.MINUTE:
798801
start_time = now.floor(freq="h")
799802

800-
if self._time_bars_origin is not None:
801-
start_time += self._time_bars_origin
803+
if self._time_bars_origin_offset is not None:
804+
start_time += self._time_bars_origin_offset
802805

803806
if now < start_time:
804807
start_time -= pd.Timedelta(hours=1)
@@ -810,8 +813,8 @@ cdef class TimeBarAggregator(BarAggregator):
810813
elif aggregation == BarAggregation.HOUR:
811814
start_time = now.floor(freq="d")
812815

813-
if self._time_bars_origin is not None:
814-
start_time += self._time_bars_origin
816+
if self._time_bars_origin_offset is not None:
817+
start_time += self._time_bars_origin_offset
815818

816819
if now < start_time:
817820
start_time -= pd.Timedelta(days=1)
@@ -823,24 +826,24 @@ cdef class TimeBarAggregator(BarAggregator):
823826
elif aggregation == BarAggregation.DAY:
824827
start_time = now.floor(freq="d")
825828

826-
if self._time_bars_origin is not None:
827-
start_time += self._time_bars_origin
829+
if self._time_bars_origin_offset is not None:
830+
start_time += self._time_bars_origin_offset
828831

829832
if now < start_time:
830833
start_time -= pd.Timedelta(days=1)
831834
elif aggregation == BarAggregation.WEEK:
832835
start_time = (now - pd.Timedelta(days=now.dayofweek)).floor(freq="d")
833836

834-
if self._time_bars_origin is not None:
835-
start_time += self._time_bars_origin
837+
if self._time_bars_origin_offset is not None:
838+
start_time += self._time_bars_origin_offset
836839

837840
if now < start_time:
838841
start_time -= pd.Timedelta(weeks=1)
839842
elif aggregation == BarAggregation.MONTH:
840843
start_time = (now - pd.DateOffset(months=now.month - 1, days=now.day - 1)).floor(freq="d")
841844

842-
if self._time_bars_origin is not None:
843-
start_time += self._time_bars_origin
845+
if self._time_bars_origin_offset is not None:
846+
start_time += self._time_bars_origin_offset
844847

845848
if now < start_time:
846849
start_time -= pd.DateOffset(years=1)
@@ -916,8 +919,8 @@ cdef class TimeBarAggregator(BarAggregator):
916919
if start_time == now:
917920
self._skip_first_non_full_bar = False
918921

919-
if self._add_delay:
920-
start_time += timedelta(microseconds=self._composite_bar_build_delay)
922+
start_time += timedelta(microseconds=self._bar_build_delay)
923+
self._log.debug(f"Timer {start_time=}")
921924

922925
if self.bar_type.spec.aggregation != BarAggregation.MONTH:
923926
self._clock.set_timer(

nautilus_trader/data/config.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,12 @@ class DataEngineConfig(NautilusConfig, frozen=True):
3636
If time bar aggregators will skip emitting a bar if the aggregation starts mid-interval.
3737
time_bars_build_with_no_updates : bool, default True
3838
If time bar aggregators will build and emit bars with no new market updates.
39-
time_bars_origins : dict[BarAggregation, pd.Timedelta | pd.DateOffset], optional
39+
time_bars_origin_offset : dict[BarAggregation, pd.Timedelta | pd.DateOffset], optional
4040
A dictionary mapping time bar aggregations to their origin time offsets.
41+
bar_build_delay : int, default 0
42+
The time delay (microseconds) before building and emitting a composite bar type.
43+
15 microseconds can be useful in a backtest context, when aggregating internal bars
44+
from internal bars several times so all messages are processed before a timer triggers.
4145
validate_data_sequence : bool, default False
4246
If data objects timestamp sequencing will be validated and handled.
4347
buffer_deltas : bool, default False
@@ -54,7 +58,8 @@ class DataEngineConfig(NautilusConfig, frozen=True):
5458
time_bars_timestamp_on_close: bool = True
5559
time_bars_skip_first_non_full_bar: bool = False
5660
time_bars_build_with_no_updates: bool = True
57-
time_bars_origins: dict | None = None
61+
time_bars_origin_offset: dict | None = None
62+
bar_build_delay: int = 0
5863
validate_data_sequence: bool = False
5964
buffer_deltas: bool = False
6065
external_clients: list[ClientId] | None = None

nautilus_trader/data/engine.pxd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ cdef class DataEngine(Component):
102102
cdef readonly bint _time_bars_timestamp_on_close
103103
cdef readonly bint _time_bars_skip_first_non_full_bar
104104
cdef readonly bint _time_bars_build_with_no_updates
105-
cdef readonly dict[BarAggregation, object] _time_bars_origins # pd.Timedelta or pd.DateOffset
105+
cdef readonly dict[BarAggregation, object] _time_bars_origin_offset # pd.Timedelta or pd.DateOffset
106+
cdef readonly int _bar_build_delay
106107
cdef readonly bint _validate_data_sequence
107108
cdef readonly bint _buffer_deltas
108109

nautilus_trader/data/engine.pyx

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ cdef class DataEngine(Component):
184184
self._time_bars_timestamp_on_close = config.time_bars_timestamp_on_close
185185
self._time_bars_skip_first_non_full_bar = config.time_bars_skip_first_non_full_bar
186186
self._time_bars_build_with_no_updates = config.time_bars_build_with_no_updates
187-
self._time_bars_origins = config.time_bars_origins or {}
187+
self._time_bars_origin_offset = config.time_bars_origin_offset or {}
188+
self._bar_build_delay = config.bar_build_delay
188189
self._validate_data_sequence = config.validate_data_sequence
189190
self._buffer_deltas = config.buffer_deltas
190191

@@ -2251,6 +2252,12 @@ cdef class DataEngine(Component):
22512252

22522253
cpdef object _create_bar_aggregator(self, Instrument instrument, BarType bar_type):
22532254
if bar_type.spec.is_time_aggregated():
2255+
# Use configured bar_build_delay, with special handling for composite bars
2256+
bar_build_delay = self._bar_build_delay
2257+
2258+
if bar_type.is_composite() and bar_type.composite().is_internally_aggregated() and bar_build_delay == 0:
2259+
bar_build_delay = 15 # Default for composite bars when config is 0
2260+
22542261
aggregator = TimeBarAggregator(
22552262
instrument=instrument,
22562263
bar_type=bar_type,
@@ -2260,7 +2267,8 @@ cdef class DataEngine(Component):
22602267
timestamp_on_close=self._time_bars_timestamp_on_close,
22612268
skip_first_non_full_bar=self._time_bars_skip_first_non_full_bar,
22622269
build_with_no_updates=self._time_bars_build_with_no_updates,
2263-
time_bars_origin=self._time_bars_origins.get(bar_type.spec.aggregation),
2270+
time_bars_origin_offset=self._time_bars_origin_offset.get(bar_type.spec.aggregation),
2271+
bar_build_delay=bar_build_delay,
22642272
)
22652273
elif bar_type.spec.aggregation == BarAggregation.TICK:
22662274
aggregator = TickBarAggregator(
@@ -2291,6 +2299,7 @@ cdef class DataEngine(Component):
22912299

22922300
cpdef void _start_bar_aggregator(self, MarketDataClient client, SubscribeBars command):
22932301
cdef Instrument instrument = self._cache.instrument(command.bar_type.instrument_id)
2302+
22942303
if instrument is None:
22952304
self._log.error(
22962305
f"Cannot start bar aggregation: "

tests/unit_tests/backtest/test_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ def test_backtest_run_config_id(self) -> None:
285285
TestConfigStubs.backtest_engine_config,
286286
("catalog",),
287287
{"persist": True},
288-
("f39654b91400406374e3e4e3e7ff433e890adcae5f623d1ef6af7f823ce88449",),
288+
("f7d56cc577f56aa583466d1d61dbcc8378f44109cf51ce6fa7235f490150560b",),
289289
),
290290
(
291291
TestConfigStubs.risk_engine_config,

0 commit comments

Comments
 (0)