Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-237: Implement parquet-cpp's abstract IO interfaces for memory allocation and file reading #101

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/io/hdfs-io-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ TEST_F(TestHdfsClient, ReadableMethods) {
ASSERT_EQ(size, file_size);

uint8_t buffer[50];
int32_t bytes_read = 0;
int64_t bytes_read = 0;

ASSERT_OK(file->Read(50, &bytes_read, buffer));
ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/arrow/io/hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
return Status::OK();
}

Status ReadAt(int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for making all these int64_t

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I only originally did int32_t because libhdfs has some size limits, fixed now

tSize ret = hdfsPread(fs_, file_, static_cast<tOffset>(position),
reinterpret_cast<void*>(buffer), nbytes);
RETURN_NOT_OK(CheckReadResult(ret));
*bytes_read = ret;
return Status::OK();
}

Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer), nbytes);
RETURN_NOT_OK(CheckReadResult(ret));
*bytes_read = ret;
Expand Down Expand Up @@ -138,11 +138,11 @@ Status HdfsReadableFile::Close() {
}

Status HdfsReadableFile::ReadAt(
int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
return impl_->ReadAt(position, nbytes, bytes_read, buffer);
}

Status HdfsReadableFile::Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
return impl_->Read(nbytes, bytes_read, buffer);
}

Expand Down Expand Up @@ -177,7 +177,7 @@ class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl {
return Status::OK();
}

Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written) {
Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written) {
tSize ret = hdfsWrite(fs_, file_, reinterpret_cast<const void*>(buffer), nbytes);
CHECK_FAILURE(ret, "Write");
*bytes_written = ret;
Expand All @@ -198,12 +198,12 @@ Status HdfsWriteableFile::Close() {
}

Status HdfsWriteableFile::Write(
const uint8_t* buffer, int32_t nbytes, int32_t* bytes_read) {
const uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) {
return impl_->Write(buffer, nbytes, bytes_read);
}

Status HdfsWriteableFile::Write(const uint8_t* buffer, int32_t nbytes) {
int32_t bytes_written_dummy = 0;
Status HdfsWriteableFile::Write(const uint8_t* buffer, int64_t nbytes) {
int64_t bytes_written_dummy = 0;
return Write(buffer, nbytes, &bytes_written_dummy);
}

Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/io/hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,14 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
Status GetSize(int64_t* size) override;

Status ReadAt(
int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override;
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;

Status Seek(int64_t position) override;
Status Tell(int64_t* position) override;

// NOTE: If you wish to read a particular range of a file in a multithreaded
// context, you may prefer to use ReadAt to avoid locking issues
Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override;
Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;

private:
class ARROW_NO_EXPORT HdfsReadableFileImpl;
Expand All @@ -189,9 +189,9 @@ class ARROW_EXPORT HdfsWriteableFile : public WriteableFile {

Status Close() override;

Status Write(const uint8_t* buffer, int32_t nbytes) override;
Status Write(const uint8_t* buffer, int64_t nbytes) override;

Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written);
Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written);

Status Tell(int64_t* position) override;

Expand Down
14 changes: 7 additions & 7 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#ifndef ARROW_IO_INTERFACES
#define ARROW_IO_INTERFACES
#ifndef ARROW_IO_INTERFACES_H
#define ARROW_IO_INTERFACES_H

#include <cstdint>

Expand All @@ -40,17 +40,17 @@ class FileSystemClient {
};

class FileBase {
public:
virtual Status Close() = 0;

virtual Status Tell(int64_t* position) = 0;
};

class ReadableFile : public FileBase {
public:
virtual Status ReadAt(
int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0;
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0;

virtual Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0;
virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0;

virtual Status GetSize(int64_t* size) = 0;
};
Expand All @@ -62,10 +62,10 @@ class RandomAccessFile : public ReadableFile {

class WriteableFile : public FileBase {
public:
virtual Status Write(const uint8_t* buffer, int32_t nbytes) = 0;
virtual Status Write(const uint8_t* buffer, int64_t nbytes) = 0;
};

} // namespace io
} // namespace arrow

#endif // ARROW_IO_INTERFACES
#endif // ARROW_IO_INTERFACES_H
5 changes: 5 additions & 0 deletions cpp/src/arrow/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# arrow_parquet : Arrow <-> Parquet adapter

set(PARQUET_SRCS
io.cc
reader.cc
schema.cc
writer.cc
Expand Down Expand Up @@ -48,8 +49,12 @@ ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet)
ADD_ARROW_TEST(parquet-io-test)
ARROW_TEST_LINK_LIBRARIES(parquet-io-test arrow_parquet)

ADD_ARROW_TEST(parquet-reader-writer-test)
ARROW_TEST_LINK_LIBRARIES(parquet-reader-writer-test arrow_parquet)

# Headers: top level
install(FILES
io.h
reader.h
schema.h
utils.h
Expand Down
94 changes: 94 additions & 0 deletions cpp/src/arrow/parquet/io.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/parquet/io.h"

#include <cstdint>
#include <memory>

#include "parquet/api/io.h"

#include "arrow/parquet/utils.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

// To assist with readability
using ArrowROFile = arrow::io::RandomAccessFile;

namespace arrow {
namespace parquet {

// ----------------------------------------------------------------------
// ParquetAllocator

ParquetAllocator::ParquetAllocator() : pool_(default_memory_pool()) {}

ParquetAllocator::ParquetAllocator(MemoryPool* pool) : pool_(pool) {}

ParquetAllocator::~ParquetAllocator() {}

uint8_t* ParquetAllocator::Malloc(int64_t size) {
uint8_t* result;
PARQUET_THROW_NOT_OK(pool_->Allocate(size, &result));
return result;
}

void ParquetAllocator::Free(uint8_t* buffer, int64_t size) {
// Does not report Status
pool_->Free(buffer, size);
}

// ----------------------------------------------------------------------
// ParquetReadSource

ParquetReadSource::ParquetReadSource(
const std::shared_ptr<ArrowROFile>& file, ParquetAllocator* allocator)
: file_(file), allocator_(allocator) {}

void ParquetReadSource::Close() {
PARQUET_THROW_NOT_OK(file_->Close());
}

int64_t ParquetReadSource::Tell() const {
int64_t position;
PARQUET_THROW_NOT_OK(file_->Tell(&position));
return position;
}

void ParquetReadSource::Seek(int64_t position) {
PARQUET_THROW_NOT_OK(file_->Seek(position));
}

int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) {
int64_t bytes_read;
PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out));
return bytes_read;
}

std::shared_ptr<::parquet::Buffer> ParquetReadSource::Read(int64_t nbytes) {
// TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests
// that there should be more code sharing amongst file-like sources
auto result = std::make_shared<::parquet::OwnedMutableBuffer>(0, allocator_);
result->Resize(nbytes);

int64_t bytes_read = Read(nbytes, result->mutable_data());
if (bytes_read < nbytes) { result->Resize(bytes_read); }
return result;
}

} // namespace parquet
} // namespace arrow
80 changes: 80 additions & 0 deletions cpp/src/arrow/parquet/io.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Bridges Arrow's IO interfaces and Parquet-cpp's IO interfaces

