Skip to content

Commit 119aa6c

Browse files
authored
Revert "[core] Correct bytes in flight when objects <5mb (#54349)" (#56387)
Signed-off-by: dayshah <dhyey2019@gmail.com>
1 parent 7bb884c commit 119aa6c

File tree

8 files changed

+157
-204
lines changed

8 files changed

+157
-204
lines changed

src/ray/common/ray_config_def.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,9 @@ RAY_CONFIG(uint64_t, object_manager_default_chunk_size, 5 * 1024 * 1024)
334334

335335
/// The maximum number of outbound bytes to allow to be outstanding. This avoids
336336
/// excessive memory usage during object broadcast to many receivers.
337-
RAY_CONFIG(int64_t, object_manager_max_bytes_in_flight, (int64_t)2 * 1024 * 1024 * 1024)
337+
RAY_CONFIG(uint64_t,
338+
object_manager_max_bytes_in_flight,
339+
((uint64_t)2) * 1024 * 1024 * 1024)
338340

339341
/// Maximum number of ids in one batch to send to GCS to delete keys.
340342
RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000)

src/ray/object_manager/chunk_object_reader.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
#pragma once
1616

17-
#include <algorithm>
1817
#include <memory>
1918
#include <string>
2019

@@ -42,10 +41,6 @@ class ChunkObjectReader {
4241

4342
const IObjectReader &GetObject() const { return *object_; }
4443

45-
uint64_t ChunkSize() const {
46-
return std::min(chunk_size_, object_->GetDataSize() + object_->GetMetadataSize());
47-
}
48-
4944
private:
5045
const std::shared_ptr<IObjectReader> object_;
5146
const uint64_t chunk_size_;

src/ray/object_manager/object_manager.cc

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,10 @@ ObjectManager::ObjectManager(
116116
get_spilled_object_url_(std::move(get_spilled_object_url)),
117117
pull_retry_timer_(*main_service_,
118118
boost::posix_time::milliseconds(config.timer_freq_ms)),
119-
push_manager_(std::make_unique<PushManager>(config_.max_bytes_in_flight)) {
119+
push_manager_(std::make_unique<PushManager>(/* max_chunks_in_flight= */ std::max(
120+
static_cast<int64_t>(1L),
121+
static_cast<int64_t>(config_.max_bytes_in_flight /
122+
config_.object_chunk_size)))) {
120123
RAY_CHECK_GT(config_.rpc_service_threads_number, 0);
121124

122125
pull_retry_timer_.async_wait([this](const boost::system::error_code &e) { Tick(e); });
@@ -488,13 +491,8 @@ void ObjectManager::PushObjectInternal(const ObjectID &object_id,
488491
<< ", total data size: " << chunk_reader->GetObject().GetObjectSize();
489492

490493
auto push_id = UniqueID::FromRandom();
491-
uint64_t push_max_chunk_size = chunk_reader->ChunkSize();
492494
push_manager_->StartPush(
493-
node_id,
494-
object_id,
495-
chunk_reader->GetNumChunks(),
496-
push_max_chunk_size,
497-
[=](int64_t chunk_id) {
495+
node_id, object_id, chunk_reader->GetNumChunks(), [=](int64_t chunk_id) {
498496
rpc_service_.post(
499497
[=]() {
500498
// Post to the multithreaded RPC event loop so that data is copied
@@ -505,14 +503,11 @@ void ObjectManager::PushObjectInternal(const ObjectID &object_id,
505503
node_id,
506504
chunk_id,
507505
rpc_client,
508-
[this, push_max_chunk_size](const Status &status) {
506+
[=](const Status &status) {
509507
// Post back to the main event loop because the
510508
// PushManager is not thread-safe.
511-
this->main_service_->post(
512-
[this, push_max_chunk_size]() {
513-
this->push_manager_->OnChunkComplete(push_max_chunk_size);
514-
},
515-
"ObjectManager.Push");
509+
main_service_->post([this]() { push_manager_->OnChunkComplete(); },
510+
"ObjectManager.Push");
516511
},
517512
chunk_reader,
518513
from_disk);

src/ray/object_manager/object_manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ struct ObjectManagerConfig {
5353
/// Object chunk size, in bytes
5454
uint64_t object_chunk_size;
5555
/// Max object push bytes in flight.
56-
int64_t max_bytes_in_flight;
56+
uint64_t max_bytes_in_flight;
5757
/// The store socket name.
5858
std::string store_socket_name;
5959
/// The time in milliseconds to wait until a Push request

src/ray/object_manager/push_manager.cc

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ namespace ray {
2424
void PushManager::StartPush(const NodeID &dest_id,
2525
const ObjectID &obj_id,
2626
int64_t num_chunks,
27-
int64_t max_chunk_size,
2827
std::function<void(int64_t)> send_chunk_fn) {
2928
auto push_id = std::make_pair(dest_id, obj_id);
3029
RAY_CHECK(num_chunks > 0);
@@ -38,7 +37,6 @@ void PushManager::StartPush(const NodeID &dest_id,
3837
dest_id,
3938
obj_id,
4039
num_chunks,
41-
max_chunk_size,
4240
std::move(send_chunk_fn));
4341
} else {
4442
RAY_LOG(DEBUG) << "Duplicate push request " << push_id.first << ", " << push_id.second
@@ -49,8 +47,8 @@ void PushManager::StartPush(const NodeID &dest_id,
4947
ScheduleRemainingPushes();
5048
}
5149

52-
void PushManager::OnChunkComplete(int64_t push_max_chunk_size) {
53-
bytes_in_flight_ -= push_max_chunk_size;
50+
void PushManager::OnChunkComplete() {
51+
chunks_in_flight_ -= 1;
5452
chunks_remaining_ -= 1;
5553
ScheduleRemainingPushes();
5654
}
@@ -64,17 +62,17 @@ void PushManager::ScheduleRemainingPushes() {
6462

6563
// Loop over all active pushes for approximate round-robin prioritization.
6664
bool keep_looping = true;
67-
while (bytes_in_flight_ < max_bytes_in_flight_ && keep_looping) {
65+
while (chunks_in_flight_ < max_chunks_in_flight_ && keep_looping) {
6866
// Loop over each active push and try to send another chunk.
69-
// If we could push out a chunk and haven't reached the max_bytes_in_flight_ limit,
67+
// If we could push out a chunk and haven't reached the chunks_in_flight_ limit,
7068
// we'll loop again to try to send more chunks.
7169
keep_looping = false;
7270
auto iter = push_requests_with_chunks_to_send_.begin();
7371
while (iter != push_requests_with_chunks_to_send_.end() &&
74-
bytes_in_flight_ < max_bytes_in_flight_) {
72+
chunks_in_flight_ < max_chunks_in_flight_) {
7573
auto &push_state = *iter;
7674
push_state.SendOneChunk();
77-
bytes_in_flight_ += push_state.max_chunk_size_;
75+
chunks_in_flight_ += 1;
7876
if (push_state.num_chunks_to_send_ == 0) {
7977
auto push_state_map_iter = push_state_map_.find(push_state.node_id_);
8078
RAY_CHECK(push_state_map_iter != push_state_map_.end());
@@ -109,16 +107,18 @@ void PushManager::HandleNodeRemoved(const NodeID &node_id) {
109107

110108
void PushManager::RecordMetrics() const {
111109
ray::stats::STATS_push_manager_num_pushes_remaining.Record(
112-
push_requests_with_chunks_to_send_.size());
113-
ray::stats::STATS_push_manager_chunks.Record(chunks_remaining_, "Remaining");
110+
NumPushRequestsWithChunksToSend());
111+
ray::stats::STATS_push_manager_chunks.Record(NumChunksInFlight(), "InFlight");
112+
ray::stats::STATS_push_manager_chunks.Record(NumChunksRemaining(), "Remaining");
114113
}
115114

116115
std::string PushManager::DebugString() const {
117116
std::stringstream result;
118117
result << "PushManager:";
119-
result << "\n- num pushes remaining: " << push_requests_with_chunks_to_send_.size();
120-
result << "\n- num chunks remaining: " << chunks_remaining_;
121-
result << "\n- max bytes allowed: " << max_bytes_in_flight_;
118+
result << "\n- num pushes remaining: " << NumPushRequestsWithChunksToSend();
119+
result << "\n- num chunks in flight: " << NumChunksInFlight();
120+
result << "\n- num chunks remaining: " << NumChunksRemaining();
121+
result << "\n- max chunks allowed: " << max_chunks_in_flight_;
122122
return result.str();
123123
}
124124

src/ray/object_manager/push_manager.h

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ class PushManager {
2828
public:
2929
/// Create a push manager.
3030
///
31-
/// \param max_bytes_in_flight Max number of bytes allowed to be in flight
31+
/// \param max_chunks_in_flight Max number of chunks allowed to be in flight
3232
/// from this PushManager (this raylet).
33-
explicit PushManager(int64_t max_bytes_in_flight)
34-
: max_bytes_in_flight_(max_bytes_in_flight){};
33+
explicit PushManager(int64_t max_chunks_in_flight)
34+
: max_chunks_in_flight_(max_chunks_in_flight) {
35+
RAY_CHECK_GT(max_chunks_in_flight_, 0);
36+
};
3537

3638
/// Start pushing an object subject to max chunks in flight limit.
3739
///
@@ -40,39 +42,40 @@ class PushManager {
4042
/// \param dest_id The node to send to.
4143
/// \param obj_id The object to send.
4244
/// \param num_chunks The total number of chunks to send.
43-
/// \param max_chunk_size See comment for max_chunk_size_ in PushState.
4445
/// \param send_chunk_fn This function will be called with args 0...{num_chunks-1}.
4546
/// The caller promises to call PushManager::OnChunkComplete()
4647
/// once a call to send_chunk_fn finishes.
4748
void StartPush(const NodeID &dest_id,
4849
const ObjectID &obj_id,
4950
int64_t num_chunks,
50-
int64_t max_chunk_size,
5151
std::function<void(int64_t)> send_chunk_fn);
5252

5353
/// Called every time a chunk completes to trigger additional sends.
5454
/// TODO(ekl) maybe we should cancel the entire push on error.
55-
void OnChunkComplete(int64_t push_max_chunk_size);
55+
void OnChunkComplete();
5656

5757
/// Cancel all pushes that have not yet been sent to the removed node.
5858
void HandleNodeRemoved(const NodeID &node_id);
5959

60-
void RecordMetrics() const;
61-
62-
int64_t BytesInFlight() const { return bytes_in_flight_; }
63-
64-
int64_t ChunksRemaining() const { return chunks_remaining_; }
60+
/// Return the number of chunks currently in flight. For metrics and testing.
61+
int64_t NumChunksInFlight() const { return chunks_in_flight_; };
6562

66-
int64_t PushesInFlight() const { return push_state_map_.size(); }
63+
/// Return the number of chunks remaining. For metrics and testing.
64+
int64_t NumChunksRemaining() const { return chunks_remaining_; }
6765

68-
int64_t PushRequestsRemaining() const {
66+
/// Return the number of push requests with remaining chunks. For metrics and testing.
67+
int64_t NumPushRequestsWithChunksToSend() const {
6968
return push_requests_with_chunks_to_send_.size();
70-
}
69+
};
70+
71+
/// Record the internal metrics.
72+
void RecordMetrics() const;
7173

7274
std::string DebugString() const;
7375

7476
private:
7577
FRIEND_TEST(TestPushManager, TestPushState);
78+
FRIEND_TEST(TestPushManager, TestNodeRemoved);
7679

7780
/// Tracks the state of an active object push to another node.
7881
struct PushState {
@@ -81,13 +84,8 @@ class PushManager {
8184

8285
/// total number of chunks of this object.
8386
int64_t num_chunks_;
84-
/// the max size of a chunk for this object in bytes, used to count bytes_in_flight_
85-
/// and assure it stays under max_bytes_in_flight_. This means we can overcount for
86-
/// the last chunk but we're accepting that to keep the code simpler.
87-
int64_t max_chunk_size_;
8887
/// The function to send chunks with.
8988
std::function<void(int64_t)> chunk_send_fn_;
90-
9189
/// The index of the next chunk to send.
9290
int64_t next_chunk_id_ = 0;
9391
/// The number of chunks remaining to send.
@@ -96,12 +94,10 @@ class PushManager {
9694
PushState(NodeID node_id,
9795
ObjectID object_id,
9896
int64_t num_chunks,
99-
int64_t max_chunk_size,
10097
std::function<void(int64_t)> chunk_send_fn)
10198
: node_id_(node_id),
10299
object_id_(object_id),
103100
num_chunks_(num_chunks),
104-
max_chunk_size_(max_chunk_size),
105101
chunk_send_fn_(std::move(chunk_send_fn)),
106102
num_chunks_to_send_(num_chunks) {}
107103

@@ -126,11 +122,11 @@ class PushManager {
126122
/// Called on completion events to trigger additional pushes.
127123
void ScheduleRemainingPushes();
128124

129-
/// Max number of bytes in flight allowed.
130-
const int64_t max_bytes_in_flight_;
125+
/// Max number of chunks in flight allowed.
126+
const int64_t max_chunks_in_flight_;
131127

132-
/// Running count of bytes in flight
133-
int64_t bytes_in_flight_ = 0;
128+
/// Running count of chunks in flight, used to limit progress of in_flight_pushes_.
129+
int64_t chunks_in_flight_ = 0;
134130

135131
/// Remaining count of chunks to push to other nodes.
136132
int64_t chunks_remaining_ = 0;

0 commit comments

Comments
 (0)