Skip to content

Commit 193031b

Browse files
authored
Merge pull request apache#2 from songqing/plasma_queue_client
Add plasma queue python interface. It calls functions in "cpp/src/plasma/client.h" and is called by Ray file "python/ray/worker.py"
2 parents a4fbd91 + 9577637 commit 193031b

File tree

2 files changed

+154
-1
lines changed

2 files changed

+154
-1
lines changed

cpp/cmake_modules/CompilerInfo.cmake

+5
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ elseif("${COMPILER_VERSION_FULL}" MATCHES ".*clang-9")
7878
set(COMPILER_FAMILY "clang")
7979
set(COMPILER_VERSION "4.0.0svn")
8080

81+
# clang on Mac OS X, XCode 9.
82+
elseif("${COMPILER_VERSION_FULL}" MATCHES ".*clang-10")
83+
set(COMPILER_FAMILY "clang")
84+
set(COMPILER_VERSION "4.1.0svn")
85+
8186
# gcc
8287
elseif("${COMPILER_VERSION_FULL_LOWER}" MATCHES ".*gcc[ -]version.*")
8388
set(COMPILER_FAMILY "gcc")

python/pyarrow/_plasma.pyx

+149-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ from libcpp cimport bool as c_bool, nullptr
2323
from libcpp.memory cimport shared_ptr, unique_ptr, make_shared
2424
from libcpp.string cimport string as c_string
2525
from libcpp.vector cimport vector as c_vector
26-
from libc.stdint cimport int64_t, uint8_t, uintptr_t
26+
from libc.stdint cimport int64_t, uint8_t, uintptr_t, uint32_t, uint64_t
2727
from cpython.pycapsule cimport *
2828

2929
import collections
@@ -113,6 +113,22 @@ cdef extern from "plasma/client.h" nogil:
113113
CStatus Transfer(const char* addr, int port,
114114
const CUniqueID& object_id)
115115

116+
# Interfaces that are related to plasma queue.
117+
CStatus CreateQueue(const CUniqueID& object_id, int64_t data_size,
118+
const shared_ptr[CBuffer]* data)
119+
120+
CStatus GetQueue(const CUniqueID& object_id, int64_t timeout_ms,
121+
int* fd)
122+
123+
CStatus CreateQueueItem(const CUniqueID& object_id, uint32_t data_size,
124+
shared_ptr[CBuffer]* data,
125+
uint64_t& seq_id)
126+
127+
CStatus SealQueueItem(const CUniqueID& object_id, uint64_t seq_id,
128+
shared_ptr[CBuffer] data)
129+
130+
CStatus GetQueueItem(const CUniqueID& object_id,
131+
CObjectBuffer* object_buffer, uint64_t& seq_id)
116132

117133
cdef extern from "plasma/client.h" nogil:
118134

@@ -396,6 +412,138 @@ cdef class PlasmaClient:
396412
self.seal(target_id)
397413
return target_id
398414

