Skip to content

Commit aac861a

Browse files
authored
[core] Move core worker gRPC service definition to core_worker/ (#56197)
- Moves the service definition to `core_worker/`. No need for this to be in a common directory because core worker is the only implementer. - Matches the format of the GCS service definitions. - Removed unnecessary macro indirection and split `.h` & `.cc` files. --------- Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
1 parent af858cf commit aac861a

File tree

11 files changed

+318
-174
lines changed

11 files changed

+318
-174
lines changed

src/ray/core_worker/BUILD.bazel

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ ray_cc_library(
2626
":experimental_mutable_object_provider",
2727
":future_resolver",
2828
":generator_waiter",
29+
":grpc_service",
2930
":memory_store",
3031
":object_recovery_manager",
3132
":plasma_store_provider",
@@ -46,7 +47,6 @@ ray_cc_library(
4647
"//src/ray/pubsub:subscriber",
4748
"//src/ray/raylet_client:raylet_client_lib",
4849
"//src/ray/rpc:core_worker_client",
49-
"//src/ray/rpc:core_worker_server",
5050
"//src/ray/rpc:metrics_agent_client",
5151
"//src/ray/stats:stats_lib",
5252
"//src/ray/util:container_util",
@@ -65,6 +65,24 @@ ray_cc_library(
6565
],
6666
)
6767

68+
ray_cc_library(
69+
name = "grpc_service",
70+
srcs = [
71+
"grpc_service.cc",
72+
],
73+
hdrs = [
74+
"grpc_service.h",
75+
],
76+
visibility = [":__subpackages__"],
77+
deps = [
78+
"//src/ray/common:asio",
79+
"//src/ray/protobuf:core_worker_cc_grpc",
80+
"//src/ray/protobuf:core_worker_cc_proto",
81+
"//src/ray/rpc:grpc_server",
82+
"//src/ray/rpc:server_call",
83+
],
84+
)
85+
6886
ray_cc_library(
6987
name = "shutdown_coordinator",
7088
srcs = [

src/ray/core_worker/core_worker.h

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,10 @@
5353
#include "ray/pubsub/publisher.h"
5454
#include "ray/pubsub/subscriber.h"
5555
#include "ray/raylet_client/raylet_client_interface.h"
56-
#include "ray/rpc/worker/core_worker_server.h"
5756
#include "ray/util/process.h"
5857
#include "ray/util/shared_lru.h"
5958
#include "src/ray/protobuf/pubsub.pb.h"
6059

61-
/// The set of gRPC handlers and their associated level of concurrency. If you want to
62-
/// add a new call to the worker gRPC server, do the following:
63-
/// 1) Add the rpc to the CoreWorkerService in core_worker.proto, e.g., "ExampleCall"
64-
/// 2) Add a new macro to RAY_CORE_WORKER_DECLARE_RPC_HANDLERS
65-
/// in core_worker_server.h,
66-
// e.g. "DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(ExampleCall)"
67-
/// 3) Add a new macro to RAY_CORE_WORKER_RPC_HANDLERS in core_worker_server.h, e.g.
68-
/// "RPC_SERVICE_HANDLER(CoreWorkerService, ExampleCall, 1)"
69-
/// 4) Add a method to the CoreWorker class below: "CoreWorker::HandleExampleCall"
70-
7160
namespace ray::core {
7261

7362
JobID GetProcessJobID(const CoreWorkerOptions &options);

src/ray/core_worker/core_worker_process.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,8 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
255255
// Start RPC server after all the task receivers are properly initialized and we have
256256
// our assigned port from the raylet.
257257
core_worker_server->RegisterService(
258-
std::make_unique<rpc::CoreWorkerGrpcService>(io_service_, *service_handler_),
258+
std::make_unique<rpc::CoreWorkerGrpcService>(
259+
io_service_, *service_handler_, /*max_active_rpcs_per_handler_=*/-1),
259260
false /* token_auth */);
260261
core_worker_server->Run();
261262

src/ray/core_worker/core_worker_process.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <string>
2020

2121
#include "ray/core_worker/core_worker_options.h"
22+
#include "ray/core_worker/grpc_service.h"
2223
#include "ray/rpc/metrics_agent_client.h"
2324
#include "ray/util/mutex_protected.h"
2425

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright 2025 The Ray Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "ray/core_worker/grpc_service.h"
16+
17+
#include <memory>
18+
#include <vector>
19+
20+
namespace ray {
21+
namespace rpc {
22+
23+
void CoreWorkerGrpcService::InitServerCallFactories(
24+
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
25+
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
26+
const ClusterID &cluster_id) {
27+
/// TODO(vitsai): Remove this when auth is implemented for node manager.
28+
/// Disable gRPC server metrics since it incurs too high cardinality.
29+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
30+
CoreWorkerService, PushTask, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
31+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
32+
ActorCallArgWaitComplete,
33+
max_active_rpcs_per_handler_,
34+
AuthType::NO_AUTH);
35+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
36+
RayletNotifyGCSRestart,
37+
max_active_rpcs_per_handler_,
38+
AuthType::NO_AUTH);
39+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
40+
GetObjectStatus,
41+
max_active_rpcs_per_handler_,
42+
AuthType::NO_AUTH);
43+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
44+
WaitForActorRefDeleted,
45+
max_active_rpcs_per_handler_,
46+
AuthType::NO_AUTH);
47+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
48+
PubsubLongPolling,
49+
max_active_rpcs_per_handler_,
50+
AuthType::NO_AUTH);
51+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
52+
PubsubCommandBatch,
53+
max_active_rpcs_per_handler_,
54+
AuthType::NO_AUTH);
55+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
56+
UpdateObjectLocationBatch,
57+
max_active_rpcs_per_handler_,
58+
AuthType::NO_AUTH);
59+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
60+
GetObjectLocationsOwner,
61+
max_active_rpcs_per_handler_,
62+
AuthType::NO_AUTH);
63+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
64+
ReportGeneratorItemReturns,
65+
max_active_rpcs_per_handler_,
66+
AuthType::NO_AUTH);
67+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
68+
CoreWorkerService, KillActor, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
69+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
70+
CoreWorkerService, CancelTask, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
71+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
72+
RemoteCancelTask,
73+
max_active_rpcs_per_handler_,
74+
AuthType::NO_AUTH);
75+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
76+
RegisterMutableObjectReader,
77+
max_active_rpcs_per_handler_,
78+
AuthType::NO_AUTH);
79+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
80+
GetCoreWorkerStats,
81+
max_active_rpcs_per_handler_,
82+
AuthType::NO_AUTH);
83+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
84+
CoreWorkerService, LocalGC, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
85+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
86+
CoreWorkerService, DeleteObjects, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
87+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
88+
CoreWorkerService, SpillObjects, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
89+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
90+
RestoreSpilledObjects,
91+
max_active_rpcs_per_handler_,
92+
AuthType::NO_AUTH);
93+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
94+
DeleteSpilledObjects,
95+
max_active_rpcs_per_handler_,
96+
AuthType::NO_AUTH);
97+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
98+
PlasmaObjectReady,
99+
max_active_rpcs_per_handler_,
100+
AuthType::NO_AUTH);
101+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
102+
CoreWorkerService, Exit, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
103+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
104+
AssignObjectOwner,
105+
max_active_rpcs_per_handler_,
106+
AuthType::NO_AUTH);
107+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
108+
NumPendingTasks,
109+
max_active_rpcs_per_handler_,
110+
AuthType::NO_AUTH);
111+
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
112+
FreeActorObject,
113+
max_active_rpcs_per_handler_,
114+
AuthType::NO_AUTH);
115+
}
116+
117+
} // namespace rpc
118+
} // namespace ray

