Skip to content

Commit bf482ac

Browse files
committed
POST-REVIEW: logic around existing ingest versions
1 parent 74c5a3d commit bf482ac

File tree

5 files changed

+49
-45
lines changed

5 files changed

+49
-45
lines changed

dcpy/connectors/ingest_datastore.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ def push_versioned(self, key: str, version: str, **kwargs) -> dict:
7474
def pull_versioned(
7575
self, key: str, version: str, destination_path: Path, **kwargs
7676
) -> dict:
77-
raise NotImplementedError
77+
return self.storage.pull(
78+
f"{key}/{version}/{key}.parquet",
79+
destination_path / key / version, # TODO a little hacky
80+
)
7881

7982
def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]:
8083
"""This is maybe a problem in my plan"""
@@ -88,9 +91,17 @@ def _get_config_obj(self, key: str, version: str) -> dict:
8891
with open(Path(tmp_dir) / config_filename, "r", encoding="utf-8") as raw:
8992
return yaml.safe_load(raw.read())
9093

91-
def get_config(self, key: str, version: str):
94+
def get_config(self, key: str, version: str) -> ingest.Config:
9295
return ingest.Config(**self._get_config_obj(key, version))
9396

97+
def try_get_config(self, key: str, version: str) -> ingest.Config | None:
98+
"""for backwards compatibility"""
99+
obj = self._get_config_obj(key, version)
100+
if "dataset" not in obj: # very specific to library
101+
return ingest.Config(**obj)
102+
else:
103+
return None
104+
94105
def get_latest_version(self, key: str, **kwargs) -> str:
95106
return self._get_config_obj(key, "latest")["version"]
96107

