Skip to content

Commit bc6c3dc

Browse files
committed
control-service: async deployment deletion
Why: As part of VEP-2272, we need to introduce a process for synchronizing data jobs from the database to Kubernetes. Currently, the process lacks the ability to delete deployments asynchronously. What: We have introduced an asynchronous method for deployment deletion. It is based on the logic that if the desired deployment is null and the actual deployment is not null, the process performs deployment deletion. Testing done: Integration tests. Signed-off-by: Miroslav Ivanov miroslavi@vmware.com
1 parent c870acb commit bc6c3dc

File tree

6 files changed

+161
-15
lines changed

6 files changed

+161
-15
lines changed

projects/control-service/projects/pipelines_control_service/src/integration-test/java/com/vmware/taurus/service/deploy/DataJobDeploymentCrudITV2.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,13 @@ public void testSynchronizeDataJob() throws Exception {
158158
Assertions.assertNotEquals(lastDeployedDateInitial, lastDeployedDateShouldBeChanged);
159159
Assertions.assertNotEquals(
160160
deploymentVersionShaShouldNotBeChanged, deploymentVersionShaShouldBeChanged);
161+
162+
// Deletes deployment
163+
desiredJobDeploymentRepository.deleteById(testJobName);
164+
dataJobsSynchronizer.synchronizeDataJob(
165+
dataJob, null, actualDataJobDeployment, true);
166+
Assertions.assertFalse(deploymentService.readDeployment(testJobName).isPresent());
167+
Assertions.assertFalse(actualJobDeploymentRepository.findById(testJobName).isPresent());
161168
}
162169

163170
@Test

projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/datajobs/DataJobsDeploymentController.java

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import com.vmware.taurus.controlplane.model.data.DataJobMode;
1212
import com.vmware.taurus.exception.ExternalSystemError;
1313
import com.vmware.taurus.service.JobsService;
14+
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig;
1415
import com.vmware.taurus.service.deploy.DeploymentService;
16+
import com.vmware.taurus.service.deploy.DeploymentServiceV2;
1517
import com.vmware.taurus.service.diag.OperationContext;
1618
import com.vmware.taurus.service.model.JobDeploymentStatus;
1719
import io.swagger.v3.oas.annotations.tags.Tag;
@@ -51,14 +53,27 @@ public class DataJobsDeploymentController implements DataJobsDeploymentApi {
5153

5254
@Autowired private OperationContext operationContext;
5355

56+
@Autowired private DeploymentServiceV2 deploymentServiceV2;
57+
58+
@Autowired private DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig;
59+
5460
@Override
5561
public ResponseEntity<Void> deploymentDelete(
5662
String teamName, String jobName, String deploymentId) {
5763
if (jobsService.jobWithTeamExists(jobName, teamName)) {
5864
// TODO: deploymentId not implemented
5965
if (jobName != null) {
60-
deploymentService.deleteDeployment(jobName);
61-
return ResponseEntity.accepted().build();
66+
if (dataJobDeploymentPropertiesConfig
67+
.getWriteTos()
68+
.contains(DataJobDeploymentPropertiesConfig.WriteTo.K8S)) {
69+
deploymentService.deleteDeployment(jobName);
70+
return ResponseEntity.accepted().build();
71+
} else if (dataJobDeploymentPropertiesConfig
72+
.getWriteTos()
73+
.contains(DataJobDeploymentPropertiesConfig.WriteTo.DB)) {
74+
deploymentServiceV2.deleteDesiredDeployment(jobName);
75+
return ResponseEntity.accepted().build();
76+
}
6277
}
6378
return ResponseEntity.notFound().build();
6479
}
@@ -76,8 +91,12 @@ public ResponseEntity<Void> deploymentPatch(
7691
if (job.isPresent()) {
7792
var jobDeployment =
7893
ToModelApiConverter.toJobDeployment(teamName, jobName, dataJobDeployment);
79-
deploymentService.patchDeployment(job.get(), jobDeployment);
80-
return ResponseEntity.accepted().build();
94+
if (dataJobDeploymentPropertiesConfig
95+
.getWriteTos()
96+
.contains(DataJobDeploymentPropertiesConfig.WriteTo.K8S)) {
97+
deploymentService.patchDeployment(job.get(), jobDeployment);
98+
return ResponseEntity.accepted().build();
99+
}
81100
}
82101
}
83102
return ResponseEntity.notFound().build();
@@ -89,8 +108,12 @@ public ResponseEntity<List<DataJobDeploymentStatus>> deploymentList(
89108
if (jobsService.jobWithTeamExists(jobName, teamName)) {
90109
// TODO: deploymentId and mode not implemented
91110
List<DataJobDeploymentStatus> deployments = Collections.emptyList();
92-
Optional<JobDeploymentStatus> jobDeploymentStatus =
93-
deploymentService.readDeployment(jobName.toLowerCase());
111+
Optional<JobDeploymentStatus> jobDeploymentStatus = Optional.empty();
112+
if (dataJobDeploymentPropertiesConfig
113+
.getReadDataSource()
114+
.equals(DataJobDeploymentPropertiesConfig.ReadFrom.K8S)) {
115+
jobDeploymentStatus = deploymentService.readDeployment(jobName.toLowerCase());
116+
}
94117
if (jobDeploymentStatus.isPresent()) {
95118
deployments =
96119
Arrays.asList(ToApiModelConverter.toDataJobDeploymentStatus(jobDeploymentStatus.get()));
@@ -105,8 +128,12 @@ public ResponseEntity<DataJobDeploymentStatus> deploymentRead(
105128
String teamName, String jobName, String deploymentId) {
106129
if (jobsService.jobWithTeamExists(jobName, teamName)) {
107130
// TODO: deploymentId are not implemented.
108-
Optional<JobDeploymentStatus> jobDeploymentStatus =
109-
deploymentService.readDeployment(jobName.toLowerCase());
131+
Optional<JobDeploymentStatus> jobDeploymentStatus = Optional.empty();
132+
if (dataJobDeploymentPropertiesConfig
133+
.getReadDataSource()
134+
.equals(DataJobDeploymentPropertiesConfig.ReadFrom.K8S)) {
135+
jobDeploymentStatus = deploymentService.readDeployment(jobName.toLowerCase());
136+
}
110137
if (jobDeploymentStatus.isPresent()) {
111138
return ResponseEntity.ok(
112139
ToApiModelConverter.toDataJobDeploymentStatus(jobDeploymentStatus.get()));
@@ -128,13 +155,17 @@ public ResponseEntity<Void> deploymentUpdate(
128155
if (job.isPresent()) {
129156
var jobDeployment =
130157
ToModelApiConverter.toJobDeployment(teamName, jobName.toLowerCase(), dataJobDeployment);
131-
// TODO: Consider using a Task-oriented API approach
132-
deploymentService.updateDeployment(
133-
job.get(),
134-
jobDeployment,
135-
sendNotification,
136-
operationContext.getUser(),
137-
operationContext.getOpId());
158+
if (dataJobDeploymentPropertiesConfig
159+
.getWriteTos()
160+
.contains(DataJobDeploymentPropertiesConfig.WriteTo.K8S)) {
161+
// TODO: Consider using a Task-oriented API approach
162+
deploymentService.updateDeployment(
163+
job.get(),
164+
jobDeployment,
165+
sendNotification,
166+
operationContext.getUser(),
167+
operationContext.getOpId());
168+
}
138169

139170
return ResponseEntity.accepted().build();
140171
}

projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/deploy/DataJobsSynchronizer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ void synchronizeDataJob(
144144
desiredDataJobDeployment,
145145
actualDataJobDeployment,
146146
isDeploymentPresentInKubernetes);
147+
} else if (actualDataJobDeployment != null) {
148+
deploymentService.deleteActualDeployment(dataJob.getName());
147149
}
148150
}
149151

projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/deploy/DeploymentServiceV2.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
import com.vmware.taurus.service.notification.NotificationContent;
1313
import com.vmware.taurus.service.repository.ActualJobDeploymentRepository;
1414
import com.vmware.taurus.service.repository.DesiredJobDeploymentRepository;
15+
import com.vmware.taurus.service.repository.JobsRepository;
1516
import io.kubernetes.client.openapi.ApiException;
1617
import lombok.RequiredArgsConstructor;
1718
import org.slf4j.Logger;
1819
import org.slf4j.LoggerFactory;
1920
import org.springframework.stereotype.Service;
2021

2122
import java.util.Map;
23+
import java.util.Optional;
2224
import java.util.Set;
2325
import java.util.function.Function;
2426
import java.util.stream.Collectors;
@@ -43,6 +45,7 @@ public class DeploymentServiceV2 {
4345
private final DesiredJobDeploymentRepository desiredJobDeploymentRepository;
4446
private final ActualJobDeploymentRepository actualJobDeploymentRepository;
4547
private final DataJobsKubernetesService dataJobsKubernetesService;
48+
private final JobsRepository jobsRepository;
4649

4750
/**
4851
* Updates or creates a Kubernetes CronJob based on the provided configuration.
@@ -128,6 +131,25 @@ public void updateDeployment(
128131
}
129132
}
130133

134+
public void deleteDesiredDeployment(String dataJobName) {
135+
desiredJobDeploymentRepository.deleteById(dataJobName);
136+
}
137+
138+
public void deleteActualDeployment(String dataJobName) {
139+
if (this.deploymentExistsOrInProgress(dataJobName)) {
140+
jobImageBuilder.cancelBuildingJob(dataJobName);
141+
jobImageDeployer.unScheduleJob(dataJobName);
142+
jobsRepository.updateDataJobEnabledByName(dataJobName, false);
143+
}
144+
145+
actualJobDeploymentRepository.deleteById(dataJobName);
146+
deploymentProgress.deleted(dataJobName);
147+
}
148+
149+
public Optional<ActualDataJobDeployment> readDeployment(String dataJobName) {
150+
return actualJobDeploymentRepository.findById(dataJobName);
151+
}
152+
131153
public void updateDeploymentEnabledStatus(String dataJobName, Boolean enabledStatus) {
132154
desiredJobDeploymentRepository.updateDesiredDataJobDeploymentEnabledByDataJobName(
133155
dataJobName, enabledStatus);
@@ -172,4 +194,9 @@ private void handleException(
172194
NotificationContent.getPlatformErrorBody(),
173195
sendNotification);
174196
}
197+
198+
private boolean deploymentExistsOrInProgress(String dataJobName) {
199+
return jobImageBuilder.isBuildingJobInProgress(dataJobName)
200+
|| readDeployment(dataJobName).isPresent();
201+
}
175202
}

projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/deploy/JobImageDeployerV2.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.vmware.taurus.service.model.*;
1515
import com.vmware.taurus.service.notification.NotificationContent;
1616
import io.kubernetes.client.openapi.ApiException;
17+
import lombok.NonNull;
1718
import lombok.RequiredArgsConstructor;
1819
import lombok.extern.slf4j.Slf4j;
1920
import org.apache.commons.lang3.StringUtils;
@@ -103,6 +104,17 @@ public ActualDataJobDeployment scheduleJob(
103104
}
104105
}
105106

107+
public void unScheduleJob(@NonNull String dataJobName) {
108+
String cronJobName = getCronJobName(dataJobName);
109+
try {
110+
if (dataJobsKubernetesService.listCronJobs().contains(cronJobName)) {
111+
dataJobsKubernetesService.deleteCronJob(cronJobName);
112+
}
113+
} catch (ApiException e) {
114+
throw new KubernetesException("Failed to un-schedule job", e);
115+
}
116+
}
117+
106118
private ActualDataJobDeployment catchApiException(
107119
DataJob dataJob, Boolean sendNotification, ApiException apiException) {
108120
if (apiException.getCode() == HttpStatus.UNPROCESSABLE_ENTITY.value()) {

projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/deploy/DataJobsSynchronizerTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
package com.vmware.taurus.service.deploy;
77

88
import com.vmware.taurus.ServiceApp;
9+
import com.vmware.taurus.service.model.ActualDataJobDeployment;
10+
import com.vmware.taurus.service.model.DataJob;
11+
import com.vmware.taurus.service.model.DesiredDataJobDeployment;
912
import io.kubernetes.client.openapi.ApiException;
1013
import org.junit.jupiter.api.Test;
1114
import org.junit.jupiter.api.extension.ExtendWith;
@@ -107,6 +110,70 @@ void synchronizeDataJobs_synchronizationEnabledTrueAndWriteToDbFalse_shouldSkipS
107110
.findAllActualDeploymentNamesFromKubernetes();
108111
}
109112

113+
@Test
114+
void synchronizeDataJob_desiredDeploymentNullAndActualDeploymentNull_shouldSkipSynchronization() {
115+
DataJob dataJob = new DataJob();
116+
dataJob.setName("test-job-name");
117+
boolean isDeploymentPresentInKubernetes = false;
118+
DesiredDataJobDeployment desiredDataJobDeployment = null;
119+
ActualDataJobDeployment actualDataJobDeployment = null;
120+
121+
dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
122+
123+
Mockito.verify(deploymentService, Mockito.times(0))
124+
.updateDeployment(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
125+
Mockito.verify(deploymentService, Mockito.times(0))
126+
.deleteActualDeployment(dataJob.getName());
127+
}
128+
129+
@Test
130+
void synchronizeDataJob_desiredDeploymentNullAndActualDeploymentNotNull_shouldDeleteJobDeployment() {
131+
DataJob dataJob = new DataJob();
132+
dataJob.setName("test-job-name");
133+
boolean isDeploymentPresentInKubernetes = true;
134+
DesiredDataJobDeployment desiredDataJobDeployment = null;
135+
ActualDataJobDeployment actualDataJobDeployment = new ActualDataJobDeployment();
136+
137+
dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
138+
139+
Mockito.verify(deploymentService, Mockito.times(0))
140+
.updateDeployment(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
141+
Mockito.verify(deploymentService, Mockito.times(1))
142+
.deleteActualDeployment(dataJob.getName());
143+
}
144+
145+
@Test
146+
void synchronizeDataJob_desiredDeploymentNotNullAndActualDeploymentNotNull_shouldUpdateJobDeployment() {
147+
DataJob dataJob = new DataJob();
148+
dataJob.setName("test-job-name");
149+
boolean isDeploymentPresentInKubernetes = true;
150+
DesiredDataJobDeployment desiredDataJobDeployment = new DesiredDataJobDeployment();
151+
ActualDataJobDeployment actualDataJobDeployment = new ActualDataJobDeployment();
152+
153+
dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
154+
155+
Mockito.verify(deploymentService, Mockito.times(1))
156+
.updateDeployment(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
157+
Mockito.verify(deploymentService, Mockito.times(0))
158+
.deleteActualDeployment(dataJob.getName());
159+
}
160+
161+
@Test
162+
void synchronizeDataJob_desiredDeploymentNotNullAndActualDeploymentNull_shouldUpdateJobDeployment() {
163+
DataJob dataJob = new DataJob();
164+
dataJob.setName("test-job-name");
165+
boolean isDeploymentPresentInKubernetes = true;
166+
DesiredDataJobDeployment desiredDataJobDeployment = new DesiredDataJobDeployment();
167+
ActualDataJobDeployment actualDataJobDeployment = null;
168+
169+
dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
170+
171+
Mockito.verify(deploymentService, Mockito.times(1))
172+
.updateDeployment(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
173+
Mockito.verify(deploymentService, Mockito.times(0))
174+
.deleteActualDeployment(dataJob.getName());
175+
}
176+
110177
void enableSynchronizationProcess() {
111178
initSynchronizationProcessConfig(true, true);
112179
}

0 commit comments

Comments
 (0)