Skip to content

Commit 66d4d2f

Browse files
Merge pull request #397 from icanbwell/safe-merge-enable
Enable safe merge
2 parents 20e0ae0 + 87f9459 commit 66d4d2f

File tree

11 files changed

+1460
-1409
lines changed

11 files changed

+1460
-1409
lines changed

Pipfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ bounded-pool-executor = ">=0.0.3"
3939
# fastjsonschema is needed for validating JSON
4040
fastjsonschema= ">=2.18.0"
4141
# helix.fhir.client.sdk is needed for interacting with FHIR servers
42-
"helix.fhir.client.sdk" = ">=4.1.11"
42+
"helix.fhir.client.sdk" = ">=4.1.28"
4343
# opensearch-py is needed for interacting with OpenSearch
4444
opensearch-py= { extras = ['async'], version = ">=2.6.0" }
4545
# pyathena is needed for interacting with Athena in AWS

Pipfile.lock

Lines changed: 1408 additions & 1407 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

spark_pipeline_framework/transformers/fhir_sender/v1/fhir_sender.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ def __init__(
9292
drop_fields_from_json: Optional[List[str]] = None,
9393
partition_by_column_name: Optional[str] = None,
9494
enable_repartitioning: bool = True,
95+
smart_merge: Optional[bool] = None,
9596
):
9697
"""
9798
Sends FHIR json stored in a folder to a FHIR server
@@ -274,6 +275,10 @@ def __init__(
274275
)
275276
self._setDefault(enable_repartitioning=enable_repartitioning)
276277

278+
self.smart_merge: Param[Optional[bool]] = Param(self, "smart_merge", "")
279+
280+
self._setDefault(smart_merge=None)
281+
277282
kwargs = self._input_kwargs
278283
self.setParams(**kwargs)
279284

@@ -317,6 +322,7 @@ def _transform(self, df: DataFrame) -> DataFrame:
317322
partition_by_column_name: Optional[str] = self.getOrDefault(
318323
self.partition_by_column_name
319324
)
325+
smart_merge: Optional[bool] = self.getOrDefault(self.smart_merge)
320326

321327
if not batch_size or batch_size == 0:
322328
batch_size = 30
@@ -487,6 +493,7 @@ def _transform(self, df: DataFrame) -> DataFrame:
487493
validation_server_url=validation_server_url,
488494
retry_count=retry_count,
489495
exclude_status_codes_from_retry=exclude_status_codes_from_retry,
496+
smart_merge=smart_merge,
490497
)
491498
)
492499
result_rows: List[Dict[str, Any]] = flatten(result_rows_list)
@@ -522,6 +529,7 @@ def _transform(self, df: DataFrame) -> DataFrame:
522529
validation_server_url=validation_server_url,
523530
retry_count=retry_count,
524531
exclude_status_codes_from_retry=exclude_status_codes_from_retry,
532+
smart_merge=smart_merge,
525533
)
526534
)
527535
rdd = (

spark_pipeline_framework/transformers/fhir_sender/v1/fhir_sender_helpers.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def send_json_bundle_to_fhir(
3333
retry_count: Optional[int] = None,
3434
exclude_status_codes_from_retry: Optional[List[int]] = None,
3535
additional_request_headers: Optional[Dict[str, str]] = None,
36+
smart_merge: Optional[bool] = None,
3637
) -> Optional[FhirMergeResponse]:
3738
assert id_, f"{json_data_list!r}"
3839
fhir_client: FhirClient = get_fhir_client(
@@ -46,6 +47,8 @@ def send_json_bundle_to_fhir(
4647
auth_scopes=auth_scopes,
4748
log_level=log_level,
4849
)
50+
if smart_merge is False:
51+
fhir_client.smart_merge(False)
4952

5053
fhir_client = fhir_client.resource(resource)
5154
if validation_server_url:

spark_pipeline_framework/transformers/fhir_sender/v1/fhir_sender_processor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def send_partition_to_server(
4444
validation_server_url: Optional[str],
4545
retry_count: Optional[int],
4646
exclude_status_codes_from_retry: Optional[List[int]],
47+
smart_merge: Optional[bool] = None,
4748
) -> Generator[List[Dict[str, Any]], None, None]:
4849
"""
4950
This function processes a partition
@@ -191,6 +192,7 @@ def send_partition_to_server(
191192
log_level=log_level,
192193
retry_count=retry_count,
193194
exclude_status_codes_from_retry=exclude_status_codes_from_retry,
195+
smart_merge=smart_merge,
194196
)
195197
if result:
196198
auth_access_token1 = result.access_token

