@@ -44,14 +44,14 @@ pub struct ParquetTable {
44
44
}
45
45
46
46
impl ParquetTable {
47
- pub fn new ( filename : & str ) -> Self {
48
- let file = File :: open ( filename) . unwrap ( ) ;
49
- let parquet_file = ParquetFile :: open ( file, None ) . unwrap ( ) ;
47
+ pub fn try_new ( filename : & str ) -> Result < Self > {
48
+ let file = File :: open ( filename) ? ;
49
+ let parquet_file = ParquetFile :: open ( file, None ) ? ;
50
50
let schema = parquet_file. schema . clone ( ) ;
51
- Self {
51
+ Ok ( Self {
52
52
filename : filename. to_string ( ) ,
53
53
schema,
54
- }
54
+ } )
55
55
}
56
56
}
57
57
@@ -65,8 +65,8 @@ impl Table for ParquetTable {
65
65
projection : & Option < Vec < usize > > ,
66
66
_batch_size : usize ,
67
67
) -> Result < Vec < ScanResult > > {
68
- let file = File :: open ( self . filename . clone ( ) ) . unwrap ( ) ;
69
- let parquet_file = ParquetFile :: open ( file, projection. clone ( ) ) . unwrap ( ) ;
68
+ let file = File :: open ( self . filename . clone ( ) ) ? ;
69
+ let parquet_file = ParquetFile :: open ( file, projection. clone ( ) ) ? ;
70
70
Ok ( vec ! [ Arc :: new( Mutex :: new( parquet_file) ) ] )
71
71
}
72
72
}
@@ -84,17 +84,14 @@ pub struct ParquetFile {
84
84
85
85
impl ParquetFile {
86
86
pub fn open ( file : File , projection : Option < Vec < usize > > ) -> Result < Self > {
87
- println ! ( "open()" ) ;
88
-
89
- let reader = SerializedFileReader :: new ( file) . unwrap ( ) ;
87
+ let reader = SerializedFileReader :: new ( file) ?;
90
88
91
89
let metadata = reader. metadata ( ) ;
92
90
let file_type = to_arrow ( metadata. file_metadata ( ) . schema ( ) ) ?;
93
91
94
92
match file_type. data_type ( ) {
95
93
DataType :: Struct ( fields) => {
96
94
let schema = Schema :: new ( fields. clone ( ) ) ;
97
- //println!("Parquet schema: {:?}", schema);
98
95
99
96
let projection = match projection {
100
97
Some ( p) => p,
@@ -130,21 +127,24 @@ impl ParquetFile {
130
127
}
131
128
}
132
129
133
- fn load_next_row_group ( & mut self ) {
130
+ fn load_next_row_group ( & mut self ) -> Result < ( ) > {
134
131
if self . row_group_index < self . reader . num_row_groups ( ) {
135
- let reader = self . reader . get_row_group ( self . row_group_index ) . unwrap ( ) ;
132
+ let reader = self . reader . get_row_group ( self . row_group_index ) ? ;
136
133
137
- self . column_readers = vec ! [ ] ;
134
+ self . column_readers = Vec :: with_capacity ( self . projection . len ( ) ) ;
138
135
139
136
for i in & self . projection {
140
- self . column_readers
141
- . push ( reader. get_column_reader ( * i) . unwrap ( ) ) ;
137
+ self . column_readers . push ( reader. get_column_reader ( * i) ?) ;
142
138
}
143
139
144
140
self . current_row_group = Some ( reader) ;
145
141
self . row_group_index += 1 ;
142
+
143
+ Ok ( ( ) )
146
144
} else {
147
- panic ! ( )
145
+ Err ( ExecutionError :: General (
146
+ "Attempt to read past final row group" . to_string ( ) ,
147
+ ) )
148
148
}
149
149
}
150
150
@@ -171,7 +171,7 @@ impl ParquetFile {
171
171
) {
172
172
Ok ( ( count, _) ) => {
173
173
let mut builder = BooleanBuilder :: new ( count) ;
174
- builder. append_slice ( & read_buffer[ 0 ..count] ) . unwrap ( ) ;
174
+ builder. append_slice ( & read_buffer[ 0 ..count] ) ? ;
175
175
row_count = count;
176
176
Arc :: new ( builder. finish ( ) )
177
177
}
@@ -199,7 +199,7 @@ impl ParquetFile {
199
199
) {
200
200
Ok ( ( count, _) ) => {
201
201
let mut builder = Int32Builder :: new ( count) ;
202
- builder. append_slice ( & read_buffer[ 0 ..count] ) . unwrap ( ) ;
202
+ builder. append_slice ( & read_buffer[ 0 ..count] ) ? ;
203
203
row_count = count;
204
204
Arc :: new ( builder. finish ( ) )
205
205
}
@@ -227,7 +227,7 @@ impl ParquetFile {
227
227
) {
228
228
Ok ( ( count, _) ) => {
229
229
let mut builder = Int64Builder :: new ( count) ;
230
- builder. append_slice ( & read_buffer[ 0 ..count] ) . unwrap ( ) ;
230
+ builder. append_slice ( & read_buffer[ 0 ..count] ) ? ;
231
231
row_count = count;
232
232
Arc :: new ( builder. finish ( ) )
233
233
}
@@ -262,7 +262,7 @@ impl ParquetFile {
262
262
| ( v[ 1 ] as u128 ) << 32
263
263
| ( v[ 2 ] as u128 ) ;
264
264
let ms: i64 = ( value / 1000000 ) as i64 ;
265
- builder. append_value ( ms) . unwrap ( ) ;
265
+ builder. append_value ( ms) ? ;
266
266
}
267
267
row_count = count;
268
268
Arc :: new ( builder. finish ( ) )
@@ -289,7 +289,7 @@ impl ParquetFile {
289
289
& mut read_buffer,
290
290
) {
291
291
Ok ( ( count, _) ) => {
292
- builder. append_slice ( & read_buffer[ 0 ..count] ) . unwrap ( ) ;
292
+ builder. append_slice ( & read_buffer[ 0 ..count] ) ? ;
293
293
row_count = count;
294
294
Arc :: new ( builder. finish ( ) )
295
295
}
@@ -315,7 +315,7 @@ impl ParquetFile {
315
315
& mut read_buffer,
316
316
) {
317
317
Ok ( ( count, _) ) => {
318
- builder. append_slice ( & read_buffer[ 0 ..count] ) . unwrap ( ) ;
318
+ builder. append_slice ( & read_buffer[ 0 ..count] ) ? ;
319
319
row_count = count;
320
320
Arc :: new ( builder. finish ( ) )
321
321
}
@@ -339,12 +339,10 @@ impl ParquetFile {
339
339
let mut builder = BinaryBuilder :: new ( row_count) ;
340
340
for j in 0 ..row_count {
341
341
let slice = b[ j] . slice ( 0 , b[ j] . len ( ) ) ;
342
- builder
343
- . append_string (
344
- & String :: from_utf8 ( slice. data ( ) . to_vec ( ) )
345
- . unwrap ( ) ,
346
- )
347
- . unwrap ( ) ;
342
+ builder. append_string (
343
+ & String :: from_utf8 ( slice. data ( ) . to_vec ( ) )
344
+ . unwrap ( ) ,
345
+ ) ?;
348
346
}
349
347
Arc :: new ( builder. finish ( ) )
350
348
}
@@ -368,12 +366,10 @@ impl ParquetFile {
368
366
let mut builder = BinaryBuilder :: new ( row_count) ;
369
367
for j in 0 ..row_count {
370
368
let slice = b[ j] . slice ( 0 , b[ j] . len ( ) ) ;
371
- builder
372
- . append_string (
373
- & String :: from_utf8 ( slice. data ( ) . to_vec ( ) )
374
- . unwrap ( ) ,
375
- )
376
- . unwrap ( ) ;
369
+ builder. append_string (
370
+ & String :: from_utf8 ( slice. data ( ) . to_vec ( ) )
371
+ . unwrap ( ) ,
372
+ ) ?;
377
373
}
378
374
Arc :: new ( builder. finish ( ) )
379
375
}
@@ -387,12 +383,9 @@ impl ParquetFile {
387
383
}
388
384
} ;
389
385
390
- println ! ( "Adding array to batch" ) ;
391
386
batch. push ( array) ;
392
387
}
393
388
394
- println ! ( "Loaded batch of {} rows" , row_count) ;
395
-
396
389
if row_count == 0 {
397
390
Ok ( None )
398
391
} else {
@@ -445,14 +438,14 @@ impl RecordBatchIterator for ParquetFile {
445
438
fn next ( & mut self ) -> Result < Option < RecordBatch > > {
446
439
// advance the row group reader if necessary
447
440
if self . current_row_group . is_none ( ) {
448
- self . load_next_row_group ( ) ;
441
+ self . load_next_row_group ( ) ? ;
449
442
self . load_batch ( )
450
443
} else {
451
444
match self . load_batch ( ) {
452
445
Ok ( Some ( b) ) => Ok ( Some ( b) ) ,
453
446
Ok ( None ) => {
454
447
if self . row_group_index < self . reader . num_row_groups ( ) {
455
- self . load_next_row_group ( ) ;
448
+ self . load_next_row_group ( ) ? ;
456
449
self . load_batch ( )
457
450
} else {
458
451
Ok ( None )
@@ -678,7 +671,7 @@ mod tests {
678
671
fn load_table ( name : & str ) -> Box < Table > {
679
672
let testdata = env:: var ( "PARQUET_TEST_DATA" ) . unwrap ( ) ;
680
673
let filename = format ! ( "{}/{}" , testdata, name) ;
681
- let table = ParquetTable :: new ( & filename) ;
674
+ let table = ParquetTable :: try_new ( & filename) . unwrap ( ) ;
682
675
println ! ( "Loading file {} with schema:" , name) ;
683
676
for field in table. schema ( ) . fields ( ) {
684
677
println ! ( "\t {:?}" , field) ;
0 commit comments