@@ -86,15 +86,10 @@ def run_step(context: JobContext, step: Step) -> StepResult:
8686 )
8787 except Exception as e :
8888 status = ExecutionStatus .ERROR
89- details = errors .MSG_WHY_FROM_EXCEPTION (e )
9089 blamee = whom_to_blame (e , __file__ , context .job_directory )
9190 exception = e
9291 errors .report (blamee , exception )
93- errors .log_exception (
94- log ,
95- exception ,
96- f"Processing step { step .name } completed with error." ,
97- )
92+ log .warning (f"Processing step { step .name } completed with error." )
9893
9994 return StepResult (
10095 name = step .name ,
@@ -132,6 +127,7 @@ def run_job(context: JobContext) -> ExecutionResult:
132127 execution_status = ExecutionStatus .SUCCESS
133128 for current_step in steps :
134129 step_start_time = datetime .utcnow ()
130+ res = None
135131 try :
136132 res = context .core_context .plugin_registry .hook ().run_step (
137133 context = context , step = current_step
@@ -140,11 +136,7 @@ def run_job(context: JobContext) -> ExecutionResult:
140136 blamee = whom_to_blame (e , __file__ , context .job_directory )
141137 exception = e
142138 errors .report (blamee , exception )
143- errors .log_exception (
144- log ,
145- exception ,
146- f"Processing step { current_step .name } completed with error." ,
147- )
139+ log .warn (f"Processing step { current_step .name } completed with error." )
148140 res = StepResult (
149141 name = current_step .name ,
150142 type = current_step .type ,
@@ -160,11 +152,11 @@ def run_job(context: JobContext) -> ExecutionResult:
160152 # errors.clear_intermediate_errors() # step completed successfully, so we can forget errors
161153 if res .status == ExecutionStatus .ERROR :
162154 execution_status = ExecutionStatus .ERROR
155+ exception = res .exception
163156 break
164157 if res .status == ExecutionStatus .SKIP_REQUESTED :
165158 # We keep the status as Success, but we skip all remaining steps
166159 break
167-
168160 execution_result = ExecutionResult (
169161 context .name ,
170162 context .core_context .state .get (CommonStoreKeys .EXECUTION_ID ),
@@ -298,25 +290,31 @@ def run(
298290 self ._plugin_hook .initialize_job (context = job_context )
299291
300292 start_time = datetime .utcnow ()
293+ step_results = []
301294 try :
302- return self ._plugin_hook .run_job (context = job_context )
295+ execution_result = self ._plugin_hook .run_job (context = job_context )
296+ if (
297+ execution_result .exception
298+ and execution_result .status == ExecutionStatus .ERROR
299+ ):
300+ step_results = execution_result .steps_list
301+ raise execution_result .exception
302+ return execution_result
303303 except BaseException as ex :
304304 blamee = whom_to_blame (ex , __file__ , job_context .job_directory )
305305 errors .report (blamee , ex )
306- errors .log_exception (
307- log , ex , f"Data Job { self ._name } completed with error."
308- )
306+ log .warn (f"Data Job { self ._name } completed with error." )
307+ log .exception (ex )
309308 execution_result = ExecutionResult (
310309 self ._name ,
311310 self ._core_context .state .get (CommonStoreKeys .EXECUTION_ID ),
312311 start_time ,
313312 datetime .utcnow (),
314313 ExecutionStatus .ERROR ,
315- [] ,
314+ step_results ,
316315 ex ,
317316 blamee ,
318317 )
319318 return execution_result
320-
321319 finally : # TODO: we should pass execution result to finalize_job somehow ...
322320 self ._plugin_hook .finalize_job (context = job_context )
0 commit comments