#ifndef ARROW_PARQUET_IO_H
#define ARROW_PARQUET_IO_H

#include <cstdint>
#include <memory>

#include "parquet/api/io.h"

#include "arrow/io/interfaces.h"
#include "arrow/util/visibility.h"

namespace arrow {

class MemoryPool;

namespace parquet {

// An implementation of the Parquet MemoryAllocator API that plugs into an
// existing Arrow memory pool. This way we can direct all allocations to a
// single place rather than tracking allocations in different locations (for
// example: without utilizing parquet-cpp's default allocator)
class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator {
public:
// Uses the default memory pool
ParquetAllocator();

explicit ParquetAllocator(MemoryPool* pool);
virtual ~ParquetAllocator();

uint8_t* Malloc(int64_t size) override;
void Free(uint8_t* buffer, int64_t size) override;

MemoryPool* pool() { return pool_; }

private:
MemoryPool* pool_;
};

class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually a bit bothered if it really is a good idea to have the same export macro for different shared libs. Sadly this thought did not come up with the previous review but it would probably be better to have ARROW_PARQUET_EXPORT and ARROW_IO_EXPORT.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. From my point of view, what matters is not conflicting with any 3rd party macros that you may encounter in headers. Unless you expect to have some differing interpretation of visibility between leaf libraries (does not seem too likely). Since we can control name conflicts within Arrow at least, it's not really a problem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I cannot make a problem up (yet). I was just bothered if it could be one. But as long as things work fine, we don't need to have to these macros.

public:
ParquetReadSource(
const std::shared_ptr<io::RandomAccessFile>& file, ParquetAllocator* allocator);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a shared_ptr here is potentially inflexible, but currently the HDFS classes return shared_ptrs so that's why I chose that here


void Close() override;
int64_t Tell() const override;
void Seek(int64_t pos) override;
int64_t Read(int64_t nbytes, uint8_t* out) override;
std::shared_ptr<::parquet::Buffer> Read(int64_t nbytes) override;

private:
// An Arrow readable file of some kind
std::shared_ptr<io::RandomAccessFile> file_;

// The allocator is required for creating managed buffers
ParquetAllocator* allocator_;
};

} // namespace parquet
} // namespace arrow

#endif // ARROW_PARQUET_IO_H
Loading