Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7448: Allow QueryCompilerCaster to apply cost-optimization on automatic casting #7464

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
69 changes: 69 additions & 0 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import abc
import warnings
from enum import IntEnum
from functools import cached_property
from typing import TYPE_CHECKING, Hashable, List, Literal, Optional

Expand Down Expand Up @@ -107,6 +108,52 @@
return axis_setter


class QCCoercionCost(IntEnum): # noqa: PR01
"""
Coercion costs between Query Compilers.

Coercion costs between query compilers can be expressed
as integers in the range 0 to 1000, where 1000 is
considered impossible. Since coercsion costs can be a
function of many variables ( dataset size, partitioning,
network throughput, and query time ) we define a set range
of cost values to simplify comparisons between two query
compilers / engines in a unified way.

COST_ZERO means there is no cost associated, or that the query compilers
are the same.

COST_IMPOSSIBLE means the coercion is effectively impossible, which can
occur if the target system is unable to store the data as a result
of the coercion.
"""

COST_ZERO = 0
COST_LOW = 250
COST_MEDIUM = 500
COST_HIGH = 750
COST_IMPOSSIBLE = 1000

@classmethod
def validate_coersion_cost(cls, cost: QCCoercionCost):
"""
Validate that the coercion cost is within range.

Parameters
----------
cost : QCCoercionCost

Returns
-------
callable(pandas.DataFrame, *args, **kwargs) -> pandas.DataFrame
Function to be applied in the partitions.
"""
if int(cost) < int(QCCoercionCost.COST_ZERO) or int(cost) > int(

Check warning on line 151 in modin/core/storage_formats/base/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/base/query_compiler.py#L151

Added line #L151 was not covered by tests
QCCoercionCost.COST_IMPOSSIBLE
):
raise ValueError("Query compiler coercsion cost out of range")

Check warning on line 154 in modin/core/storage_formats/base/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/base/query_compiler.py#L154

Added line #L154 was not covered by tests


# FIXME: many of the BaseQueryCompiler methods are hiding actual arguments
# by using *args and **kwargs. They should be spread into actual parameters.
# Currently actual arguments are placed in the methods docstrings, but since they're
Expand Down Expand Up @@ -247,6 +294,28 @@
return [self.__wrap_in_qc(obj) for obj in result]
return self.__wrap_in_qc(result)

def qc_engine_switch_cost(self, other_qc) -> dict[type, int]:
"""
Coercion costs to and from other_qc.

Returns a map of type to QCCoercionCost, where type is the type we are casting to.
This provides a mechanism for the query compilers to provide information to
Modin on the cost of moving data to another query compiler ( or the other way ).

Parameters
----------
other_qc : QueryCompiler
The query compiler to which we should return the cost of switching.

Returns
-------
dict[type, int]
Dictionary of QueryCompilerClass type to QCCoercionCost.
"""
if isinstance(type(self), type(other_qc)):
return {type(self): QCCoercionCost.COST_ZERO}
return {}

Check warning on line 317 in modin/core/storage_formats/base/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/base/query_compiler.py#L315-L317

Added lines #L315 - L317 were not covered by tests

# Abstract Methods and Fields: Must implement in children classes
# In some cases, there you may be able to use the same implementation for
# some of these abstract methods, but for the sake of generality they are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
)
from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler
from modin.core.storage_formats.pandas.query_compiler_caster import QueryCompilerCaster
from modin.utils import (
_inherit_docstrings,
)
from modin.utils import _inherit_docstrings

