Skip to content

Commit 32835d1

Browse files
committed
control-service: async deployment deletion
Why: As part of VEP-2272, we need to introduce a process for synchronizing data jobs from the database to Kubernetes. Currently, the process lacks the ability to delete deployments asynchronously. What: We have introduced an asynchronous method for deployment deletion. It is based on the logic that if the desired deployment is null and the actual deployment is not null, the process performs deployment deletion. Testing done: Integration tests. Signed-off-by: Miroslav Ivanov miroslavi@vmware.com
1 parent fd8d2a2 commit 32835d1

File tree

9 files changed

+178
-16
lines changed

9 files changed

+178
-16
lines changed

projects/control-service/projects/pipelines_control_service/src/integration-test/java/com/vmware/taurus/service/deploy/DataJobDeploymentCrudITV2.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,13 @@ public void testSynchronizeDataJob() throws Exception {
159159
Assertions.assertNotEquals(lastDeployedDateInitial, lastDeployedDateShouldBeChanged);
160160
Assertions.assertNotEquals(
161161
deploymentVersionShaShouldNotBeChanged, deploymentVersionShaShouldBeChanged);
162+
163+
// Deletes deployment
164+
desiredJobDeploymentRepository.deleteById(testJobName);
165+
dataJobsSynchronizer.synchronizeDataJob(
166+
dataJob, null, actualDataJobDeployment, true);
167+
Assertions.assertFalse(deploymentService.readDeployment(testJobName).isPresent());
168+
Assertions.assertFalse(actualJobDeploymentRepository.findById(testJobName).isPresent());
162169
}
163170

164171
@Test

projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/datajobs/DataJobsDeploymentController.java

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.util.List;
3333
import java.util.Optional;
3434

