Skip to content

[AIRFLOW-6885] Delete worker on success#7507

Merged
dimberman merged 1 commit intoapache:masterfrom
dimberman:AIRFLOW-6885_delete-worker-on-success
Mar 19, 2020
Merged

[AIRFLOW-6885] Delete worker on success#7507
dimberman merged 1 commit intoapache:masterfrom
dimberman:AIRFLOW-6885_delete-worker-on-success

Conversation

@dimberman
Copy link
Copy Markdown
Contributor

@dimberman dimberman commented Feb 22, 2020

Users now have the option to only delete worker pods when they are successful


Issue link: AIRFLOW-6885

Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
  • Unit tests coverage for changes (not needed for documentation changes)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

* For document-only changes commit message can start with [AIRFLOW-XXXX].


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg boring-cyborg bot added area:Scheduler including HA (high availability) scheduler k8s labels Feb 22, 2020
@dimberman dimberman requested review from ashb and kaxil February 22, 2020 18:34
@dimberman dimberman force-pushed the AIRFLOW-6885_delete-worker-on-success branch from e375b82 to f38951b Compare February 22, 2020 18:36
@dimberman dimberman changed the title Users now have the option to only delete worker pods when they are su… [AIRFLOW-6885] Delete worker on success Feb 22, 2020
@dimberman dimberman requested a review from turbaszek February 22, 2020 18:37
@dimberman dimberman requested review from kaxil and turbaszek February 23, 2020 02:01
Copy link
Copy Markdown
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the feature idea, but what do you think of using a single (existing) config option with possible values True, False, and only_on_success.

This may be a bag idea because of the config API, but might be more user friendly?

@dimberman dimberman force-pushed the AIRFLOW-6885_delete-worker-on-success branch 2 times, most recently from 48899c3 to ea82908 Compare February 23, 2020 21:02
@kaxil
Copy link
Copy Markdown
Member

kaxil commented Feb 25, 2020

Travis failed:

tests/executors/test_kubernetes_executor.py ........F
_______________ TestKubernetesExecutor.test_change_state_failed ________________
self = <tests.executors.test_kubernetes_executor.TestKubernetesExecutor testMethod=test_change_state_failed>
mock_delete_pod = <MagicMock name='delete_pod' id='140527588788768'>
mock_get_kube_client = <MagicMock name='get_kube_client' id='140527589891936'>
mock_kubernetes_job_watcher = <MagicMock name='KubernetesJobWatcher' id='140527589720472'>
    @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
    @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
    def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
        executor = KubernetesExecutor()
        executor.start()
        test_time = timezone.utcnow()
        key = ('dag_id', 'task_id', test_time, 'try_number3')
        executor._change_state(key, State.FAILED, 'pod_id', 'default')
        self.assertTrue(executor.event_buffer[key] == State.FAILED)
>       mock_delete_pod.assert_not_called()
tests/executors/test_kubernetes_executor.py:266: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_mock_self = <MagicMock name='delete_pod' id='140527588788768'>
    def assert_not_called(_mock_self):
        """assert that the mock was never called.
        """
        self = _mock_self
        if self.call_count != 0:
            msg = ("Expected '%s' to not have been called. Called %s times.%s"
                   % (self._mock_name or 'mock',
                      self.call_count,
                      self._calls_repr()))
>           raise AssertionError(msg)
E           AssertionError: Expected 'delete_pod' to not have been called. Called 1 times.
E           Calls: [call('pod_id', 'default')].
/usr/local/lib/python3.6/site-packages/mock/mock.py:871: AssertionError
----------------------------- Captured stdout call -----------------------------
[%(asctime)s] {{%(filename)s:%(lineno)d}} %(levelname)s - %(message)s
[%(asctime)s] {{%(filename)s:%(lineno)d}} %(levelname)s - %(message)s
[%(asctime)s] {{%(filename)s:%(lineno)d}} %(levelname)s - %(message)s
tests/executors/test_kubernetes_executor.py F
_________ TestKubernetesExecutor.test_change_state_failed_pod_deletion _________
self = <tests.executors.test_kubernetes_executor.TestKubernetesExecutor testMethod=test_change_state_failed_pod_deletion>
mock_delete_pod = <MagicMock name='delete_pod' id='140527588755552'>
mock_get_kube_client = <MagicMock name='get_kube_client' id='140527588795280'>
mock_kubernetes_job_watcher = <MagicMock name='KubernetesJobWatcher' id='140527588672288'>
    @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
    @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
    def test_change_state_failed_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
                                              mock_kubernetes_job_watcher):
        executor = KubernetesExecutor()
        executor.kube_config.delete_worker_pods_on_success = True
        executor.start()
        key = ('dag_id', 'task_id', 'ex_time', 'try_number2')
        executor._change_state(key, State.FAILED, 'pod_id', 'test-namespace')
        self.assertTrue(executor.event_buffer[key] == State.FAILED)
>       mock_delete_pod.assert_not_called()
tests/executors/test_kubernetes_executor.py:296: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_mock_self = <MagicMock name='delete_pod' id='140527588755552'>
    def assert_not_called(_mock_self):
        """assert that the mock was never called.
        """
        self = _mock_self
        if self.call_count != 0:
            msg = ("Expected '%s' to not have been called. Called %s times.%s"
                   % (self._mock_name or 'mock',
                      self.call_count,
                      self._calls_repr()))
>           raise AssertionError(msg)
E           AssertionError: Expected 'delete_pod' to not have been called. Called 1 times.
E           Calls: [call('pod_id', 'test-namespace')].
/usr/local/lib/python3.6/site-packages/mock/mock.py:871: AssertionError

@dimberman dimberman force-pushed the AIRFLOW-6885_delete-worker-on-success branch from ea82908 to a7c04ce Compare March 19, 2020 13:32
Users now have the option to only delete worker pods when they are successful
@dimberman dimberman force-pushed the AIRFLOW-6885_delete-worker-on-success branch from 6752286 to be002d4 Compare March 19, 2020 20:21
@dimberman dimberman merged commit d027b87 into apache:master Mar 19, 2020
@dimberman dimberman deleted the AIRFLOW-6885_delete-worker-on-success branch March 19, 2020 21:52
@kaxil kaxil added this to the Airflow 1.10.11 milestone Apr 3, 2020
kaxil pushed a commit that referenced this pull request Apr 16, 2020
Users now have the option to only delete worker pods when they are successful

Co-authored-by: Daniel Imberman <daniel@astronomer.io>
cfei18 pushed a commit to cfei18/incubator-airflow that referenced this pull request Mar 5, 2021
Users now have the option to only delete worker pods when they are successful

Co-authored-by: Daniel Imberman <daniel@astronomer.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants