Skip to content

Commit 9fa812e

Browse files
committed
[590] Add RunCatalogSync utility for synchronizing tables across catalogs
1 parent c9deec1 commit 9fa812e

File tree

22 files changed

+1532
-56
lines changed

22 files changed

+1532
-56
lines changed

xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
package org.apache.xtable.conversion;
2020

21+
import java.util.Collections;
2122
import java.util.List;
23+
import java.util.Map;
2224

2325
import lombok.Builder;
2426
import lombok.NonNull;
@@ -29,22 +31,30 @@
2931
import org.apache.xtable.model.sync.SyncMode;
3032

3133
@Value
34+
@Builder
3235
public class ConversionConfig {
3336
// The source of the sync
3437
@NonNull SourceTable sourceTable;
3538
// One or more targets to sync the table metadata to
3639
List<TargetTable> targetTables;
40+
// Each target table can be synced to multiple target catalogs, this is map from
41+
// targetTable to target catalogs.
42+
Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs;
3743
// The mode, incremental or snapshot
3844
SyncMode syncMode;
3945

4046
@Builder
4147
ConversionConfig(
42-
@NonNull SourceTable sourceTable, List<TargetTable> targetTables, SyncMode syncMode) {
48+
@NonNull SourceTable sourceTable,
49+
List<TargetTable> targetTables,
50+
Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs,
51+
SyncMode syncMode) {
4352
this.sourceTable = sourceTable;
4453
this.targetTables = targetTables;
4554
Preconditions.checkArgument(
4655
targetTables != null && !targetTables.isEmpty(),
4756
"Please provide at-least one format to sync");
57+
this.targetCatalogs = targetCatalogs == null ? Collections.emptyMap() : targetCatalogs;
4858
this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
4959
}
5060
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.conversion;
20+
21+
import java.util.Collections;
22+
import java.util.Map;
23+
24+
import lombok.Builder;
25+
import lombok.NonNull;
26+
import lombok.Value;
27+
28+
/**
29+
* Defines the configuration for an external catalog, user needs to populate at-least one of {@link
30+
* ExternalCatalogConfig#catalogType} or {@link ExternalCatalogConfig#catalogSyncClientImpl}
31+
*/
32+
@Value
33+
@Builder
34+
public class ExternalCatalogConfig {
35+
/**
36+
* A user-defined unique identifier for the catalog, allows user to sync table to multiple
37+
* catalogs of the same name/type eg: HMS catalog with url1, HMS catalog with url2
38+
*/
39+
@NonNull String catalogId;
40+
41+
/**
42+
* The type of the catalog. If the catalogType implementation exists in XTable, the implementation
43+
* class will be inferred.
44+
*/
45+
String catalogType;
46+
47+
/**
48+
* (Optional) A fully qualified class name that implements the interface for {@link
49+
* org.apache.xtable.spi.sync.CatalogSyncClient}, it can be used if the implementation for
50+
* catalogType doesn't exist in XTable.
51+
*/
52+
String catalogSyncClientImpl;
53+
54+
/**
55+
* (Optional) A fully qualified class name that implements the interface for {@link
56+
* org.apache.xtable.spi.extractor.CatalogConversionSource} it can be used if the implementation
57+
* for catalogType doesn't exist in XTable.
58+
*/
59+
String catalogConversionSourceImpl;
60+
61+
/**
62+
* The properties for this catalog, used for providing any custom behaviour during catalog sync
63+
*/
64+
@NonNull @Builder.Default Map<String, String> catalogProperties = Collections.emptyMap();
65+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.conversion;
20+
21+
import lombok.Builder;
22+
import lombok.NonNull;
23+
import lombok.Value;
24+
25+
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
26+
27+
/**
28+
* TargetCatalogConfig contains the parameters that are required when syncing {@link TargetTable} to
29+
* a catalog.
30+
*/
31+
@Value
32+
@Builder
33+
public class TargetCatalogConfig {
34+
/** The tableIdentifier that will be used when syncing {@link TargetTable} to the catalog. */
35+
@NonNull CatalogTableIdentifier catalogTableIdentifier;
36+
37+
/** Configuration for the catalog. */
38+
@NonNull ExternalCatalogConfig catalogConfig;
39+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.model.storage;
20+
21+
/**
22+
* Default constants for supported catalog types.
23+
*
24+
* @since 0.1
25+
*/
26+
public class CatalogType {
27+
public static final String STORAGE = "STORAGE";
28+
}

xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,7 @@
2929
public interface CatalogConversionSource {
3030
/** Returns the source table object present in the catalog. */
3131
SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier);
32+
33+
/** Returns the {@link org.apache.xtable.model.storage.CatalogType} for the catalog conversion */
34+
String getCatalogType();
3235
}

xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ public interface CatalogSyncClient<TABLE> extends AutoCloseable {
3434
*/
3535
String getCatalogId();
3636

37+
/** Returns the {@link org.apache.xtable.model.storage.CatalogType} the client syncs to */
38+
String getCatalogType();
39+
3740
/** Returns the storage location of the table synced to the catalog. */
3841
String getStorageLocation(TABLE table);
3942

xtable-core/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,4 +174,24 @@
174174
<scope>test</scope>
175175
</dependency>
176176
</dependencies>
177+
178+
<build>
179+
<plugins>
180+
<plugin>
181+
<groupId>org.apache.maven.plugins</groupId>
182+
<artifactId>maven-jar-plugin</artifactId>
183+
<executions>
184+
<execution>
185+
<goals>
186+
<goal>test-jar</goal>
187+
</goals>
188+
<phase>test-compile</phase>
189+
</execution>
190+
</executions>
191+
<configuration>
192+
<skip>false</skip>
193+
</configuration>
194+
</plugin>
195+
</plugins>
196+
</build>
177197
</project>
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.catalog;
20+
21+
import java.util.ServiceLoader;
22+
import java.util.function.Function;
23+
24+
import lombok.AccessLevel;
25+
import lombok.NoArgsConstructor;
26+
27+
import org.apache.commons.lang3.StringUtils;
28+
import org.apache.hadoop.conf.Configuration;
29+
30+
import org.apache.xtable.conversion.ExternalCatalogConfig;
31+
import org.apache.xtable.exception.NotSupportedException;
32+
import org.apache.xtable.reflection.ReflectionUtils;
33+
import org.apache.xtable.spi.extractor.CatalogConversionSource;
34+
import org.apache.xtable.spi.sync.CatalogSyncClient;
35+
36+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
37+
public class CatalogConversionFactory {
38+
private static final CatalogConversionFactory INSTANCE = new CatalogConversionFactory();
39+
40+
public static CatalogConversionFactory getInstance() {
41+
return INSTANCE;
42+
}
43+
44+
/**
45+
* Returns an implementation class for {@link CatalogConversionSource} that's used for converting
46+
* table definitions in the catalog to {@link org.apache.xtable.conversion.SourceTable} object.
47+
*
48+
* @param sourceCatalogConfig configuration for the source catalog
49+
* @param configuration hadoop configuration
50+
*/
51+
public static CatalogConversionSource createCatalogConversionSource(
52+
ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) {
53+
if (!StringUtils.isEmpty(sourceCatalogConfig.getCatalogType())) {
54+
return findInstance(
55+
CatalogConversionSource.class,
56+
sourceCatalogConfig.getCatalogType(),
57+
CatalogConversionSource::getCatalogType);
58+
}
59+
return ReflectionUtils.createInstanceOfClass(
60+
sourceCatalogConfig.getCatalogConversionSourceImpl(), sourceCatalogConfig, configuration);
61+
}
62+
63+
/**
64+
* Returns an implementation class for {@link CatalogSyncClient} that's used for syncing {@link
65+
* org.apache.xtable.conversion.TargetTable} to a catalog.
66+
*
67+
* @param targetCatalogConfig configuration for the target catalog
68+
* @param configuration hadoop configuration
69+
*/
70+
public <TABLE> CatalogSyncClient<TABLE> createCatalogSyncClient(
71+
ExternalCatalogConfig targetCatalogConfig, String tableFormat, Configuration configuration) {
72+
if (!StringUtils.isEmpty(targetCatalogConfig.getCatalogType())) {
73+
return findInstance(
74+
CatalogSyncClient.class,
75+
targetCatalogConfig.getCatalogType(),
76+
CatalogSyncClient::getCatalogType);
77+
}
78+
return ReflectionUtils.createInstanceOfClass(
79+
targetCatalogConfig.getCatalogSyncClientImpl(),
80+
targetCatalogConfig,
81+
tableFormat,
82+
configuration);
83+
}
84+
85+
private static <T> T findInstance(
86+
Class<T> serviceClass, String catalogType, Function<T, String> catalogTypeExtractor) {
87+
ServiceLoader<T> loader = ServiceLoader.load(serviceClass);
88+
for (T instance : loader) {
89+
String instanceCatalogType = catalogTypeExtractor.apply(instance);
90+
if (catalogType.equals(instanceCatalogType)) {
91+
return instance;
92+
}
93+
}
94+
throw new NotSupportedException("catalogType is not yet supported: " + catalogType);
95+
}
96+
}

0 commit comments

Comments
 (0)