Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CUDF's Column.from_column_view by copying it and adjusting. #2004

Merged
merged 4 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 224 additions & 2 deletions python/morpheus/morpheus/_lib/cudf_helpers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,228 @@ from cudf._lib.column cimport Column
from cudf._lib.utils cimport data_from_unique_ptr
from cudf._lib.utils cimport table_view_from_table

##### THE FOLLOWING CODE IS COPIED FROM CUDF AND SHOULD BE REMOVED WHEN UPDATING TO cudf>=24.12 #####
# see https://github.com/rapidsai/cudf/pull/17193 for details

# isort: off

# imports needed for get_element, which is required by from_column_view_with_fix
cimport pylibcudf.libcudf.copying as cpp_copying
from pylibcudf.libcudf.column.column_view cimport column_view
from libcpp.memory cimport make_unique, unique_ptr
from pylibcudf.libcudf.scalar.scalar cimport scalar
from cudf._lib.scalar cimport DeviceScalar

# imports needed for from_column_view_with_fix
import rmm
from libc.stdint cimport uintptr_t
from cudf.core.buffer import (
# Buffer,
ExposureTrackedBuffer,
SpillableBuffer,
# acquire_spill_lock,
as_buffer,
# cuda_array_interface_wrapper,
)
cimport pylibcudf.libcudf.types as libcudf_types
from cudf._lib.types cimport (
dtype_from_column_view,
# dtype_to_data_type,
# dtype_to_pylibcudf_type,
)
from cudf._lib.null_mask import bitmask_allocation_size_bytes

# isort: on

cdef get_element(column_view col_view, size_type index):

cdef unique_ptr[scalar] c_output
with nogil:
c_output = move(
cpp_copying.get_element(col_view, index)
)

return DeviceScalar.from_unique_ptr(
move(c_output), dtype=dtype_from_column_view(col_view)
)

cdef Column from_column_view_with_fix(column_view cv, object owner):
cwharris marked this conversation as resolved.
Show resolved Hide resolved
"""
Given a ``cudf::column_view``, constructs a ``cudf.Column`` from it,
along with referencing an ``owner`` Python object that owns the memory
lifetime. If ``owner`` is a ``cudf.Column``, we reach inside of it and
make the owner of each newly created ``Buffer`` the respective
``Buffer`` from the ``owner`` ``cudf.Column``.
If ``owner`` is ``None``, we allocate new memory for the resulting
``cudf.Column``.
"""
column_owner = isinstance(owner, Column)
mask_owner = owner
if column_owner and isinstance(owner.dtype, cudf.CategoricalDtype):
owner = owner.base_children[0]

size = cv.size()
offset = cv.offset()
dtype = dtype_from_column_view(cv)
dtype_itemsize = getattr(dtype, "itemsize", 1)

data_ptr = <uintptr_t>(cv.head[void]())
data = None
base_size = size + offset
data_owner = owner

if column_owner:
data_owner = owner.base_data
mask_owner = mask_owner.base_mask
base_size = owner.base_size
base_nbytes = base_size * dtype_itemsize
# special case for string column
is_string_column = (cv.type().id() == libcudf_types.type_id.STRING)
if is_string_column:
if cv.num_children() == 0:
base_nbytes = 0
else:
# get the size from offset child column (device to host copy)
offsets_column_index = 0
offset_child_column = cv.child(offsets_column_index)
if offset_child_column.size() == 0:
base_nbytes = 0
else:
chars_size = get_element(
offset_child_column, offset_child_column.size()-1).value
base_nbytes = chars_size

if data_ptr:
if data_owner is None:
buffer_size = (
base_nbytes
if is_string_column
else ((size + offset) * dtype_itemsize)
)
data = as_buffer(
rmm.DeviceBuffer(ptr=data_ptr,
size=buffer_size)
)
elif (
column_owner and
isinstance(data_owner, ExposureTrackedBuffer)
):
data = as_buffer(
data=data_ptr,
size=base_nbytes,
owner=data_owner,
exposed=False,
)
elif (
# This is an optimization of the most common case where
# from_column_view creates a "view" that is identical to
# the owner.
column_owner and
isinstance(data_owner, SpillableBuffer) and
# We check that `data_owner` is spill locked (not spillable)
# and that it points to the same memory as `data_ptr`.
not data_owner.spillable and
data_owner.memory_info() == (data_ptr, base_nbytes, "gpu")
):
data = data_owner
else:
# At this point we don't know the relationship between data_ptr
# and data_owner thus we mark both of them exposed.
# TODO: try to discover their relationship and create a
# SpillableBufferSlice instead.
data = as_buffer(
data=data_ptr,
size=base_nbytes,
owner=data_owner,
exposed=True,
)
if isinstance(data_owner, ExposureTrackedBuffer):
# accessing the pointer marks it exposed permanently.
data_owner.mark_exposed()
elif isinstance(data_owner, SpillableBuffer):
if data_owner.is_spilled:
raise ValueError(
f"{data_owner} is spilled, which invalidates "
f"the exposed data_ptr ({hex(data_ptr)})"
)
# accessing the pointer marks it exposed permanently.
data_owner.mark_exposed()
else:
data = as_buffer(
rmm.DeviceBuffer(ptr=data_ptr, size=0)
)

