Skip to content

Commit 289d6b7

Browse files
authored
vdk-dag: improve DAGs docs and example (#1984)
What: Improve DAGs user-facing documentation and example based on the feedback and discussions with users. The feedback we got from users is: - The Requirements section in the example links VDK DAGs to the VEP, it should rather link to the README. > Addressed. - Would it be possible to run the example with a DB of choice or am I required to use Trino? > Addressed. - The team name has to be added on several occasions and this could lead to some consistency issues. > Addressed. - The DAG-specific configuration variables are not very visible in the README. > Addressed. Signed-off-by: Yoan Salambashev <ysalambashev@vmware.com>
1 parent e6d68ca commit 289d6b7

File tree

15 files changed

+146
-158
lines changed

15 files changed

+146
-158
lines changed

examples/dag-with-args-example/README.md

Lines changed: 52 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ In this example you will use the Versatile Data Kit to develop six Data Jobs - t
77
from separate JSON files, and will subsequently insert the data into Trino tables. The next three jobs will read the
88
data inserted by the previous two jobs, and will print the data to the terminal. The sixth Data Job will be a DAG job
99
which will manage the other five and ensure that the third, fourth and fifth jobs run only when the previous two finish
10-
successfully. All the Trino-related details (tables, schema, catalog) will be passed individually to each job as job
11-
arguments in JSON format.
10+
successfully. All the DB-related (Trino is chosen but could be any other) details (tables, schema) will be passed
11+
individually to each job as job arguments in JSON format.
1212

1313
The DAG Job uses a separate job input object separate from the one usually used for job
1414
operations in VDK Data Jobs and must be imported.
@@ -45,7 +45,8 @@ To run this example, you need:
4545
* Versatile Data Kit
4646
* Trino DB
4747
* `vdk-trino` - VDK plugin for a connection to a Trino database
48-
* [VDK DAGs](https://github.com/vmware/versatile-data-kit/tree/main/specs/vep-1243-vdk-meta-jobs)
48+
* [VDK DAGs README](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-dag)
49+
* [VDK DAGs Specification](https://github.com/vmware/versatile-data-kit/tree/main/specs/vep-1243-vdk-dag)
4950

5051
## Configuration
5152

@@ -123,7 +124,6 @@ def run(job_input: IJobInput):
123124
data_job_dir = pathlib.Path(job_input.get_job_directory())
124125
data_file = data_job_dir / "data.json"
125126

126-
db_catalog = job_input.get_arguments().get("db_catalog")
127127
db_schema = job_input.get_arguments().get("db_schema")
128128
db_table = job_input.get_arguments().get("db_table")
129129

@@ -133,7 +133,7 @@ def run(job_input: IJobInput):
133133

134134
rows = [tuple(i.values()) for i in data]
135135
insert_query = f"""
136-
INSERT INTO {db_catalog}.{db_schema}.{db_table} VALUES
136+
INSERT INTO {db_schema}.{db_table} VALUES
137137
""" + ", ".join(
138138
str(i) for i in rows
139139
)
@@ -225,7 +225,6 @@ def run(job_input: IJobInput):
225225
data_job_dir = pathlib.Path(job_input.get_job_directory())
226226
data_file = data_job_dir / "data.json"
227227

228-
db_catalog = job_input.get_arguments().get("db_catalog")
229228
db_schema = job_input.get_arguments().get("db_schema")
230229
db_table = job_input.get_arguments().get("db_table")
231230

@@ -235,13 +234,13 @@ def run(job_input: IJobInput):
235234

236235
rows = [tuple(i.values()) for i in data]
237236
insert_query = f"""
238-
INSERT INTO {db_catalog}.{db_schema}.{db_table} VALUES
237+
INSERT INTO {db_schema}.{db_table} VALUES
239238
""" + ", ".join(
240239
str(i) for i in rows
241240
)
242241

243242
create_query = f"""
244-
CREATE TABLE IF NOT EXISTS {db_catalog}.{db_schema}.{db_table}
243+
CREATE TABLE IF NOT EXISTS {db_schema}.{db_table}
245244
(
246245
id varchar,
247246
first_name varchar,
@@ -298,9 +297,7 @@ vdk-trino
298297

299298
```
300299
read-job-usa/
301-
├── 10_transform.py
302-
├── 20_drop_table_one.sql
303-
├── 30_drop_table_two.sql
300+
├── 10_read.py
304301
├── config.ini
305302
├── requirements.txt
306303
```
@@ -313,16 +310,15 @@ from vdk.api.job_input import IJobInput
313310

314311

315312
def run(job_input: IJobInput):
316-
db_catalog = job_input.get_arguments().get("db_catalog")
317313
db_schema = job_input.get_arguments().get("db_schema")
318314
db_tables = job_input.get_arguments().get("db_tables")
319315

320316
job1_data = job_input.execute_query(
321-
f"SELECT * FROM {db_catalog}.{db_schema}.{db_tables[0]} "
317+
f"SELECT * FROM {db_schema}.{db_tables[0]} "
322318
f"WHERE Country = 'USA'"
323319
)
324320
job2_data = job_input.execute_query(
325-
f"SELECT * FROM {db_catalog}.{db_schema}.{db_tables[1]} "
321+
f"SELECT * FROM {db_schema}.{db_tables[1]} "
326322
f"WHERE Country = 'USA'"
327323
)
328324

@@ -361,7 +357,7 @@ vdk-trino
361357

362358
```
363359
read-job-canada/
364-
├── 10_transform.py
360+
├── 10_read.py
365361
├── config.ini
366362
├── requirements.txt
367363
```
@@ -374,16 +370,15 @@ from vdk.api.job_input import IJobInput
374370

375371

376372
def run(job_input: IJobInput):
377-
db_catalog = job_input.get_arguments().get("db_catalog")
378373
db_schema = job_input.get_arguments().get("db_schema")
379374
db_tables = job_input.get_arguments().get("db_tables")
380375

381376
job1_data = job_input.execute_query(
382-
f"SELECT * FROM {db_catalog}.{db_schema}.{db_tables[0]} "
377+
f"SELECT * FROM {db_schema}.{db_tables[0]} "
383378
f"WHERE Country = 'Canada'"
384379
)
385380
job2_data = job_input.execute_query(
386-
f"SELECT * FROM {db_catalog}.{db_schema}.{db_tables[1]} "
381+
f"SELECT * FROM {db_schema}.{db_tables[1]} "
387382
f"WHERE Country = 'Canada'"
388383
)
389384

@@ -422,7 +417,7 @@ vdk-trino
422417

423418
```
424419
read-job-rest-of-world/
425-
├── 10_transform.py
420+
├── 10_read.py
426421
├── 20_drop_table_one.sql
427422
├── 30_drop_table_two.sql
428423
├── config.ini
@@ -437,16 +432,15 @@ from vdk.api.job_input import IJobInput
437432

438433

439434
def run(job_input: IJobInput):
440-
db_catalog = job_input.get_arguments().get("db_catalog")
441435
db_schema = job_input.get_arguments().get("db_schema")
442436
db_tables = job_input.get_arguments().get("db_tables")
443437

444438
job1_data = job_input.execute_query(
445-
f"SELECT * FROM {db_catalog}.{db_schema}.{db_tables[0]} "
439+
f"SELECT * FROM {db_schema}.{db_tables[0]} "
446440
f"WHERE Country NOT IN ('USA', 'Canada')"
447441
)
448442
job2_data = job_input.execute_query(
449-
f"SELECT * FROM {db_catalog}.{db_schema}.{db_tables[1]} "
443+
f"SELECT * FROM {db_schema}.{db_tables[1]} "
450444
f"WHERE Country NOT IN ('USA', 'Canada')"
451445
)
452446

@@ -511,75 +505,70 @@ dag-job/
511505
<summary>dag_job.py</summary>
512506

513507
```python
514-
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput
508+
from vdk.plugin.dag.dag_runner import DagInput
515509

516510

517511
JOBS_RUN_ORDER = [
518512
{
519513
"job_name": "ingest-job-table-one",
520514
"team_name": "my-team",
521-
"fail_meta_job_on_error": True,
515+
"fail_dag_on_error": True,
522516
"arguments": {
523517
"db_table": "test_dag_one",
524518
"db_schema": "default",
525-
"db_catalog": "memory",
526519
},
527520
"depends_on": [],
528521
},
529522
{
530523
"job_name": "ingest-job-table-two",
531524
"team_name": "my-team",
532-
"fail_meta_job_on_error": True,
525+
"fail_dag_on_error": True,
533526
"arguments": {
534527
"db_table": "test_dag_two",
535528
"db_schema": "default",
536-
"db_catalog": "memory",
537529
},
538530
"depends_on": [],
539531
},
540532
{
541533
"job_name": "read-job-usa",
542534
"team_name": "my-team",
543-
"fail_meta_job_on_error": True,
535+
"fail_dag_on_error": True,
544536
"arguments": {
545537
"db_tables": ["test_dag_one", "test_dag_two"],
546538
"db_schema": "default",
547-
"db_catalog": "memory",
548539
},
549540
"depends_on": ["ingest-job-table-one", "ingest-job-table-two"],
550541
},
551542
{
552543
"job_name": "read-job-canada",
553544
"team_name": "my-team",
554-
"fail_meta_job_on_error": True,
545+
"fail_dag_on_error": True,
555546
"arguments": {
556547
"db_tables": ["test_dag_one", "test_dag_two"],
557548
"db_schema": "default",
558-
"db_catalog": "memory",
559549
},
560550
"depends_on": ["ingest-job-table-one", "ingest-job-table-two"],
561551
},
562552
{
563553
"job_name": "read-job-rest-of-world",
564554
"team_name": "my-team",
565-
"fail_meta_job_on_error": True,
555+
"fail_dag_on_error": True,
566556
"arguments": {
567557
"db_tables": ["test_dag_one", "test_dag_two"],
568558
"db_schema": "default",
569-
"db_catalog": "memory",
570559
},
571560
"depends_on": ["ingest-job-table-one", "ingest-job-table-two"],
572561
},
573562
]
574563

575564

576-
def run(job_input):
577-
MetaJobInput().run_meta_job(JOBS_RUN_ORDER)
565+
def run(job_input) -> None:
566+
DagInput().run_dag(JOBS_RUN_ORDER)
578567

579568
```
580569
</details>
581570

582-
Note that the `run_meta_job` method belongs to the `MetaJobInput` object which must be imported
571+
Note that the `run_dag` method belongs to the `DAGInput` object which must be imported
583572
and instantiated separately from the default `IJobInput` object which is passed to the `run` function by default.
584573

585574
<details>
@@ -598,44 +587,46 @@ and instantiated separately from the default `IJobInput` object which is passed
598587
team = my-team
599588

600589
[vdk]
601-
meta_jobs_max_concurrent_running_jobs = 2
590+
dags_max_concurrent_running_jobs = 2
591+
dags_delayed_jobs_min_delay_seconds = 1
592+
dags_delayed_jobs_randomized_added_delay_seconds = 1
593+
```
594+
</details>
602595

603-
meta_jobs_delayed_jobs_randomized_added_delay_seconds = 1
604-
meta_jobs_delayed_jobs_min_delay_seconds = 1
596+
<details>
597+
<summary>requirements.txt</summary>
598+
599+
```text
600+
vdk-dag
605601
```
606602
</details>
607603

604+
Note that the VDK DAG Job does not require the `vdk-trino` dependency.
605+
Component jobs are responsible for their own dependencies, and the DAG Job only handles their triggering.
606+
608607
### Configuration details
609608

610-
Setting [meta_jobs_max_concurrent_running_jobs](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py#L87)
609+
Setting [dags_max_concurrent_running_jobs](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin_configuration.py#L87)
611610
to 2 in the DAG Job config.ini file would mean that the jobs in the DAG will be executed in the following order:
612611
* ingest-job-table-one, ingest-job-table-two
613612
* read-job-usa, read-job-canada
614613
* read-job-rest-of-world
615614

616-
When the ingest jobs are both finished, all of the read jobs are ready to start but when the aforementioned limit is
615+
When the ingest jobs are both finished, all the read jobs are ready to start but when the aforementioned limit is
617616
hit (after read-job-usa and read-job-canada are started), the following message is logged:
618617

619618
![DAG concurrent running jobs limit hit](images/dag-concurrent-running-jobs-limit-hit.png)
620619
Then the delayed read-job-rest-of-world is started after any of the currently running Data Jobs finishes.
621620

622621
The other two configurations are set in order to have a short fixed delay for delayed jobs such as the last read job.
623-
Check the [configuration](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-meta-jobs/src/vdk/plugin/meta_jobs/meta_configuration.py)
622+
Check the [configuration](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin_configuration.py)
624623
for more details.
625624

626-
<details>
627-
<summary>requirements.txt</summary>
628-
629-
```text
630-
vdk-meta-jobs
631-
```
632-
</details>
633-
634-
Note that the VDK DAG Job does not require the `vdk-trino` dependency.
635-
Component jobs are responsible for their own dependencies, and the DAG Job only handles their triggering.
636-
637625
## Execution
638626

627+
[Here](https://github.com/vmware/versatile-data-kit/tree/main/specs/vep-1243-vdk-dag#high-level-design) you can read
628+
more about the DAG execution.
629+
639630
### Create and deploy Data Jobs
640631

641632
To do so, open a terminal, navigate to the parent directory of the data job
@@ -671,6 +662,14 @@ vdk create -n dag-job -t my-team --no-template && \
671662
vdk deploy -n dag-job -t my-team -p dag-job -r "dag-with-args-example"
672663
```
673664

665+
Note: The team name has to be consistent everywhere (in the config.ini, in each job of the DAG dict of jobs and
666+
while creating&deploying the jobs). Instead of passing the team name each time, you can set a default value:
667+
```console
668+
vdk set-default -t my-team
669+
```
670+
This would then be used in all commands that require a team. However, you would still have to provide the same value
671+
for team name in the config.ini file and the DAG dict of jobs.
672+
674673
### Run DAG Job
675674

676675
You can now run your DAG Job through the Execution API by using one of the following commands*:
Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,61 @@
11
# Copyright 2021-2023 VMware, Inc.
22
# SPDX-License-Identifier: Apache-2.0
3-
from vdk.plugin.meta_jobs.meta_job_runner import MetaJobInput
3+
from vdk.plugin.dag.dag_runner import DagInput
44

55

66
JOBS_RUN_ORDER = [
77
{
88
"job_name": "ingest-job-table-one",
99
"team_name": "my-team",
10-
"fail_meta_job_on_error": True,
10+
"fail_dag_on_error": True,
1111
"arguments": {
1212
"db_table": "test_dag_one",
1313
"db_schema": "default",
14-
"db_catalog": "memory",
1514
},
1615
"depends_on": [],
1716
},
1817
{
1918
"job_name": "ingest-job-table-two",
2019
"team_name": "my-team",
21-
"fail_meta_job_on_error": True,
20+
"fail_dag_on_error": True,
2221
"arguments": {
2322
"db_table": "test_dag_two",
2423
"db_schema": "default",
25-
"db_catalog": "memory",
2624
},
2725
"depends_on": [],
2826
},
2927
{
3028
"job_name": "read-job-usa",
3129
"team_name": "my-team",
32-
"fail_meta_job_on_error": True,
30+
"fail_dag_on_error": True,
3331
"arguments": {
3432
"db_tables": ["test_dag_one", "test_dag_two"],
3533
"db_schema": "default",
36-
"db_catalog": "memory",
3734
},
3835
"depends_on": ["ingest-job-table-one", "ingest-job-table-two"],
3936
},
4037
{
4138
"job_name": "read-job-canada",
4239
"team_name": "my-team",
43-
"fail_meta_job_on_error": True,
40+
"fail_dag_on_error": True,
4441
"arguments": {
4542
"db_tables": ["test_dag_one", "test_dag_two"],
4643
"db_schema": "default",
47-
"db_catalog": "memory",
4844
},
4945
"depends_on": ["ingest-job-table-one", "ingest-job-table-two"],
5046
},
5147
{
5248
"job_name": "read-job-rest-of-world",
5349
"team_name": "my-team",
54-
"fail_meta_job_on_error": True,
50+
"fail_dag_on_error": True,
5551
"arguments": {
5652
"db_tables": ["test_dag_one", "test_dag_two"],
5753
"db_schema": "default",
58-
"db_catalog": "memory",
5954
},
6055
"depends_on": ["ingest-job-table-one", "ingest-job-table-two"],
6156
},
6257
]
6358

6459

65-
def run(job_input):
66-
MetaJobInput().run_meta_job(JOBS_RUN_ORDER)
60+
def run(job_input) -> None:
61+
DagInput().run_dag(JOBS_RUN_ORDER)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
vdk-dag

0 commit comments

Comments
 (0)