Skip to content

Commit a667fca

Browse files
robertnishiharapcmoritz
authored andcommitted
ARROW-3920: [plasma] Fix reference counting in custom tensorflow plasma operator.
There is an issue here where `Release` was never being called in the plasma TF operator. Note that I also changed the release delay in the plasma operator to 0. Author: Robert Nishihara <robertnishihara@gmail.com> Author: Philipp Moritz <pcmoritz@gmail.com> Closes apache#3061 from robertnishihara/extrareleaseinplasmaop and squashes the following commits: c109566 <Philipp Moritz> add include guards f89d5df <Philipp Moritz> lint 4836342 <Philipp Moritz> unregister memory e3b3864 <Robert Nishihara> Linting b948ce0 <Robert Nishihara> Add test. 75f2bd9 <Robert Nishihara> Remove logging statement. f04a7d2 <Robert Nishihara> Fix 574c035 <Robert Nishihara> Fix ndarray/tensor confusion in plasma op. 06985cd <Robert Nishihara> Have plasma op deserialize as numpy array. a2a9c36 <Robert Nishihara> Add release call into wrapped_callback. 0db9154 <Robert Nishihara> Change release delay to 0. f434094 <Robert Nishihara> Add Release call in plasma op.
1 parent 2bc4d95 commit a667fca

File tree

6 files changed

+58
-38
lines changed

6 files changed

+58
-38
lines changed

cpp/src/arrow/python/deserialize.cc

+9-8
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu
361361

362362
ipc::Message message(metadata, body);
363363

364-
RETURN_NOT_OK(ReadTensor(message, &tensor));
364+
RETURN_NOT_OK(ipc::ReadTensor(message, &tensor));
365365
out->tensors.emplace_back(std::move(tensor));
366366
}
367367

@@ -375,7 +375,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu
375375

376376
ipc::Message message(metadata, body);
377377

378-
RETURN_NOT_OK(ReadTensor(message, &tensor));
378+
RETURN_NOT_OK(ipc::ReadTensor(message, &tensor));
379379
out->ndarrays.emplace_back(std::move(tensor));
380380
}
381381

@@ -389,19 +389,20 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu
389389
return Status::OK();
390390
}
391391

392-
Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out) {
393-
if (object.tensors.size() != 1) {
394-
return Status::Invalid("Object is not a Tensor");
392+
Status DeserializeNdarray(const SerializedPyObject& object,
393+
std::shared_ptr<Tensor>* out) {
394+
if (object.ndarrays.size() != 1) {
395+
return Status::Invalid("Object is not an Ndarray");
395396
}
396-
*out = object.tensors[0];
397+
*out = object.ndarrays[0];
397398
return Status::OK();
398399
}
399400

400-
Status ReadTensor(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
401+
Status NdarrayFromBuffer(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
401402
io::BufferReader reader(src);
402403
SerializedPyObject object;
403404
RETURN_NOT_OK(ReadSerializedObject(&reader, &object));
404-
return DeserializeTensor(object, out);
405+
return DeserializeNdarray(object, out);
405406
}
406407

407408
} // namespace py

cpp/src/arrow/python/deserialize.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,15 @@ ARROW_EXPORT
7676
Status DeserializeObject(PyObject* context, const SerializedPyObject& object,
7777
PyObject* base, PyObject** out);
7878

79-
/// \brief Reconstruct Tensor from Arrow-serialized representation
79+
/// \brief Reconstruct Ndarray from Arrow-serialized representation
8080
/// \param[in] object Object to deserialize
8181
/// \param[out] out The deserialized tensor
8282
/// \return Status
8383
ARROW_EXPORT
84-
Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out);
84+
Status DeserializeNdarray(const SerializedPyObject& object, std::shared_ptr<Tensor>* out);
8585

8686
ARROW_EXPORT
87-
Status ReadTensor(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out);
87+
Status NdarrayFromBuffer(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out);
8888

8989
} // namespace py
9090
} // namespace arrow

cpp/src/arrow/python/serialize.cc

+7-7
Original file line numberDiff line numberDiff line change
@@ -752,23 +752,23 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject
752752
return Status::OK();
753753
}
754754

