Skip to content

Commit b869ddb

Browse files
vdk-impala: Introduce checks for insert template (#2198)
Why: Add functionality to the insert processing templates in order to allow quality checks to be made before the data is inserted into the target table. Currently, the checks done on the insert template processing step are not covering if the semantics of the data is correct. Therefore, bad data could go into the target table which could be unwanted behavior. More details explained in #1361 What: -Adding functionality to handle the insert template behavior if the user provides checks Tests: provided positive and negative regression tests Signed-off-by: Stefan Buldeev sbuldeev@vmware.com --------- Signed-off-by: Stefan Buldeev sbuldeev@vmware.com Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 3b3ecc7 commit b869ddb

File tree

12 files changed

+208
-11
lines changed

12 files changed

+208
-11
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright 2021-2023 VMware, Inc.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
5+
class DataQualityException(Exception):
6+
"""
7+
Exception raised for errors with the quality of the data.
8+
9+
Attributes:
10+
checked_object -- Object that the quality checks are ran against
11+
target_table -- DWH table where target data is loaded
12+
source_view -- View from which the raw data is loaded from
13+
"""
14+
15+
def __init__(self, checked_object, target_table, source_view):
16+
self.checked_object = checked_object
17+
self.target_table = target_table
18+
self.source_view = source_view
19+
self.message = f"""What happened: Error occurred while performing quality checks.\n
20+
Why it happened: Object: {checked_object} is not passing the quality checks.\n
21+
Consequences: The source view data will not be processed to the target table - {target_table}.\n
22+
Countermeasures: Check the source view: {source_view} what data is trying to be processed."""
23+
super().__init__(self.message)

projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/02-handle-quality-checks.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44

55
from vdk.api.job_input import IJobInput
6+
from vdk.plugin.impala.templates.data_quality_exception import DataQualityException
67
from vdk.plugin.impala.templates.utility import align_stg_table_with_target
78
from vdk.plugin.impala.templates.utility import get_file_content
89
from vdk.plugin.impala.templates.utility import get_staging_table_name
@@ -34,10 +35,9 @@ def run(job_input: IJobInput):
3435
staging_table_name = get_staging_table_name(target_schema, target_table)
3536

3637
staging_table = f"{staging_schema}.{staging_table_name}"
38+
target_table_full_name = f"{target_schema}.{target_table}"
3739

38-
align_stg_table_with_target(
39-
f"{target_schema}.{target_table}", staging_table, job_input
40-
)
40+
align_stg_table_with_target(target_table_full_name, staging_table, job_input)
4141

4242
insert_into_staging = insert_query.format(
4343
target_schema=staging_schema,
@@ -58,7 +58,11 @@ def run(job_input: IJobInput):
5858
)
5959
job_input.execute_query(insert_into_target)
6060
else:
61-
raise Exception("The data is not passing the quality checks!")
61+
raise DataQualityException(
62+
checked_object=staging_table,
63+
source_view=f"{source_schema}.{source_view}",
64+
target_table=target_table_full_name,
65+
)
6266

6367
else:
6468
job_input.execute_query(insert_query)

projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/dimension/scd1/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ In summary, it overwrites the target table with the source data.
1616
- target_table - SC Data Warehouse table of DW type 'Slowly Changing Dimension Type 1', where target data is loaded
1717
- source_schema - SC Data Lake schema, where source raw data is loaded from
1818
- source_view - SC Data Lake view, where source raw data is loaded from
19-
- check - (Optional) Callback function responsible for checking the quality of the data
19+
- check - (Optional) Callback function responsible for checking the quality of the data. Takes in a table name as a parameter which will be used for data validation
2020
- staging_schema - (Optional) Schema where the checks will be executed. If not provided target_schema will be used as default
2121

2222
### Prerequisites:

projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/insert/00-fact-snapshot-definition.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Copyright 2021-2023 VMware, Inc.
22
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Callable
4+
from typing import Optional
5+
36
from pydantic import BaseModel
47
from vdk.api.job_input import IJobInput
58
from vdk.plugin.impala.templates.template_arguments_validator import (
@@ -12,6 +15,8 @@ class FactDailySnapshotParams(BaseModel):
1215
target_table: str
1316
source_schema: str
1417
source_view: str
18+
check: Optional[Callable[[str], bool]]
19+
staging_schema: Optional[str]
1520

1621

1722
class FactDailySnapshot(TemplateArgumentsValidator):
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# Copyright 2021-2023 VMware, Inc.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import os
4+
5+
from vdk.api.job_input import IJobInput
6+
from vdk.plugin.impala.templates.data_quality_exception import DataQualityException
7+
from vdk.plugin.impala.templates.utility import align_stg_table_with_target
8+
from vdk.plugin.impala.templates.utility import get_file_content
9+
from vdk.plugin.impala.templates.utility import get_staging_table_name
10+
11+
SQL_FILES_FOLDER = (
12+
os.path.dirname(os.path.abspath(__file__)) + "/02-requisite-sql-scripts"
13+
)
14+
15+
"""
16+
This step is intened to handle quality checks if such provided
17+
and stop the data from being populated into the target table if the check has negative outcome.
18+
Otherwise the data will be directly processed according to the used template type
19+
"""
20+
21+
22+
def run(job_input: IJobInput):
23+
job_arguments = job_input.get_arguments()
24+
25+
check = job_arguments.get("check")
26+
partition_clause = job_arguments["_vdk_template_insert_partition_clause"]
27+
source_schema = job_arguments.get("source_schema")
28+
source_view = job_arguments.get("source_view")
29+
target_schema = job_arguments.get("target_schema")
30+
target_table = job_arguments.get("target_table")
31+
insert_query = get_file_content(SQL_FILES_FOLDER, "02-insert-into-target.sql")
32+
33+
if check:
34+
staging_schema = job_arguments.get("staging_schema", target_schema)
35+
staging_table_name = get_staging_table_name(target_schema, target_table)
36+
37+
staging_table = f"{staging_schema}.{staging_table_name}"
38+
target_table_full_name = f"{target_schema}.{target_table}"
39+
40+
align_stg_table_with_target(target_table_full_name, staging_table, job_input)
41+
42+
insert_into_staging = insert_query.format(
43+
target_schema=staging_schema,
44+
target_table=staging_table_name,
45+
_vdk_template_insert_partition_clause=partition_clause,
46+
source_schema=source_schema,
47+
source_view=source_view,
48+
)
49+
job_input.execute_query(insert_into_staging)
50+
51+
view_schema = staging_schema
52+
view_name = f"vw_{staging_table_name}"
53+
create_view_query = get_file_content(
54+
SQL_FILES_FOLDER, "02-create-consolidated-view.sql"
55+
)
56+
create_view = create_view_query.format(
57+
view_schema=view_schema,
58+
view_name=view_name,
59+
target_schema=target_schema,
60+
target_table=target_table,
61+
staging_schema=staging_schema,
62+
staging_table_name=staging_table_name,
63+
)
64+
job_input.execute_query(create_view)
65+
66+
view_full_name = f"{view_schema}.{view_name}"
67+
if check(view_full_name):
68+
insert_into_target = insert_query.format(
69+
source_schema=staging_schema,
70+
source_view=staging_table_name,
71+
_vdk_template_insert_partition_clause=partition_clause,
72+
target_schema=target_schema,
73+
target_table=target_table,
74+
)
75+
job_input.execute_query(insert_into_target)
76+
else:
77+
raise DataQualityException(
78+
checked_object=view_full_name,
79+
source_view=f"{source_schema}.{source_view}",
80+
target_table=target_table_full_name,
81+
)
82+
83+
else:
84+
job_input.execute_query(insert_query)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
CREATE VIEW IF NOT EXISTS {view_schema}.{view_name}
2+
AS (
3+
SELECT *
4+
FROM {target_schema}.{target_table}
5+
UNION ALL
6+
SELECT *
7+
FROM {staging_schema}.{staging_table_name}
8+
)

projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/insert/02-insert-into-target.sql renamed to projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/insert/02-requisite-sql-scripts/02-insert-into-target.sql

File renamed without changes.

projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/insert/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ This template can be used to load raw data from Data Lake to target Table in Dat
1212
- target_table - Data Warehouse table of DW type table, where target data is loaded
1313
- source_schema - Data Lake schema, where source raw data is loaded from
1414
- source_view - Data Lake view, where source raw data is loaded from
15+
- check - (Optional) Callback function responsible for checking the quality of the data. Takes in a table name as a parameter which will be used for data validation
16+
- staging_schema - (Optional) Schema where the checks will be executed. If not provided target_schema will be used as default
1517

1618
### Prerequisites:
1719

projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/02-handle-quality-checks.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44

55
from vdk.api.job_input import IJobInput
6+
from vdk.plugin.impala.templates.data_quality_exception import DataQualityException
67
from vdk.plugin.impala.templates.utility import align_stg_table_with_target
78
from vdk.plugin.impala.templates.utility import get_file_content
89
from vdk.plugin.impala.templates.utility import get_staging_table_name
@@ -38,10 +39,9 @@ def run(job_input: IJobInput):
3839
staging_table_name = get_staging_table_name(target_schema, target_table)
3940

4041
staging_table = f"{staging_schema}.{staging_table_name}"
42+
target_table_full_name = f"{target_schema}.{target_table}"
4143

42-
align_stg_table_with_target(
43-
f"{target_schema}.{target_table}", staging_table, job_input
44-
)
44+
align_stg_table_with_target(target_table_full_name, staging_table, job_input)
4545

4646
insert_into_staging = insert_query.format(
4747
current_target_schema=staging_schema,
@@ -65,7 +65,11 @@ def run(job_input: IJobInput):
6565
)
6666
job_input.execute_query(insert_into_target)
6767
else:
68-
raise Exception("The data is not passing the quality checks!")
68+
raise DataQualityException(
69+
checked_object=staging_table,
70+
source_view=f"{source_schema}.{source_view}",
71+
target_table=target_table_full_name,
72+
)
6973

7074
else:
7175
insert_query = insert_query.replace(

projects/vdk-plugins/vdk-impala/src/vdk/plugin/impala/templates/load/fact/snapshot/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ truncating all present target table records observed after t1.
1919
- source_schema - SC Data Lake schema, where source raw data is loaded from
2020
- source_view - SC Data Lake view, where source raw data is loaded from
2121
- last_arrival_ts - Timestamp column, on which increments to target_table are done
22-
- check - (Optional) Callback function responsible for checking the quality of the data
22+
- check - (Optional) Callback function responsible for checking the quality of the data. Takes in a table name as a parameter which will be used for data validation
2323
- staging_schema - (Optional) Schema where the checks will be executed. If not provided target_schema will be used as default
2424

2525
### Prerequisites:

0 commit comments

Comments
 (0)