from timdex_dataset_api import TIMDEXDataset
Python library for interacting with a TIMDEX parquet dataset located remotely or in S3. This library is often abbreviated as "TDA".
- To preview a list of available Makefile commands:
make help
- To install with dev dependencies:
make install
- To update dependencies:
make update
- To run unit tests:
make test
- To lint the repo:
make lint
The library version number is set in timdex_dataset_api/__init__.py
, e.g.:
__version__ = "2.1.0"
Updating the version number when making changes to the library will prompt applications that install it, when they have their dependencies updated, to pickup the new version.
This library is designed to be utilized by other projects, and can therefore be added as a dependency directly from the Github repository.
Add via pipenv
:
pipenv add git+https://github.com/MITLibraries/timdex-dataset-api.git
Manually add to Pipfile
:
[packages]
... other dependencies...
timdex_dataset_api = {git = "https://github.com/MITLibraries/timdex-dataset-api.git"}
... other dependencies...
None at this time.
TDA_LOG_LEVEL=# log level for timdex-dataset-api, accepts [DEBUG, INFO, WARNING, ERROR], default INFO
WARNING_ONLY_LOGGERS=# Comma-seperated list of logger names to set as WARNING only, e.g. 'botocore,charset_normalizer,smart_open'
Currently, the most common use cases are:
- Transmogrifier: uses TDA to write to the parquet dataset
- TIMDEX-Index-Manager (TIM): uses TDA to read from the parquet dataset
Beyond those two ETL run use cases, others are emerging where this library proves helpful:
- yielding only the current version of all records in the dataset, useful for quickly re-indexing to Opensearch
- high throughput (time) + memory safe (space) access to the dataset for analysis
For both reading and writing, the following env vars are recommended:
TDA_LOG_LEVEL=INFO
WARNING_ONLY_LOGGERS=asyncio,botocore,urllib3,s3transfer,boto3
First, import the library:
from timdex_dataset_api import TIMDEXDataset
Load a dataset instance:
# dataset in S3
timdex_dataset = TIMDEXDataset("s3://my-bucket/path/to/dataset")
# or, local dataset (e.g. testing or development)
timdex_dataset = TIMDEXDataset("/path/to/dataset")
# load the dataset, which discovers all parquet files
timdex_dataset.load()
# or, load the dataset but ensure that only current records are ever yielded
timdex_dataset.load(current_records=True)
All read methods for TIMDEXDataset
allow for the same group of filters which are defined in timdex_dataset_api.dataset.DatasetFilters
. Examples are shown below.
# read a single row, no filtering
single_record_dict = next(timdex_dataset.read_dicts_iter())
# get batches of records, filtering to a particular run
for batch in timdex_dataset.read_batches_iter(
source="alma",
run_date="2025-06-01",
run_id="abc123"
):
# do thing with pyarrow batch...
# use convenience method to yield only transformed records
# NOTE: this is what TIM uses for indexing to Opensearch for a given ETL run
for transformed_record in timdex_dataset.read_transformed_records_iter(
source="aspace",
run_date="2025-06-01",
run_id="ghi789"
):
# do something with transformed record dictionary...
# load all records for a given run into a pandas dataframe
# NOTE: this can be potentially expensive memory-wise if the run is large
run_df = timdex_dataset.read_dataframe(
source="dspace",
run_date="2025-06-01",
run_id="def456"
)
At this time, the only application that writes to the ETL parquet dataset is Transmogrifier.
To write records to the dataset, you must prepare an iterator of timdex_dataset_api.record.DatasetRecord
. Here is some pseudocode for how a dataset write can work:
from timdex_dataset_api import DatasetRecord, TIMDEXDataset
# different ways to achieve, just need some kind of iterator (e.g. list, generator, etc.)
# of DatasetRecords for writing
def records_to_write_iter() -> Iterator[DatasetRecord]:
records = [...]
for record in records:
yield DatasetRecord(
timdex_record_id=...,
source_record=...,
transformed_record=...,
source=...,
run_date=...,
run_type=...,
run_timestamp=...,
action=...,
run_record_offset=...
)
records_iter = records_to_write_iter()
# finally, perform the write, relying on the library to handle efficient batching
timdex_dataset = TIMDEXDataset("/path/to/dataset")
timdex_dataset.write(records_iter=records_iter)