Skip to content

Commit 5fb4465

Browse files
committed
pr comments
Signed-off-by: arunjose696 <arunjose696@gmail.com>
1 parent e7c3365 commit 5fb4465

File tree

5 files changed

+34
-41
lines changed

5 files changed

+34
-41
lines changed

modin/core/execution/ray/implementations/cudf_on_ray/partitioning/partition_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def from_pandas(cls, df, return_dims=False):
125125
num_splits = GpuCount.get()
126126
put_func = cls._partition_class.put
127127
# For now, we default to row partitioning
128-
pandas_dfs = split_result_of_axis_func_pandas(0, num_splits, df)
128+
pandas_dfs = list(split_result_of_axis_func_pandas(0, num_splits, df))
129129
keys = [
130130
put_func(cls._get_gpu_managers()[i], pandas_dfs[i])
131131
for i in range(num_splits)

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -324,13 +324,12 @@ def _deploy_ray_func(
324324
if not extract_metadata:
325325
for item in result:
326326
yield item
327-
return
328-
329-
ip = get_node_ip_address()
330-
for r in result:
331-
if isinstance(r, pandas.DataFrame):
332-
for item in [r, len(r), len(r.columns), ip]:
333-
yield item
334-
else:
335-
for item in [r, None, None, ip]:
336-
yield item
327+
else:
328+
ip = get_node_ip_address()
329+
for r in result:
330+
if isinstance(r, pandas.DataFrame):
331+
for item in [r, len(r), len(r.columns), ip]:
332+
yield item
333+
else:
334+
for item in [r, None, None, ip]:
335+
yield item

modin/core/storage_formats/cudf/parser.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ def _split_result_for_readers(axis, num_splits, df): # pragma: no cover
3939
Returns:
4040
A list of pandas DataFrames.
4141
"""
42-
splits = split_result_of_axis_func_pandas(axis, num_splits, df)
43-
if not isinstance(splits, list):
44-
splits = [splits]
42+
splits = list(split_result_of_axis_func_pandas(axis, num_splits, df))
4543
return splits
4644

4745

modin/core/storage_formats/pandas/parsers.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,7 @@ def _split_result_for_readers(axis, num_splits, df): # pragma: no cover
113113
list
114114
A list of pandas DataFrames.
115115
"""
116-
splits = split_result_of_axis_func_pandas(axis, num_splits, df)
117-
if not isinstance(splits, list):
118-
splits = [splits]
116+
splits = list(split_result_of_axis_func_pandas(axis, num_splits, df))
119117
return splits
120118

121119

modin/core/storage_formats/pandas/utils.py

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -86,30 +86,28 @@ def split_result_of_axis_func_pandas(
8686
"""
8787
if num_splits == 1:
8888
yield result
89-
return
90-
91-
if length_list is None:
92-
length_list = get_length_list(result.shape[axis], num_splits, min_block_size)
93-
# Inserting the first "zero" to properly compute cumsum indexing slices
94-
length_list = np.insert(length_list, obj=0, values=[0])
95-
96-
sums = np.cumsum(length_list)
97-
axis = 0 if isinstance(result, pandas.Series) else axis
98-
99-
for i in range(len(sums) - 1):
100-
# We do this to restore block partitioning
101-
if axis == 0:
102-
chunk = result.iloc[sums[i] : sums[i + 1]]
103-
else:
104-
chunk = result.iloc[:, sums[i] : sums[i + 1]]
105-
106-
# Sliced MultiIndex still stores all encoded values of the original index, explicitly
107-
# asking it to drop unused values in order to save memory.
108-
if isinstance(chunk.axes[axis], pandas.MultiIndex):
109-
chunk = chunk.set_axis(
110-
chunk.axes[axis].remove_unused_levels(), axis=axis, copy=False
111-
)
112-
yield chunk
89+
else:
90+
if length_list is None:
91+
length_list = get_length_list(result.shape[axis], num_splits,min_block_size)
92+
# Inserting the first "zero" to properly compute cumsum indexing slices
93+
length_list = np.insert(length_list, obj=0, values=[0])
94+
sums = np.cumsum(length_list)
95+
axis = 0 if isinstance(result, pandas.Series) else axis
96+
97+
for i in range(len(sums) - 1):
98+
# We do this to restore block partitioning
99+
if axis == 0:
100+
chunk = result.iloc[sums[i] : sums[i + 1]]
101+
else:
102+
chunk = result.iloc[:, sums[i] : sums[i + 1]]
103+
104+
# Sliced MultiIndex still stores all encoded values of the original index, explicitly
105+
# asking it to drop unused values in order to save memory.
106+
if isinstance(chunk.axes[axis], pandas.MultiIndex):
107+
chunk = chunk.set_axis(
108+
chunk.axes[axis].remove_unused_levels(), axis=axis, copy=False
109+
)
110+
yield chunk
113111

114112

115113
def get_length_list(axis_len: int, num_splits: int, min_block_size=None) -> list:

0 commit comments

Comments
 (0)