mask = None
mask_ptr = <uintptr_t>(cv.null_mask())
if mask_ptr:
if mask_owner is None:
if column_owner:
# if we reached here, it means `owner` is a `Column`
# that does not have a null mask, but `cv` thinks it
# should have a null mask. This can happen in the
# following sequence of events:
#
# 1) `cv` is constructed as a view into a
# `cudf::column` that is nullable (i.e., it has
# a null mask), but contains no nulls.
# 2) `owner`, a `Column`, is constructed from the
# same `cudf::column`. Because `cudf::column`
# is memory owning, `owner` takes ownership of
# the memory owned by the
# `cudf::column`. Because the column has a null
# count of 0, it may choose to discard the null
# mask.
# 3) Now, `cv` points to a discarded null mask.
#
# TL;DR: we should not include a null mask in the
# result:
mask = None
else:
mask = as_buffer(
rmm.DeviceBuffer(
ptr=mask_ptr,
size=bitmask_allocation_size_bytes(base_size)
)
)
else:
mask = as_buffer(
data=mask_ptr,
size=bitmask_allocation_size_bytes(base_size),
owner=mask_owner,
exposed=True
)

if cv.has_nulls():
null_count = cv.null_count()
else:
null_count = 0

children = []
for child_index in range(cv.num_children()):
child_owner = owner
if column_owner:
child_owner = owner.base_children[child_index]
children.append(
from_column_view_with_fix(
cv.child(child_index),
child_owner
)
)
children = tuple(children)

result = cudf.core.column.build_column(
data=data,
dtype=dtype,
mask=mask,
size=size,
offset=offset,
null_count=null_count,
children=tuple(children)
)

return result

##### THE PREVIOUS CODE IS COPIED FROM CUDF AND SHOULD BE REMOVED WHEN UPDATING TO cudf>=24.12 #####

cdef vector[string] get_column_names(object tbl, object index):
cdef vector[string] column_names
Expand Down Expand Up @@ -188,7 +410,7 @@ cdef public api:
if table_owner:
column_owner = owner._index._columns[column_idx]
index_columns.append(
Column.from_column_view(
from_column_view_with_fix(
tv.column(column_idx),
column_owner
)
Expand All @@ -205,7 +427,7 @@ cdef public api:
if table_owner:
column_owner = owner._columns[column_indices[source_column_idx]]
data_columns.append(
Column.from_column_view(tv.column(column_idx), column_owner)
from_column_view_with_fix(tv.column(column_idx), column_owner)
)
column_idx += 1
source_column_idx += 1
Expand Down
12 changes: 11 additions & 1 deletion python/morpheus/morpheus/_lib/cudf_helpers/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
from __future__ import annotations
import morpheus._lib.cudf_helpers
import typing
from cudf.core.buffer.exposure_tracked_buffer import ExposureTrackedBuffer
from cudf.core.buffer.spillable_buffer import SpillableBuffer
from cudf.core.dtypes import StructDtype
import _cython_3_0_11
import cudf
import rmm

__all__ = [
"ExposureTrackedBuffer",
"SpillableBuffer",
"StructDtype",
"cudf"
"as_buffer",
"bitmask_allocation_size_bytes",
"cudf",
"rmm"
]


__pyx_capi__: dict # value = {'make_table_from_table_with_metadata': <capsule object "PyObject *(cudf::io::table_with_metadata, int)">, 'make_table_from_table_info_data': <capsule object "PyObject *(morpheus::TableInfoData, PyObject *)">, 'make_table_info_data_from_table': <capsule object "morpheus::TableInfoData (PyObject *)">, 'data_from_table_view_indexed': <capsule object "PyObject *(cudf::table_view, PyObject *, PyObject *, PyObject *, PyObject *)">}
__test__ = {}
bitmask_allocation_size_bytes: _cython_3_0_11.cython_function_or_method # value = <cyfunction bitmask_allocation_size_bytes>
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,20 @@

#include <gtest/gtest.h> // for TestInfo, TEST_F
#include <pybind11/gil.h> // for gil_scoped_release
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <rmm/cuda_stream_view.hpp> // for cuda_stream_per_thread
#include <rmm/device_buffer.hpp> // for device_buffer

#include <cstdint> // for int64_t
#include <filesystem> // for operator/, path
#include <memory> // for allocator, __shared_ptr_access, shared_ptr, make_shared
#include <utility> // for move
#include <vector> // for vector

using namespace morpheus;
using namespace morpheus::test;
using namespace pybind11::literals;

using TestMessageMeta = morpheus::test::TestMessages; // NOLINT(readability-identifier-naming)

Expand Down Expand Up @@ -82,3 +86,25 @@ TEST_F(TestMessageMeta, CopyRangeAndSlicing)
assert_eq_device_to_host(sliced_meta->get_info().get_column(0), sliced_expected_int);
assert_eq_device_to_host(sliced_meta->get_info().get_column(1), sliced_expected_double);
}

TEST_F(TestMessageMeta, Issue1934)
{
// Reproduce issue 1934 (https://github.com/nv-morpheus/Morpheus/issues/1934)
// The bug causes a segfault when calling `get_data_frame` on a message meta object
namespace py = pybind11;
py::gil_scoped_acquire gil;

auto cudf_mod = py::module_::import("cudf");
auto a_col = py::list();
auto v1 = py::list();
v1.append(py::str("a"));
a_col.attr("append")(std::move(v1));
a_col.attr("append")(py::none());

auto df = cudf_mod.attr("DataFrame")(py::dict("a"_a = std::move(a_col)));
df.attr("drop")(0, "inplace"_a = true);

auto msg = MessageMetaInterfaceProxy::init_python(std::move(df));

auto df_copy = MessageMetaInterfaceProxy::get_data_frame(*msg);
}
Loading