Skip to content

Commit 2c1a62d

Browse files
Fix memory leaks for huge tables (#89)
We process each row group sequentially during "COPY FROM parquet". Normally, we expect that memory consumption does not exceed too much the row group size. But we also do some allocations during the copy at current Postgres context, which can be extreme for some huge tables (e.g. with 100 columns and default row group size ~ 123000) To fix the issue, we intoduce a memory context that is used and freed per each row during the copy.
1 parent fd51bed commit 2c1a62d

File tree

1 file changed

+34
-24
lines changed

1 file changed

+34
-24
lines changed

src/arrow_parquet/parquet_reader.rs

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ use pgrx::{
1010
fmgr_info, getTypeBinaryOutputInfo, varlena, Datum, FmgrInfo, FormData_pg_attribute,
1111
InvalidOid, SendFunctionCall,
1212
},
13-
vardata_any, varsize_any_exhdr, void_mut_ptr, AllocatedByPostgres, PgBox, PgTupleDesc,
13+
vardata_any, varsize_any_exhdr, void_mut_ptr, AllocatedByPostgres, PgBox, PgMemoryContexts,
14+
PgTupleDesc,
1415
};
1516
use url::Url;
1617

@@ -44,6 +45,7 @@ pub(crate) struct ParquetReaderContext {
4445
attribute_contexts: Vec<ArrowToPgAttributeContext>,
4546
binary_out_funcs: Vec<PgBox<FmgrInfo>>,
4647
match_by: MatchBy,
48+
per_row_memory_ctx: PgMemoryContexts,
4749
}
4850

4951
impl ParquetReaderContext {
@@ -88,6 +90,8 @@ impl ParquetReaderContext {
8890

8991
let binary_out_funcs = Self::collect_binary_out_funcs(&attributes);
9092

93+
let per_row_memory_ctx = PgMemoryContexts::new("COPY FROM parquet per row memory context");
94+
9195
ParquetReaderContext {
9296
buffer: Vec::new(),
9397
offset: 0,
@@ -97,6 +101,7 @@ impl ParquetReaderContext {
97101
match_by,
98102
started: false,
99103
finished: false,
104+
per_row_memory_ctx,
100105
}
101106
}
102107

@@ -172,12 +177,10 @@ impl ParquetReaderContext {
172177
}
173178

174179
if !self.started {
175-
// starts PG copy protocol
180+
// starts PG copy
176181
self.copy_start();
177182
}
178183

179-
let natts = self.attribute_contexts.len() as i16;
180-
181184
// read a record batch from the parquet file. Record batch will contain
182185
// DEFAULT_BATCH_SIZE rows as we configured in the parquet reader.
183186
let record_batch = PG_BACKEND_TOKIO_RUNTIME.block_on(self.parquet_reader.next());
@@ -193,8 +196,21 @@ impl ParquetReaderContext {
193196

194197
// slice the record batch to get the next row
195198
let record_batch = record_batch.slice(i, 1);
199+
self.copy_row(record_batch);
200+
}
201+
} else {
202+
// finish PG copy
203+
self.copy_finish();
204+
}
205+
206+
true
207+
}
196208

209+
fn copy_row(&mut self, record_batch: RecordBatch) {
210+
unsafe {
211+
self.per_row_memory_ctx.switch_to(|_context| {
197212
/* 2 bytes: per-tuple header */
213+
let natts = self.attribute_contexts.len() as i16;
198214
let attnum_len_bytes = natts.to_be_bytes();
199215
self.buffer.extend_from_slice(&attnum_len_bytes);
200216

@@ -209,34 +225,28 @@ impl ParquetReaderContext {
209225
for (datum, out_func) in tuple_datums.into_iter().zip(self.binary_out_funcs.iter())
210226
{
211227
if let Some(datum) = datum {
212-
unsafe {
213-
let datum_bytes: *mut varlena =
214-
SendFunctionCall(out_func.as_ptr(), datum);
215-
216-
/* 4 bytes: attribute's data size */
217-
let data_size = varsize_any_exhdr(datum_bytes);
218-
let data_size_bytes = (data_size as i32).to_be_bytes();
219-
self.buffer.extend_from_slice(&data_size_bytes);
220-
221-
/* variable bytes: attribute's data */
222-
let data = vardata_any(datum_bytes) as _;
223-
let data_bytes = std::slice::from_raw_parts(data, data_size);
224-
self.buffer.extend_from_slice(data_bytes);
225-
};
228+
let datum_bytes: *mut varlena = SendFunctionCall(out_func.as_ptr(), datum);
229+
230+
/* 4 bytes: attribute's data size */
231+
let data_size = varsize_any_exhdr(datum_bytes);
232+
let data_size_bytes = (data_size as i32).to_be_bytes();
233+
self.buffer.extend_from_slice(&data_size_bytes);
234+
235+
/* variable bytes: attribute's data */
236+
let data = vardata_any(datum_bytes) as _;
237+
let data_bytes = std::slice::from_raw_parts(data, data_size);
238+
self.buffer.extend_from_slice(data_bytes);
226239
} else {
227240
/* 4 bytes: null */
228241
let null_value = -1_i32;
229242
let null_value_bytes = null_value.to_be_bytes();
230243
self.buffer.extend_from_slice(&null_value_bytes);
231244
}
232245
}
233-
}
234-
} else {
235-
// finish PG copy protocol
236-
self.copy_finish();
237-
}
246+
});
238247

239-
true
248+
self.per_row_memory_ctx.reset();
249+
};
240250
}
241251

242252
fn copy_start(&mut self) {

0 commit comments

Comments
 (0)