755-
Status SerializeTensor(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
755+
Status SerializeNdarray(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
756756
std::shared_ptr<Array> array;
757757
SequenceBuilder builder;
758-
RETURN_NOT_OK(builder.AppendTensor(static_cast<int32_t>(out->tensors.size())));
759-
out->tensors.push_back(tensor);
758+
RETURN_NOT_OK(builder.AppendNdarray(static_cast<int32_t>(out->ndarrays.size())));
759+
out->ndarrays.push_back(tensor);
760760
RETURN_NOT_OK(builder.Finish(nullptr, nullptr, nullptr, nullptr, &array));
761761
out->batch = MakeBatch(array);
762762
return Status::OK();
763763
}
764764

765-
Status WriteTensorHeader(std::shared_ptr<DataType> dtype,
766-
const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
767-
io::OutputStream* dst) {
765+
Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
766+
const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
767+
io::OutputStream* dst) {
768768
auto empty_tensor = std::make_shared<Tensor>(
769769
dtype, std::make_shared<Buffer>(nullptr, tensor_num_bytes), shape);
770770
SerializedPyObject serialized_tensor;
771-
RETURN_NOT_OK(SerializeTensor(empty_tensor, &serialized_tensor));
771+
RETURN_NOT_OK(SerializeNdarray(empty_tensor, &serialized_tensor));
772772
return serialized_tensor.WriteTo(dst);
773773
}
774774

cpp/src/arrow/python/serialize.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ Status SerializeTensor(std::shared_ptr<Tensor> tensor, py::SerializedPyObject* o
103103
/// \param[in] dst The OutputStream to write the Tensor header to
104104
/// \return Status
105105
ARROW_EXPORT
106-
Status WriteTensorHeader(std::shared_ptr<DataType> dtype,
107-
const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
108-
io::OutputStream* dst);
106+
Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
107+
const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
108+
io::OutputStream* dst);
109109

110110
} // namespace py
111111

python/pyarrow/tensorflow/plasma_op.cc

+32-16
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
7777
if (!connected_) {
7878
VLOG(1) << "Connecting to Plasma...";
7979
ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
80-
plasma_manager_socket_name_,
81-
plasma::kPlasmaDefaultReleaseDelay));
80+
plasma_manager_socket_name_, 0));
8281
VLOG(1) << "Connected!";
8382
connected_ = true;
8483
}
@@ -141,7 +140,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
141140
std::vector<int64_t> shape = {total_bytes / byte_width};
142141

143142
arrow::io::MockOutputStream mock;
144-
ARROW_CHECK_OK(arrow::py::WriteTensorHeader(arrow_dtype, shape, 0, &mock));
143+
ARROW_CHECK_OK(arrow::py::WriteNdarrayHeader(arrow_dtype, shape, 0, &mock));
145144
int64_t header_size = mock.GetExtentBytesWritten();
146145

147146
std::shared_ptr<Buffer> data_buffer;
@@ -153,15 +152,21 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
153152

154153
int64_t offset;
155154
arrow::io::FixedSizeBufferWriter buf(data_buffer);
156-
ARROW_CHECK_OK(arrow::py::WriteTensorHeader(arrow_dtype, shape, total_bytes, &buf));
155+
ARROW_CHECK_OK(arrow::py::WriteNdarrayHeader(arrow_dtype, shape, total_bytes, &buf));
157156
ARROW_CHECK_OK(buf.Tell(&offset));
158157

159158
uint8_t* data = reinterpret_cast<uint8_t*>(data_buffer->mutable_data() + offset);
160159

161-
auto wrapped_callback = [this, context, done, data_buffer, object_id]() {
160+
auto wrapped_callback = [this, context, done, data_buffer, data, object_id]() {
162161
{
163162
tf::mutex_lock lock(mu_);
164163
ARROW_CHECK_OK(client_.Seal(object_id));
164+
ARROW_CHECK_OK(client_.Release(object_id));
165+
#ifdef GOOGLE_CUDA
166+
auto orig_stream = context->op_device_context()->stream();
167+
auto stream_executor = orig_stream->parent();
168+
CHECK(stream_executor->HostMemoryUnregister(static_cast<void*>(data)));
169+
#endif
165170
}
166171
context->SetStatus(tensorflow::Status::OK());
167172
done();
@@ -244,8 +249,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
244249
if (!connected_) {
245250
VLOG(1) << "Connecting to Plasma...";
246251
ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
247-
plasma_manager_socket_name_,
248-
plasma::kPlasmaDefaultReleaseDelay));
252+
plasma_manager_socket_name_, 0));
249253
VLOG(1) << "Connected!";
250254
connected_ = true;
251255
}
@@ -284,25 +288,39 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
284288
/*timeout_ms=*/-1, &object_buffer));
285289
}
286290