spark_pipeline_framework/transformers/fhir_sender/v2/fhir_sender.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ def __init__(
104104
max_chunk_size: int = 100,
105105
process_chunks_in_parallel: Optional[bool] = True,
106106
maximum_concurrent_tasks: int = 100,
107+
smart_merge: Optional[bool] = None,
107108
):
108109
"""
109110
Sends FHIR json stored in a folder to a FHIR server
@@ -139,6 +140,7 @@ def __init__(
139140
:param max_chunk_size: (Optional) max chunk size
140141
:param process_chunks_in_parallel: (Optional) process chunks in parallel
141142
:param maximum_concurrent_tasks: (Optional) maximum concurrent tasks
143+
:param smart_merge: (Optional) whether to use smart merge or not
142144
"""
143145
super().__init__(
144146
name=name, parameters=parameters, progress_logger=progress_logger
@@ -307,6 +309,9 @@ def __init__(
307309
)
308310
self._setDefault(maximum_concurrent_tasks=maximum_concurrent_tasks)
309311

312+
self.smart_merge: Param[Optional[bool]] = Param(self, "smart_merge", "")
313+
self._setDefault(smart_merge=None)
314+
310315
kwargs = self._input_kwargs
311316
self.setParams(**kwargs)
312317

@@ -394,6 +399,8 @@ async def _transform_async(self, df: DataFrame) -> DataFrame:
394399
)
395400
maximum_concurrent_tasks: int = self.getOrDefault(self.maximum_concurrent_tasks)
396401

