Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CUDF's Column.from_column_view by copying it and adjusting. #2004

Merged
merged 4 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 220 additions & 2 deletions python/morpheus/morpheus/_lib/cudf_helpers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,224 @@ from cudf._lib.column cimport Column
from cudf._lib.utils cimport data_from_unique_ptr
from cudf._lib.utils cimport table_view_from_table

# isort: off

# imports needed for get_element, which is required by from_column_view_with_fix
cimport pylibcudf.libcudf.copying as cpp_copying
from pylibcudf.libcudf.column.column_view cimport column_view
from libcpp.memory cimport make_unique, unique_ptr
from pylibcudf.libcudf.scalar.scalar cimport scalar
from cudf._lib.scalar cimport DeviceScalar

# imports needed for from_column_view_with_fix
import rmm
from libc.stdint cimport uintptr_t
from cudf.core.buffer import (
# Buffer,
ExposureTrackedBuffer,
SpillableBuffer,
# acquire_spill_lock,
as_buffer,
# cuda_array_interface_wrapper,
)
cimport pylibcudf.libcudf.types as libcudf_types
from cudf._lib.types cimport (
dtype_from_column_view,
# dtype_to_data_type,
# dtype_to_pylibcudf_type,
)
from cudf._lib.null_mask import bitmask_allocation_size_bytes

# isort: on

cdef get_element(column_view col_view, size_type index):

cdef unique_ptr[scalar] c_output
with nogil:
c_output = move(
cpp_copying.get_element(col_view, index)
)

return DeviceScalar.from_unique_ptr(
move(c_output), dtype=dtype_from_column_view(col_view)
)

cdef Column from_column_view_with_fix(column_view cv, object owner):
"""
Given a ``cudf::column_view``, constructs a ``cudf.Column`` from it,
along with referencing an ``owner`` Python object that owns the memory
lifetime. If ``owner`` is a ``cudf.Column``, we reach inside of it and
make the owner of each newly created ``Buffer`` the respective
``Buffer`` from the ``owner`` ``cudf.Column``.
If ``owner`` is ``None``, we allocate new memory for the resulting
``cudf.Column``.
"""
column_owner = isinstance(owner, Column)
mask_owner = owner
if column_owner and isinstance(owner.dtype, cudf.CategoricalDtype):
owner = owner.base_children[0]

size = cv.size()
offset = cv.offset()
dtype = dtype_from_column_view(cv)
dtype_itemsize = getattr(dtype, "itemsize", 1)

data_ptr = <uintptr_t>(cv.head[void]())
data = None
base_size = size + offset
data_owner = owner

if column_owner:
data_owner = owner.base_data
mask_owner = mask_owner.base_mask
base_size = owner.base_size
base_nbytes = base_size * dtype_itemsize
# special case for string column
is_string_column = (cv.type().id() == libcudf_types.type_id.STRING)
if is_string_column:
if cv.num_children() == 0:
base_nbytes = 0
else:
# get the size from offset child column (device to host copy)
offsets_column_index = 0
offset_child_column = cv.child(offsets_column_index)
if offset_child_column.size() == 0:
base_nbytes = 0
else:
chars_size = get_element(
offset_child_column, offset_child_column.size()-1).value
base_nbytes = chars_size

if data_ptr:
if data_owner is None:
buffer_size = (
base_nbytes
if is_string_column
else ((size + offset) * dtype_itemsize)
)
data = as_buffer(
rmm.DeviceBuffer(ptr=data_ptr,
size=buffer_size)
)
elif (
column_owner and
isinstance(data_owner, ExposureTrackedBuffer)
):
data = as_buffer(
data=data_ptr,
size=base_nbytes,
owner=data_owner,
exposed=False,
)
elif (
# This is an optimization of the most common case where
# from_column_view creates a "view" that is identical to
# the owner.
column_owner and
isinstance(data_owner, SpillableBuffer) and
# We check that `data_owner` is spill locked (not spillable)
# and that it points to the same memory as `data_ptr`.
not data_owner.spillable and
data_owner.memory_info() == (data_ptr, base_nbytes, "gpu")
):
data = data_owner
else:
# At this point we don't know the relationship between data_ptr
# and data_owner thus we mark both of them exposed.
# TODO: try to discover their relationship and create a
# SpillableBufferSlice instead.
data = as_buffer(
data=data_ptr,
size=base_nbytes,
owner=data_owner,
exposed=True,
)
if isinstance(data_owner, ExposureTrackedBuffer):
# accessing the pointer marks it exposed permanently.
data_owner.mark_exposed()
elif isinstance(data_owner, SpillableBuffer):
if data_owner.is_spilled:
raise ValueError(
f"{data_owner} is spilled, which invalidates "
f"the exposed data_ptr ({hex(data_ptr)})"
)
# accessing the pointer marks it exposed permanently.
data_owner.mark_exposed()
else:
data = as_buffer(
rmm.DeviceBuffer(ptr=data_ptr, size=0)
)

