Skip to content

Commit 7b4ee63

Browse files
authored
[Spark] Managed Commits: add a DynamoDB-based commit owner (#3107)
## Description Taking inspiration from #339, this PR adds a Commit Owner Client which uses DynamoDB as the backend. Each Delta table managed by a DynamoDB instance will have one corresponding entry in a DynamoDB table. The table schema is as follows: * tableId: String --- The unique identifier for the entry. This is a UUID. * path: String --- The fully qualified path of the table in the file system. e.g. s3://bucket/path. * acceptingCommits: Boolean --- Whether the commit owner is accepting new commits. This will only * be set to false when the table is converted from managed commits to file system commits. * tableVersion: Number --- The version of the latest commit. * tableTimestamp: Number --- The inCommitTimestamp of the latest commit. * schemaVersion: Number --- The version of the schema used to store the data. * commits: --- The list of unbackfilled commits. - version: Number --- The version of the commit. - inCommitTimestamp: Number --- The inCommitTimestamp of the commit. - fsName: String --- The name of the unbackfilled file. - fsLength: Number --- The length of the unbackfilled file. - fsTimestamp: Number --- The modification time of the unbackfilled file. For a table to be managed by DynamoDB, `registerTable` must be called for that Delta table. This will create a new entry in the db for this Delta table. Every `commit` invocation appends the UUID delta file status to the `commits` list in the table entry. `commit` is performed through a conditional write in DynamoDB. ## How was this patch tested? Added a new suite called `DynamoDBCommitOwnerClient5BackfillSuite` which uses a mock DynamoDB client. + plus manual testing against a DynamoDB instance.
1 parent 57df2c0 commit 7b4ee63

File tree

9 files changed

+1177
-44
lines changed

9 files changed

+1177
-44
lines changed

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ lazy val spark = (project in file("spark"))
202202
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
203203
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
204204
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",
205+
// For DynamoDBCommitStore
206+
"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided",
205207

206208
// Test deps
207209
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",

spark/src/main/java/io/delta/dynamodbcommitstore/DynamoDBCommitOwnerClient.java

Lines changed: 681 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.dynamodbcommitstore;
18+
19+
import com.amazonaws.auth.AWSCredentialsProvider;
20+
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
21+
import org.apache.spark.sql.delta.managedcommit.CommitOwnerBuilder;
22+
import org.apache.spark.sql.delta.managedcommit.CommitOwnerClient;
23+
import org.apache.spark.sql.SparkSession;
24+
import scala.collection.immutable.Map;
25+
26+
import java.lang.reflect.InvocationTargetException;
27+
28+
public class DynamoDBCommitOwnerClientBuilder implements CommitOwnerBuilder {
29+
30+
private final long BACKFILL_BATCH_SIZE = 1L;
31+
32+
@Override
33+
public String getName() {
34+
return "dynamodb";
35+
}
36+
37+
/**
38+
* Key for the name of the DynamoDB table which stores all the unbackfilled
39+
* commits for this owner. The value of this key is stored in the `conf`
40+
* which is passed to the `build` method.
41+
*/
42+
private static final String MANAGED_COMMITS_TABLE_NAME_KEY = "managedCommitsTableName";
43+
/**
44+
* Key for the endpoint of the DynamoDB service. The value of this key is stored in the
45+
* `conf` which is passed to the `build` method.
46+
*/
47+
private static final String DYNAMO_DB_ENDPOINT_KEY = "dynamoDBEndpoint";
48+
49+
/**
50+
* The AWS credentials provider chain to use when creating the DynamoDB client.
51+
* This has temporarily been hardcoded until we have a way to read from sparkSession.
52+
*/
53+
private static final String AWS_CREDENTIALS_PROVIDER =
54+
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain";
55+
56+
// TODO: update this interface so that it can take a sparkSession.
57+
@Override
58+
public CommitOwnerClient build(SparkSession spark, Map<String, String> conf) {
59+
String managedCommitsTableName = conf.get(MANAGED_COMMITS_TABLE_NAME_KEY).getOrElse(() -> {
60+
throw new RuntimeException(MANAGED_COMMITS_TABLE_NAME_KEY + " not found");
61+
});
62+
String dynamoDBEndpoint = conf.get(DYNAMO_DB_ENDPOINT_KEY).getOrElse(() -> {
63+
throw new RuntimeException(DYNAMO_DB_ENDPOINT_KEY + " not found");
64+
});
65+
try {
66+
AmazonDynamoDBClient client =
67+
createAmazonDDBClient(dynamoDBEndpoint, AWS_CREDENTIALS_PROVIDER);
68+
return new DynamoDBCommitOwnerClient(
69+
managedCommitsTableName, dynamoDBEndpoint, client, BACKFILL_BATCH_SIZE);
70+
} catch (Exception e) {
71+
throw new RuntimeException("Failed to create DynamoDB client", e);
72+
}
73+
}
74+
75+
private AmazonDynamoDBClient createAmazonDDBClient(
76+
String endpoint,
77+
String credentialProviderName
78+
) throws NoSuchMethodException,
79+
ClassNotFoundException,
80+
InvocationTargetException,
81+
InstantiationException,
82+
IllegalAccessException {
83+
Class<?> awsCredentialsProviderClass = Class.forName(credentialProviderName);
84+
AWSCredentialsProvider awsCredentialsProvider =
85+
(AWSCredentialsProvider) awsCredentialsProviderClass.getConstructor().newInstance();
86+
AmazonDynamoDBClient client = new AmazonDynamoDBClient(awsCredentialsProvider);
87+
client.setEndpoint(endpoint);
88+
return client;
89+
}
90+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.dynamodbcommitstore;
18+
19+
/**
20+
* Defines the field names used in the DynamoDB table entry.
21+
*/
22+
final class DynamoDBTableEntryConstants {
23+
private DynamoDBTableEntryConstants() {}
24+
25+
/** The primary key of the DynamoDB table. */
26+
public static final String TABLE_ID = "tableId";
27+
/** The version of the latest commit in the corresponding Delta table. */
28+
public static final String TABLE_LATEST_VERSION = "tableVersion";
29+
/** The inCommitTimestamp of the latest commit in the corresponding Delta table. */
30+
public static final String TABLE_LATEST_TIMESTAMP = "tableTimestamp";
31+
/** Whether this commit owner is accepting more commits for the corresponding Delta table. */
32+
public static final String ACCEPTING_COMMITS = "acceptingCommits";
33+
/** The path of the corresponding Delta table. */
34+
public static final String TABLE_PATH = "path";
35+
/** The schema version of this DynamoDB table entry. */
36+
public static final String SCHEMA_VERSION = "schemaVersion";
37+
/** The name of the field used to store unbackfilled commits. */
38+
public static final String COMMITS = "commits";
39+
/** The unbackfilled commit version. */
40+
public static final String COMMIT_VERSION = "version";
41+
/** The inCommitTimestamp of the unbackfilled commit. */
42+
public static final String COMMIT_TIMESTAMP = "timestamp";
43+
/** The name of the unbackfilled file. e.g. 00001.uuid.json */
44+
public static final String COMMIT_FILE_NAME = "fsName";
45+
/** The length of the unbackfilled file as per the file status. */
46+
public static final String COMMIT_FILE_LENGTH = "fsLength";
47+
/** The modification timestamp of the unbackfilled file as per the file status. */
48+
public static final String COMMIT_FILE_MODIFICATION_TIMESTAMP = "fsTimestamp";
49+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.dynamodbcommitstore;
18+
19+
import org.apache.spark.sql.delta.managedcommit.AbstractMetadata;
20+
import org.apache.spark.sql.delta.managedcommit.UpdatedActions;
21+
import org.apache.hadoop.fs.Path;
22+
23+
import java.util.UUID;
24+
25+
public class ManagedCommitUtils {
26+
27+
private ManagedCommitUtils() {}
28+
29+
/** The subdirectory in which to store the unbackfilled commit files. */
30+
final static String COMMIT_SUBDIR = "_commits";
31+
32+
/** The configuration key for the managed commit owner. */
33+
private static final String MANAGED_COMMIT_OWNER_CONF_KEY =
34+
"delta.managedCommits.commitOwner-dev";
35+
36+
/**
37+
* Creates a new unbackfilled delta file path for the given commit version.
38+
* The path is of the form `tablePath/_delta_log/_commits/00000000000000000001.uuid.json`.
39+
*/
40+
public static Path generateUnbackfilledDeltaFilePath(
41+
Path logPath,
42+
long version) {
43+
String uuid = UUID.randomUUID().toString();
44+
Path basePath = new Path(logPath, COMMIT_SUBDIR);
45+
return new Path(basePath, String.format("%020d.%s.json", version, uuid));
46+
}
47+
48+
/**
49+
* Returns the path to the backfilled delta file for the given commit version.
50+
* The path is of the form `tablePath/_delta_log/00000000000000000001.json`.
51+
*/
52+
public static Path getBackfilledDeltaFilePath(
53+
Path logPath,
54+
Long version) {
55+
return new Path(logPath, String.format("%020d.json", version));
56+
}
57+
58+
private static String getManagedCommitOwner(AbstractMetadata metadata) {
59+
return metadata
60+
.getConfiguration()
61+
.get(MANAGED_COMMIT_OWNER_CONF_KEY)
62+
.getOrElse(() -> "");
63+
}
64+
65+
/**
66+
* Returns true if the commit is a managed commit to filesystem conversion.
67+
*/
68+
public static boolean isManagedCommitToFSConversion(
69+
Long commitVersion,
70+
UpdatedActions updatedActions) {
71+
boolean oldMetadataHasManagedCommits =
72+
!getManagedCommitOwner(updatedActions.getOldMetadata()).isEmpty();
73+
boolean newMetadataHasManagedCommits =
74+
!getManagedCommitOwner(updatedActions.getNewMetadata()).isEmpty();
75+
return oldMetadataHasManagedCommits && !newMetadataHasManagedCommits && commitVersion > 0;
76+
}
77+
}

spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitOwnerClient.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.managedcommit
1919
import scala.collection.mutable
2020

2121
import org.apache.spark.sql.delta.storage.LogStore
22+
import io.delta.dynamodbcommitstore.DynamoDBCommitOwnerClientBuilder
2223
import org.apache.hadoop.conf.Configuration
2324
import org.apache.hadoop.fs.{FileStatus, Path}
2425

@@ -45,7 +46,7 @@ case class Commit(
4546
case class CommitFailedException(
4647
private val retryable: Boolean,
4748
private val conflict: Boolean,
48-
private val message: String) extends Exception(message) {
49+
private val message: String) extends RuntimeException(message) {
4950
def getRetryable: Boolean = retryable
5051
def getConflict: Boolean = conflict
5152
}
@@ -237,7 +238,7 @@ object CommitOwnerProvider {
237238
}
238239

239240
private val initialCommitOwnerBuilders = Seq[CommitOwnerBuilder](
240-
// Any new commit-owner builder will be registered here.
241+
new DynamoDBCommitOwnerClientBuilder()
241242
)
242243
initialCommitOwnerBuilders.foreach(registerBuilder)
243244
}

0 commit comments

Comments
 (0)