Skip to content

Commit 3150359

Browse files
committed
WIP
1 parent 8c21301 commit 3150359

File tree

8 files changed

+138
-21
lines changed

8 files changed

+138
-21
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/ResolvedTable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package io.delta.kernel;
1818

1919
import io.delta.kernel.annotation.Experimental;
20-
import io.delta.kernel.commit.Committer;
2120
import io.delta.kernel.expressions.Column;
21+
import io.delta.kernel.transaction.UpdateTableTransactionBuilder;
2222
import io.delta.kernel.types.StructType;
2323
import java.util.List;
2424
import java.util.Optional;
@@ -66,5 +66,5 @@ public interface ResolvedTable {
6666
/** @return a scan builder for constructing scans to read data from this table */
6767
ScanBuilder getScanBuilder();
6868

69-
Committer getCommitter();
69+
UpdateTableTransactionBuilder buildUpdateTableTransaction(String engineInfo, Operation operation);
7070
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,15 @@
5555
import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode;
5656
import io.delta.kernel.internal.util.SchemaUtils;
5757
import io.delta.kernel.internal.util.Tuple2;
58+
import io.delta.kernel.transaction.UpdateTableTransactionBuilder;
5859
import io.delta.kernel.types.StringType;
5960
import io.delta.kernel.types.StructType;
6061
import java.util.*;
6162
import java.util.stream.Collectors;
6263
import org.slf4j.Logger;
6364
import org.slf4j.LoggerFactory;
6465

65-
public class TransactionBuilderImpl implements TransactionBuilder {
66+
public class TransactionBuilderImpl implements TransactionBuilder, UpdateTableTransactionBuilder {
6667
private static final Logger logger = LoggerFactory.getLogger(TransactionBuilderImpl.class);
6768

6869
private final long currentTimeMillis = System.currentTimeMillis();
@@ -104,6 +105,38 @@ public TransactionBuilderImpl(TableImpl table, String engineInfo, Operation oper
104105
this.operation = operation;
105106
}
106107

108+
///////////////////////////////////////////
109+
// UpdateTableTransactionBuilder methods //
110+
///////////////////////////////////////////
111+
112+
@Override
113+
public TransactionBuilderImpl withUpdatedSchema(StructType updatedSchema) {
114+
this.schema = Optional.of(updatedSchema); // will be verified as part of the build() call
115+
return this;
116+
}
117+
118+
@Override
119+
public TransactionBuilderImpl withTableProperties(Map<String, String> properties) {
120+
this.tableProperties =
121+
Optional.of(Collections.unmodifiableMap(TableConfig.validateDeltaProperties(properties)));
122+
return this;
123+
}
124+
125+
@Override
126+
public TransactionBuilderImpl withTransactionId(String applicationId, long transactionVersion) {
127+
SetTransaction txnId =
128+
new SetTransaction(
129+
requireNonNull(applicationId, "applicationId is null"),
130+
transactionVersion,
131+
Optional.of(currentTimeMillis));
132+
this.setTxnOpt = Optional.of(txnId);
133+
return this;
134+
}
135+
136+
////////////////////////////////
137+
// TransactionBuilder methods //
138+
////////////////////////////////
139+
107140
@Override
108141
public TransactionBuilder withSchema(Engine engine, StructType newSchema) {
109142
this.schema = Optional.of(newSchema); // will be verified as part of the build() call
@@ -152,24 +185,16 @@ public TransactionBuilder withClusteringColumns(Engine engine, List<Column> clus
152185
@Override
153186
public TransactionBuilder withTransactionId(
154187
Engine engine, String applicationId, long transactionVersion) {
155-
SetTransaction txnId =
156-
new SetTransaction(
157-
requireNonNull(applicationId, "applicationId is null"),
158-
transactionVersion,
159-
Optional.of(currentTimeMillis));
160-
this.setTxnOpt = Optional.of(txnId);
161-
return this;
188+
return withTransactionId(applicationId, transactionVersion);
162189
}
163190

164191
@Override
165192
public TransactionBuilder withTableProperties(Engine engine, Map<String, String> properties) {
166-
this.tableProperties =
167-
Optional.of(Collections.unmodifiableMap(TableConfig.validateDeltaProperties(properties)));
168-
return this;
193+
return withTableProperties(properties);
169194
}
170195

171196
@Override
172-
public TransactionBuilder withTablePropertiesRemoved(Set<String> propertyKeys) {
197+
public TransactionBuilderImpl withTablePropertiesRemoved(Set<String> propertyKeys) {
173198
checkArgument(
174199
propertyKeys.stream().noneMatch(key -> key.toLowerCase(Locale.ROOT).startsWith("delta.")),
175200
"Unsetting 'delta.' table properties is currently unsupported");
@@ -198,7 +223,7 @@ public TransactionBuilder withDomainMetadataSupported() {
198223
}
199224

200225
@Override
201-
public Transaction build(Engine engine) {
226+
public TransactionImpl build(Engine engine) {
202227
if (operation == Operation.REPLACE_TABLE) {
203228
throw new UnsupportedOperationException("REPLACE TABLE is not yet supported");
204229
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import static io.delta.kernel.internal.util.Utils.toCloseableIterator;
2424

2525
import io.delta.kernel.*;
26+
import io.delta.kernel.commit.CommitPayload;
27+
import io.delta.kernel.commit.Committer;
2628
import io.delta.kernel.data.Row;
2729
import io.delta.kernel.engine.Engine;
2830
import io.delta.kernel.exceptions.ConcurrentWriteException;
@@ -66,7 +68,7 @@
6668
import org.slf4j.Logger;
6769
import org.slf4j.LoggerFactory;
6870

69-
public class TransactionImpl implements Transaction {
71+
public class TransactionImpl implements Transaction, io.delta.kernel.transaction.Transaction {
7072
private static final Logger logger = LoggerFactory.getLogger(TransactionImpl.class);
7173

7274
public static final int DEFAULT_READ_VERSION = 1;
@@ -132,19 +134,54 @@ public TransactionImpl(
132134
this.currentCrcInfo = readSnapshot.getCurrentCrcInfo();
133135
}
134136

137+
/////////////////////////////////////////////////////
138+
// io.delta.kernel.transaction.Transaction methods //
139+
/////////////////////////////////////////////////////
140+
135141
@Override
136-
public Row getTransactionState(Engine engine) {
142+
public StructType getSchema() {
143+
return metadata.getSchema();
144+
}
145+
146+
@Override
147+
public List<Column> getPartitionColumns() {
148+
return VectorUtils.<String>toJavaList(metadata.getPartitionColumns()).stream()
149+
.map(Column::new)
150+
.collect(Collectors.toList());
151+
}
152+
153+
@Override
154+
public Row getTransactionState() {
137155
return TransactionStateRow.of(metadata, dataPath.toString(), maxRetries);
138156
}
139157

158+
@Override
159+
public Committer getCommitter() {
160+
throw new UnsupportedOperationException("not implemented");
161+
}
162+
163+
@Override
164+
public CommitPayload getCommitPayload(CloseableIterator<Row> dataActions) {
165+
return null;
166+
}
167+
168+
/////////////////////////////////////////
169+
// io.delta.kernel.Transaction methods //
170+
/////////////////////////////////////////
171+
172+
@Override
173+
public Row getTransactionState(Engine engine) {
174+
return getTransactionState();
175+
}
176+
140177
@Override
141178
public List<String> getPartitionColumns(Engine engine) {
142179
return VectorUtils.toJavaList(metadata.getPartitionColumns());
143180
}
144181

145182
@Override
146183
public StructType getSchema(Engine engine) {
147-
return metadata.getSchema();
184+
return getSchema();
148185
}
149186

150187
@Override

kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/ResolvedTableInternalImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
import static java.util.Objects.requireNonNull;
2020

21+
import io.delta.kernel.Operation;
2122
import io.delta.kernel.ScanBuilder;
22-
import io.delta.kernel.commit.Committer;
2323
import io.delta.kernel.expressions.Column;
2424
import io.delta.kernel.internal.ScanBuilderImpl;
2525
import io.delta.kernel.internal.actions.DomainMetadata;
@@ -35,6 +35,7 @@
3535
import io.delta.kernel.internal.util.Clock;
3636
import io.delta.kernel.internal.util.VectorUtils;
3737
import io.delta.kernel.metrics.SnapshotReport;
38+
import io.delta.kernel.transaction.UpdateTableTransactionBuilder;
3839
import io.delta.kernel.types.StructType;
3940
import java.util.List;
4041
import java.util.Map;
@@ -117,8 +118,9 @@ public ScanBuilder getScanBuilder() {
117118
}
118119

119120
@Override
120-
public Committer getCommitter() {
121-
throw new UnsupportedOperationException("not implemented");
121+
public UpdateTableTransactionBuilder buildUpdateTableTransaction(
122+
String engineInfo, Operation operation) {
123+
return null;
122124
}
123125

124126
///////////////////////////////////////
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.delta.kernel.transaction;
2+
3+
import io.delta.kernel.engine.Engine;
4+
import java.util.Map;
5+
6+
public interface BaseTransactionBuilder<T extends BaseTransactionBuilder<T>> {
7+
T withTableProperties(Map<String, String> properties);
8+
9+
T withTransactionId(String applicationId, long transactionVersion);
10+
11+
Transaction build(Engine engine);
12+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.delta.kernel.transaction;
2+
3+
import io.delta.kernel.commit.CommitPayload;
4+
import io.delta.kernel.commit.Committer;
5+
import io.delta.kernel.data.Row;
6+
import io.delta.kernel.expressions.Column;
7+
import io.delta.kernel.types.StructType;
8+
import io.delta.kernel.utils.CloseableIterator;
9+
import java.util.List;
10+
11+
public interface Transaction {
12+
StructType getSchema();
13+
14+
List<Column> getPartitionColumns();
15+
16+
long getReadTableVersion();
17+
18+
Row getTransactionState();
19+
20+
Committer getCommitter();
21+
22+
CommitPayload getCommitPayload(CloseableIterator<Row> dataActions);
23+
24+
// TODO: CommitPayload resolveConflictsAndRebase(
25+
// CloseableIterator<Row> dataActions,
26+
// List<ParsedDeltaData> winningCommits,
27+
// Optional<Long> latestTableVersion);
28+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.delta.kernel.transaction;
2+
3+
import io.delta.kernel.types.StructType;
4+
import java.util.Set;
5+
6+
public interface UpdateTableTransactionBuilder
7+
extends BaseTransactionBuilder<UpdateTableTransactionBuilder> {
8+
9+
UpdateTableTransactionBuilder withUpdatedSchema(StructType updatedSchema);
10+
11+
UpdateTableTransactionBuilder withTablePropertiesRemoved(Set<String> propertyKeys);
12+
}

0 commit comments

Comments
 (0)