mask = None
mask_ptr = <uintptr_t>(cv.null_mask())
if mask_ptr:
if mask_owner is None:
if column_owner:
# if we reached here, it means `owner` is a `Column`
# that does not have a null mask, but `cv` thinks it
# should have a null mask. This can happen in the
# following sequence of events:
#
# 1) `cv` is constructed as a view into a
# `cudf::column` that is nullable (i.e., it has
# a null mask), but contains no nulls.
# 2) `owner`, a `Column`, is constructed from the
# same `cudf::column`. Because `cudf::column`
# is memory owning, `owner` takes ownership of
# the memory owned by the
# `cudf::column`. Because the column has a null
# count of 0, it may choose to discard the null
# mask.
# 3) Now, `cv` points to a discarded null mask.
#
# TL;DR: we should not include a null mask in the
# result:
mask = None
else:
mask = as_buffer(
rmm.DeviceBuffer(
ptr=mask_ptr,
size=bitmask_allocation_size_bytes(base_size)
)
)
else:
mask = as_buffer(
data=mask_ptr,
size=bitmask_allocation_size_bytes(base_size),
owner=mask_owner,
exposed=True
)

if cv.has_nulls():
null_count = cv.null_count()
else:
null_count = 0

children = []
for child_index in range(cv.num_children()):
child_owner = owner
if column_owner:
child_owner = owner.base_children[child_index]
children.append(
from_column_view_with_fix(
cv.child(child_index),
child_owner
)
)
children = tuple(children)

result = cudf.core.column.build_column(
data=data,
dtype=dtype,
mask=mask,
size=size,
offset=offset,
null_count=null_count,
children=tuple(children)
)

return result


cdef vector[string] get_column_names(object tbl, object index):
cdef vector[string] column_names
Expand Down Expand Up @@ -188,7 +406,7 @@ cdef public api:
if table_owner:
column_owner = owner._index._columns[column_idx]
index_columns.append(
Column.from_column_view(
from_column_view_with_fix(
tv.column(column_idx),
column_owner
)
Expand All @@ -205,7 +423,7 @@ cdef public api:
if table_owner:
column_owner = owner._columns[column_indices[source_column_idx]]
data_columns.append(
Column.from_column_view(tv.column(column_idx), column_owner)
from_column_view_with_fix(tv.column(column_idx), column_owner)
)
column_idx += 1
source_column_idx += 1
Expand Down
12 changes: 11 additions & 1 deletion python/morpheus/morpheus/_lib/cudf_helpers/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
from __future__ import annotations
import morpheus._lib.cudf_helpers
import typing
from cudf.core.buffer.exposure_tracked_buffer import ExposureTrackedBuffer
from cudf.core.buffer.spillable_buffer import SpillableBuffer
from cudf.core.dtypes import StructDtype
import _cython_3_0_11
import cudf
import rmm

__all__ = [
"ExposureTrackedBuffer",
"SpillableBuffer",
"StructDtype",
"cudf"
"as_buffer",
"bitmask_allocation_size_bytes",
"cudf",
"rmm"
]


__pyx_capi__: dict # value = {'make_table_from_table_with_metadata': <capsule object "PyObject *(cudf::io::table_with_metadata, int)">, 'make_table_from_table_info_data': <capsule object "PyObject *(morpheus::TableInfoData, PyObject *)">, 'make_table_info_data_from_table': <capsule object "morpheus::TableInfoData (PyObject *)">, 'data_from_table_view_indexed': <capsule object "PyObject *(cudf::table_view, PyObject *, PyObject *, PyObject *, PyObject *)">}
__test__ = {}
bitmask_allocation_size_bytes: _cython_3_0_11.cython_function_or_method # value = <cyfunction bitmask_allocation_size_bytes>
Loading