Skip to content

Commit 00c8211

Browse files
committed
Draft implementations of parquet-cpp allocator and read-only file interfaces
1 parent 77598fa commit 00c8211

12 files changed

+810
-444
lines changed

cpp/src/arrow/io/hdfs-io-test.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ TEST_F(TestHdfsClient, ReadableMethods) {
266266
ASSERT_EQ(size, file_size);
267267

268268
uint8_t buffer[50];
269-
int32_t bytes_read = 0;
269+
int64_t bytes_read = 0;
270270

271271
ASSERT_OK(file->Read(50, &bytes_read, buffer));
272272
ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));

cpp/src/arrow/io/hdfs.cc

+8-8
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,15 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
100100
return Status::OK();
101101
}
102102

103-
Status ReadAt(int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
103+
Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
104104
tSize ret = hdfsPread(fs_, file_, static_cast<tOffset>(position),
105105
reinterpret_cast<void*>(buffer), nbytes);
106106
RETURN_NOT_OK(CheckReadResult(ret));
107107
*bytes_read = ret;
108108
return Status::OK();
109109
}
110110

111-
Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
111+
Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
112112
tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer), nbytes);
113113
RETURN_NOT_OK(CheckReadResult(ret));
114114
*bytes_read = ret;
@@ -138,11 +138,11 @@ Status HdfsReadableFile::Close() {
138138
}
139139

140140
Status HdfsReadableFile::ReadAt(
141-
int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
141+
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
142142
return impl_->ReadAt(position, nbytes, bytes_read, buffer);
143143
}
144144

145-
Status HdfsReadableFile::Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
145+
Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
146146
return impl_->Read(nbytes, bytes_read, buffer);
147147
}
148148

@@ -177,7 +177,7 @@ class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl {
177177
return Status::OK();
178178
}
179179

180-
Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written) {
180+
Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written) {
181181
tSize ret = hdfsWrite(fs_, file_, reinterpret_cast<const void*>(buffer), nbytes);
182182
CHECK_FAILURE(ret, "Write");
183183
*bytes_written = ret;
@@ -198,12 +198,12 @@ Status HdfsWriteableFile::Close() {
198198
}
199199

200200
Status HdfsWriteableFile::Write(
201-
const uint8_t* buffer, int32_t nbytes, int32_t* bytes_read) {
201+
const uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) {
202202
return impl_->Write(buffer, nbytes, bytes_read);
203203
}
204204

205-
Status HdfsWriteableFile::Write(const uint8_t* buffer, int32_t nbytes) {
206-
int32_t bytes_written_dummy = 0;
205+
Status HdfsWriteableFile::Write(const uint8_t* buffer, int64_t nbytes) {
206+
int64_t bytes_written_dummy = 0;
207207
return Write(buffer, nbytes, &bytes_written_dummy);
208208
}
209209

cpp/src/arrow/io/hdfs.h

+4-4
Original file line numberDiff line numberDiff line change
@@ -164,14 +164,14 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
164164
Status GetSize(int64_t* size) override;
165165

166166
Status ReadAt(
167-
int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override;
167+
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
168168

169169
Status Seek(int64_t position) override;
170170
Status Tell(int64_t* position) override;
171171

172172
// NOTE: If you wish to read a particular range of a file in a multithreaded
173173
// context, you may prefer to use ReadAt to avoid locking issues
174-
Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override;
174+
Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
175175

176176
private:
177177
class ARROW_NO_EXPORT HdfsReadableFileImpl;
@@ -189,9 +189,9 @@ class ARROW_EXPORT HdfsWriteableFile : public WriteableFile {
189189

190190
Status Close() override;
191191

192-
Status Write(const uint8_t* buffer, int32_t nbytes) override;
192+
Status Write(const uint8_t* buffer, int64_t nbytes) override;
193193

194-
Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written);
194+
Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written);
195195

196196
Status Tell(int64_t* position) override;
197197

cpp/src/arrow/io/interfaces.h

+7-7
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
#ifndef ARROW_IO_INTERFACES
19-
#define ARROW_IO_INTERFACES
18+
#ifndef ARROW_IO_INTERFACES_H
19+
#define ARROW_IO_INTERFACES_H
2020

2121
#include <cstdint>
2222

@@ -40,17 +40,17 @@ class FileSystemClient {
4040
};
4141

4242
class FileBase {
43+
public:
4344
virtual Status Close() = 0;
44-
4545
virtual Status Tell(int64_t* position) = 0;
4646
};
4747

4848
class ReadableFile : public FileBase {
4949
public:
5050
virtual Status ReadAt(
51-
int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0;
51+
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0;
5252

53-
virtual Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0;
53+
virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0;
5454

5555
virtual Status GetSize(int64_t* size) = 0;
5656
};
@@ -62,10 +62,10 @@ class RandomAccessFile : public ReadableFile {
6262

6363
class WriteableFile : public FileBase {
6464
public:
65-
virtual Status Write(const uint8_t* buffer, int32_t nbytes) = 0;
65+
virtual Status Write(const uint8_t* buffer, int64_t nbytes) = 0;
6666
};
6767

6868
} // namespace io
6969
} // namespace arrow
7070

71-
#endif // ARROW_IO_INTERFACES
71+
#endif // ARROW_IO_INTERFACES_H

cpp/src/arrow/parquet/CMakeLists.txt

+5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
# arrow_parquet : Arrow <-> Parquet adapter
2020

2121
set(PARQUET_SRCS
22+
io.cc
2223
reader.cc
2324
schema.cc
2425
writer.cc
@@ -48,8 +49,12 @@ ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet)
4849
ADD_ARROW_TEST(parquet-io-test)
4950
ARROW_TEST_LINK_LIBRARIES(parquet-io-test arrow_parquet)
5051

52+
ADD_ARROW_TEST(parquet-reader-writer-test)
53+
ARROW_TEST_LINK_LIBRARIES(parquet-reader-writer-test arrow_parquet)
54+
5155
# Headers: top level
5256
install(FILES
57+
io.h
5358
reader.h
5459
schema.h
5560
utils.h

cpp/src/arrow/parquet/io.cc

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/parquet/io.h"
19+
20+
#include <cstdint>
21+
#include <memory>
22+
23+
#include "parquet/api/io.h"
24+
25+
#include "arrow/parquet/utils.h"
26+
#include "arrow/util/memory-pool.h"
27+
#include "arrow/util/status.h"
28+
29+
// To assist with readability
30+
using ArrowROFile = arrow::io::RandomAccessFile;
31+
32+
namespace arrow {
33+
namespace parquet {
34+
35+
// ----------------------------------------------------------------------
36+
// ParquetAllocator
37+
38+
ParquetAllocator::ParquetAllocator() : pool_(default_memory_pool()) {}
39+
40+
ParquetAllocator::ParquetAllocator(MemoryPool* pool) : pool_(pool) {}
41+
42+
ParquetAllocator::~ParquetAllocator() {}
43+
44+
uint8_t* ParquetAllocator::Malloc(int64_t size) {
45+
uint8_t* result;
46+
PARQUET_THROW_NOT_OK(pool_->Allocate(size, &result));
47+
return result;
48+
}
49+
50+
void ParquetAllocator::Free(uint8_t* buffer, int64_t size) {
51+
// Does not report Status
52+
pool_->Free(buffer, size);
53+
}
54+
55+
// ----------------------------------------------------------------------
56+
// ParquetReadSource
57+
58+
ParquetReadSource::ParquetReadSource(
59+
const std::shared_ptr<ArrowROFile>& file, ParquetAllocator* allocator)
60+
: file_(file), allocator_(allocator) {}
61+
62+
void ParquetReadSource::Close() {
63+
PARQUET_THROW_NOT_OK(file_->Close());
64+
}
65+
66+
int64_t ParquetReadSource::Tell() const {
67+
int64_t position;
68+
PARQUET_THROW_NOT_OK(file_->Tell(&position));
69+
return position;
70+
}
71+
72+
void ParquetReadSource::Seek(int64_t position) {
73+
PARQUET_THROW_NOT_OK(file_->Seek(position));
74+
}
75+
76+
int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) {
77+
int64_t bytes_read;
78+
PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out));
79+
return bytes_read;
80+
}
81+
82+
std::shared_ptr<::parquet::Buffer> ParquetReadSource::Read(int64_t nbytes) {
83+
// TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests
84+
// that there should be more code sharing amongst file-like sources
85+
auto result = std::make_shared<::parquet::OwnedMutableBuffer>(0, allocator_);
86+
result->Resize(nbytes);
87+
88+
int64_t bytes_read = Read(nbytes, result->mutable_data());
89+
if (bytes_read < nbytes) { result->Resize(bytes_read); }
90+
return result;
91+
}
92+
93+
} // namespace parquet
94+
} // namespace arrow

