Skip to content

Commit a7f4729

Browse files
authored
Refine usage of databento use_exchange_as_venue (#2487)
1 parent 1619d6d commit a7f4729

File tree

7 files changed

+45
-19
lines changed

7 files changed

+45
-19
lines changed

crates/adapters/databento/src/live.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::{
2020

2121
use ahash::{HashSet, HashSetExt};
2222
use databento::{
23-
dbn::{self, PitSymbolMap, Publisher, Record, SymbolIndex, VersionUpgradePolicy},
23+
dbn::{self, PitSymbolMap, Record, SymbolIndex, VersionUpgradePolicy},
2424
live::Subscription,
2525
};
2626
use indexmap::IndexMap;
@@ -75,6 +75,7 @@ pub struct DatabentoFeedHandler {
7575
publisher_venue_map: IndexMap<PublisherId, Venue>,
7676
symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
7777
replay: bool,
78+
use_exchange_as_venue: bool,
7879
}
7980

8081
impl DatabentoFeedHandler {
@@ -87,6 +88,7 @@ impl DatabentoFeedHandler {
8788
tx: tokio::sync::mpsc::Sender<LiveMessage>,
8889
publisher_venue_map: IndexMap<PublisherId, Venue>,
8990
symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
91+
use_exchange_as_venue: bool,
9092
) -> Self {
9193
Self {
9294
key,
@@ -96,6 +98,7 @@ impl DatabentoFeedHandler {
9698
publisher_venue_map,
9799
symbol_venue_map,
98100
replay: false,
101+
use_exchange_as_venue,
99102
}
100103
}
101104

@@ -214,9 +217,7 @@ impl DatabentoFeedHandler {
214217
instrument_id_map.remove(&msg.hd.instrument_id);
215218
handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map);
216219
} else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
217-
// TODO: Make this configurable so that exchange can be used as venue
218-
let use_exchange_as_venue = false;
219-
if use_exchange_as_venue && msg.publisher()? == Publisher::GlbxMdp3Glbx {
220+
if self.use_exchange_as_venue {
220221
update_instrument_id_map_with_exchange(
221222
&symbol_map,
222223
&self.symbol_venue_map,
@@ -390,7 +391,11 @@ fn update_instrument_id_map_with_exchange(
390391
let symbol = Symbol::from(raw_symbol.as_str());
391392
let venue = Venue::from(exchange);
392393
let instrument_id = InstrumentId::new(symbol, venue);
393-
symbol_venue_map.write().unwrap().insert(symbol, venue);
394+
symbol_venue_map
395+
.write()
396+
.unwrap()
397+
.entry(symbol)
398+
.or_insert(venue);
394399
instrument_id_map.insert(raw_instrument_id, instrument_id);
395400
instrument_id
396401
}

crates/adapters/databento/src/python/historical.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,17 @@ pub struct DatabentoHistoricalClient {
7171
inner: Arc<Mutex<databento::HistoricalClient>>,
7272
publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
7373
symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
74+
use_exchange_as_venue: bool,
7475
}
7576

7677
#[pymethods]
7778
impl DatabentoHistoricalClient {
7879
#[new]
79-
fn py_new(key: String, publishers_filepath: PathBuf) -> PyResult<Self> {
80+
fn py_new(
81+
key: String,
82+
publishers_filepath: PathBuf,
83+
use_exchange_as_venue: bool,
84+
) -> PyResult<Self> {
8085
let client = databento::HistoricalClient::builder()
8186
.key(key.clone())
8287
.map_err(to_pyvalue_err)?
@@ -98,6 +103,7 @@ impl DatabentoHistoricalClient {
98103
publisher_venue_map: Arc::new(publisher_venue_map),
99104
symbol_venue_map: Arc::new(RwLock::new(HashMap::new())),
100105
key,
106+
use_exchange_as_venue,
101107
})
102108
}
103109

@@ -127,7 +133,7 @@ impl DatabentoHistoricalClient {
127133
}
128134

129135
#[pyo3(name = "get_range_instruments")]
130-
#[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, use_exchange_as_venue=false))]
136+
#[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
131137
#[allow(clippy::too_many_arguments)]
132138
fn py_get_range_instruments<'py>(
133139
&self,
@@ -137,7 +143,6 @@ impl DatabentoHistoricalClient {
137143
start: u64,
138144
end: Option<u64>,
139145
limit: Option<u64>,
140-
use_exchange_as_venue: bool,
141146
) -> PyResult<Bound<'py, PyAny>> {
142147
let client = self.inner.clone();
143148
let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
@@ -165,6 +170,7 @@ impl DatabentoHistoricalClient {
165170
let publisher_venue_map = self.publisher_venue_map.clone();
166171
let symbol_venue_map = self.symbol_venue_map.clone();
167172
let ts_init = self.clock.get_time_ns();
173+
let use_exchange_as_venue = self.use_exchange_as_venue;
168174

169175
pyo3_async_runtimes::tokio::future_into_py(py, async move {
170176
let mut client = client.lock().await; // TODO: Use a client pool

crates/adapters/databento/src/python/live.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub struct DatabentoLiveClient {
5454
buffer_size: usize,
5555
publisher_venue_map: IndexMap<u16, Venue>,
5656
symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
57+
use_exchange_as_venue: bool,
5758
}
5859

5960
impl DatabentoLiveClient {
@@ -127,7 +128,12 @@ fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
127128
#[pymethods]
128129
impl DatabentoLiveClient {
129130
#[new]
130-
pub fn py_new(key: String, dataset: String, publishers_filepath: PathBuf) -> PyResult<Self> {
131+
pub fn py_new(
132+
key: String,
133+
dataset: String,
134+
publishers_filepath: PathBuf,
135+
use_exchange_as_venue: bool,
136+
) -> PyResult<Self> {
131137
let publishers_json = fs::read_to_string(publishers_filepath)?;
132138
let publishers_vec: Vec<DatabentoPublisher> =
133139
serde_json::from_str(&publishers_json).map_err(to_pyvalue_err)?;
@@ -151,6 +157,7 @@ impl DatabentoLiveClient {
151157
is_closed: false,
152158
publisher_venue_map,
153159
symbol_venue_map: Arc::new(RwLock::new(HashMap::new())),
160+
use_exchange_as_venue,
154161
})
155162
}
156163

@@ -231,6 +238,7 @@ impl DatabentoLiveClient {
231238
msg_tx,
232239
self.publisher_venue_map.clone(),
233240
self.symbol_venue_map.clone(),
241+
self.use_exchange_as_venue,
234242
);
235243

236244
self.send_command(LiveCommand::Start)?;

nautilus_trader/adapters/databento/data.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ def _get_live_client(self, dataset: Dataset) -> nautilus_pyo3.DatabentoLiveClien
284284
key=self._live_api_key,
285285
dataset=dataset,
286286
publishers_filepath=str(PUBLISHERS_FILEPATH),
287+
use_exchange_as_venue=self._use_exchange_as_venue,
287288
)
288289
self._live_clients[dataset] = live_client
289290

@@ -298,6 +299,7 @@ def _get_live_client_mbo(self, dataset: Dataset) -> nautilus_pyo3.DatabentoLiveC
298299
key=self._live_api_key,
299300
dataset=dataset,
300301
publishers_filepath=str(PUBLISHERS_FILEPATH),
302+
use_exchange_as_venue=self._use_exchange_as_venue,
301303
)
302304
self._live_clients_mbo[dataset] = live_client
303305

@@ -874,7 +876,6 @@ async def _request_instrument(self, request: RequestInstrument) -> None:
874876
instrument_ids=[instrument_id_to_pyo3(request.instrument_id)],
875877
start=start.value,
876878
end=end.value,
877-
use_exchange_as_venue=self._use_exchange_as_venue,
878879
)
879880

880881
instruments = instruments_from_pyo3(pyo3_instruments)
@@ -899,8 +900,6 @@ async def _request_instruments(self, request: RequestInstruments) -> None:
899900
LogColor.BLUE,
900901
)
901902

902-
use_exchange_as_venue = request.params.get("use_exchange_as_venue", False)
903-
904903
# parent_symbols can be equal to ["ES.OPT", "ES.FUT"] for example in order to not query all instruments of an exchange
905904
parent_symbols = request.params.get("parent_symbols") or [ALL_SYMBOLS]
906905
pyo3_instrument_ids = [
@@ -913,7 +912,6 @@ async def _request_instruments(self, request: RequestInstruments) -> None:
913912
instrument_ids=pyo3_instrument_ids,
914913
start=start.value,
915914
end=end.value,
916-
use_exchange_as_venue=use_exchange_as_venue,
917915
)
918916
instruments = instruments_from_pyo3(pyo3_instruments)
919917

nautilus_trader/adapters/databento/factories.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
def get_cached_databento_http_client(
3535
key: str | None = None,
3636
gateway: str | None = None,
37+
use_exchange_as_venue: bool = True,
3738
) -> nautilus_pyo3.DatabentoHistoricalClient:
3839
"""
3940
Cache and return a Databento historical HTTP client with the given key and gateway.
@@ -46,6 +47,8 @@ def get_cached_databento_http_client(
4647
The Databento API secret key for the client.
4748
gateway : str, optional
4849
The Databento historical HTTP client gateway override.
50+
use_exchange_as_venue : bool, default True
51+
If the `exchange` field will be used as the venue for instrument IDs.
4952
5053
Returns
5154
-------
@@ -55,6 +58,7 @@ def get_cached_databento_http_client(
5558
return nautilus_pyo3.DatabentoHistoricalClient(
5659
key=key or get_env_key("DATABENTO_API_KEY"),
5760
publishers_filepath=str(PUBLISHERS_FILEPATH),
61+
use_exchange_as_venue=use_exchange_as_venue,
5862
)
5963

6064

@@ -66,6 +70,7 @@ def get_cached_databento_instrument_provider(
6670
live_gateway: str | None = None,
6771
loader: DatabentoDataLoader | None = None,
6872
config: InstrumentProviderConfig | None = None,
73+
use_exchange_as_venue=True,
6974
) -> DatabentoInstrumentProvider:
7075
"""
7176
Cache and return a Databento instrument provider.
@@ -87,6 +92,8 @@ def get_cached_databento_instrument_provider(
8792
The loader for the provider.
8893
config : InstrumentProviderConfig
8994
The configuration for the instrument provider.
95+
use_exchange_as_venue : bool, default True
96+
If the `exchange` field will be used as the venue for instrument IDs.
9097
9198
Returns
9299
-------
@@ -100,6 +107,7 @@ def get_cached_databento_instrument_provider(
100107
live_gateway=live_gateway,
101108
loader=loader,
102109
config=config,
110+
use_exchange_as_venue=use_exchange_as_venue,
103111
)
104112

105113

@@ -144,6 +152,7 @@ def create( # type: ignore
144152
http_client = get_cached_databento_http_client(
145153
key=config.api_key,
146154
gateway=config.http_gateway,
155+
use_exchange_as_venue=config.use_exchange_as_venue,
147156
)
148157

149158
loader = DatabentoDataLoader(config.venue_dataset_map)
@@ -154,6 +163,7 @@ def create( # type: ignore
154163
live_gateway=config.live_gateway,
155164
loader=loader,
156165
config=config.instrument_provider,
166+
use_exchange_as_venue=config.use_exchange_as_venue,
157167
)
158168

159169
return DatabentoDataClient(

nautilus_trader/adapters/databento/providers.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def __init__(
6666
live_gateway: str | None = None,
6767
loader: DatabentoDataLoader | None = None,
6868
config: InstrumentProviderConfig | None = None,
69+
use_exchange_as_venue: bool = True,
6970
) -> None:
7071
super().__init__(config=config)
7172

@@ -76,6 +77,7 @@ def __init__(
7677

7778
self._http_client = http_client
7879
self._loader = loader or DatabentoDataLoader()
80+
self._use_exchange_as_venue = use_exchange_as_venue
7981

8082
async def load_all_async(self, filters: dict | None = None) -> None:
8183
raise RuntimeError(
@@ -122,11 +124,10 @@ async def load_ids_async( # noqa: C901 (too complex)
122124
key=self._live_api_key,
123125
dataset=dataset,
124126
publishers_filepath=str(PUBLISHERS_FILEPATH),
127+
use_exchange_as_venue=self._use_exchange_as_venue,
125128
)
126129

127130
parent_symbols = list(filters.get("parent_symbols", [])) if filters is not None else None
128-
# use_exchange_as_venue = filters.get("use_exchange_as_venue", True) if filters else True
129-
130131
pyo3_instruments = []
131132

132133
success_msg = "All instruments received and decoded"
@@ -157,7 +158,6 @@ def receive_instruments(pyo3_instrument: Any) -> None:
157158
[instrument_id_to_pyo3(instrument_id) for instrument_id in instrument_ids],
158159
),
159160
start=0, # From start of current week (latest definitions)
160-
# use_exchange_as_venue=use_exchange_as_venue, # TODO
161161
)
162162

163163
if parent_symbols:
@@ -265,7 +265,6 @@ async def get_range(
265265
266266
"""
267267
dataset = self._check_all_datasets_equal(instrument_ids)
268-
use_exchange_as_venue = filters.get("use_exchange_as_venue", True) if filters else True
269268

270269
# Here the NULL venue is overridden and so is used as a
271270
# placeholder to conform to instrument ID conventions.
@@ -274,7 +273,6 @@ async def get_range(
274273
instrument_ids=[instrument_id_to_pyo3(InstrumentId.from_str(f"{ALL_SYMBOLS}.NULL"))],
275274
start=pd.Timestamp(start, tz=pytz.utc).value,
276275
end=pd.Timestamp(end, tz=pytz.utc).value if end is not None else None,
277-
use_exchange_as_venue=use_exchange_as_venue,
278276
)
279277
instruments = instruments_from_pyo3(pyo3_instruments)
280278
instruments = sorted(instruments, key=lambda x: x.ts_init)

nautilus_trader/core/nautilus_pyo3.pyi

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4911,6 +4911,7 @@ class DatabentoHistoricalClient:
49114911
self,
49124912
key: str,
49134913
publishers_filepath: str,
4914+
use_exchange_as_venue: bool,
49144915
) -> None: ...
49154916
@property
49164917
def key(self) -> str: ...
@@ -4922,7 +4923,6 @@ class DatabentoHistoricalClient:
49224923
start: int,
49234924
end: int | None = None,
49244925
limit: int | None = None,
4925-
use_exchange_as_venue: bool = False
49264926
) -> list[Instrument]: ...
49274927
async def get_range_quotes(
49284928
self,
@@ -4982,6 +4982,7 @@ class DatabentoLiveClient:
49824982
key: str,
49834983
dataset: str,
49844984
publishers_filepath: str,
4985+
use_exchange_as_venue: bool,
49854986
) -> None: ...
49864987
@property
49874988
def key(self) -> str: ...

0 commit comments

Comments
 (0)