@@ -23,7 +23,7 @@ from libcpp cimport bool as c_bool, nullptr
23
23
from libcpp.memory cimport shared_ptr, unique_ptr, make_shared
24
24
from libcpp.string cimport string as c_string
25
25
from libcpp.vector cimport vector as c_vector
26
- from libc.stdint cimport int64_t, uint8_t, uintptr_t
26
+ from libc.stdint cimport int64_t, uint8_t, uintptr_t, uint32_t, uint64_t
27
27
from cpython.pycapsule cimport *
28
28
29
29
import collections
@@ -113,6 +113,22 @@ cdef extern from "plasma/client.h" nogil:
113
113
CStatus Transfer(const char * addr, int port,
114
114
const CUniqueID& object_id)
115
115
116
+ # Interfaces that are related to plasma queue.
117
+ CStatus CreateQueue(const CUniqueID& object_id, int64_t data_size,
118
+ const shared_ptr[CBuffer]* data)
119
+
120
+ CStatus GetQueue(const CUniqueID& object_id, int64_t timeout_ms,
121
+ int * fd)
122
+
123
+ CStatus CreateQueueItem(const CUniqueID& object_id, uint32_t data_size,
124
+ shared_ptr[CBuffer]* data,
125
+ uint64_t& seq_id)
126
+
127
+ CStatus SealQueueItem(const CUniqueID& object_id, uint64_t seq_id,
128
+ shared_ptr[CBuffer] data)
129
+
130
+ CStatus GetQueueItem(const CUniqueID& object_id,
131
+ CObjectBuffer* object_buffer, uint64_t& seq_id)
116
132
117
133
cdef extern from " plasma/client.h" nogil:
118
134
@@ -396,6 +412,138 @@ cdef class PlasmaClient:
396
412
self .seal(target_id)
397
413
return target_id
398
414
415
+ def create_queue (self , ObjectID queue_id = None , total_bytes = 1024000 ):
416
+ """
417
+ Store a plasma queue into the object store.
418
+
419
+ Parameters
420
+ ----------
421
+ queue_id : ObjectID, default None
422
+ If this is provided, the specified object ID will be used to refer
423
+ to the object.
424
+ -------
425
+ """
426
+ cdef ObjectID target_id = (queue_id if queue_id
427
+ else ObjectID.from_random())
428
+ """
429
+ Create a new buffer in the PlasmaStore for a particular object ID.
430
+ The returned buffer is unmutable as seal is called.
431
+
432
+ Raises
433
+ ------
434
+ PlasmaObjectExists
435
+ This exception is raised if the object could not be created because
436
+ there already is an object with the same ID in the plasma store.
437
+
438
+ PlasmaStoreFull: This exception is raised if the object could
439
+ not be created because the plasma store is unable to evict
440
+ enough objects to create room for it.
441
+ """
442
+ # Apply a buffer from plasma store for the plasma queue
443
+ cdef shared_ptr[CBuffer] data
444
+ check_status(self .client.get().CreateQueue(target_id.data,
445
+ total_bytes, & data))
446
+
447
+ def push_queue (self , object value , ObjectID queue_id = None ,
448
+ int memcopy_threads = 6 , serialization_context = None ):
449
+ """
450
+ Push a Python value into the plasma queue.
451
+
452
+ Parameters
453
+ ----------
454
+ value : object
455
+ A Python object to store.
456
+ queue_id : ObjectID, default None
457
+ If this is provided, the specified object ID will be used to refer
458
+ to the plasma queue.
459
+ memcopy_threads : int, default 6
460
+ The number of threads to use to write the serialized object into
461
+ the object store for large objects.
462
+ serialization_context : pyarrow.SerializationContext, default None
463
+ Custom serialization and deserialization context.
464
+
465
+ """
466
+ cdef ObjectID target_id = (queue_id if queue_id
467
+ else ObjectID.from_random())
468
+ serialized = pyarrow.serialize(value, serialization_context)
469
+
470
+ # Return buffer from the plasma queue to write
471
+ cdef uint64_t seq_id = 0
472
+ cdef shared_ptr[CBuffer] data
473
+ check_status(self .client.get().CreateQueueItem(target_id.data,
474
+ serialized.total_bytes, & data, seq_id))
475
+ buffer = self ._make_mutable_plasma_buffer(target_id,
476
+ data.get().mutable_data(),
477
+ serialized.total_bytes)
478
+
479
+ stream = pyarrow.FixedSizeBufferWriter(buffer )
480
+ stream.set_memcopy_threads(memcopy_threads)
481
+ serialized.write_to(stream)
482
+ with nogil:
483
+ check_status(self .client.get().SealQueueItem(target_id.data,
484
+ seq_id, data))
485
+
486
+ def get_queue (self , queue_id , timeout_ms = - 1 ):
487
+ """
488
+ Subscribe the corresponding plasma queue.
489
+
490
+ Parameters
491
+ ----------
492
+ queue_id : ObjectID
493
+ Object ID associated to the plasma queue we get
494
+ from the store.
495
+ timeout_ms : int, default -1
496
+ The number of milliseconds that the get call should block before
497
+ timing out and returning. Pass -1 if the call should block and 0
498
+ if the call should return immediately.
499
+
500
+ Returns
501
+ -------
502
+ """
503
+
504
+ cdef ObjectID target_id = (queue_id if queue_id
505
+ else ObjectID.from_random())
506
+ cdef int nofify_fd
507
+ check_status(self .client.get().GetQueue(target_id.data,
508
+ timeout_ms, & nofify_fd))
509
+
510
+ def read_queue (self , queue_id , index , int timeout_ms = - 1 ,
511
+ serialization_context = None ):
512
+ """
513
+ Get one Python value from the plasma queue.
514
+
515
+ Parameters
516
+ ----------
517
+ queue_id : ObjectID
518
+ Object ID associated to the plasma queue we get
519
+ from the store.
520
+ timeout_ms : int, default -1
521
+ The number of milliseconds that the get call should block before
522
+ timing out and returning. Pass -1 if the call should block and 0
523
+ if the call should return immediately.
524
+ serialization_context : pyarrow.SerializationContext, default None
525
+ Custom serialization and deserialization context.
526
+
527
+ Returns
528
+ -------
529
+ object
530
+ Python value associated with the current index in the plasma queue,
531
+ and ObjectNotAvailable if the object was not available.
532
+ """
533
+
534
+ cdef ObjectID target_id = (queue_id if queue_id
535
+ else ObjectID.from_random())
536
+ cdef CObjectBuffer object_buffer
537
+ check_status(self .client.get().GetQueueItem(target_id.data,
538
+ & object_buffer, index))
539
+ buffer = pyarrow_wrap_buffer(object_buffer.data)
540
+ # buffer is None if this object was not available within the timeout
541
+ if buffer :
542
+ value = pyarrow.deserialize(buffer , serialization_context)
543
+ return value
544
+ else :
545
+ return ObjectNotAvailable
546
+
399
547
def get (self , object_ids , int timeout_ms = - 1 , serialization_context = None ):
400
548
"""
401
549
Get one or more Python values from the object store.
0 commit comments