287-
std::shared_ptr<arrow::Tensor> tensor;
288-
ARROW_CHECK_OK(arrow::py::ReadTensor(object_buffer.data, &tensor));
291+
std::shared_ptr<arrow::Tensor> ndarray;
292+
ARROW_CHECK_OK(arrow::py::NdarrayFromBuffer(object_buffer.data, &ndarray));
289293

290-
int64_t byte_width = get_byte_width(*tensor->type());
291-
const int64_t size_in_bytes = tensor->data()->size();
294+
int64_t byte_width = get_byte_width(*ndarray->type());
295+
const int64_t size_in_bytes = ndarray->data()->size();
292296

293297
tf::TensorShape shape({static_cast<int64_t>(size_in_bytes / byte_width)});
294298

295-
const float* plasma_data = reinterpret_cast<const float*>(tensor->raw_data());
299+
const float* plasma_data = reinterpret_cast<const float*>(ndarray->raw_data());
296300

297301
tf::Tensor* output_tensor = nullptr;
298302
OP_REQUIRES_OK_ASYNC(context, context->allocate_output(0, shape, &output_tensor),
299303
done);
300304

305+
auto wrapped_callback = [this, context, done, plasma_data, object_id]() {
306+
{
307+
tf::mutex_lock lock(mu_);
308+
ARROW_CHECK_OK(client_.Release(object_id));
309+
#ifdef GOOGLE_CUDA
310+
auto orig_stream = context->op_device_context()->stream();
311+
auto stream_executor = orig_stream->parent();
312+
CHECK(stream_executor->HostMemoryUnregister(
313+
const_cast<void*>(static_cast<const void*>(plasma_data))));
314+
#endif
315+
}
316+
done();
317+
};
318+
301319
if (std::is_same<Device, CPUDevice>::value) {
302320
std::memcpy(
303321
reinterpret_cast<void*>(const_cast<char*>(output_tensor->tensor_data().data())),
304322
plasma_data, size_in_bytes);
305-
done();
323+
wrapped_callback();
306324
} else {
307325
#ifdef GOOGLE_CUDA
308326
auto orig_stream = context->op_device_context()->stream();
@@ -319,8 +337,6 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
319337
}
320338

321339
// Important. See note in T2P op.
322-
// We don't check the return status since the host memory might've been
323-
// already registered (e.g., the TensorToPlasmaOp might've been run).
324340
CHECK(stream_executor->HostMemoryRegister(
325341
const_cast<void*>(static_cast<const void*>(plasma_data)),
326342
static_cast<tf::uint64>(size_in_bytes)));
@@ -341,7 +357,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
341357
CHECK(orig_stream->ThenWaitFor(h2d_stream).ok());
342358

343359
context->device()->tensorflow_gpu_device_info()->event_mgr->ThenExecute(
344-
h2d_stream, std::move(done));
360+
h2d_stream, std::move(wrapped_callback));
345361
#endif
346362
}
347363
}

python/pyarrow/tests/test_plasma_tf_op.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ def FromPlasma():
7070
# Try getting the data from Python
7171
plasma_object_id = plasma.ObjectID(object_id)
7272
obj = client.get(plasma_object_id)
73-
obj = obj.to_numpy()
7473

7574
# Deserialized Tensor should be 64-byte aligned.
7675
assert obj.ctypes.data % 64 == 0
@@ -100,3 +99,7 @@ def test_plasma_tf_op(use_gpu=False):
10099
np.int8, np.int16, np.int32, np.int64]:
101100
run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,
102101
client, use_gpu, dtype)
102+
103+
# Make sure the objects have been released.
104+
for _, info in client.list().items():
105+
assert info['ref_count'] == 0

0 commit comments

Comments
 (0)