From 990ec1700000ed6edf4a9cd1c561dc5602755d60 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Tue, 27 Jun 2023 11:56:55 +0000 Subject: [PATCH 1/9] Dtaft implementation of UDFs in operators Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/algebra/operator.py | 39 ++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/modin/core/dataframe/algebra/operator.py b/modin/core/dataframe/algebra/operator.py index cc093e6720b..7678245b7aa 100644 --- a/modin/core/dataframe/algebra/operator.py +++ b/modin/core/dataframe/algebra/operator.py @@ -60,3 +60,42 @@ def validate_axis(cls, axis: Optional[int]) -> int: Integer representation of given axis. """ return 0 if axis is None else axis + + +def apply_operator( + df, + operator_cls, + func, + return_type=None, + func_args=None, + func_kwargs=None, + *args, + **kwargs +): + """ + Apply a function to a modin DataFrame using the passed operator. + + Parameters + ---------- + df : modin.pandas.DataFrame or modin.pandas.Series + operator_cls : Operator + func : callable(pandas.DataFrame, *args, **kwargs) -> pandas.DataFrame + return_type : type, optional + func_args : tuple, optional + func_kwargs : dict, optional + + Returns + ------- + return_type + """ + operator = operator_cls.register(func, *args, **kwargs) + + func_args = tuple() if func_args is None else func_args + func_kwargs = dict() if func_kwargs is None else func_kwargs + + qc_result = operator(df._query_compiler, *func_args, **func_kwargs) + + if return_type is None: + return_type = type(df) + + return return_type.__constructor__(query_compiler=qc_result) From ac9732e2aae8c32f346bf64ba4c73ae86ec07cc9 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Tue, 27 Jun 2023 16:10:23 +0000 Subject: [PATCH 2/9] Add tests Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/algebra/binary.py | 22 ++++- modin/core/dataframe/algebra/fold.py | 8 ++ modin/core/dataframe/algebra/groupby.py | 28 ++++++ modin/core/dataframe/algebra/operator.py | 21 ++++ modin/core/dataframe/algebra/reduce.py | 8 ++ modin/core/dataframe/algebra/tree_reduce.py | 16 +++ .../dataframe/pandas/dataframe/dataframe.py | 1 - .../storage_formats/pandas/test_internals.py | 99 +++++++++++++++++++ setup.cfg | 2 +- 9 files changed, 202 insertions(+), 3 deletions(-) diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index 1a7dc604e12..57b2761e473 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -297,7 +297,7 @@ def register( """ def caller( - query_compiler, other, broadcast=False, *args, dtypes=None, **kwargs + query_compiler, other, *args, broadcast=False, dtypes=None, **kwargs ): """ Apply binary `func` to passed operands. @@ -413,3 +413,23 @@ def caller( ) return caller + + @classmethod + def apply( + cls, left, right, func, axis=0, func_args=None, func_kwargs=None, **kwargs + ): + from modin.pandas import Series + + operator = cls.register(func, **kwargs) + + func_args = tuple() if func_args is None else func_args + func_kwargs = dict() if func_kwargs is None else func_kwargs + qc_result = operator( + left._query_compiler, + right._query_compiler, + broadcast=isinstance(right, Series), + *func_args, + axis=axis, + **func_kwargs, + ) + return type(left)(query_compiler=qc_result) diff --git a/modin/core/dataframe/algebra/fold.py b/modin/core/dataframe/algebra/fold.py index 419a0b56903..2ed71c9e19f 100644 --- a/modin/core/dataframe/algebra/fold.py +++ b/modin/core/dataframe/algebra/fold.py @@ -65,3 +65,11 @@ def caller(query_compiler, fold_axis=None, *args, **kwargs): ) return caller + + @classmethod + def apply(cls, df, func, fold_axis=0, func_args=None, func_kwargs=None): + func_args = tuple() if func_args is None else func_args + func_kwargs = dict() if func_kwargs is None else func_kwargs + func_args = (fold_axis,) + func_args + + return super().apply(df, func, func_args, func_kwargs) diff --git a/modin/core/dataframe/algebra/groupby.py b/modin/core/dataframe/algebra/groupby.py index 78cb1ab8365..809445eacc9 100644 --- a/modin/core/dataframe/algebra/groupby.py +++ b/modin/core/dataframe/algebra/groupby.py @@ -736,3 +736,31 @@ def wrapper(df): return result return _map, _reduce + + @classmethod + def apply( + cls, + df, + map_func, + reduce_func, + by, + groupby_kwargs=None, + agg_args=None, + agg_kwargs=None, + ): + agg_args = tuple() if agg_args is None else agg_args + agg_kwargs = dict() if agg_kwargs is None else agg_kwargs + groupby_kwargs = dict() if groupby_kwargs is None else groupby_kwargs + + operator = cls.register(map_func, reduce_func) + qc_result = operator( + df._query_compiler, + df[by]._query_compiler, + axis=0, + groupby_kwargs=groupby_kwargs, + agg_args=agg_args, + agg_kwargs=agg_kwargs, + drop=True, + ) + + return type(df)(query_compiler=qc_result) diff --git a/modin/core/dataframe/algebra/operator.py b/modin/core/dataframe/algebra/operator.py index 7678245b7aa..d53e86d74fe 100644 --- a/modin/core/dataframe/algebra/operator.py +++ b/modin/core/dataframe/algebra/operator.py @@ -61,6 +61,22 @@ def validate_axis(cls, axis: Optional[int]) -> int: """ return 0 if axis is None else axis + @classmethod + def apply( + cls, df, func, func_args=None, func_kwargs=None, _return_type=None, **kwargs + ): + operator = cls.register(func, **kwargs) + + func_args = tuple() if func_args is None else func_args + func_kwargs = dict() if func_kwargs is None else func_kwargs + + qc_result = operator(df._query_compiler, *func_args, **func_kwargs) + + if _return_type is None: + _return_type = type(df) + + return _return_type(query_compiler=qc_result) + def apply_operator( df, @@ -78,9 +94,14 @@ def apply_operator( Parameters ---------- df : modin.pandas.DataFrame or modin.pandas.Series + DataFrame object to apply the operator against. operator_cls : Operator + Operator describing how to apply the function. func : callable(pandas.DataFrame, *args, **kwargs) -> pandas.DataFrame + A function to apply. return_type : type, optional + A class that takes the ``query_compiler`` keyword argument. If not specified + will be identical to the type of the passed `df`. func_args : tuple, optional func_kwargs : dict, optional diff --git a/modin/core/dataframe/algebra/reduce.py b/modin/core/dataframe/algebra/reduce.py index 0f4fbe3667f..13abc58f167 100644 --- a/modin/core/dataframe/algebra/reduce.py +++ b/modin/core/dataframe/algebra/reduce.py @@ -50,3 +50,11 @@ def caller(query_compiler, *args, **kwargs): ) return caller + + @classmethod + def apply(cls, df, func, axis=0, func_args=None, func_kwargs=None): + from modin.pandas import Series + + return super().apply( + df, func, func_args, func_kwargs, axis=axis, _return_type=Series + ) diff --git a/modin/core/dataframe/algebra/tree_reduce.py b/modin/core/dataframe/algebra/tree_reduce.py index 671faa1ea0a..2ad46bf7066 100644 --- a/modin/core/dataframe/algebra/tree_reduce.py +++ b/modin/core/dataframe/algebra/tree_reduce.py @@ -54,3 +54,19 @@ def caller(query_compiler, *args, **kwargs): ) return caller + + @classmethod + def apply( + cls, df, map_function, reduce_function, axis=0, func_args=None, func_kwargs=None + ): + from modin.pandas import Series + + return super().apply( + df, + map_function, + func_args, + func_kwargs, + reduce_function=reduce_function, + axis=axis, + _return_type=Series, + ) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 44736aa6753..38ad5fe0704 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -1826,7 +1826,6 @@ def _compute_tree_reduce_metadata(self, axis, new_parts, dtypes=None): Modin series (1xN frame) containing the reduced data. """ new_axes, new_axes_lengths = [0, 0], [0, 0] - new_axes[axis] = [MODIN_UNNAMED_SERIES_LABEL] new_axes[axis ^ 1] = self.axes[axis ^ 1] diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index 25cfb14ef19..eff0f4f74ff 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -1042,3 +1042,102 @@ def test_setitem_bool_preserve_dtypes(): # scalar as a col_loc df.loc[indexer, "a"] = 2.0 assert df._query_compiler._modin_frame.has_materialized_dtypes + + +class TestOperatorsApply: + def test_binary(self): + from modin.core.dataframe.algebra import Binary + + md_df1, pd_df1 = create_test_dfs(test_data_values[0]) + md_df2, pd_df2 = md_df1 // 2, pd_df1 // 2 + + def func(df, other, value, axis=None): + return df * other + value + + # DataFrame case + md_res = Binary.apply(md_df1, md_df2, func, func_args=(10,)) + pd_res = func(pd_df1, pd_df2, 10) + df_equals(md_res, pd_res) + + # Series case + md_res = Binary.apply(md_df1, md_df2.iloc[0, :], func, axis=1, func_args=(10,)) + pd_res = func(pd_df1, pd_df2.iloc[0, :], 10) + df_equals(md_res, pd_res) + + def test_fold(self): + from modin.core.dataframe.algebra import Fold + + md_df, pd_df = create_test_dfs(test_data_values[0]) + + def func(df, value, axis): + return (df + value).cumsum(axis) + + md_res = Fold.apply(md_df, func, fold_axis=0, func_args=(10, 0)) + pd_res = func(pd_df, 10, 0) + df_equals(md_res, pd_res) + + md_res = Fold.apply(md_df, func, fold_axis=1, func_args=(10, 1)) + pd_res = func(pd_df, 10, 1) + df_equals(md_res, pd_res) + + def test_groupby(self): + from modin.core.dataframe.algebra import GroupByReduce + + md_df, pd_df = create_test_dfs(test_data_values[0]) + by_col = md_df.columns[0] + + def map_func(grp): + return grp.count() + + def reduce_func(grp): + return grp.sum() + + md_res = GroupByReduce.apply( + md_df, map_func, reduce_func, by=by_col, groupby_kwargs={"as_index": False} + ) + pd_res = pd_df.groupby(by_col, as_index=False).count() + # breakpoint() + df_equals(md_res, pd_res) + + def test_map(self): + from modin.core.dataframe.algebra import Map + + md_df, pd_df = create_test_dfs(test_data_values[0]) + + def func(df, value): + return df**value + + md_res = Map.apply(md_df, func, func_args=(3,)) + pd_res = func(pd_df, 3) + df_equals(md_res, pd_res) + + def test_reduce(self): + from modin.core.dataframe.algebra import Reduce + + md_df, pd_df = create_test_dfs(test_data_values[0]) + + def func(df, value, axis): + return (df**value).sum(axis) + + md_res = Reduce.apply(md_df, func, func_args=(3, 0)) + pd_res = func(pd_df, 3, 0) + df_equals(md_res, pd_res) + + md_res = Reduce.apply(md_df, func, axis=1, func_args=(3, 1)) + pd_res = func(pd_df, 3, 1) + df_equals(md_res, pd_res) + + def test_tree_reduce(self): + from modin.core.dataframe.algebra import TreeReduce + + md_df, pd_df = create_test_dfs(test_data_values[0]) + + def map_func(df): + return df.count() + + def reduce_func(df): + return df.sum() + + md_res = TreeReduce.apply(md_df, map_func, reduce_func) + pd_res = pd_df.count() + df_equals(md_res, pd_res) diff --git a/setup.cfg b/setup.cfg index 0d6e97bc180..709a57c9932 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,7 +12,7 @@ tag_prefix = parentdir_prefix = modin- [tool:pytest] -addopts = --disable-pytest-warnings --cov-config=setup.cfg --cov=modin --cov-append --cov-report= +addopts = --disable-pytest-warnings xfail_strict=true markers = xfail_executions From 58990840ee31549c7a44bccd24763d43398f96c1 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Tue, 27 Jun 2023 17:18:27 +0000 Subject: [PATCH 3/9] Add doc-strings Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/algebra/binary.py | 24 +++++++ modin/core/dataframe/algebra/fold.py | 21 ++++++ modin/core/dataframe/algebra/groupby.py | 24 +++++++ modin/core/dataframe/algebra/operator.py | 67 +++++++------------ modin/core/dataframe/algebra/reduce.py | 20 ++++++ modin/core/dataframe/algebra/tree_reduce.py | 22 ++++++ .../dataframe/pandas/dataframe/dataframe.py | 1 + setup.cfg | 2 +- 8 files changed, 136 insertions(+), 45 deletions(-) diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index 57b2761e473..83c587f9d50 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -418,6 +418,30 @@ def caller( def apply( cls, left, right, func, axis=0, func_args=None, func_kwargs=None, **kwargs ): + """ + Apply groupby aggregation function using map-reduce pattern. + + Parameters + ---------- + left : modin.pandas.DataFrame or modin.pandas.Series + Left operand. + right : modin.pandas.DataFrame or modin.pandas.Series + Right operand. + func : callable(pandas.DataFrame, pandas.DataFrame, *args, axis, *kwargs) -> pandas.DataFrame + A binary function to apply `left` and `right`. + axis : int, default: 0 + Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``). + func_args : tuple, optional + Positional arguments to pass to the funcs. + func_kwargs : dict, optional + Keyword arguments to pass to the funcs. + **kwargs : dict + Additional arguments to pass to the ``cls.register()``. + + Returns + ------- + The same type as `df`. + """ from modin.pandas import Series operator = cls.register(func, **kwargs) diff --git a/modin/core/dataframe/algebra/fold.py b/modin/core/dataframe/algebra/fold.py index 2ed71c9e19f..ba2ffb6b169 100644 --- a/modin/core/dataframe/algebra/fold.py +++ b/modin/core/dataframe/algebra/fold.py @@ -68,6 +68,27 @@ def caller(query_compiler, fold_axis=None, *args, **kwargs): @classmethod def apply(cls, df, func, fold_axis=0, func_args=None, func_kwargs=None): + """ + Apply a Fold (full-axis) function to the dataframe. + + Parameters + ---------- + df : modin.pandas.DataFrame or modin.pandas.Series + DataFrame object to apply the operator against. + func : callable(pandas.DataFrame[NxM], *args, **kwargs) -> pandas.DataFrame[NxM] + A function to apply to every partition. Note that the function shouldn't change + the shape of the dataframe. + fold_axis : int, default: 0 + Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``). + func_args : tuple, optional + Positional arguments to pass to the funcs. + func_kwargs : dict, optional + Keyword arguments to pass to the funcs. + + Returns + ------- + the same type as `df` + """ func_args = tuple() if func_args is None else func_args func_kwargs = dict() if func_kwargs is None else func_kwargs func_args = (fold_axis,) + func_args diff --git a/modin/core/dataframe/algebra/groupby.py b/modin/core/dataframe/algebra/groupby.py index 809445eacc9..a45d7bdd410 100644 --- a/modin/core/dataframe/algebra/groupby.py +++ b/modin/core/dataframe/algebra/groupby.py @@ -748,6 +748,30 @@ def apply( agg_args=None, agg_kwargs=None, ): + """ + Apply groupby aggregation function using map-reduce pattern. + + Parameters + ---------- + df : modin.pandas.DataFrame or modin.pandas.Series + A source DataFrame to group. + map_func : callable(pandas.core.groupby.DataFrameGroupBy) -> pandas.DataFrame + A map function to apply to a groupby object in every partition. + reduce_func : callable(pandas.core.groupby.DataFrameGroupBy) -> pandas.DataFrame + A reduction function to apply to the results of the map functions. + by : label or list of labels + Columns of the `df` to group on. + groupby_kwargs : dict, optional + Keyword arguments matching the signature of ``pandas.DataFrame.groupby``. + agg_args : tuple, optional + Positional arguments to pass to the funcs. + agg_kwargs : dict, optional + Keyword arguments to pass to the funcs. + + Returns + ------- + The same type as `df`. + """ agg_args = tuple() if agg_args is None else agg_args agg_kwargs = dict() if agg_kwargs is None else agg_kwargs groupby_kwargs = dict() if groupby_kwargs is None else groupby_kwargs diff --git a/modin/core/dataframe/algebra/operator.py b/modin/core/dataframe/algebra/operator.py index d53e86d74fe..9b8095a04ff 100644 --- a/modin/core/dataframe/algebra/operator.py +++ b/modin/core/dataframe/algebra/operator.py @@ -65,6 +65,29 @@ def validate_axis(cls, axis: Optional[int]) -> int: def apply( cls, df, func, func_args=None, func_kwargs=None, _return_type=None, **kwargs ): + """ + Apply a function to a Modin DataFrame using the operator's scheme. + + Parameters + ---------- + df : modin.pandas.DataFrame or modin.pandas.Series + DataFrame object to apply the operator against. + func : callable(pandas.DataFrame, *args, **kwargs) -> pandas.DataFrame + A function to apply. + func_args : tuple, optional + Positional arguments to pass to the `func`. + func_kwargs : dict, optional + Keyword arguments to pass to the `func`. + _return_type : type, optional + A class that takes the ``query_compiler`` keyword argument. If not specified + will be identical to the type of the passed `df`. + **kwargs : dict + Aditional arguments to pass to the ``cls.register()``. + + Returns + ------- + return_type + """ operator = cls.register(func, **kwargs) func_args = tuple() if func_args is None else func_args @@ -76,47 +99,3 @@ def apply( _return_type = type(df) return _return_type(query_compiler=qc_result) - - -def apply_operator( - df, - operator_cls, - func, - return_type=None, - func_args=None, - func_kwargs=None, - *args, - **kwargs -): - """ - Apply a function to a modin DataFrame using the passed operator. - - Parameters - ---------- - df : modin.pandas.DataFrame or modin.pandas.Series - DataFrame object to apply the operator against. - operator_cls : Operator - Operator describing how to apply the function. - func : callable(pandas.DataFrame, *args, **kwargs) -> pandas.DataFrame - A function to apply. - return_type : type, optional - A class that takes the ``query_compiler`` keyword argument. If not specified - will be identical to the type of the passed `df`. - func_args : tuple, optional - func_kwargs : dict, optional - - Returns - ------- - return_type - """ - operator = operator_cls.register(func, *args, **kwargs) - - func_args = tuple() if func_args is None else func_args - func_kwargs = dict() if func_kwargs is None else func_kwargs - - qc_result = operator(df._query_compiler, *func_args, **func_kwargs) - - if return_type is None: - return_type = type(df) - - return return_type.__constructor__(query_compiler=qc_result) diff --git a/modin/core/dataframe/algebra/reduce.py b/modin/core/dataframe/algebra/reduce.py index 13abc58f167..04987659e8d 100644 --- a/modin/core/dataframe/algebra/reduce.py +++ b/modin/core/dataframe/algebra/reduce.py @@ -53,6 +53,26 @@ def caller(query_compiler, *args, **kwargs): @classmethod def apply(cls, df, func, axis=0, func_args=None, func_kwargs=None): + """ + Apply a reduction function to each row/column partition of the dataframe. + + Parameters + ---------- + df : modin.pandas.DataFrame or modin.pandas.Series + DataFrame object to apply the operator against. + func : callable(pandas.DataFrame, *args, **kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]] + A function to apply. + axis : int, default: 0 + Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``). + func_args : tuple, optional + Positional arguments to pass to the `func`. + func_kwargs : dict, optional + Keyword arguments to pass to the `func`. + + Returns + ------- + modin.pandas.Series + """ from modin.pandas import Series return super().apply( diff --git a/modin/core/dataframe/algebra/tree_reduce.py b/modin/core/dataframe/algebra/tree_reduce.py index 2ad46bf7066..979995500e9 100644 --- a/modin/core/dataframe/algebra/tree_reduce.py +++ b/modin/core/dataframe/algebra/tree_reduce.py @@ -59,6 +59,28 @@ def caller(query_compiler, *args, **kwargs): def apply( cls, df, map_function, reduce_function, axis=0, func_args=None, func_kwargs=None ): + """ + Apply a map-reduce function to the dataframe. + + Parameters + ---------- + df : modin.pandas.DataFrame or modin.pandas.Series + DataFrame object to apply the operator against. + map_function : callable(pandas.DataFrame, *args, **kwargs) -> pandas.DataFrame + A map function to apply to every partition. + reduce_function : callable(pandas.DataFrame, *args, **kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]] + A reduction function to apply to the results of the map functions. + axis : int, default: 0 + Whether to apply the reduce function across rows (``axis=0``) or across columns (``axis=1``). + func_args : tuple, optional + Positional arguments to pass to the funcs. + func_kwargs : dict, optional + Keyword arguments to pass to the funcs. + + Returns + ------- + modin.pandas.Series + """ from modin.pandas import Series return super().apply( diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 38ad5fe0704..44736aa6753 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -1826,6 +1826,7 @@ def _compute_tree_reduce_metadata(self, axis, new_parts, dtypes=None): Modin series (1xN frame) containing the reduced data. """ new_axes, new_axes_lengths = [0, 0], [0, 0] + new_axes[axis] = [MODIN_UNNAMED_SERIES_LABEL] new_axes[axis ^ 1] = self.axes[axis ^ 1] diff --git a/setup.cfg b/setup.cfg index 709a57c9932..0d6e97bc180 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,7 +12,7 @@ tag_prefix = parentdir_prefix = modin- [tool:pytest] -addopts = --disable-pytest-warnings +addopts = --disable-pytest-warnings --cov-config=setup.cfg --cov=modin --cov-append --cov-report= xfail_strict=true markers = xfail_executions From e9f5dcd1cc797e012a7db7b6f520ee1b778ecc57 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Tue, 27 Jun 2023 20:25:40 +0200 Subject: [PATCH 4/9] Update docs Signed-off-by: Dmitry Chigarev --- docs/flow/modin/core/dataframe/algebra.rst | 57 +++++++++++++++------ modin/core/dataframe/algebra/binary.py | 2 +- modin/core/dataframe/algebra/fold.py | 2 +- modin/core/dataframe/algebra/operator.py | 6 +-- modin/core/dataframe/algebra/reduce.py | 4 +- modin/core/dataframe/algebra/tree_reduce.py | 4 +- 6 files changed, 49 insertions(+), 26 deletions(-) diff --git a/docs/flow/modin/core/dataframe/algebra.rst b/docs/flow/modin/core/dataframe/algebra.rst index ded04dc386b..d07c3b43d7a 100644 --- a/docs/flow/modin/core/dataframe/algebra.rst +++ b/docs/flow/modin/core/dataframe/algebra.rst @@ -33,6 +33,9 @@ Uniformly apply a function argument to each partition in parallel. .. figure:: /img/map_evaluation.svg :align: center +.. autoclass:: modin.core.dataframe.algebra.map.Map + :members: register, apply + Reduce operator --------------- Applies an argument function that reduces each column or row on the specified axis into a scalar, but requires knowledge about the whole axis. @@ -43,6 +46,9 @@ that the reduce function returns a one dimensional frame. .. figure:: /img/reduce_evaluation.svg :align: center +.. autoclass:: modin.core.dataframe.algebra.reduce.Reduce + :members: register, apply + TreeReduce operator ------------------- Applies an argument function that reduces specified axis into a scalar. First applies map function to each partition @@ -50,6 +56,9 @@ in parallel, then concatenates resulted partitions along the specified axis and function. In contrast with `Map function` template, here you're allowed to change partition shape in the map phase. Note that the execution engine expects that the reduce function returns a one dimensional frame. +.. autoclass:: modin.core.dataframe.algebra.tree_reduce.TreeReduce + :members: register, apply + Binary operator --------------- Applies an argument function, that takes exactly two operands (first is always `QueryCompiler`). @@ -65,23 +74,32 @@ the right operand to the left. it automatically but note that this requires repartitioning, which is a much more expensive operation than the binary function itself. +.. autoclass:: modin.core.dataframe.algebra.binary.Binary + :members: register, apply + Fold operator ------------- Applies an argument function that requires knowledge of the whole axis. Be aware that providing this knowledge may be expensive because the execution engine has to concatenate partitions along the specified axis. +.. autoclass:: modin.core.dataframe.algebra.fold.Fold + :members: register, apply + GroupBy operator ---------------- Evaluates GroupBy aggregation for that type of functions that can be executed via TreeReduce approach. To be able to form groups engine broadcasts ``by`` partitions to each partition of the source frame. +.. autoclass:: modin.core.dataframe.algebra.groupby.GroupByReduce + :members: register, apply + Default-to-pandas operator -------------------------- Do :doc:`fallback to pandas ` for passed function. -How to register your own function -''''''''''''''''''''''''''''''''' +How to use UDFs with these operators +'''''''''''''''''''''''''''''''''''' Let's examine an example of how to use the algebra module to create your own new functions. @@ -95,34 +113,39 @@ Let's implement a function that counts non-NA values for each column or row TreeReduce approach would be great: in a map phase, we'll count non-NA cells in each partition in parallel and then just sum its results in the reduce phase. -To define the TreeReduce function that does `count` + `sum` we just need to register the -appropriate functions and then assign the result to the picked `QueryCompiler` -(`PandasQueryCompiler` in our case): +To execute a TreeReduce function that does `count` + `sum` you can simply use the operator's ``.apply(...)`` +method that takes and outputs a :py:class:`~modin.pandas.dataframe.DataFrame`: .. code-block:: python - from modin.core.storage_formats import PandasQueryCompiler from modin.core.dataframe.algebra import TreeReduce - PandasQueryCompiler.custom_count = TreeReduce.register(pandas.DataFrame.count, pandas.DataFrame.sum) + res_df = TreeReduce.apply( + df, + map_func=lambda df: df.count(), + reduce_function=lambda df: df.sum() + ) -Then, we want to handle it from the :py:class:`~modin.pandas.dataframe.DataFrame`, so we need to create a way to do that: +If you're going to use your custom-defined function quite often you may want +to wrap it into a separate function or assign it as a DataFrame's method: .. code-block:: python import modin.pandas as pd - def count_func(self, **kwargs): - # The constructor allows you to pass in a query compiler as a keyword argument - return self.__constructor__(query_compiler=self._query_compiler.custom_count(**kwargs)) + def count_func(self): + return TreeReduce.apply( + self, + map_func=lambda df: df.count(), + reduce_function=lambda df: df.sum() + ) - pd.DataFrame.count_custom = count_func - -And then you can use it like you usually would: + # you can then use the function as is + res = count_func(df) -.. code-block:: python - - df.count_custom(axis=1) + # or assign it to the DataFrame's class and use it as a method + pd.DataFrame.count_custom = count_func + res = df.count_custom() Many of the `pandas` API functions can be easily implemented this way, so if you find out that one of your favorite function is still defaulted to pandas and decide to diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index 83c587f9d50..85236724e95 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -427,7 +427,7 @@ def apply( Left operand. right : modin.pandas.DataFrame or modin.pandas.Series Right operand. - func : callable(pandas.DataFrame, pandas.DataFrame, *args, axis, *kwargs) -> pandas.DataFrame + func : callable(pandas.DataFrame, pandas.DataFrame, \*args, axis, \*\*kwargs) -> pandas.DataFrame A binary function to apply `left` and `right`. axis : int, default: 0 Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``). diff --git a/modin/core/dataframe/algebra/fold.py b/modin/core/dataframe/algebra/fold.py index ba2ffb6b169..0ceb78a5831 100644 --- a/modin/core/dataframe/algebra/fold.py +++ b/modin/core/dataframe/algebra/fold.py @@ -75,7 +75,7 @@ def apply(cls, df, func, fold_axis=0, func_args=None, func_kwargs=None): ---------- df : modin.pandas.DataFrame or modin.pandas.Series DataFrame object to apply the operator against. - func : callable(pandas.DataFrame[NxM], *args, **kwargs) -> pandas.DataFrame[NxM] + func : callable(pandas.DataFrame[NxM], \*args, \*\*kwargs) -> pandas.DataFrame[NxM] A function to apply to every partition. Note that the function shouldn't change the shape of the dataframe. fold_axis : int, default: 0 diff --git a/modin/core/dataframe/algebra/operator.py b/modin/core/dataframe/algebra/operator.py index 9b8095a04ff..393b83f99a8 100644 --- a/modin/core/dataframe/algebra/operator.py +++ b/modin/core/dataframe/algebra/operator.py @@ -65,14 +65,14 @@ def validate_axis(cls, axis: Optional[int]) -> int: def apply( cls, df, func, func_args=None, func_kwargs=None, _return_type=None, **kwargs ): - """ - Apply a function to a Modin DataFrame using the operator's scheme. + r""" + Apply a function to a Modin DataFrame using the operators scheme. Parameters ---------- df : modin.pandas.DataFrame or modin.pandas.Series DataFrame object to apply the operator against. - func : callable(pandas.DataFrame, *args, **kwargs) -> pandas.DataFrame + func : callable(pandas.DataFrame, \*args, \*\*kwargs) -> pandas.DataFrame A function to apply. func_args : tuple, optional Positional arguments to pass to the `func`. diff --git a/modin/core/dataframe/algebra/reduce.py b/modin/core/dataframe/algebra/reduce.py index 04987659e8d..25f8755f6a0 100644 --- a/modin/core/dataframe/algebra/reduce.py +++ b/modin/core/dataframe/algebra/reduce.py @@ -53,14 +53,14 @@ def caller(query_compiler, *args, **kwargs): @classmethod def apply(cls, df, func, axis=0, func_args=None, func_kwargs=None): - """ + r""" Apply a reduction function to each row/column partition of the dataframe. Parameters ---------- df : modin.pandas.DataFrame or modin.pandas.Series DataFrame object to apply the operator against. - func : callable(pandas.DataFrame, *args, **kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]] + func : callable(pandas.DataFrame, \*args, \*\*kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]] A function to apply. axis : int, default: 0 Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``). diff --git a/modin/core/dataframe/algebra/tree_reduce.py b/modin/core/dataframe/algebra/tree_reduce.py index 979995500e9..f9d905f4a6b 100644 --- a/modin/core/dataframe/algebra/tree_reduce.py +++ b/modin/core/dataframe/algebra/tree_reduce.py @@ -66,9 +66,9 @@ def apply( ---------- df : modin.pandas.DataFrame or modin.pandas.Series DataFrame object to apply the operator against. - map_function : callable(pandas.DataFrame, *args, **kwargs) -> pandas.DataFrame + map_function : callable(pandas.DataFrame, \*args, \*\*kwargs) -> pandas.DataFrame A map function to apply to every partition. - reduce_function : callable(pandas.DataFrame, *args, **kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]] + reduce_function : callable(pandas.DataFrame, \*args, \*\*kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]] A reduction function to apply to the results of the map functions. axis : int, default: 0 Whether to apply the reduce function across rows (``axis=0``) or across columns (``axis=1``). From ee354fd26d912c4aa42617daad4f18ce1b9c98da Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Tue, 27 Jun 2023 18:28:35 +0000 Subject: [PATCH 5/9] fix doc-strings Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/algebra/binary.py | 2 +- modin/core/dataframe/algebra/fold.py | 2 +- modin/core/dataframe/algebra/tree_reduce.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index 85236724e95..e6c7d1933b6 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -418,7 +418,7 @@ def caller( def apply( cls, left, right, func, axis=0, func_args=None, func_kwargs=None, **kwargs ): - """ + r""" Apply groupby aggregation function using map-reduce pattern. Parameters diff --git a/modin/core/dataframe/algebra/fold.py b/modin/core/dataframe/algebra/fold.py index 0ceb78a5831..82d868f9af2 100644 --- a/modin/core/dataframe/algebra/fold.py +++ b/modin/core/dataframe/algebra/fold.py @@ -68,7 +68,7 @@ def caller(query_compiler, fold_axis=None, *args, **kwargs): @classmethod def apply(cls, df, func, fold_axis=0, func_args=None, func_kwargs=None): - """ + r""" Apply a Fold (full-axis) function to the dataframe. Parameters diff --git a/modin/core/dataframe/algebra/tree_reduce.py b/modin/core/dataframe/algebra/tree_reduce.py index f9d905f4a6b..479bf217178 100644 --- a/modin/core/dataframe/algebra/tree_reduce.py +++ b/modin/core/dataframe/algebra/tree_reduce.py @@ -59,7 +59,7 @@ def caller(query_compiler, *args, **kwargs): def apply( cls, df, map_function, reduce_function, axis=0, func_args=None, func_kwargs=None ): - """ + r""" Apply a map-reduce function to the dataframe. Parameters From 884224ed7e66c1546906ae50b69eeed4aa0229af Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 28 Jun 2023 19:02:06 +0200 Subject: [PATCH 6/9] Update modin/core/dataframe/algebra/binary.py --- modin/core/dataframe/algebra/binary.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index e6c7d1933b6..c3600d385ce 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -419,7 +419,7 @@ def apply( cls, left, right, func, axis=0, func_args=None, func_kwargs=None, **kwargs ): r""" - Apply groupby aggregation function using map-reduce pattern. + Apply a binary function row-wise or column-wise. Parameters ---------- From 96089daed873e2df0d0a2221a0cb8ba16523a12d Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 28 Jun 2023 19:03:56 +0200 Subject: [PATCH 7/9] Update modin/test/storage_formats/pandas/test_internals.py --- modin/test/storage_formats/pandas/test_internals.py | 1 - 1 file changed, 1 deletion(-) diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index eff0f4f74ff..6f484248ffd 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -1096,7 +1096,6 @@ def reduce_func(grp): md_df, map_func, reduce_func, by=by_col, groupby_kwargs={"as_index": False} ) pd_res = pd_df.groupby(by_col, as_index=False).count() - # breakpoint() df_equals(md_res, pd_res) def test_map(self): From d65387e6dbb7b894f6c3eb2c4936cacf190ac9a2 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 5 Jul 2023 10:48:32 +0000 Subject: [PATCH 8/9] Remove high-level imports in algebra operators Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/algebra/binary.py | 4 +--- modin/core/dataframe/algebra/operator.py | 13 ++----------- modin/core/dataframe/algebra/reduce.py | 7 ++----- modin/core/dataframe/algebra/tree_reduce.py | 6 ++---- 4 files changed, 7 insertions(+), 23 deletions(-) diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index c3600d385ce..8f86a6b18fa 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -442,8 +442,6 @@ def apply( ------- The same type as `df`. """ - from modin.pandas import Series - operator = cls.register(func, **kwargs) func_args = tuple() if func_args is None else func_args @@ -451,7 +449,7 @@ def apply( qc_result = operator( left._query_compiler, right._query_compiler, - broadcast=isinstance(right, Series), + broadcast=right.ndim == 1, *func_args, axis=axis, **func_kwargs, diff --git a/modin/core/dataframe/algebra/operator.py b/modin/core/dataframe/algebra/operator.py index 393b83f99a8..344d160ed97 100644 --- a/modin/core/dataframe/algebra/operator.py +++ b/modin/core/dataframe/algebra/operator.py @@ -62,9 +62,7 @@ def validate_axis(cls, axis: Optional[int]) -> int: return 0 if axis is None else axis @classmethod - def apply( - cls, df, func, func_args=None, func_kwargs=None, _return_type=None, **kwargs - ): + def apply(cls, df, func, func_args=None, func_kwargs=None, **kwargs): r""" Apply a function to a Modin DataFrame using the operators scheme. @@ -78,9 +76,6 @@ def apply( Positional arguments to pass to the `func`. func_kwargs : dict, optional Keyword arguments to pass to the `func`. - _return_type : type, optional - A class that takes the ``query_compiler`` keyword argument. If not specified - will be identical to the type of the passed `df`. **kwargs : dict Aditional arguments to pass to the ``cls.register()``. @@ -94,8 +89,4 @@ def apply( func_kwargs = dict() if func_kwargs is None else func_kwargs qc_result = operator(df._query_compiler, *func_args, **func_kwargs) - - if _return_type is None: - _return_type = type(df) - - return _return_type(query_compiler=qc_result) + return type(df)(query_compiler=qc_result) diff --git a/modin/core/dataframe/algebra/reduce.py b/modin/core/dataframe/algebra/reduce.py index 25f8755f6a0..583533ce8dd 100644 --- a/modin/core/dataframe/algebra/reduce.py +++ b/modin/core/dataframe/algebra/reduce.py @@ -73,8 +73,5 @@ def apply(cls, df, func, axis=0, func_args=None, func_kwargs=None): ------- modin.pandas.Series """ - from modin.pandas import Series - - return super().apply( - df, func, func_args, func_kwargs, axis=axis, _return_type=Series - ) + result = super().apply(df, func, func_args, func_kwargs, axis=axis) + return result if result.ndim == 1 else result.squeeze(axis) diff --git a/modin/core/dataframe/algebra/tree_reduce.py b/modin/core/dataframe/algebra/tree_reduce.py index 479bf217178..45ae29c4f6d 100644 --- a/modin/core/dataframe/algebra/tree_reduce.py +++ b/modin/core/dataframe/algebra/tree_reduce.py @@ -81,14 +81,12 @@ def apply( ------- modin.pandas.Series """ - from modin.pandas import Series - - return super().apply( + result = super().apply( df, map_function, func_args, func_kwargs, reduce_function=reduce_function, axis=axis, - _return_type=Series, ) + return result if result.ndim == 1 else result.squeeze(axis) From 3f3cc5b134e737035e9e8f884cf1f5c99a79bab7 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Fri, 14 Jul 2023 12:50:08 +0000 Subject: [PATCH 9/9] Apply @vnlitvinov's suggestions Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/algebra/binary.py | 8 +++----- modin/core/dataframe/algebra/fold.py | 7 ++----- modin/core/dataframe/algebra/groupby.py | 12 ++++-------- modin/core/dataframe/algebra/operator.py | 10 ++++------ 4 files changed, 13 insertions(+), 24 deletions(-) diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index 8f86a6b18fa..b3d011dfff4 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -444,14 +444,12 @@ def apply( """ operator = cls.register(func, **kwargs) - func_args = tuple() if func_args is None else func_args - func_kwargs = dict() if func_kwargs is None else func_kwargs qc_result = operator( left._query_compiler, right._query_compiler, broadcast=right.ndim == 1, - *func_args, + *(func_args or ()), axis=axis, - **func_kwargs, + **(func_kwargs or {}), ) - return type(left)(query_compiler=qc_result) + return left.__constructor__(query_compiler=qc_result) diff --git a/modin/core/dataframe/algebra/fold.py b/modin/core/dataframe/algebra/fold.py index 82d868f9af2..3aeb1b60586 100644 --- a/modin/core/dataframe/algebra/fold.py +++ b/modin/core/dataframe/algebra/fold.py @@ -89,8 +89,5 @@ def apply(cls, df, func, fold_axis=0, func_args=None, func_kwargs=None): ------- the same type as `df` """ - func_args = tuple() if func_args is None else func_args - func_kwargs = dict() if func_kwargs is None else func_kwargs - func_args = (fold_axis,) + func_args - - return super().apply(df, func, func_args, func_kwargs) + func_args = (fold_axis,) + (func_args or ()) + return super().apply(df, func, func_args, func_kwargs or {}) diff --git a/modin/core/dataframe/algebra/groupby.py b/modin/core/dataframe/algebra/groupby.py index a45d7bdd410..f04b79e2af0 100644 --- a/modin/core/dataframe/algebra/groupby.py +++ b/modin/core/dataframe/algebra/groupby.py @@ -772,19 +772,15 @@ def apply( ------- The same type as `df`. """ - agg_args = tuple() if agg_args is None else agg_args - agg_kwargs = dict() if agg_kwargs is None else agg_kwargs - groupby_kwargs = dict() if groupby_kwargs is None else groupby_kwargs - operator = cls.register(map_func, reduce_func) qc_result = operator( df._query_compiler, df[by]._query_compiler, axis=0, - groupby_kwargs=groupby_kwargs, - agg_args=agg_args, - agg_kwargs=agg_kwargs, + groupby_kwargs=groupby_kwargs or {}, + agg_args=agg_args or (), + agg_kwargs=agg_kwargs or {}, drop=True, ) - return type(df)(query_compiler=qc_result) + return df.__constructor__(query_compiler=qc_result) diff --git a/modin/core/dataframe/algebra/operator.py b/modin/core/dataframe/algebra/operator.py index 344d160ed97..80458b0a3ce 100644 --- a/modin/core/dataframe/algebra/operator.py +++ b/modin/core/dataframe/algebra/operator.py @@ -84,9 +84,7 @@ def apply(cls, df, func, func_args=None, func_kwargs=None, **kwargs): return_type """ operator = cls.register(func, **kwargs) - - func_args = tuple() if func_args is None else func_args - func_kwargs = dict() if func_kwargs is None else func_kwargs - - qc_result = operator(df._query_compiler, *func_args, **func_kwargs) - return type(df)(query_compiler=qc_result) + qc_result = operator( + df._query_compiler, *(func_args or ()), **(func_kwargs or {}) + ) + return df.__constructor__(query_compiler=qc_result)