Skip to content

Commit 097ca01

Browse files
Added provenance_manager extension point
This commit adds the possibility to upload custom `ProvenanceManager` classes through StreamFlow plugins and to visualize them through the `streamflow ext list` and `streamflow ext show` commands
1 parent 2670704 commit 097ca01

File tree

6 files changed

+34
-7
lines changed

6 files changed

+34
-7
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ zip-safe = true
7272
"streamflow.deployment.connector" = ["schemas/base/*.json", "schemas/*.json"]
7373
"streamflow.deployment.filter" = ["schemas/*.json"]
7474
"streamflow.persistence" = ["schemas/*.sql", "schemas/*.json"]
75+
"streamflow.provenance" = ["schemas/**/*.json"]
7576
"streamflow.recovery" = ["schemas/*.json"]
7677
"streamflow.scheduling" = ["schemas/*.json"]
7778
"streamflow.scheduling.policy" = ["schemas/*.json"]

streamflow/ext/plugin.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from streamflow.core.data import DataManager
88
from streamflow.core.deployment import BindingFilter, Connector, DeploymentManager
99
from streamflow.core.persistence import Database
10+
from streamflow.core.provenance import ProvenanceManager
1011
from streamflow.core.recovery import CheckpointManager, FailureManager
1112
from streamflow.core.scheduling import Policy, Scheduler
1213
from streamflow.cwl.requirement.docker import cwl_docker_translator_classes
@@ -17,6 +18,7 @@
1718
from streamflow.deployment.filter import binding_filter_classes
1819
from streamflow.log_handler import logger
1920
from streamflow.persistence import database_classes
21+
from streamflow.provenance import provenance_manager_classes
2022
from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes
2123
from streamflow.scheduling import scheduler_classes
2224
from streamflow.scheduling.policy import policy_classes
@@ -31,6 +33,7 @@
3133
"deployment_manager": deployment_manager_classes,
3234
"failure_manager": failure_manager_classes,
3335
"policy": policy_classes,
36+
"provenance_manager": provenance_manager_classes,
3437
"scheduler": scheduler_classes,
3538
}
3639

@@ -85,6 +88,11 @@ def register_failure_manager(self, name: str, cls: type[FailureManager]):
8588
def register_policy(self, name: str, cls: type[Policy]):
8689
self._register(name, cls, "policy")
8790

91+
def register_provenance_manager(
92+
self, name: str, cls: type[ProvenanceManager]
93+
):
94+
self._register(name, cls, "provenance_manager")
95+
8896
def register_scheduler(self, name: str, cls: type[Scheduler]):
8997
self._register(name, cls, "scheduler")
9098

streamflow/main.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from streamflow.parser import parser
3232
from streamflow.persistence import database_classes
3333
from streamflow.persistence.loading_context import DefaultDatabaseLoadingContext
34-
from streamflow.provenance import prov_classes
34+
from streamflow.provenance import provenance_manager_classes
3535
from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes
3636
from streamflow.scheduling import scheduler_classes
3737

@@ -126,20 +126,20 @@ async def _async_prov(args: argparse.Namespace):
126126
f"Workflow {args.workflow} is associated to the following types: {','.join(wf_type)}"
127127
)
128128
wf_type = list(wf_type)[0]
129-
if args.type not in prov_classes:
129+
if args.type not in provenance_manager_classes:
130130
raise WorkflowProvenanceException(
131131
f"{args.type} provenance format is not supported."
132132
)
133-
elif wf_type not in prov_classes[args.type]:
133+
elif wf_type not in provenance_manager_classes[args.type]:
134134
raise WorkflowProvenanceException(
135135
"{} provenance format is not supported for workflows of type {}.".format(
136136
args.type, wf_type
137137
)
138138
)
139139
else:
140-
provenance_manager: ProvenanceManager = prov_classes[args.type][wf_type](
141-
context, db_context, workflows
142-
)
140+
provenance_manager: ProvenanceManager = provenance_manager_classes[
141+
args.type
142+
][wf_type](context, db_context, workflows)
143143
await provenance_manager.create_archive(
144144
outdir=args.outdir,
145145
filename=args.name,

streamflow/provenance/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
from streamflow.provenance.run_crate import CWLRunCrateProvenanceManager
22

3-
prov_classes = {"run_crate": {"cwl": CWLRunCrateProvenanceManager}}
3+
provenance_manager_classes = {"run_crate/cwl": CWLRunCrateProvenanceManager}

streamflow/provenance/run_crate.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import uuid
1313
from abc import ABC, abstractmethod
1414
from collections.abc import MutableMapping, MutableSequence
15+
from importlib.resources import files
1516
from typing import Any, cast, get_args
1617
from zipfile import ZipFile
1718

@@ -1424,6 +1425,16 @@ async def get_main_entity(self) -> MutableMapping[str, Any]:
14241425
)
14251426
return main_entity
14261427

1428+
@classmethod
1429+
def get_schema(cls) -> str:
1430+
return (
1431+
files(__package__)
1432+
.joinpath("schemas")
1433+
.joinpath("run_crate")
1434+
.joinpath("cwl.json")
1435+
.read_text("utf-8")
1436+
)
1437+
14271438
async def get_property_value(
14281439
self, name: str, token: Token
14291440
) -> MutableMapping[str, Any] | None:
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"$schema": "https://json-schema.org/draft/2019-09/schema",
3+
"$id": "https://streamflow.di.unito.it/schemas/provenance/run_crate/cwl.json",
4+
"type": "object",
5+
"properties": {},
6+
"additionalProperties": false
7+
}

0 commit comments

Comments
 (0)