402+
smart_merge: Optional[bool] = self.getOrDefault(self.smart_merge)
403+
397404
if parameters and parameters.get("flow_name"):
398405
user_agent_value = (
399406
f"{parameters['team_name']}:helix.pipelines:{parameters['flow_name']}".replace(
@@ -531,6 +538,7 @@ async def _transform_async(self, df: DataFrame) -> DataFrame:
531538
process_chunks_in_parallel=process_chunks_in_parallel,
532539
maximum_concurrent_tasks=maximum_concurrent_tasks,
533540
),
541+
smart_merge=smart_merge,
534542
)
535543
if run_synchronously:
536544
rows_to_send: List[Dict[str, Any]] = [

spark_pipeline_framework/transformers/fhir_sender/v2/fhir_sender_helpers_async.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ async def send_json_bundle_to_fhir_async(
3636
exclude_status_codes_from_retry: Optional[List[int]] = None,
3737
additional_request_headers: Optional[Dict[str, str]] = None,
3838
batch_size: Optional[int] = None,
39+
smart_merge: Optional[bool] = None,
3940
) -> AsyncGenerator[FhirMergeResponse, None]:
4041
"""
4142
Send a JSON bundle to FHIR server
@@ -54,6 +55,8 @@ async def send_json_bundle_to_fhir_async(
5455
log_level=log_level,
5556
auth_well_known_url=auth_well_known_url,
5657
)
58+
if smart_merge is False:
59+
fhir_client.smart_merge(False)
5760

5861
fhir_client = fhir_client.resource(resource)
5962
if validation_server_url:

spark_pipeline_framework/transformers/fhir_sender/v2/fhir_sender_parameters.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,4 @@ class FhirSenderParameters:
3030
retry_count: Optional[int]
3131
exclude_status_codes_from_retry: Optional[List[int]]
3232
pandas_udf_parameters: AsyncPandasUdfParameters
33+
smart_merge: Optional[bool]

spark_pipeline_framework/transformers/fhir_sender/v2/fhir_sender_processor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ async def send_partition_to_server_async(
359359
log_level=parameters.log_level,
360360
retry_count=parameters.retry_count,
361361
exclude_status_codes_from_retry=parameters.exclude_status_codes_from_retry,
362+
smart_merge=parameters.smart_merge,
362363
):
363364
if result:
364365
auth_access_token1 = result.access_token
@@ -401,6 +402,7 @@ async def send_partition_to_server_async(
401402
log_level=parameters.log_level,
402403
retry_count=parameters.retry_count,
403404
exclude_status_codes_from_retry=parameters.exclude_status_codes_from_retry,
405+
smart_merge=parameters.smart_merge,
404406
):
405407
if result and result.request_id:
406408
request_id_list.append(result.request_id)

spark_pipeline_framework/transformers/send_automapper_to_fhir/v1/automapper_to_fhir_transformer.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def __init__(
7373
operation: Union[
7474
FhirSenderOperation, str
7575
] = FhirSenderOperation.FHIR_OPERATION_MERGE.value,
76+
smart_merge: Optional[bool] = None,
7677
):
7778
"""
7879
Runs the auto-mappers, saves the result to Athena db and then sends the results to fhir server
@@ -97,6 +98,7 @@ def __init__(
9798
}
9899
:param enable_repartitioning: Enable repartitioning or not, default True
99100
:param operation: The API operation to perform, such as merge, put, delete etc
101+
:param smart_merge: (Optional) Enable smart merge functionality
100102
"""
101103
super().__init__(
102104
name=name, parameters=parameters, progress_logger=progress_logger
@@ -190,6 +192,9 @@ def __init__(
190192
)
191193
self._setDefault(operation=operation)
192194

195+
self.smart_merge: Param[Optional[bool]] = Param(self, "smart_merge", "")
196+
self._setDefault(smart_merge=smart_merge)
197+
193198
kwargs = self._input_kwargs
194199
self.setParams(**kwargs)
195200

@@ -214,6 +219,7 @@ def _transform(self, df: DataFrame) -> DataFrame:
214219
)
215220
enable_repartitioning: bool = self.getOrDefault(self.enable_repartitioning)
216221
operation: Union[FhirSenderOperation, str] = self.getOrDefault(self.operation)
222+
smart_merge: Optional[bool] = self.getSmartMerge()
217223

218224
assert parameters
219225
progress_logger = self.getProgressLogger()
@@ -347,6 +353,7 @@ def _transform(self, df: DataFrame) -> DataFrame:
347353
),
348354
enable_repartitioning=enable_repartitioning,
349355
operation=operation,
356+
smart_merge=smart_merge,
350357
).transform(df)
351358
return df
352359

@@ -409,3 +416,7 @@ def getSendToFhir(self) -> Optional[bool]:
409416
# noinspection PyPep8Naming,PyMissingOrEmptyDocstring
410417
def getAdditionalRequestHeaders(self) -> Optional[Dict[str, str]]:
411418
return self.getOrDefault(self.additional_request_headers)
419+
420+
# noinspection PyPep8Naming,PyMissingOrEmptyDocstring
421+
def getSmartMerge(self) -> Optional[bool]:
422+
return self.getOrDefault(self.smart_merge)

spark_pipeline_framework/transformers/send_automapper_to_fhir/v2/automapper_to_fhir_transformer.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def __init__(
7575
FhirSenderOperation, str
7676
] = FhirSenderOperation.FHIR_OPERATION_MERGE.value,
7777
log_level: Optional[str] = None,
78+
smart_merge: Optional[bool] = None,
7879
):
7980
"""
8081
Runs the auto-mappers, saves the result to Athena db and then sends the results to fhir server
@@ -99,6 +100,8 @@ def __init__(
99100
}
100101
:param enable_repartitioning: Enable repartitioning or not, default True
101102
:param operation: The API operation to perform, such as merge, put, delete etc
103+
:param log_level: (Optional) Log level to use
104+
:param smart_merge: (Optional) Enable smart merge functionality
102105
"""
103106
super().__init__(
104107
name=name, parameters=parameters, progress_logger=progress_logger
@@ -195,6 +198,9 @@ def __init__(
195198
self.log_level: Param[str] = Param(self, "log_level", "")
196199
self._setDefault(log_level=log_level)
197200

201+
self.smart_merge: Param[Optional[bool]] = Param(self, "smart_merge", "")
202+
self._setDefault(smart_merge=smart_merge)
203+
198204
kwargs = self._input_kwargs
199205
self.setParams(**kwargs)
200206

@@ -219,6 +225,7 @@ async def _transform_async(self, df: DataFrame) -> DataFrame:
219225
)
220226
enable_repartitioning: bool = self.getOrDefault(self.enable_repartitioning)
221227
operation: Union[FhirSenderOperation, str] = self.getOrDefault(self.operation)
228+
smart_merge: Optional[bool] = self.getSmartMerge()
222229

223230
assert parameters is not None
224231
progress_logger = self.getProgressLogger()
@@ -350,6 +357,7 @@ async def _transform_async(self, df: DataFrame) -> DataFrame:
350357
),
351358
operation=operation,
352359
log_level=log_level,
360+
smart_merge=smart_merge,
353361
).transform_async(df)
354362
return df
355363

@@ -409,6 +417,10 @@ def getAuthScopes(self) -> Optional[List[str]]:
409417
def getSendToFhir(self) -> Optional[bool]:
410418
return self.getOrDefault(self.send_to_fhir)
411419

412-
# noinspection PyPep8Naming,PyMissingOrEmptyDocstring
420+
# no inspection PyPep8Naming,PyMissingOrEmptyDocstring
413421
def getAdditionalRequestHeaders(self) -> Optional[Dict[str, str]]:
414422
return self.getOrDefault(self.additional_request_headers)
423+
424+
# noinspection PyPep8Naming,PyMissingOrEmptyDocstring
425+
def getSmartMerge(self) -> Optional[bool]:
426+
return self.getOrDefault(self.smart_merge)

0 commit comments

Comments
 (0)