Skip to content

Commit 3a412b1

Browse files
committed
first parquet test passes
1 parent ff3e5b7 commit 3a412b1

File tree

1 file changed

+37
-10
lines changed

1 file changed

+37
-10
lines changed

rust/datafusion/src/datasource/parquet.rs

+37-10
Original file line numberDiff line numberDiff line change
@@ -144,16 +144,19 @@ impl ParquetFile {
144144
let mut row_count = 0;
145145
for i in 0..self.column_readers.len() {
146146
let array: Arc<Array> = match self.column_readers[i] {
147-
ColumnReader::BoolColumnReader(ref mut r) => {
147+
ColumnReader::BoolColumnReader(ref mut _r) => {
148148
return Err(ExecutionError::NotImplemented(
149149
"unsupported column reader type (BOOL)".to_string(),
150150
));
151151
}
152152
ColumnReader::Int32ColumnReader(ref mut r) => {
153-
let mut builder = Int32Builder::new(self.batch_size);
154153
let mut read_buffer: Vec<i32> =
155154
Vec::with_capacity(self.batch_size);
156155

156+
for _ in 0..self.batch_size {
157+
read_buffer.push(0);
158+
}
159+
157160
match r.read_batch(
158161
self.batch_size,
159162
None,
@@ -163,7 +166,7 @@ impl ParquetFile {
163166
//TODO this isn't handling null values
164167
Ok((count, _)) => {
165168
println!("Read {} rows", count);
166-
169+
let mut builder = Int32Builder::new(self.batch_size);
167170
builder.append_slice(&read_buffer).unwrap();
168171
row_count = count;
169172
Arc::new(builder.finish())
@@ -176,17 +179,17 @@ impl ParquetFile {
176179
}
177180
}
178181
}
179-
ColumnReader::Int64ColumnReader(ref mut r) => {
182+
ColumnReader::Int64ColumnReader(ref mut _r) => {
180183
return Err(ExecutionError::NotImplemented(
181184
"unsupported column reader type (INT64)".to_string(),
182185
));
183186
}
184-
ColumnReader::Int96ColumnReader(ref mut r) => {
187+
ColumnReader::Int96ColumnReader(ref mut _r) => {
185188
return Err(ExecutionError::NotImplemented(
186189
"unsupported column reader type (INT96)".to_string(),
187190
));
188191
}
189-
ColumnReader::FloatColumnReader(ref mut r) => {
192+
ColumnReader::FloatColumnReader(ref mut _r) => {
190193
return Err(ExecutionError::NotImplemented(
191194
"unsupported column reader type (FLOAT)".to_string(),
192195
));
@@ -215,7 +218,7 @@ impl ParquetFile {
215218
}
216219
}
217220
}
218-
ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => {
221+
ColumnReader::FixedLenByteArrayColumnReader(ref mut _r) => {
219222
return Err(ExecutionError::NotImplemented(
220223
"unsupported column reader type (FixedLenByteArray)"
221224
.to_string(),
@@ -261,7 +264,15 @@ impl ParquetFile {
261264
if row_count == 0 {
262265
Ok(None)
263266
} else {
264-
Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?))
267+
match &self.projection {
268+
Some(proj) => Ok(Some(RecordBatch::try_new(
269+
self.schema.projection(proj)?,
270+
batch,
271+
)?)),
272+
None => {
273+
Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?))
274+
}
275+
}
265276
}
266277
}
267278
_ => Ok(None),
@@ -337,10 +348,11 @@ impl RecordBatchIterator for ParquetFile {
337348
#[cfg(test)]
338349
mod tests {
339350
use super::*;
351+
use arrow::array::Int32Array;
340352
use std::env;
341353

342354
#[test]
343-
fn read_parquet_file() {
355+
fn read_read_i32_column() {
344356
let testdata = env::var("PARQUET_TEST_DATA").unwrap();
345357
let filename = format!("{}/alltypes_plain.parquet", testdata);
346358

@@ -354,6 +366,21 @@ mod tests {
354366
let batch = it.next().unwrap().unwrap();
355367

356368
assert_eq!(1, batch.num_columns());
357-
assert_eq!(1, batch.num_rows());
369+
assert_eq!(64 * 1024, batch.num_rows());
370+
371+
let array = batch
372+
.column(0)
373+
.as_any()
374+
.downcast_ref::<Int32Array>()
375+
.unwrap();
376+
let mut values: Vec<i32> = vec![];
377+
for i in 0..16 {
378+
values.push(array.value(i));
379+
}
380+
381+
assert_eq!(
382+
"[4, 5, 6, 7, 2, 3, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]",
383+
format!("{:?}", values)
384+
);
358385
}
359386
}

0 commit comments

Comments
 (0)