_NO_REPARTITION_ON_NATIVE_EXECUTION_EXCEPTION_MESSAGE = (
"Modin dataframes and series using native execution do not have partitions."
Expand Down
184 changes: 162 additions & 22 deletions modin/core/storage_formats/pandas/query_compiler_caster.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,119 @@

import functools
import inspect
from itertools import combinations
from types import FunctionType, MethodType
from typing import Any, Dict, Tuple, TypeVar

from pandas.core.indexes.frozen import FrozenList

from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler
from modin.core.storage_formats.base.query_compiler import (
BaseQueryCompiler,

Check failure

Code scanning / CodeQL

Module-level cyclic import Error

'BaseQueryCompiler' may not be defined if module
modin.core.storage_formats.base.query_compiler
is imported before module
modin.core.storage_formats.pandas.query_compiler_caster
, as the
definition
of BaseQueryCompiler occurs after the cyclic
import
of modin.core.storage_formats.pandas.query_compiler_caster.
QCCoercionCost,

Check failure

Code scanning / CodeQL

Module-level cyclic import Error

'QCCoercionCost' may not be defined if module
modin.core.storage_formats.base.query_compiler
is imported before module
modin.core.storage_formats.pandas.query_compiler_caster
, as the
definition
of QCCoercionCost occurs after the cyclic
import
of modin.core.storage_formats.pandas.query_compiler_caster.
)

Fn = TypeVar("Fn", bound=Any)


class QueryCompilerCasterCalculator:
"""
Calculate which QueryCompiler should be used for an operation.

Given a set of QueryCompilers; containing various data, determine
which query compiler everything should be cast to which minimizes
the cost of casting, or coercion. Use the aggregate sum of coercion
to determine overall cost.
"""

def __init__(self):
self._caster_costing_map = {}
self._data_cls_map = {}
self._qc_list = []
self._qc_cls_list = []
self._result_type = None

Check warning on line 53 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L49-L53

Added lines #L49 - L53 were not covered by tests

def add_query_compiler(self, query_compiler):
"""
Add a query compiler to be considered for casting.

Parameters
----------
query_compiler : QueryCompiler
"""
if isinstance(query_compiler, type):

Check warning on line 63 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L63

Added line #L63 was not covered by tests
# class
qc_type = query_compiler

Check warning on line 65 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L65

Added line #L65 was not covered by tests
else:
# instance
qc_type = type(query_compiler)
self._qc_list.append(query_compiler)
self._data_cls_map[qc_type] = query_compiler._modin_frame
self._qc_cls_list.append(qc_type)

Check warning on line 71 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L68-L71

Added lines #L68 - L71 were not covered by tests

def calculate(self):
"""
Calculate which query compiler we should cast to.

Returns
-------
type
QueryCompiler class which should be used for the operation.
"""
if self._result_type is not None:
return self._result_type
if len(self._qc_cls_list) == 1:
return self._qc_cls_list[0]
if len(self._qc_cls_list) == 0:
raise ValueError("No query compilers registered")

Check warning on line 87 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L82-L87

Added lines #L82 - L87 were not covered by tests

for qc_1, qc_2 in combinations(self._qc_list, 2):
costs_1 = qc_1.qc_engine_switch_cost(qc_2)
costs_2 = qc_2.qc_engine_switch_cost(qc_1)
self._add_cost_data(costs_1)
self._add_cost_data(costs_2)
if len(self._caster_costing_map) <= 0 and len(self._qc_cls_list) > 0:
self._result_type = self._qc_cls_list[0]
return self._result_type
min_value = min(self._caster_costing_map.values())
for key, value in self._caster_costing_map.items():
if min_value == value:
self._result_type = key
break
return self._result_type

Check warning on line 102 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L89-L102

Added lines #L89 - L102 were not covered by tests

def _add_cost_data(self, costs: dict):
"""
Add the cost data to the calculator.

Parameters
----------
costs : dict
Dictionary of query compiler classes to costs.
"""
for k, v in costs.items():

Check warning on line 113 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L113

Added line #L113 was not covered by tests
# filter out any extranious query compilers not in this operation
if k in self._qc_cls_list:
QCCoercionCost.validate_coersion_cost(v)

Check warning on line 116 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L115-L116

Added lines #L115 - L116 were not covered by tests
# Adds the costs associated with all coercions to a type, k
self._caster_costing_map[k] = (

Check warning on line 118 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L118

Added line #L118 was not covered by tests
v + self._caster_costing_map[k]
if k in self._caster_costing_map
else v
)

def result_data_frame(self):
"""
Return the data frame associated with the calculated query compiler.

Returns
-------
DataFrame object
DataFrame object associated with the preferred query compiler.
"""
qc_type = self.calculate()
return self._data_cls_map[qc_type]

Check warning on line 134 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L133-L134

Added lines #L133 - L134 were not covered by tests


class QueryCompilerCaster:
"""Cast all query compiler arguments of the member function to current query compiler."""

Expand All @@ -55,48 +158,39 @@
apply_argument_cast(cls)


def cast_nested_args_to_current_qc_type(arguments, current_qc):
def visit_nested_args(arguments, fn: callable):
"""
Cast all arguments in nested fashion to current query compiler.
Visit each argument recursively, calling fn on each one.

Parameters
----------
arguments : tuple or dict
current_qc : BaseQueryCompiler
fn : Callable to apply to matching arguments

Returns
-------
tuple or dict
Returns args and kwargs with all query compilers casted to current_qc.
"""

def cast_arg_to_current_qc(arg):
current_qc_type = type(current_qc)
if isinstance(arg, BaseQueryCompiler) and not isinstance(arg, current_qc_type):
data_cls = current_qc._modin_frame
return current_qc_type.from_pandas(arg.to_pandas(), data_cls)
else:
return arg

imutable_types = (FrozenList, tuple)
if isinstance(arguments, imutable_types):
args_type = type(arguments)
arguments = list(arguments)
arguments = cast_nested_args_to_current_qc_type(arguments, current_qc)
arguments = visit_nested_args(arguments, fn)

Check warning on line 179 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L179

Added line #L179 was not covered by tests

return args_type(arguments)
if isinstance(arguments, list):
for i in range(len(arguments)):
if isinstance(arguments[i], (list, dict)):
cast_nested_args_to_current_qc_type(arguments[i], current_qc)
visit_nested_args(arguments[i], fn)

Check warning on line 185 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L185

Added line #L185 was not covered by tests
else:
arguments[i] = cast_arg_to_current_qc(arguments[i])
arguments[i] = fn(arguments[i])

Check warning on line 187 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L187

Added line #L187 was not covered by tests
elif isinstance(arguments, dict):
for key in arguments:
if isinstance(arguments[key], (list, dict)):
cast_nested_args_to_current_qc_type(arguments[key], current_qc)
visit_nested_args(arguments[key], fn)

Check warning on line 191 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L191

Added line #L191 was not covered by tests
else:
arguments[key] = cast_arg_to_current_qc(arguments[key])
arguments[key] = fn(arguments[key])

Check warning on line 193 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L193

Added line #L193 was not covered by tests
return arguments


Expand All @@ -115,13 +209,15 @@
"""
if isinstance(obj, type):
all_attrs = dict(inspect.getmembers(obj))
all_attrs.pop("__abstractmethods__")

# This is required because inspect converts class methods to member functions
current_class_attrs = vars(obj)
for key in current_class_attrs:
all_attrs[key] = current_class_attrs[key]

all_attrs.pop("__abstractmethods__")
all_attrs.pop("__init__")
all_attrs.pop("qc_engine_switch_cost")
all_attrs.pop("from_pandas")
for attr_name, attr_value in all_attrs.items():
if isinstance(
attr_value, (FunctionType, MethodType, classmethod, staticmethod)
Expand Down Expand Up @@ -150,10 +246,54 @@
-------
Any
"""
if len(args) == 0 and len(kwargs) == 0:
return

Check warning on line 250 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L249-L250

Added lines #L249 - L250 were not covered by tests
current_qc = args[0]
calculator = QueryCompilerCasterCalculator()
calculator.add_query_compiler(current_qc)

Check warning on line 253 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L252-L253

Added lines #L252 - L253 were not covered by tests

def arg_needs_casting(arg):
current_qc_type = type(current_qc)
if not isinstance(arg, BaseQueryCompiler):
return False
if isinstance(arg, current_qc_type):
return False
return True

Check warning on line 261 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L255-L261

Added lines #L255 - L261 were not covered by tests

def register_query_compilers(arg):
if not arg_needs_casting(arg):
return arg
calculator.add_query_compiler(arg)
return arg

Check warning on line 267 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L263-L267

Added lines #L263 - L267 were not covered by tests

def cast_to_qc(arg):
if not arg_needs_casting(arg):
return arg
qc_type = calculator.calculate()
if qc_type is None or qc_type is type(arg):
return arg
frame_data = calculator.result_data_frame()
result = qc_type.from_pandas(arg.to_pandas(), frame_data)
return result

Check warning on line 277 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L269-L277

Added lines #L269 - L277 were not covered by tests

if isinstance(current_qc, BaseQueryCompiler):
kwargs = cast_nested_args_to_current_qc_type(kwargs, current_qc)
args = cast_nested_args_to_current_qc_type(args, current_qc)
visit_nested_args(kwargs, register_query_compilers)
visit_nested_args(args, register_query_compilers)

Check warning on line 281 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L280-L281

Added lines #L280 - L281 were not covered by tests

args = visit_nested_args(args, cast_to_qc)
kwargs = visit_nested_args(kwargs, cast_to_qc)

Check warning on line 284 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L283-L284

Added lines #L283 - L284 were not covered by tests

qc = calculator.calculate()

Check warning on line 286 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L286

Added line #L286 was not covered by tests

if qc is None or qc is type(current_qc):
return obj(*args, **kwargs)

Check warning on line 289 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L288-L289

Added lines #L288 - L289 were not covered by tests

# breakpoint()
# we need to cast current_qc to a new query compiler
if qc != current_qc:
data_cls = current_qc._modin_frame
return qc.from_pandas(current_qc.to_pandas(), data_cls)

Check warning on line 295 in modin/core/storage_formats/pandas/query_compiler_caster.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/query_compiler_caster.py#L293-L295

Added lines #L293 - L295 were not covered by tests
# need to find the new function for obj
return obj(*args, **kwargs)

return cast_args
Loading
Loading