cpp/src/arrow/parquet/io.h

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// Bridges Arrow's IO interfaces and Parquet-cpp's IO interfaces
19+
20+
#ifndef ARROW_PARQUET_IO_H
21+
#define ARROW_PARQUET_IO_H
22+
23+
#include <cstdint>
24+
#include <memory>
25+
26+
#include "parquet/api/io.h"
27+
28+
#include "arrow/io/interfaces.h"
29+
#include "arrow/util/visibility.h"
30+
31+
namespace arrow {
32+
33+
class MemoryPool;
34+
35+
namespace parquet {
36+
37+
// An implementation of the Parquet MemoryAllocator API that plugs into an
38+
// existing Arrow memory pool. This way we can direct all allocations to a
39+
// single place rather than tracking allocations in different locations (for
40+
// example: without utilizing parquet-cpp's default allocator)
41+
class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator {
42+
public:
43+
// Uses the default memory pool
44+
ParquetAllocator();
45+
46+
explicit ParquetAllocator(MemoryPool* pool);
47+
virtual ~ParquetAllocator();
48+
49+
uint8_t* Malloc(int64_t size) override;
50+
void Free(uint8_t* buffer, int64_t size) override;
51+
52+
MemoryPool* pool() { return pool_; }
53+
54+
private:
55+
MemoryPool* pool_;
56+
};
57+
58+
class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource {
59+
public:
60+
ParquetReadSource(
61+
const std::shared_ptr<io::RandomAccessFile>& file, ParquetAllocator* allocator);
62+
63+
void Close() override;
64+
int64_t Tell() const override;
65+
void Seek(int64_t pos) override;
66+
int64_t Read(int64_t nbytes, uint8_t* out) override;
67+
std::shared_ptr<::parquet::Buffer> Read(int64_t nbytes) override;
68+
69+
private:
70+
// An Arrow readable file of some kind
71+
std::shared_ptr<io::RandomAccessFile> file_;
72+
73+
// The allocator is required for creating managed buffers
74+
ParquetAllocator* allocator_;
75+
};
76+
77+
} // namespace parquet
78+
} // namespace arrow
79+
80+
#endif // ARROW_PARQUET_IO_H

0 commit comments

Comments
 (0)