-
Notifications
You must be signed in to change notification settings - Fork 666
FEAT-#7004: use generators when returning from _deploy_ray_func remote function. #7005
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0abe5db
e7c3365
5fb4465
004e6e6
585a403
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -21,7 +21,10 @@ | |||||
from modin.core.dataframe.base.partitioning.axis_partition import ( | ||||||
BaseDataframeAxisPartition, | ||||||
) | ||||||
from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas | ||||||
from modin.core.storage_formats.pandas.utils import ( | ||||||
generate_result_of_axis_func_pandas, | ||||||
split_result_of_axis_func_pandas, | ||||||
) | ||||||
|
||||||
from .partition import PandasDataframePartition | ||||||
|
||||||
|
@@ -388,6 +391,7 @@ def deploy_axis_func( | |||||
*partitions, | ||||||
lengths=None, | ||||||
manual_partition=False, | ||||||
return_generator=False, | ||||||
): | ||||||
""" | ||||||
Deploy a function along a full axis. | ||||||
|
@@ -413,11 +417,14 @@ def deploy_axis_func( | |||||
The list of lengths to shuffle the object. | ||||||
manual_partition : bool, default: False | ||||||
If True, partition the result with `lengths`. | ||||||
return_generator : bool, default: False | ||||||
Return a generator from the function, set to `True` for Ray backend | ||||||
as Ray remote functions can return Generators. | ||||||
|
||||||
Returns | ||||||
------- | ||||||
list | ||||||
A list of pandas DataFrames. | ||||||
list | Generator | ||||||
A list or generator of pandas DataFrames. | ||||||
""" | ||||||
dataframe = pandas.concat(list(partitions), axis=axis, copy=False) | ||||||
with warnings.catch_warnings(): | ||||||
|
@@ -451,7 +458,12 @@ def deploy_axis_func( | |||||
lengths = [len(part.columns) for part in partitions] | ||||||
if sum(lengths) != len(result.columns): | ||||||
lengths = None | ||||||
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths) | ||||||
if return_generator: | ||||||
return generate_result_of_axis_func_pandas( | ||||||
axis, num_splits, result, lengths | ||||||
) | ||||||
else: | ||||||
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths) | ||||||
|
||||||
@classmethod | ||||||
def deploy_func_between_two_axis_partitions( | ||||||
|
@@ -464,6 +476,7 @@ def deploy_func_between_two_axis_partitions( | |||||
len_of_left, | ||||||
other_shape, | ||||||
*partitions, | ||||||
return_generator=False, | ||||||
): | ||||||
""" | ||||||
Deploy a function along a full axis between two data sets. | ||||||
|
@@ -487,11 +500,14 @@ def deploy_func_between_two_axis_partitions( | |||||
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition. | ||||||
*partitions : iterable | ||||||
All partitions that make up the full axis (row or column) for both data sets. | ||||||
return_generator : bool, default: False | ||||||
Return a generator from the function, set to `True` for Ray backend | ||||||
as Ray remote functions can return Generators. | ||||||
|
||||||
Returns | ||||||
------- | ||||||
list | ||||||
A list of pandas DataFrames. | ||||||
list | Generator | ||||||
A list or generator of pandas DataFrames. | ||||||
""" | ||||||
lt_frame = pandas.concat(partitions[:len_of_left], axis=axis, copy=False) | ||||||
|
||||||
|
@@ -510,7 +526,18 @@ def deploy_func_between_two_axis_partitions( | |||||
with warnings.catch_warnings(): | ||||||
warnings.filterwarnings("ignore", category=FutureWarning) | ||||||
result = func(lt_frame, rt_frame, *f_args, **f_kwargs) | ||||||
return split_result_of_axis_func_pandas(axis, num_splits, result) | ||||||
if return_generator: | ||||||
return generate_result_of_axis_func_pandas( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This wouldnt work because using yeild in a function would turn it to a generator. We do not require generators but lists for some branches of if , For the backends such as dask as we try to return a list of partitions, but as there is yield statement in the function a generator would still be returned and thus partitions would be empty when materialized. https://stackoverflow.com/questions/26595895/return-and-yield-in-the-same-function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean not just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Checked with |
||||||
axis, | ||||||
num_splits, | ||||||
result, | ||||||
) | ||||||
else: | ||||||
return split_result_of_axis_func_pandas( | ||||||
axis, | ||||||
num_splits, | ||||||
result, | ||||||
) | ||||||
|
||||||
|
||||||
@classmethod | ||||||
def drain(cls, df: pandas.DataFrame, call_queue: list): | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -189,6 +189,7 @@ def deploy_axis_func( | |
f_kwargs=f_kwargs, | ||
manual_partition=manual_partition, | ||
lengths=lengths, | ||
return_generator=True, | ||
) | ||
|
||
@classmethod | ||
|
@@ -244,6 +245,7 @@ def deploy_func_between_two_axis_partitions( | |
f_to_deploy=func, | ||
f_len_args=len(f_args), | ||
f_kwargs=f_kwargs, | ||
return_generator=True, | ||
) | ||
|
||
def wait(self): | ||
|
@@ -320,12 +322,16 @@ def _deploy_ray_func( | |
f_args = positional_args[:f_len_args] | ||
deploy_args = positional_args[f_len_args:] | ||
result = deployer(axis, f_to_deploy, f_args, f_kwargs, *deploy_args, **kwargs) | ||
|
||
if not extract_metadata: | ||
return result | ||
ip = get_node_ip_address() | ||
if isinstance(result, pandas.DataFrame): | ||
return result, len(result), len(result.columns), ip | ||
elif all(isinstance(r, pandas.DataFrame) for r in result): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we check if all parts of result are dataframes. What are the scenarios where the result would be heterogeneous( composed of dataframes and non dataframes)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this would suffice. |
||
return [i for r in result for i in [r, len(r), len(r.columns), ip]] | ||
for item in result: | ||
yield item | ||
else: | ||
return [i for r in result for i in [r, None, None, ip]] | ||
ip = get_node_ip_address() | ||
for r in result: | ||
if isinstance(r, pandas.DataFrame): | ||
for item in [r, len(r), len(r.columns), ip]: | ||
yield item | ||
else: | ||
for item in [r, None, None, ip]: | ||
yield item |
Uh oh!
There was an error while loading. Please reload this page.