Skip to content

Commit 6b7db36

Browse files
authored
[3.3][Kernel][Defaults] Get rid of using package private classes from parquet-mr (#4494)
(Cherrypick #4429) Currently, we override the `InternalParquetRecordReader`, which is package private. However, this is causing IllegalAccess errors when parquet and kernel libraries are loaded in different classloaders (more details on the illegal access [here](https://stackoverflow.com/questions/14282726/urlclassloader-and-accessibility-of-package-private-methods/14283808#14283808)). Instead, use the `ParquetReader` builder to avoid accessing package private access issue. Manually tested in an environment that loads parquet and kernel in separate classloaders. Existing tests for verifying the functionality remain the same.
1 parent b497ab7 commit 6b7db36

File tree

4 files changed

+36
-64
lines changed

4 files changed

+36
-64
lines changed

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnReaders.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,15 @@ class ParquetColumnReaders {
3838
public static Converter createConverter(
3939
int initialBatchSize, DataType typeFromClient, Type typeFromFile) {
4040
if (typeFromClient instanceof StructType) {
41+
checkArgument(typeFromFile instanceof GroupType, "cannot be cast to GroupType");
4142
return new RowColumnReader(
4243
initialBatchSize, (StructType) typeFromClient, (GroupType) typeFromFile);
4344
} else if (typeFromClient instanceof ArrayType) {
45+
checkArgument(typeFromFile instanceof GroupType, "cannot be cast to GroupType");
4446
return new ArrayColumnReader(
4547
initialBatchSize, (ArrayType) typeFromClient, (GroupType) typeFromFile);
4648
} else if (typeFromClient instanceof MapType) {
49+
checkArgument(typeFromFile instanceof GroupType, "cannot be cast to GroupType");
4750
return new MapColumnReader(
4851
initialBatchSize, (MapType) typeFromClient, (GroupType) typeFromFile);
4952
} else if (typeFromClient instanceof StringType || typeFromClient instanceof BinaryType) {

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import static io.delta.kernel.defaults.internal.parquet.ParquetFilterUtils.toParquetFilter;
1919
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
2020
import static java.util.Objects.requireNonNull;
21-
import static org.apache.parquet.hadoop.ParquetInputFormat.*;
2221

2322
import io.delta.kernel.data.ColumnarBatch;
2423
import io.delta.kernel.exceptions.KernelEngineException;
@@ -31,8 +30,9 @@
3130
import java.util.*;
3231
import org.apache.hadoop.conf.Configuration;
3332
import org.apache.hadoop.fs.Path;
33+
import org.apache.parquet.filter2.compat.FilterCompat;
3434
import org.apache.parquet.filter2.predicate.FilterPredicate;
35-
import org.apache.parquet.hadoop.ParquetRecordReaderWrapper;
35+
import org.apache.parquet.hadoop.ParquetReader;
3636
import org.apache.parquet.hadoop.api.InitContext;
3737
import org.apache.parquet.hadoop.api.ReadSupport;
3838
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -60,7 +60,7 @@ public CloseableIterator<ColumnarBatch> read(
6060

6161
return new CloseableIterator<ColumnarBatch>() {
6262
private final BatchReadSupport readSupport = new BatchReadSupport(maxBatchSize, schema);
63-
private ParquetRecordReaderWrapper<Object> reader;
63+
private ParquetReader<Object> reader;
6464
private boolean hasNotConsumedNextElement;
6565

6666
@Override
@@ -76,9 +76,10 @@ public boolean hasNext() {
7676
return true;
7777
}
7878

79-
hasNotConsumedNextElement = reader.nextKeyValue() && reader.getCurrentValue() != null;
79+
Object next = reader.read();
80+
hasNotConsumedNextElement = next != null;
8081
return hasNotConsumedNextElement;
81-
} catch (IOException | InterruptedException ex) {
82+
} catch (IOException ex) {
8283
throw new KernelEngineException("Error reading Parquet file: " + path, ex);
8384
}
8485
}
@@ -115,28 +116,32 @@ private void initParquetReaderIfRequired() {
115116
org.apache.parquet.hadoop.ParquetFileReader.readFooter(confCopy, filePath);
116117

117118
MessageType parquetSchema = footer.getFileMetaData().getSchema();
119+
118120
Optional<FilterPredicate> parquetPredicate =
119121
predicate.flatMap(predicate -> toParquetFilter(parquetSchema, predicate));
120122

121-
if (parquetPredicate.isPresent()) {
122-
// clone the configuration to avoid modifying the original one
123-
confCopy = new Configuration(confCopy);
124-
125-
setFilterPredicate(confCopy, parquetPredicate.get());
126-
// Disable the record level filtering as the `parquet-mr` evaluates
127-
// the filter once the entire record has been materialized. Instead,
128-
// we use the predicate to prune the row groups which is more efficient.
129-
// In the future, we can consider using the record level filtering if a
130-
// native Parquet reader is implemented in Kernel default module.
131-
confCopy.set(RECORD_FILTERING_ENABLED, "false");
132-
confCopy.set(DICTIONARY_FILTERING_ENABLED, "false");
133-
confCopy.set(COLUMN_INDEX_FILTERING_ENABLED, "false");
134-
}
135-
136-
// Pass the already read footer to the reader to avoid reading it again.
137-
fileReader = new ParquetFileReaderWithFooter(filePath, confCopy, footer);
138-
reader = new ParquetRecordReaderWrapper<>(readSupport);
139-
reader.initialize(fileReader, confCopy);
123+
// TODO: We can avoid reading the footer again if we can pass the footer, but there is
124+
// no API to do that in the current version of parquet-mr which takes InputFile
125+
// as input.
126+
reader =
127+
new ParquetReader.Builder<Object>(filePath) {
128+
@Override
129+
protected ReadSupport<Object> getReadSupport() {
130+
return readSupport;
131+
}
132+
}.withFilter(parquetPredicate.map(FilterCompat::get).orElse(FilterCompat.NOOP))
133+
// Disable the record level filtering as the `parquet-mr` evaluates
134+
// the filter once the entire record has been materialized. Instead,
135+
// we use the predicate to prune the row groups which is more efficient.
136+
// In the future, we can consider using the record level filtering if a
137+
// native Parquet reader is implemented in Kernel default module.
138+
.useRecordFilter(false)
139+
.useStatsFilter(true) // only enable the row group level filtering
140+
.useBloomFilter(false)
141+
.useDictionaryFilter(false)
142+
.useColumnIndexFilter(false)
143+
.build();
144+
140145
} catch (IOException e) {
141146
Utils.closeCloseablesSilently(fileReader, reader);
142147
throw new KernelEngineException("Error reading Parquet file: " + path, e);

kernel/kernel-defaults/src/main/java/org/apache/parquet/hadoop/ParquetRecordReaderWrapper.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ import io.delta.golden.GoldenTableUtils.{goldenTableFile, goldenTablePath}
2222
import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow}
2323
import io.delta.kernel.test.VectorTestUtils
2424
import io.delta.kernel.types._
25+
2526
import org.apache.spark.sql.internal.SQLConf
2627
import org.scalatest.funsuite.AnyFunSuite
27-
import org.apache.parquet.io.ParquetDecodingException
2828

2929
class ParquetFileReaderSuite extends AnyFunSuite
30-
with ParquetSuiteBase with VectorTestUtils with ExpressionTestUtils {
30+
with ParquetSuiteBase with VectorTestUtils with ExpressionTestUtils {
3131

3232
test("decimals encoded using dictionary encoding ") {
3333
// Below golden tables contains three decimal columns
@@ -191,9 +191,10 @@ class ParquetFileReaderSuite extends AnyFunSuite
191191
val ex = intercept[Throwable] {
192192
readParquetFilesUsingKernel(inputLocation, readSchema)
193193
}
194+
194195
// We don't properly reject conversions and the error we get vary a lot, this checks various
195196
// error message we may get as result.
196-
// TODO: Uniformize rejecting unsupported conversions.
197+
// TODO(delta-io/delta#4493): Uniformize rejecting unsupported conversions.
197198
assert(
198199
ex.getMessage.contains("Can not read value") ||
199200
ex.getMessage.contains("column with Parquet type") ||

0 commit comments

Comments
 (0)