415+
def create_queue(self, ObjectID queue_id=None, total_bytes=1024000):
416+
"""
417+
Store a plasma queue into the object store.
418+
419+
Parameters
420+
----------
421+
queue_id : ObjectID, default None
422+
If this is provided, the specified object ID will be used to refer
423+
to the object.
424+
-------
425+
"""
426+
cdef ObjectID target_id = (queue_id if queue_id
427+
else ObjectID.from_random())
428+
"""
429+
Create a new buffer in the PlasmaStore for a particular object ID.
430+
The returned buffer is unmutable as seal is called.
431+
432+
Raises
433+
------
434+
PlasmaObjectExists
435+
This exception is raised if the object could not be created because
436+
there already is an object with the same ID in the plasma store.
437+
438+
PlasmaStoreFull: This exception is raised if the object could
439+
not be created because the plasma store is unable to evict
440+
enough objects to create room for it.
441+
"""
442+
# Apply a buffer from plasma store for the plasma queue
443+
cdef shared_ptr[CBuffer] data
444+
check_status(self.client.get().CreateQueue(target_id.data,
445+
total_bytes, &data))
446+
447+
def push_queue(self, object value, ObjectID queue_id=None,
448+
int memcopy_threads=6, serialization_context=None):
449+
"""
450+
Push a Python value into the plasma queue.
451+
452+
Parameters
453+
----------
454+
value : object
455+
A Python object to store.
456+
queue_id : ObjectID, default None
457+
If this is provided, the specified object ID will be used to refer
458+
to the plasma queue.
459+
memcopy_threads : int, default 6
460+
The number of threads to use to write the serialized object into
461+
the object store for large objects.
462+
serialization_context : pyarrow.SerializationContext, default None
463+
Custom serialization and deserialization context.
464+
465+
"""
466+
cdef ObjectID target_id = (queue_id if queue_id
467+
else ObjectID.from_random())
468+
serialized = pyarrow.serialize(value, serialization_context)
469+
470+
# Return buffer from the plasma queue to write
471+
cdef uint64_t seq_id = 0
472+
cdef shared_ptr[CBuffer] data
473+
check_status(self.client.get().CreateQueueItem(target_id.data,
474+
serialized.total_bytes, &data, seq_id))
475+
buffer = self._make_mutable_plasma_buffer(target_id,
476+
data.get().mutable_data(),
477+
serialized.total_bytes)
478+
479+
stream = pyarrow.FixedSizeBufferWriter(buffer)
480+
stream.set_memcopy_threads(memcopy_threads)
481+
serialized.write_to(stream)
482+
with nogil:
483+
check_status(self.client.get().SealQueueItem(target_id.data,
484+
seq_id, data))
485+
486+
def get_queue(self, queue_id, timeout_ms=-1):
487+
"""
488+
Subscribe the corresponding plasma queue.
489+
490+
Parameters
491+
----------
492+
queue_id : ObjectID
493+
Object ID associated to the plasma queue we get
494+
from the store.
495+
timeout_ms : int, default -1
496+
The number of milliseconds that the get call should block before
497+
timing out and returning. Pass -1 if the call should block and 0
498+
if the call should return immediately.
499+
500+
Returns
501+
-------
502+
"""
503+
504+
cdef ObjectID target_id = (queue_id if queue_id
505+
else ObjectID.from_random())
506+
cdef int nofify_fd
507+
check_status(self.client.get().GetQueue(target_id.data,
508+
timeout_ms, &nofify_fd))
509+
510+
def read_queue(self, queue_id, index, int timeout_ms=-1,
511+
serialization_context=None):
512+
"""
513+
Get one Python value from the plasma queue.
514+
515+
Parameters
516+
----------
517+
queue_id : ObjectID
518+
Object ID associated to the plasma queue we get
519+
from the store.
520+
timeout_ms : int, default -1
521+
The number of milliseconds that the get call should block before
522+
timing out and returning. Pass -1 if the call should block and 0
523+
if the call should return immediately.
524+
serialization_context : pyarrow.SerializationContext, default None
525+
Custom serialization and deserialization context.
526+
527+
Returns
528+
-------
529+
object
530+
Python value associated with the current index in the plasma queue,
531+
and ObjectNotAvailable if the object was not available.
532+
"""
533+
534+
cdef ObjectID target_id = (queue_id if queue_id
535+
else ObjectID.from_random())
536+
cdef CObjectBuffer object_buffer
537+
check_status(self.client.get().GetQueueItem(target_id.data,
538+
&object_buffer, index))
539+
buffer = pyarrow_wrap_buffer(object_buffer.data)
540+
# buffer is None if this object was not available within the timeout
541+
if buffer:
542+
value = pyarrow.deserialize(buffer, serialization_context)
543+
return value
544+
else:
545+
return ObjectNotAvailable
546+
399547
def get(self, object_ids, int timeout_ms=-1, serialization_context=None):
400548
"""
401549
Get one or more Python values from the object store.

0 commit comments

Comments
 (0)