src/ray/core_worker/grpc_service.h

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Copyright 2025 The Ray Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/*
16+
* This file defines the gRPC service handlers for the core worker server.
17+
*
18+
* core_worker_process should be the only user of this target. If other classes need the
19+
* CoreWorkerInterface in the future, split it into its own target that does not include
20+
* the heavyweight gRPC headers..
21+
*
22+
* To add a new RPC handler:
23+
* - Update core_worker.proto.
24+
* - Add a virtual method to CoreWorkerService.
25+
* - Initialize the handler for the method in InitServerCallFactories.
26+
* - Implement the method in core_worker.
27+
*/
28+
29+
#pragma once
30+
31+
#include <memory>
32+
#include <vector>
33+
34+
#include "ray/common/asio/instrumented_io_context.h"
35+
#include "ray/rpc/grpc_server.h"
36+
#include "ray/rpc/server_call.h"
37+
#include "src/ray/protobuf/core_worker.grpc.pb.h"
38+
#include "src/ray/protobuf/core_worker.pb.h"
39+
40+
namespace ray {
41+
namespace rpc {
42+
43+
class CoreWorkerServiceHandler : public DelayedServiceHandler {
44+
public:
45+
/// Blocks until the service is ready to serve RPCs.
46+
virtual void WaitUntilInitialized() = 0;
47+
48+
virtual void HandlePushTask(PushTaskRequest request,
49+
PushTaskReply *reply,
50+
SendReplyCallback send_reply_callback) = 0;
51+
52+
virtual void HandleActorCallArgWaitComplete(ActorCallArgWaitCompleteRequest request,
53+
ActorCallArgWaitCompleteReply *reply,
54+
SendReplyCallback send_reply_callback) = 0;
55+
56+
virtual void HandleRayletNotifyGCSRestart(RayletNotifyGCSRestartRequest request,
57+
RayletNotifyGCSRestartReply *reply,
58+
SendReplyCallback send_reply_callback) = 0;
59+
60+
virtual void HandleGetObjectStatus(GetObjectStatusRequest request,
61+
GetObjectStatusReply *reply,
62+
SendReplyCallback send_reply_callback) = 0;
63+
64+
virtual void HandleWaitForActorRefDeleted(WaitForActorRefDeletedRequest request,
65+
WaitForActorRefDeletedReply *reply,
66+
SendReplyCallback send_reply_callback) = 0;
67+
68+
virtual void HandlePubsubLongPolling(PubsubLongPollingRequest request,
69+
PubsubLongPollingReply *reply,
70+
SendReplyCallback send_reply_callback) = 0;
71+
72+
virtual void HandlePubsubCommandBatch(PubsubCommandBatchRequest request,
73+
PubsubCommandBatchReply *reply,
74+
SendReplyCallback send_reply_callback) = 0;
75+
76+
virtual void HandleUpdateObjectLocationBatch(UpdateObjectLocationBatchRequest request,
77+
UpdateObjectLocationBatchReply *reply,
78+
SendReplyCallback send_reply_callback) = 0;
79+
virtual void HandleGetObjectLocationsOwner(GetObjectLocationsOwnerRequest request,
80+
GetObjectLocationsOwnerReply *reply,
81+
SendReplyCallback send_reply_callback) = 0;
82+
83+
virtual void HandleReportGeneratorItemReturns(
84+
ReportGeneratorItemReturnsRequest request,
85+
ReportGeneratorItemReturnsReply *reply,
86+
SendReplyCallback send_reply_callback) = 0;
87+
88+
virtual void HandleKillActor(KillActorRequest request,
89+
KillActorReply *reply,
90+
SendReplyCallback send_reply_callback) = 0;
91+
92+
virtual void HandleCancelTask(CancelTaskRequest request,
93+
CancelTaskReply *reply,
94+
SendReplyCallback send_reply_callback) = 0;
95+
96+
virtual void HandleRemoteCancelTask(RemoteCancelTaskRequest request,
97+
RemoteCancelTaskReply *reply,
98+
SendReplyCallback send_reply_callback) = 0;
99+
100+
virtual void HandleRegisterMutableObjectReader(
101+
RegisterMutableObjectReaderRequest request,
102+
RegisterMutableObjectReaderReply *reply,
103+
SendReplyCallback send_reply_callback) = 0;
104+
105+
virtual void HandleGetCoreWorkerStats(GetCoreWorkerStatsRequest request,
106+
GetCoreWorkerStatsReply *reply,
107+
SendReplyCallback send_reply_callback) = 0;
108+
109+
virtual void HandleLocalGC(LocalGCRequest request,
110+
LocalGCReply *reply,
111+
SendReplyCallback send_reply_callback) = 0;
112+
113+
virtual void HandleDeleteObjects(DeleteObjectsRequest request,
114+
DeleteObjectsReply *reply,
115+
SendReplyCallback send_reply_callback) = 0;
116+
117+
virtual void HandleSpillObjects(SpillObjectsRequest request,
118+
SpillObjectsReply *reply,
119+
SendReplyCallback send_reply_callback) = 0;
120+
121+
virtual void HandleRestoreSpilledObjects(RestoreSpilledObjectsRequest request,
122+
RestoreSpilledObjectsReply *reply,
123+
SendReplyCallback send_reply_callback) = 0;
124+
125+
virtual void HandleDeleteSpilledObjects(DeleteSpilledObjectsRequest request,
126+
DeleteSpilledObjectsReply *reply,
127+
SendReplyCallback send_reply_callback) = 0;
128+
129+
virtual void HandlePlasmaObjectReady(PlasmaObjectReadyRequest request,
130+
PlasmaObjectReadyReply *reply,
131+
SendReplyCallback send_reply_callback) = 0;
132+
133+
virtual void HandleExit(ExitRequest request,
134+
ExitReply *reply,
135+
SendReplyCallback send_reply_callback) = 0;
136+
137+
virtual void HandleAssignObjectOwner(AssignObjectOwnerRequest request,
138+
AssignObjectOwnerReply *reply,
139+
SendReplyCallback send_reply_callback) = 0;
140+
141+
virtual void HandleNumPendingTasks(NumPendingTasksRequest request,
142+
NumPendingTasksReply *reply,
143+
SendReplyCallback send_reply_callback) = 0;
144+
145+
virtual void HandleFreeActorObject(FreeActorObjectRequest request,
146+
FreeActorObjectReply *reply,
147+
SendReplyCallback send_reply_callback) = 0;
148+
};
149+
150+
class CoreWorkerGrpcService : public GrpcService {
151+
public:
152+
CoreWorkerGrpcService(instrumented_io_context &main_service,
153+
CoreWorkerServiceHandler &service_handler,
154+
int64_t max_active_rpcs_per_handler)
155+
: GrpcService(main_service),
156+
service_handler_(service_handler),
157+
max_active_rpcs_per_handler_(max_active_rpcs_per_handler) {}
158+
159+
protected:
160+
grpc::Service &GetGrpcService() override { return service_; }
161+
162+
void InitServerCallFactories(
163+
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
164+
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
165+
const ClusterID &cluster_id) override;
166+
167+
private:
168+
CoreWorkerService::AsyncService service_;
169+
CoreWorkerServiceHandler &service_handler_;
170+
int64_t max_active_rpcs_per_handler_;
171+
};
172+
173+
} // namespace rpc
174+
} // namespace ray

src/ray/core_worker/tests/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ ray_cc_test(
249249
"//src/fakes/ray/rpc/raylet:fake_raylet_client",
250250
"//src/ray/common:test_utils",
251251
"//src/ray/core_worker:core_worker_lib",
252+
"//src/ray/core_worker:grpc_service",
252253
"//src/ray/core_worker:memory_store",
253254
"//src/ray/core_worker:reference_count",
254255
"//src/ray/ipc:fake_raylet_ipc_client",

0 commit comments

Comments
 (0)