Skip to content

Added provenance_manager extension point #714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ zip-safe = true
"streamflow.deployment.connector" = ["schemas/base/*.json", "schemas/*.json"]
"streamflow.deployment.filter" = ["schemas/*.json"]
"streamflow.persistence" = ["schemas/*.sql", "schemas/*.json"]
"streamflow.provenance" = ["schemas/**/*.json"]
"streamflow.recovery" = ["schemas/*.json"]
"streamflow.scheduling" = ["schemas/*.json"]
"streamflow.scheduling.policy" = ["schemas/*.json"]
Expand Down
6 changes: 6 additions & 0 deletions streamflow/ext/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from streamflow.core.data import DataManager
from streamflow.core.deployment import BindingFilter, Connector, DeploymentManager
from streamflow.core.persistence import Database
from streamflow.core.provenance import ProvenanceManager
from streamflow.core.recovery import CheckpointManager, FailureManager
from streamflow.core.scheduling import Policy, Scheduler
from streamflow.cwl.requirement.docker import cwl_docker_translator_classes
Expand All @@ -17,6 +18,7 @@
from streamflow.deployment.filter import binding_filter_classes
from streamflow.log_handler import logger
from streamflow.persistence import database_classes
from streamflow.provenance import provenance_manager_classes
from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes
from streamflow.scheduling import scheduler_classes
from streamflow.scheduling.policy import policy_classes
Expand All @@ -31,6 +33,7 @@
"deployment_manager": deployment_manager_classes,
"failure_manager": failure_manager_classes,
"policy": policy_classes,
"provenance_manager": provenance_manager_classes,
"scheduler": scheduler_classes,
}

Expand Down Expand Up @@ -85,6 +88,9 @@ def register_failure_manager(self, name: str, cls: type[FailureManager]):
def register_policy(self, name: str, cls: type[Policy]):
self._register(name, cls, "policy")

def register_provenance_manager(self, name: str, cls: type[ProvenanceManager]):
self._register(name, cls, "provenance_manager")

def register_scheduler(self, name: str, cls: type[Scheduler]):
self._register(name, cls, "scheduler")

Expand Down
12 changes: 6 additions & 6 deletions streamflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from streamflow.parser import parser
from streamflow.persistence import database_classes
from streamflow.persistence.loading_context import DefaultDatabaseLoadingContext
from streamflow.provenance import prov_classes
from streamflow.provenance import provenance_manager_classes
from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes
from streamflow.scheduling import scheduler_classes

Expand Down Expand Up @@ -126,20 +126,20 @@ async def _async_prov(args: argparse.Namespace):
f"Workflow {args.workflow} is associated to the following types: {','.join(wf_type)}"
)
wf_type = list(wf_type)[0]
if args.type not in prov_classes:
if args.type not in provenance_manager_classes:
raise WorkflowProvenanceException(
f"{args.type} provenance format is not supported."
)
elif wf_type not in prov_classes[args.type]:
elif wf_type not in provenance_manager_classes[args.type]:
raise WorkflowProvenanceException(
"{} provenance format is not supported for workflows of type {}.".format(
args.type, wf_type
)
)
else:
provenance_manager: ProvenanceManager = prov_classes[args.type][wf_type](
context, db_context, workflows
)
provenance_manager: ProvenanceManager = provenance_manager_classes[
args.type
][wf_type](context, db_context, workflows)
await provenance_manager.create_archive(
outdir=args.outdir,
filename=args.name,
Expand Down
2 changes: 1 addition & 1 deletion streamflow/provenance/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from streamflow.provenance.run_crate import CWLRunCrateProvenanceManager

prov_classes = {"run_crate": {"cwl": CWLRunCrateProvenanceManager}}
provenance_manager_classes = {"run_crate/cwl": CWLRunCrateProvenanceManager}
11 changes: 11 additions & 0 deletions streamflow/provenance/run_crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import uuid
from abc import ABC, abstractmethod
from collections.abc import MutableMapping, MutableSequence
from importlib.resources import files
from typing import Any, cast, get_args
from zipfile import ZipFile

Expand Down Expand Up @@ -1424,6 +1425,16 @@ async def get_main_entity(self) -> MutableMapping[str, Any]:
)
return main_entity

@classmethod
def get_schema(cls) -> str:
return (
files(__package__)
.joinpath("schemas")
.joinpath("run_crate")
.joinpath("cwl.json")
.read_text("utf-8")
)

async def get_property_value(
self, name: str, token: Token
) -> MutableMapping[str, Any] | None:
Expand Down
7 changes: 7 additions & 0 deletions streamflow/provenance/schemas/run_crate/cwl.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"$id": "https://streamflow.di.unito.it/schemas/provenance/run_crate/cwl.json",
"type": "object",
"properties": {},
"additionalProperties": false
}
Loading