dcpy/lifecycle/ingest/run.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,12 @@ def ingest(
9999
shutil.copy(dataset_staging_dir / CONFIG_FILENAME, dataset_output_dir)
100100
shutil.copy(dataset_staging_dir / config.filename, dataset_output_dir)
101101

102-
is_new = validate.validate_against_existing_versions(
103-
config.dataset, dataset_staging_dir / config.filename
104-
)
105-
if push and is_new:
102+
if processed_datastore.version_exists(dataset_id, config.version):
103+
validate.validate_against_existing_version(
104+
dataset_id, config.version, dataset_staging_dir / config.filename
105+
)
106+
logger.info("Skipping archival")
107+
elif push:
106108
assert config.archival.acl
107109
processed_datastore.push(
108110
dataset_id,
@@ -112,6 +114,4 @@ def ingest(
112114
overwrite=False, ## TODO - allow this via flag?
113115
latest=latest,
114116
)
115-
else:
116-
logger.info("Skipping archival")
117117
return config

dcpy/lifecycle/ingest/validate.py

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,39 @@
11
import pandas as pd
22
from pathlib import Path
3+
from tempfile import TemporaryDirectory
34

4-
from dcpy.models.lifecycle.ingest import Config
5-
from dcpy.connectors.edm import recipes
65
from dcpy.utils.logging import logger
6+
from dcpy.lifecycle.ingest.connectors import processed_datastore
77

88

9-
def validate_against_existing_versions(ds: recipes.Dataset, filepath: Path) -> bool:
9+
def validate_against_existing_version(ds: str, version: str, filepath: Path) -> None:
1010
"""
11-
This function is called after a dataset has been preprocessed, just before archival
11+
This function is called after a dataset has been processed, just before archival
1212
It's called in the case that the version of the dataset in the config (either provided or calculated)
13-
already exists
13+
already exists
1414
1515
The last archived dataset with the same version is pulled in by pandas and compared to what was just processed
16-
If they are identical, the last archived dataset has its config updated to reflect that it was checked but not re-archived
17-
If they differ, the version is "patched" and a new patched version is archived
16+
If they differ, an error is raised
1817
"""
19-
existing_config = recipes.try_get_config(ds)
20-
if not existing_config:
21-
logger.info(f"Dataset '{ds.key}' does not exist in recipes bucket")
22-
return True
23-
else:
24-
if isinstance(existing_config, Config):
18+
existing_config = processed_datastore.try_get_config(ds, version)
19+
if existing_config:
20+
with TemporaryDirectory() as tmp:
21+
existing_file = processed_datastore.pull_versioned(ds, version, Path(tmp))[
22+
"path"
23+
]
2524
new = pd.read_parquet(filepath)
26-
comparison = recipes.read_df(ds)
27-
if new.equals(comparison):
25+
existing = pd.read_parquet(existing_file)
26+
if new.equals(existing):
2827
logger.info(
29-
f"Dataset '{ds.key}' already exists and matches newly processed data"
28+
f"Dataset id='{ds}' version='{version}' already exists and matches newly processed data"
3029
)
31-
return False
3230
else:
3331
raise FileExistsError(
34-
f"Archived dataset '{ds.key}' already exists and has different data."
32+
f"Archived dataset id='{ds}' version='{version}' already exists and has different data."
3533
)
3634

37-
# if previous was archived with library, we both expect some potential slight changes
38-
# and are not able to update "freshness"
39-
else:
40-
logger.warning(
41-
f"previous version of '{ds.key}' archived is from library. cannot update freshness"
42-
)
43-
return False
35+
# if previous was archived with library, we both expect some potential slight changes and will not compare
36+
else:
37+
logger.warning(
38+
f"Config of existing dataset id='{ds}' version='{version}' cannot be parsed."
39+
)

dcpy/test/lifecycle/ingest/test_run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def test_run_repeat_version_fails_if_data_diff(
114114
patch_read_df.return_value = gpd.GeoDataFrame({"a": [None]}).set_geometry("a")
115115
with pytest.raises(
116116
FileExistsError,
117-
match=f"Archived dataset 'id='{DATASET}' version='{FAKE_VERSION}'' already exists and has different data.",
117+
match=f"Archived dataset id='{DATASET}' version='{FAKE_VERSION}' already exists and has different data.",
118118
):
119119
run_ingest(
120120
dataset_id=DATASET,

dcpy/test/lifecycle/ingest/test_validate.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
from dcpy.models.lifecycle.ingest import Template
88
from dcpy.utils import s3
99
from dcpy.connectors.edm import recipes
10+
from dcpy.lifecycle.ingest.connectors import processed_datastore
1011
from dcpy.lifecycle.ingest import transform, validate
1112

1213
from .shared import (
13-
TEST_DATASET,
1414
TEST_OUTPUT,
1515
BASIC_CONFIG,
1616
BASIC_LIBRARY_CONFIG,
@@ -36,9 +36,6 @@ def test_validate_all_templates(dataset):
3636

3737

3838
class TestValidateAgainstExistingVersions:
39-
def test_new(self, create_buckets):
40-
assert validate.validate_against_existing_versions(TEST_DATASET, TEST_OUTPUT)
41-
4239
def test_existing_library(self, create_buckets):
4340
ds = BASIC_LIBRARY_CONFIG.sparse_dataset
4441
config_str = json.dumps(BASIC_LIBRARY_CONFIG.model_dump(mode="json"))
@@ -48,20 +45,20 @@ def test_existing_library(self, create_buckets):
4845
f"{recipes.s3_folder_path(ds)}/config.json",
4946
BASIC_LIBRARY_CONFIG.dataset.acl,
5047
)
51-
assert recipes.exists(ds)
52-
assert not validate.validate_against_existing_versions(ds, TEST_OUTPUT)
48+
assert processed_datastore.exists(ds.id, ds.version)
49+
validate.validate_against_existing_version(ds.id, ds.version, TEST_OUTPUT)
5350

5451
def test_existing(self, create_buckets):
5552
ds = BASIC_CONFIG.dataset
5653
recipes.archive_dataset(BASIC_CONFIG, TEST_OUTPUT, acl="private")
57-
assert recipes.exists(ds)
58-
assert not validate.validate_against_existing_versions(ds, TEST_OUTPUT)
54+
assert processed_datastore.exists(ds.id, ds.version)
55+
validate.validate_against_existing_version(ds.id, ds.version, TEST_OUTPUT)
5956

6057
def test_existing_data_diffs(self, create_buckets):
6158
ds = BASIC_CONFIG.dataset
6259
recipes.archive_dataset(BASIC_CONFIG, TEST_OUTPUT, acl="private")
63-
assert recipes.exists(ds)
60+
assert processed_datastore.exists(ds.id, ds.version)
6461
with pytest.raises(FileExistsError):
65-
validate.validate_against_existing_versions(
66-
ds, TEST_OUTPUT.parent / "test.parquet"
62+
validate.validate_against_existing_version(
63+
ds.id, ds.version, TEST_OUTPUT.parent / "test.parquet"
6764
)

0 commit comments

Comments
 (0)