35+
import static com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig.*;
36+
3537
/**
3638
* REST controller for operations on data job deployments
3739
*
@@ -65,7 +67,15 @@ public ResponseEntity<Void> deploymentDelete(
6567
if (jobsService.jobWithTeamExists(jobName, teamName)) {
6668
// TODO: deploymentId not implemented
6769
if (jobName != null) {
68-
deploymentService.deleteDeployment(jobName);
70+
if (dataJobDeploymentPropertiesConfig.getWriteTos().contains(WriteTo.K8S)) {
71+
deploymentService.deleteDeployment(jobName);
72+
73+
}
74+
75+
if (dataJobDeploymentPropertiesConfig.getWriteTos().contains(WriteTo.DB)) {
76+
deploymentServiceV2.deleteDesiredDeployment(jobName);
77+
}
78+
6979
return ResponseEntity.accepted().build();
7080
}
7181
return ResponseEntity.notFound().build();
@@ -84,11 +94,16 @@ public ResponseEntity<Void> deploymentPatch(
8494
if (job.isPresent()) {
8595
var jobDeployment =
8696
ToModelApiConverter.toJobDeployment(teamName, jobName, dataJobDeployment);
87-
deploymentService.patchDeployment(job.get(), jobDeployment);
97+
98+
if (dataJobDeploymentPropertiesConfig.getWriteTos().contains(WriteTo.K8S)) {
99+
deploymentService.patchDeployment(job.get(), jobDeployment);
100+
}
101+
88102
if (dataJobDeploymentPropertiesConfig.getWriteTos().contains(WriteTo.DB)) {
89103
deploymentServiceV2.patchDesiredDbDeployment(
90104
job.get(), jobDeployment, operationContext.getUser());
91105
}
106+
92107
return ResponseEntity.accepted().build();
93108
}
94109
}
@@ -101,8 +116,12 @@ public ResponseEntity<List<DataJobDeploymentStatus>> deploymentList(
101116
if (jobsService.jobWithTeamExists(jobName, teamName)) {
102117
// TODO: deploymentId and mode not implemented
103118
List<DataJobDeploymentStatus> deployments = Collections.emptyList();
104-
Optional<JobDeploymentStatus> jobDeploymentStatus =
105-
deploymentService.readDeployment(jobName.toLowerCase());
119+
Optional<JobDeploymentStatus> jobDeploymentStatus = Optional.empty();
120+
if (dataJobDeploymentPropertiesConfig
121+
.getReadDataSource()
122+
.equals(ReadFrom.K8S)) {
123+
jobDeploymentStatus = deploymentService.readDeployment(jobName.toLowerCase());
124+
}
106125
if (jobDeploymentStatus.isPresent()) {
107126
deployments =
108127
Arrays.asList(ToApiModelConverter.toDataJobDeploymentStatus(jobDeploymentStatus.get()));
@@ -117,8 +136,12 @@ public ResponseEntity<DataJobDeploymentStatus> deploymentRead(
117136
String teamName, String jobName, String deploymentId) {
118137
if (jobsService.jobWithTeamExists(jobName, teamName)) {
119138
// TODO: deploymentId are not implemented.
120-
Optional<JobDeploymentStatus> jobDeploymentStatus =
121-
deploymentService.readDeployment(jobName.toLowerCase());
139+
Optional<JobDeploymentStatus> jobDeploymentStatus = Optional.empty();
140+
if (dataJobDeploymentPropertiesConfig
141+
.getReadDataSource()
142+
.equals(ReadFrom.K8S)) {
143+
jobDeploymentStatus = deploymentService.readDeployment(jobName.toLowerCase());
144+
}
122145
if (jobDeploymentStatus.isPresent()) {
123146
return ResponseEntity.ok(
124147
ToApiModelConverter.toDataJobDeploymentStatus(jobDeploymentStatus.get()));
@@ -141,17 +164,24 @@ public ResponseEntity<Void> deploymentUpdate(
141164
if (job.isPresent()) {
142165
var jobDeployment =
143166
ToModelApiConverter.toJobDeployment(teamName, jobName.toLowerCase(), dataJobDeployment);
144-
// TODO: Consider using a Task-oriented API approach
145-
deploymentService.updateDeployment(
146-
job.get(),
147-
jobDeployment,
148-
sendNotification,
149-
operationContext.getUser(),
150-
operationContext.getOpId());
167+
168+
if (dataJobDeploymentPropertiesConfig
169+
.getWriteTos()
170+
.contains(WriteTo.K8S)) {
171+
// TODO: Consider using a Task-oriented API approach
172+
deploymentService.updateDeployment(
173+
job.get(),
174+
jobDeployment,
175+
sendNotification,
176+
operationContext.getUser(),
177+
operationContext.getOpId());
178+
}
179+
151180
if (dataJobDeploymentPropertiesConfig.getWriteTos().contains(WriteTo.DB)) {
152-
deploymentServiceV2.updateDesiredDbDeployment(
153-
job.get(), jobDeployment, operationContext.getUser());
181+
deploymentServiceV2.updateDesiredDbDeployment(
182+
job.get(), jobDeployment, operationContext.getUser());
154183
}
184+
155185
return ResponseEntity.accepted().build();
156186
}
157187
}

projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/JobsService.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public class JobsService {
5959
private final DeploymentServiceV2 deploymentServiceV2;
6060
private final DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig;
6161

62+
@Transactional
6263
public JobOperationResult deleteJob(String name) {
6364
if (!jobsRepository.existsById(name)) {
6465
return JobOperationResult.builder().completed(false).build();
@@ -69,7 +70,19 @@ public JobOperationResult deleteJob(String name) {
6970
Optional<WebHookResult> resultHolder = postDeleteWebHookProvider.invokeWebHook(requestBody);
7071
if (isInvocationSuccessful(resultHolder)) {
7172
credentialsService.deleteJobCredentials(name);
72-
deploymentService.deleteDeployment(name);
73+
74+
if (dataJobDeploymentPropertiesConfig
75+
.getWriteTos()
76+
.contains(DataJobDeploymentPropertiesConfig.WriteTo.K8S)) {
77+
deploymentService.deleteDeployment(name);
78+
}
79+
80+
if (dataJobDeploymentPropertiesConfig
81+
.getWriteTos()
82+
.contains(DataJobDeploymentPropertiesConfig.WriteTo.DB)) {
83+
deploymentServiceV2.deleteDesiredDeployment(name);
84+
}
85+
7386
jobsRepository.deleteById(name);
7487
dataJobMetrics.clearGauges(name);
7588

projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/deploy/DataJobsSynchronizer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ void synchronizeDataJob(
144144
desiredDataJobDeployment,
145145
actualDataJobDeployment,
146146
isDeploymentPresentInKubernetes);
147+
} else if (actualDataJobDeployment != null) {
148+
deploymentService.deleteActualDeployment(dataJob.getName());
147149
}
148150
}
149151

projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/deploy/DeploymentServiceV2.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import com.vmware.taurus.service.notification.NotificationContent;
2020
import com.vmware.taurus.service.repository.ActualJobDeploymentRepository;
2121
import com.vmware.taurus.service.repository.DesiredJobDeploymentRepository;
22+
import com.vmware.taurus.service.repository.JobsRepository;
2223
import io.kubernetes.client.openapi.ApiException;
2324
import java.util.Map;
25+
import java.util.Optional;
2426
import java.util.Set;
2527
import java.util.function.Function;
2628
import java.util.stream.Collectors;
@@ -50,6 +52,7 @@ public class DeploymentServiceV2 {
5052
private final DesiredJobDeploymentRepository desiredJobDeploymentRepository;
5153
private final ActualJobDeploymentRepository actualJobDeploymentRepository;
5254
private final DataJobsKubernetesService dataJobsKubernetesService;
55+
private final JobsRepository jobsRepository;
5356

5457
/**
5558
* This method updates an existing job deployment in the database. Only fields present in the job
@@ -186,6 +189,25 @@ public void updateDeployment(
186189
}
187190
}
188191

192+
public void deleteDesiredDeployment(String dataJobName) {
193+
desiredJobDeploymentRepository.deleteById(dataJobName);
194+
}
195+
196+
public void deleteActualDeployment(String dataJobName) {
197+
if (this.deploymentExistsOrInProgress(dataJobName)) {
198+
jobImageBuilder.cancelBuildingJob(dataJobName);
199+
jobImageDeployer.unScheduleJob(dataJobName);
200+
jobsRepository.updateDataJobEnabledByName(dataJobName, false);
201+
}
202+
203+
actualJobDeploymentRepository.deleteById(dataJobName);
204+
deploymentProgress.deleted(dataJobName);
205+
}
206+
207+
public Optional<ActualDataJobDeployment> readDeployment(String dataJobName) {
208+
return actualJobDeploymentRepository.findById(dataJobName);
209+
}
210+
189211
public void updateDeploymentEnabledStatus(String dataJobName, Boolean enabledStatus) {
190212
desiredJobDeploymentRepository.updateDesiredDataJobDeploymentEnabledByDataJobName(
191213
dataJobName, enabledStatus);
@@ -230,4 +252,9 @@ private void handleException(
230252
NotificationContent.getPlatformErrorBody(),
231253
sendNotification);
232254
}
255+
256+
private boolean deploymentExistsOrInProgress(String dataJobName) {
257+
return jobImageBuilder.isBuildingJobInProgress(dataJobName)
258+
|| readDeployment(dataJobName).isPresent();
259+
}
233260
}

projects/control-service/projects/pipelines_control_service/src/main/java/com/vmware/taurus/service/deploy/JobImageDeployerV2.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.vmware.taurus.service.model.*;
1515
import com.vmware.taurus.service.notification.NotificationContent;
1616
import io.kubernetes.client.openapi.ApiException;
17+
import lombok.NonNull;
1718
import lombok.RequiredArgsConstructor;
1819
import lombok.extern.slf4j.Slf4j;
1920
import org.apache.commons.lang3.StringUtils;
@@ -103,6 +104,17 @@ public ActualDataJobDeployment scheduleJob(
103104
}
104105
}
105106

107+
public void unScheduleJob(@NonNull String dataJobName) {
108+
String cronJobName = getCronJobName(dataJobName);
109+
try {
110+
if (dataJobsKubernetesService.listCronJobs().contains(cronJobName)) {
111+
dataJobsKubernetesService.deleteCronJob(cronJobName);
112+
}
113+
} catch (ApiException e) {
114+
throw new KubernetesException("Failed to un-schedule job", e);
115+
}
116+
}
117+
106118
private ActualDataJobDeployment catchApiException(
107119
DataJob dataJob, Boolean sendNotification, ApiException apiException) {
108120
if (apiException.getCode() == HttpStatus.UNPROCESSABLE_ENTITY.value()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
alter table if exists desired_data_job_deployment
2+
drop constraint if exists fk_data_job_name_ref_data_job;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
alter table if exists actual_data_job_deployment
2+
drop constraint if exists fk_data_job_name_ref_data_job;

projects/control-service/projects/pipelines_control_service/src/test/java/com/vmware/taurus/service/deploy/DataJobsSynchronizerTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
package com.vmware.taurus.service.deploy;
77

88
import com.vmware.taurus.ServiceApp;
9+
import com.vmware.taurus.service.model.ActualDataJobDeployment;
10+
import com.vmware.taurus.service.model.DataJob;
11+
import com.vmware.taurus.service.model.DesiredDataJobDeployment;
912
import io.kubernetes.client.openapi.ApiException;
1013
import org.junit.jupiter.api.Test;
1114
import org.junit.jupiter.api.extension.ExtendWith;
@@ -107,6 +110,70 @@ void synchronizeDataJobs_synchronizationEnabledTrueAndWriteToDbFalse_shouldSkipS
107110
.findAllActualDeploymentNamesFromKubernetes();
108111
}
109112

113+
@Test
114+
void synchronizeDataJob_desiredDeploymentNullAndActualDeploymentNull_shouldSkipSynchronization() {
115+
DataJob dataJob = new DataJob();
116+
dataJob.setName("test-job-name");
117+
boolean isDeploymentPresentInKubernetes = false;
118+
DesiredDataJobDeployment desiredDataJobDeployment = null;
119+
ActualDataJobDeployment actualDataJobDeployment = null;
120+
121+
dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
122+
123+
Mockito.verify(deploymentService, Mockito.times(0))
124+
.updateDeployment(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
125+
Mockito.verify(deploymentService, Mockito.times(0))
126+
.deleteActualDeployment(dataJob.getName());
127+
}
128+
129+
@Test
130+
void synchronizeDataJob_desiredDeploymentNullAndActualDeploymentNotNull_shouldDeleteJobDeployment() {
131+
DataJob dataJob = new DataJob();
132+
dataJob.setName("test-job-name");
133+
boolean isDeploymentPresentInKubernetes = true;
134+
DesiredDataJobDeployment desiredDataJobDeployment = null;
135+
ActualDataJobDeployment actualDataJobDeployment = new ActualDataJobDeployment();
136+
137+
dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
138+
139+
Mockito.verify(deploymentService, Mockito.times(0))
140+
.updateDeployment(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
141+
Mockito.verify(deploymentService, Mockito.times(1))
142+
.deleteActualDeployment(dataJob.getName());
143+
}
144+
145+
@Test
146+
void synchronizeDataJob_desiredDeploymentNotNullAndActualDeploymentNotNull_shouldUpdateJobDeployment() {
147+
DataJob dataJob = new DataJob();
148+
dataJob.setName("test-job-name");
149+
boolean isDeploymentPresentInKubernetes = true;
150+
DesiredDataJobDeployment desiredDataJobDeployment = new DesiredDataJobDeployment();
151+
ActualDataJobDeployment actualDataJobDeployment = new ActualDataJobDeployment();
152+
153+
dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
154+
155+
Mockito.verify(deploymentService, Mockito.times(1))
156+
.updateDeployment(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
157+
Mockito.verify(deploymentService, Mockito.times(0))
158+
.deleteActualDeployment(dataJob.getName());
159+
}
160+
161+
@Test
162+
void synchronizeDataJob_desiredDeploymentNotNullAndActualDeploymentNull_shouldUpdateJobDeployment() {
163+
DataJob dataJob = new DataJob();
164+
dataJob.setName("test-job-name");
165+
boolean isDeploymentPresentInKubernetes = true;
166+
DesiredDataJobDeployment desiredDataJobDeployment = new DesiredDataJobDeployment();
167+
ActualDataJobDeployment actualDataJobDeployment = null;
168+
169+
dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
170+
171+
Mockito.verify(deploymentService, Mockito.times(1))
172+
.updateDeployment(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
173+
Mockito.verify(deploymentService, Mockito.times(0))
174+
.deleteActualDeployment(dataJob.getName());
175+
}
176+
110177
void enableSynchronizationProcess() {
111178
initSynchronizationProcessConfig(true, true);
112179
}

0 commit comments

Comments
 (0)