Skip to content

Commit 2670704

Browse files
authored
Check if bindings exist in the workflow (#708)
This commit adds a check to warn the user if there are bindings (steps or ports) in the StreamFlow config file that do not have a corresponding entity in the workflow. This commit also refactors the `Command` classes in the `streamflow.core` to remove circular dependencies. Finally, this commit improves the check for circular deployment wrappers introduced in #705 by keeping track of the already visited deployments.
1 parent 2260b60 commit 2670704

File tree

13 files changed

+389
-281
lines changed

13 files changed

+389
-281
lines changed

streamflow/config/config.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
def set_targets(current_node, target):
1414
for node in current_node["children"].values():
15+
if "port" in node:
16+
continue
1517
if "step" not in node:
1618
node["step"] = target
1719
set_targets(node, node["step"])
@@ -69,6 +71,8 @@ def _check_stacked_deployments(self) -> None:
6971
f"The deployment `{deployment['name']}` leads to a circular reference: "
7072
f"Recursive deployment definitions are not allowed."
7173
)
74+
else:
75+
deployments.add(deployment["name"])
7276

7377
def _process_binding(self, binding: MutableMapping[str, Any]):
7478
targets = (
@@ -88,6 +92,11 @@ def _process_binding(self, binding: MutableMapping[str, Any]):
8892
else:
8993
raise WorkflowDefinitionException(f"Binding filter {f} is not defined")
9094
path = PurePosixPath(binding["step"] if "step" in binding else binding["port"])
95+
if not path.is_absolute():
96+
raise WorkflowDefinitionException(
97+
f"Binding {path.as_posix()} is not well-defined in the StreamFlow file. "
98+
f"It must be an absolute POSIX path"
99+
)
91100
self.put(path, target_type, config)
92101

93102
def get(

streamflow/core/recovery.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@
1010
from streamflow.workflow.token import JobToken
1111

1212
if TYPE_CHECKING:
13-
from streamflow.core.command import CommandOutput
1413
from streamflow.core.context import StreamFlowContext
1514
from streamflow.core.data import DataLocation
16-
from streamflow.core.workflow import Job, Step, Token, Workflow
15+
from streamflow.core.workflow import CommandOutput, Job, Step, Token, Workflow
1716

1817

1918
class CheckpointManager(SchemaEntity):

streamflow/core/workflow.py

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,207 @@
1010
from typing import TYPE_CHECKING, TypeVar, cast
1111

1212
from streamflow.core import utils
13+
from streamflow.core.deployment import Connector, ExecutionLocation, Target
1314
from streamflow.core.exception import WorkflowExecutionException
1415
from streamflow.core.persistence import (
1516
DatabaseLoadingContext,
1617
DependencyType,
1718
PersistableEntity,
1819
)
20+
from streamflow.core.utils import get_class_from_name, get_class_fullname
1921

2022
if TYPE_CHECKING:
2123
from typing import Any
2224

2325
from streamflow.core.context import StreamFlowContext
2426

2527

28+
class Command(ABC):
29+
def __init__(self, step: Step):
30+
super().__init__()
31+
self.step: Step = step
32+
33+
@abstractmethod
34+
async def execute(self, job: Job) -> CommandOutput: ...
35+
36+
@classmethod
37+
async def load(
38+
cls,
39+
context: StreamFlowContext,
40+
row: MutableMapping[str, Any],
41+
loading_context: DatabaseLoadingContext,
42+
step: Step,
43+
) -> Command:
44+
type_ = cast(type[Command], utils.get_class_from_name(row["type"]))
45+
return await type_._load(context, row["params"], loading_context, step)
46+
47+
async def save(self, context: StreamFlowContext):
48+
return {
49+
"type": utils.get_class_fullname(type(self)),
50+
"params": await self._save_additional_params(context),
51+
}
52+
53+
@classmethod
54+
async def _load(
55+
cls,
56+
context: StreamFlowContext,
57+
row: MutableMapping[str, Any],
58+
loading_context: DatabaseLoadingContext,
59+
step: Step,
60+
):
61+
return cls(step=step)
62+
63+
async def _save_additional_params(
64+
self, context: StreamFlowContext
65+
) -> MutableMapping[str, Any]:
66+
return {}
67+
68+
69+
class CommandOptions(ABC):
70+
pass
71+
72+
73+
class CommandOutput:
74+
__slots__ = ("value", "status")
75+
76+
def __init__(self, value: Any, status: Status):
77+
self.value: Any = value
78+
self.status: Status = status
79+
80+
def update(self, value: Any):
81+
return CommandOutput(value=value, status=self.status)
82+
83+
84+
class CommandOutputProcessor(ABC):
85+
def __init__(self, name: str, workflow: Workflow, target: Target | None = None):
86+
self.name: str = name
87+
self.workflow: Workflow = workflow
88+
self.target: Target | None = target
89+
90+
def _get_connector(self, connector: Connector | None, job: Job) -> Connector:
91+
return connector or self.workflow.context.scheduler.get_connector(job.name)
92+
93+
async def _get_locations(
94+
self, connector: Connector | None, job: Job
95+
) -> MutableSequence[ExecutionLocation]:
96+
if self.target:
97+
available_locations = await connector.get_available_locations(
98+
service=self.target.service
99+
)
100+
return [loc.location for loc in available_locations.values()]
101+
else:
102+
return self.workflow.context.scheduler.get_locations(job.name)
103+
104+
@classmethod
105+
async def _load(
106+
cls,
107+
context: StreamFlowContext,
108+
row: MutableMapping[str, Any],
109+
loading_context: DatabaseLoadingContext,
110+
) -> CommandOutputProcessor:
111+
return cls(
112+
name=row["name"],
113+
workflow=await loading_context.load_workflow(context, row["workflow"]),
114+
target=(
115+
(await loading_context.load_target(context, row["workflow"]))
116+
if row["target"]
117+
else None
118+
),
119+
)
120+
121+
async def _save_additional_params(
122+
self, context: StreamFlowContext
123+
) -> MutableMapping[str, Any]:
124+
if self.target:
125+
await self.target.save(context)
126+
return {
127+
"name": self.name,
128+
"workflow": self.workflow.persistent_id,
129+
"target": self.target.persistent_id if self.target else None,
130+
}
131+
132+
@classmethod
133+
async def load(
134+
cls,
135+
context: StreamFlowContext,
136+
row: MutableMapping[str, Any],
137+
loading_context: DatabaseLoadingContext,
138+
) -> CommandOutputProcessor:
139+
type_ = cast(
140+
type[CommandOutputProcessor], utils.get_class_from_name(row["type"])
141+
)
142+
return await type_._load(context, row["params"], loading_context)
143+
144+
@abstractmethod
145+
async def process(
146+
self,
147+
job: Job,
148+
command_output: CommandOutput,
149+
connector: Connector | None = None,
150+
) -> Token | None: ...
151+
152+
async def save(self, context: StreamFlowContext):
153+
return {
154+
"type": utils.get_class_fullname(type(self)),
155+
"params": await self._save_additional_params(context),
156+
}
157+
158+
159+
class CommandToken:
160+
__slots__ = ("name", "position", "value")
161+
162+
def __init__(self, name: str | None, position: int | None, value: Any):
163+
self.name: str | None = name
164+
self.position: int | None = position
165+
self.value: Any = value
166+
167+
168+
class CommandTokenProcessor(ABC):
169+
def __init__(self, name: str):
170+
self.name: str = name
171+
172+
@classmethod
173+
async def _load(
174+
cls,
175+
context: StreamFlowContext,
176+
row: MutableMapping[str, Any],
177+
loading_context: DatabaseLoadingContext,
178+
):
179+
return cls(name=row["name"])
180+
181+
async def _save_additional_params(
182+
self, context: StreamFlowContext
183+
) -> MutableMapping[str, Any]:
184+
return {"name": self.name}
185+
186+
@abstractmethod
187+
def bind(
188+
self,
189+
token: Token | None,
190+
position: int | None,
191+
options: CommandOptions,
192+
) -> CommandToken: ...
193+
194+
@abstractmethod
195+
def check_type(self, token: Token) -> bool: ...
196+
197+
@classmethod
198+
async def load(
199+
cls,
200+
context: StreamFlowContext,
201+
row: MutableMapping[str, Any],
202+
loading_context: DatabaseLoadingContext,
203+
) -> CommandTokenProcessor:
204+
type_ = cast(type[CommandTokenProcessor], get_class_from_name(row["type"]))
205+
return await type_._load(context, row["params"], loading_context)
206+
207+
async def save(self, context: StreamFlowContext):
208+
return {
209+
"type": get_class_fullname(type(self)),
210+
"params": await self._save_additional_params(context),
211+
}
212+
213+
26214
class Executor(ABC):
27215
def __init__(self, workflow: Workflow):
28216
self.workflow: Workflow = workflow
@@ -514,6 +702,7 @@ def __init__(
514702
self.context: StreamFlowContext = context
515703
self.config: MutableMapping[str, Any] = config
516704
self.name: str = name if name is not None else str(uuid.uuid4())
705+
self.input_ports: MutableMapping[str, str] = {}
517706
self.output_ports: MutableMapping[str, str] = {}
518707
self.ports: MutableMapping[str, Port] = {}
519708
self.steps: MutableMapping[str, Step] = {}
@@ -548,6 +737,12 @@ def create_step(self, cls: type[S], name: str = None, **kwargs) -> S:
548737
self.steps[name] = step
549738
return step
550739

740+
def get_input_port(self, name: str) -> Port:
741+
return self.ports[self.input_ports[name]]
742+
743+
def get_input_ports(self) -> MutableMapping[str, Port]:
744+
return {name: self.ports[p] for name, p in self.input_ports.items()}
745+
551746
def get_output_port(self, name: str) -> Port:
552747
return self.ports[self.output_ports[name]]
553748

streamflow/cwl/command.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,6 @@
1616
from ruamel.yaml import RoundTripRepresenter
1717
from ruamel.yaml.scalarfloat import ScalarFloat
1818

19-
from streamflow.core.command import (
20-
Command,
21-
CommandOptions,
22-
CommandToken,
23-
CommandTokenProcessor,
24-
ListCommandToken,
25-
MapCommandTokenProcessor,
26-
ObjectCommandToken,
27-
ObjectCommandTokenProcessor,
28-
TokenizedCommand,
29-
)
3019
from streamflow.core.context import StreamFlowContext
3120
from streamflow.core.data import DataLocation
3221
from streamflow.core.deployment import Connector
@@ -36,7 +25,17 @@
3625
)
3726
from streamflow.core.persistence import DatabaseLoadingContext
3827
from streamflow.core.utils import flatten_list, get_tag
39-
from streamflow.core.workflow import Job, Status, Step, Token, Workflow
28+
from streamflow.core.workflow import (
29+
Command,
30+
CommandOptions,
31+
CommandToken,
32+
CommandTokenProcessor,
33+
Job,
34+
Status,
35+
Step,
36+
Token,
37+
Workflow,
38+
)
4039
from streamflow.cwl import utils
4140
from streamflow.cwl.processor import (
4241
CWLCommandOutput,
@@ -50,6 +49,13 @@
5049
from streamflow.data.remotepath import StreamFlowPath
5150
from streamflow.deployment.utils import get_path_processor
5251
from streamflow.log_handler import logger
52+
from streamflow.workflow.command import (
53+
ListCommandToken,
54+
MapCommandTokenProcessor,
55+
ObjectCommandToken,
56+
ObjectCommandTokenProcessor,
57+
TokenizedCommand,
58+
)
5359
from streamflow.workflow.step import ExecuteStep
5460
from streamflow.workflow.utils import get_token_value
5561

streamflow/cwl/processor.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import cwl_utils.file_formats
1010
from schema_salad.exceptions import ValidationException
1111

12-
from streamflow.core.command import CommandOutput, CommandOutputProcessor
1312
from streamflow.core.context import StreamFlowContext
1413
from streamflow.core.deployment import Connector, LocalTarget, Target
1514
from streamflow.core.exception import (
@@ -18,7 +17,14 @@
1817
)
1918
from streamflow.core.persistence import DatabaseLoadingContext
2019
from streamflow.core.utils import flatten_list, get_tag
21-
from streamflow.core.workflow import Job, Status, Token, TokenProcessor
20+
from streamflow.core.workflow import (
21+
CommandOutput,
22+
CommandOutputProcessor,
23+
Job,
24+
Status,
25+
Token,
26+
TokenProcessor,
27+
)
2228
from streamflow.cwl import utils
2329
from streamflow.cwl.token import CWLFileToken
2430
from streamflow.cwl.utils import LoadListing, SecondaryFile

streamflow/cwl/step.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from collections.abc import MutableMapping, MutableSequence
99
from typing import Any, cast
1010

11-
from streamflow.core.command import Command, CommandOutput, CommandOutputProcessor
1211
from streamflow.core.context import StreamFlowContext
1312
from streamflow.core.data import DataLocation, DataType
1413
from streamflow.core.deployment import Connector, ExecutionLocation
@@ -18,7 +17,14 @@
1817
)
1918
from streamflow.core.persistence import DatabaseLoadingContext
2019
from streamflow.core.utils import get_entity_ids, get_tag, random_name
21-
from streamflow.core.workflow import Job, Port, Token
20+
from streamflow.core.workflow import (
21+
Command,
22+
CommandOutput,
23+
CommandOutputProcessor,
24+
Job,
25+
Port,
26+
Token,
27+
)
2228
from streamflow.cwl import utils
2329
from streamflow.cwl.token import CWLFileToken
2430
from streamflow.cwl.utils import (

0 commit comments

Comments
 (0)