Skip to content

ingest - implement and make use of connector classes #1519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 84 additions & 45 deletions dcpy/connectors/edm/publishing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytz
import re
import typer
from typing import Callable, TypeVar, Any
from typing import Callable, TypeVar
from urllib.parse import urlencode, urljoin
import yaml
from zipfile import ZipFile
Expand Down Expand Up @@ -612,7 +612,6 @@ def publish_add_created_date(
if version is None:
with s3.get_file(bucket, f"{source}version.txt") as f:
version = str(f.read())
print(version)
old_metadata = s3.get_metadata(bucket, f"{source}{file_for_creation_date}")
target = f"{product}/publish/{version}/"
s3.copy_folder(
Expand Down Expand Up @@ -646,16 +645,12 @@ def _assert_gis_dataset_exists(name: str, version: str):
bucket = _bucket()
version = version.upper()
if not s3.object_exists(bucket, _gis_dataset_path(name, version)):
print(_gis_dataset_path(name, version))
print(s3.list_objects(bucket, _gis_dataset_path(name, version)))
print(s3.object_exists(bucket, _gis_dataset_path(name, version)))
raise FileNotFoundError(f"GIS dataset {name} has no version {version}")


def get_latest_gis_dataset_version(dataset_name: str) -> str:
def get_gis_dataset_versions(dataset_name: str, sort_desc: bool = True) -> list[str]:
"""
Get latest version of GIS-published dataset in edm-publishing/datasets
assuming versions are sortable
Get all versions of GIS-published dataset in edm-publishing/datasets
"""
gis_version_formats = [r"^\d{2}[A-Z]$", r"^\d{8}$"]
subfolders = []
Expand All @@ -670,7 +665,18 @@ def get_latest_gis_dataset_version(dataset_name: str) -> str:
raise ValueError(
f"Multiple version formats found for gis dataset {dataset_name}. Cannot determine latest version"
)
version = max(subfolders)
return sorted(subfolders, reverse=sort_desc)


def get_latest_gis_dataset_version(dataset_name: str) -> str:
"""
Get latest version of GIS-published dataset in edm-publishing/datasets
assuming versions are sortable
"""
versions = get_gis_dataset_versions(dataset_name)
if not versions:
raise FileNotFoundError(f"No versions found for GIS dataset {dataset_name}")
version = versions[0]
_assert_gis_dataset_exists(dataset_name, version)
return version

Expand Down Expand Up @@ -731,92 +737,125 @@ def log_event_in_db(event_details: EventLog) -> None:
class PublishedConnector(VersionedConnector):
conn_type: str = "edm.publishing.published"

def push(self, key: str, version: str, push_conf: dict | None = {}) -> dict:
raise NotImplementedError("Sorry :)")

def pull(
def _pull(
self,
key: str,
version: str,
destination_path: Path,
pull_conf: dict | None = {},
*,
dataset: str | None = None,
filepath: str,
**kwargs,
) -> dict:
assert pull_conf and "filepath" in pull_conf
pub_key = PublishKey(key, version)

s3_path = pull_conf.get("dataset", "") + "/" if "dataset" in pull_conf else ""
s3_path = dataset + "/" if dataset else ""

pulled_path = download_file(
pub_key,
s3_path + pull_conf["filepath"],
s3_path + filepath,
output_dir=destination_path,
)
return {"path": pulled_path}

def list_versions(self, key: str, sort_desc: bool = True) -> list[str]:
def pull_versioned(
self, key: str, version: str, destination_path: Path, **kwargs
) -> dict:
return self._pull(key, version, destination_path, **kwargs)

def push_versioned(self, key: str, version: str, **kwargs) -> dict:
raise NotImplementedError()

def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]:
return sorted(get_published_versions(key), reverse=sort_desc)

def query_latest_version(self, key: str) -> str:
def get_latest_version(self, key: str, **kwargs) -> str:
return self.list_versions(key)[0]

def version_exists(self, key: str, version: str) -> bool:
def version_exists(self, key: str, version: str, **kwargs) -> bool:
return version in self.list_versions(key)

def data_local_sub_path(
self, key: str, version: str, pull_conf: Any | None = None
) -> Path:
def data_local_sub_path(self, key: str, *, version: str, **kwargs) -> Path: # type: ignore[override]
return Path("edm") / "publishing" / "datasets" / key / version


class DraftsConnector(VersionedConnector):
conn_type: str = "edm.publishing.drafts"

def push(self, key: str, version: str, push_conf: dict | None = {}) -> dict:
raise NotImplementedError("Sorry :)")

def pull(
def _pull(
self,
key: str,
version: str,
destination_path: Path,
pull_conf: dict | None = {},
*,
dataset: str | None = None,
filepath: str,
revision: str,
**kwargs,
) -> dict:
assert pull_conf and "filepath" in pull_conf and "revision" in pull_conf
dataset = pull_conf.get("dataset")
draft_key = DraftKey(key, version=version, revision=pull_conf["revision"])
draft_key = DraftKey(key, version=version, revision=revision)

path_prefix = "" if not dataset else f"{dataset}/"
file_path = f"{path_prefix}{pull_conf['filepath']}"
path_prefix = dataset + "/" if dataset else ""
file_path = f"{path_prefix}{filepath}"
logger.info(f"Pulling Draft for {draft_key}, path={file_path}")
pulled_path = download_file(draft_key, file_path, output_dir=destination_path)
return {"path": pulled_path}

def list_versions(self, key: str, sort_desc: bool = True) -> list[str]:
def pull_versioned(
self, key: str, version: str, destination_path: Path, **kwargs
) -> dict:
return self._pull(
key, version=version, destination_path=destination_path, **kwargs
)

def push_versioned(self, key: str, version: str, **kwargs) -> dict:
raise NotImplementedError()

def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]:
logger.info(f"Listing versions for {key}")
versions = sorted(get_draft_versions(key), reverse=sort_desc)
assert versions, (
f"Product {key} should have versions, but none were found. This likely indicates a configuration problem."
)
return versions

def query_latest_version(self, key: str) -> str:
def get_latest_version(self, key: str, **kwargs) -> str:
return self.list_versions(key)[0]

def version_exists(self, key: str, version: str) -> bool:
def version_exists(self, key: str, version: str, **kwargs) -> bool:
return version in self.list_versions(key)

def data_local_sub_path(
self, key: str, version: str, pull_conf: Any | None = None
self, key: str, *, version: str, revision: str, **kwargs
) -> Path:
assert pull_conf and "revision" in pull_conf
return (
Path("edm")
/ "publishing"
/ "datasets"
/ key
/ version
/ pull_conf["revision"]
return Path("edm") / "publishing" / "datasets" / key / version / revision


class GisDatasetsConnector(VersionedConnector):
conn_type: str = "edm.publishing.gis"

def pull_versioned(
self, key: str, version: str, destination_path: Path, **kwargs
) -> dict:
pulled_path = download_gis_dataset(
dataset_name=key, version=version, target_folder=destination_path
)
return {"path": pulled_path}

def push_versioned(self, key: str, version: str, **kwargs) -> dict:
raise PermissionError(
"Currently, only GIS team pushes to edm-publishing/datasets"
)

def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]:
logger.info(f"Listing versions for {key}")
return get_gis_dataset_versions(key, sort_desc=sort_desc)

def get_latest_version(self, key: str, **kwargs) -> str:
return get_latest_gis_dataset_version(key)

def version_exists(self, key: str, version: str, **kwargs) -> bool:
return version in self.list_versions(key)


app = typer.Typer(add_completion=False)
Expand Down
50 changes: 13 additions & 37 deletions dcpy/connectors/edm/recipes.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from datetime import datetime
from io import BytesIO
import json
import os
import pandas as pd
from pathlib import Path
from pyarrow import parquet
from pydantic import BaseModel
import shutil
from tempfile import TemporaryDirectory
from typing import Callable, Any
from typing import Callable
import yaml

from dcpy import configuration
Expand Down Expand Up @@ -115,27 +113,6 @@ def set_latest(key: DatasetKey, acl):
)


def update_freshness(ds: DatasetKey, timestamp: datetime) -> datetime:
bucket = _bucket()
path = f"{DATASET_FOLDER}/{ds.id}/{ds.version}/config.json"
config = get_config(ds.id, ds.version)
if isinstance(config, library.Config):
raise TypeError(
f"Cannot update freshness of dataset {ds} as it was archived by library, not ingest"
)
config.archival.check_timestamps.append(timestamp)
config_str = json.dumps(config.model_dump(mode="json"))
assert config.archival.acl, "Impossible - s3-archived dataset missing acl"
s3.upload_file_obj(
BytesIO(config_str.encode()),
bucket,
path,
config.archival.acl,
metadata=s3.get_custom_metadata(bucket, path),
)
return config.archival.archival_timestamp


def get_config(name: str, version="latest") -> library.Config | ingest.Config:
"""Retrieve a recipe config from s3."""
bucket = _bucket()
Expand Down Expand Up @@ -485,38 +462,37 @@ def get_logged_metadata(datasets: list[str]) -> pd.DataFrame:
return pg_client.execute_select_query(query, datasets=datasets)


class Connector(BaseModel, VersionedConnector):
class Connector(VersionedConnector):
conn_type: str = "edm.recipes"

def push(self, key: str, version, push_conf: dict | None = {}) -> dict:
raise NotImplementedError("Sorry :)")
def push_versioned(self, key: str, version: str, **kwargs) -> dict:
raise NotImplementedError("edm.recipes deprecated for archiving")

def pull(
def pull_versioned(
self,
key: str,
version: str,
destination_path: Path,
pull_conf: dict | None = {},
*,
file_type: DatasetType = DatasetType.parquet,
**kwargs,
) -> dict:
assert pull_conf and "file_type" in pull_conf
return {
"path": fetch_dataset(
Dataset(id=key, version=version, file_type=pull_conf["file_type"]),
Dataset(id=key, version=version, file_type=file_type),
target_dir=Path(),
_target_dataset_path_override=destination_path,
)
}

def list_versions(self, key: str, sort_desc: bool = True) -> list[str]:
def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]:
return sorted(get_all_versions(name=key), reverse=sort_desc)

def query_latest_version(self, key: str) -> str:
def get_latest_version(self, key: str, **kwargs) -> str:
return get_latest_version(name=key)

def version_exists(self, key: str, version: str) -> bool:
def version_exists(self, key: str, version: str, **kwargs) -> bool:
return exists(Dataset(id=key, version=version))

def data_local_sub_path(
self, key: str, version: str, pull_conf: Any | None = None
) -> Path:
def data_local_sub_path(self, key: str, *, version: str, **kwargs) -> Path:
return Path("edm") / "recipes" / DATASET_FOLDER / key / version
Loading