Skip to content

Commit fa61584

Browse files
add IPv6 support for sockets
Signed-off-by: Yicheng-Lu-llll <luyc58576@gmail.com>
1 parent b738755 commit fa61584

File tree

38 files changed

+440
-150
lines changed

38 files changed

+440
-150
lines changed

cpp/src/ray/runtime/native_ray_runtime.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "./object/object_store.h"
2121
#include "./task/native_task_submitter.h"
2222
#include "ray/common/ray_config.h"
23+
#include "ray/util/network_util.h"
2324

2425
namespace ray {
2526
namespace internal {
@@ -31,7 +32,7 @@ NativeRayRuntime::NativeRayRuntime() {
3132

3233
auto bootstrap_address = ConfigInternal::Instance().bootstrap_ip;
3334
if (bootstrap_address.empty()) {
34-
bootstrap_address = GetNodeIpAddress();
35+
bootstrap_address = ray::GetNodeIpAddressFromPerspective();
3536
}
3637
global_state_accessor_ = ProcessHelper::GetInstance().CreateGlobalStateAccessor(
3738
bootstrap_address, ConfigInternal::Instance().bootstrap_port);

cpp/src/ray/test/cluster/cluster_mode_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ TEST(RayClusterModeTest, FullTest) {
7171
auto port = absl::GetFlag<int32_t>(FLAGS_redis_port);
7272
std::string username = absl::GetFlag<std::string>(FLAGS_redis_username);
7373
std::string password = absl::GetFlag<std::string>(FLAGS_redis_password);
74-
std::string local_ip = ray::internal::GetNodeIpAddress();
74+
std::string local_ip = ray::GetNodeIpAddressFromPerspective();
7575
ray::internal::ProcessHelper::GetInstance().StartRayNode(
7676
local_ip, port, username, password);
7777
config.address = ray::BuildAddress(local_ip, port);

cpp/src/ray/util/process_helper.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
8383

8484
if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER &&
8585
bootstrap_ip.empty()) {
86-
bootstrap_ip = GetNodeIpAddress();
86+
bootstrap_ip = ray::GetNodeIpAddressFromPerspective();
8787
StartRayNode(bootstrap_ip,
8888
bootstrap_port,
8989
ConfigInternal::Instance().redis_username,
@@ -95,9 +95,9 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
9595
std::string node_ip = ConfigInternal::Instance().node_ip_address;
9696
if (node_ip.empty()) {
9797
if (!bootstrap_ip.empty()) {
98-
node_ip = GetNodeIpAddress(bootstrap_address);
98+
node_ip = ray::GetNodeIpAddressFromPerspective(bootstrap_address);
9999
} else {
100-
node_ip = GetNodeIpAddress();
100+
node_ip = ray::GetNodeIpAddressFromPerspective();
101101
}
102102
}
103103

cpp/src/ray/util/util.cc

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,6 @@
2424
namespace ray {
2525
namespace internal {
2626

27-
std::string GetNodeIpAddress(const std::string &address) {
28-
auto parts = ParseAddress(address);
29-
RAY_CHECK(parts.has_value());
30-
try {
31-
boost::asio::io_service netService;
32-
boost::asio::ip::udp::resolver resolver(netService);
33-
boost::asio::ip::udp::resolver::query query(
34-
boost::asio::ip::udp::v4(), (*parts)[0], (*parts)[1]);
35-
boost::asio::ip::udp::resolver::iterator endpoints = resolver.resolve(query);
36-
boost::asio::ip::udp::endpoint ep = *endpoints;
37-
boost::asio::ip::udp::socket socket(netService);
38-
socket.connect(ep);
39-
boost::asio::ip::address addr = socket.local_endpoint().address();
40-
return addr.to_string();
41-
} catch (std::exception &e) {
42-
RAY_LOG(FATAL) << "Could not get the node IP address with socket. Exception: "
43-
<< e.what();
44-
return "";
45-
}
46-
}
47-
4827
std::string getLibraryPathEnv() {
4928
auto path_env_p = std::getenv(kLibraryPathEnvName);
5029
if (path_env_p != nullptr && strlen(path_env_p) != 0) {

cpp/src/ray/util/util.h

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,6 @@
1818
namespace ray {
1919
namespace internal {
2020

21-
/// IP address by which the local node can be reached *from* the `address`.
22-
///
23-
/// The behavior should be the same as `node_ip_address_from_perspective` from Ray Python
24-
/// code. See
25-
/// https://stackoverflow.com/questions/2674314/get-local-ip-address-using-boost-asio.
26-
///
27-
/// TODO(kfstorm): Make this function shared code and migrate Python & Java to use this
28-
/// function.
29-
///
30-
/// \param address The IP address and port of any known live service on the network
31-
/// you care about.
32-
/// \return The IP address by which the local node can be reached from the address.
33-
std::string GetNodeIpAddress(const std::string &address = "8.8.8.8:53");
34-
3521
std::string getLibraryPathEnv();
3622

3723
} // namespace internal

doc/source/ray-core/examples/lm/ray_train.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from fairseq_cli.train import main
1212

1313
import ray
14-
from ray._common.network_utils import build_address
1514

1615
_original_save_checkpoint = fairseq.checkpoint_utils.save_checkpoint
1716

@@ -113,7 +112,7 @@ def run_fault_tolerant_loop():
113112
# fairseq distributed training.
114113
ip = ray.get(workers[0].get_node_ip.remote())
115114
port = ray.get(workers[0].find_free_port.remote())
116-
address = f"tcp://{build_address(ip, port)}"
115+
address = f"tcp://{ip}:{port}"
117116

118117
# Start the remote processes, and check whether their are any process
119118
# fails. If so, restart all the processes.

python/ray/_common/network_utils.py

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
from typing import Optional, Tuple, Union
2+
import socket
3+
from functools import lru_cache
24

3-
from ray._raylet import build_address as _build_address
4-
from ray._raylet import parse_address as _parse_address
5+
from ray._raylet import (
6+
build_address as _build_address,
7+
is_ipv6_ip as _is_ipv6_ip,
8+
node_ip_address_from_perspective as _node_ip_address_from_perspective,
9+
parse_address as _parse_address,
10+
)
511

612

713
def parse_address(address: str) -> Optional[Tuple[str, str]]:
@@ -29,6 +35,65 @@ def build_address(host: str, port: Union[int, str]) -> str:
2935
return _build_address(host, port)
3036

3137

38+
def node_ip_address_from_perspective(address: str = "") -> str:
39+
"""IP address by which the local node can be reached *from* the `address`.
40+
41+
If no address is given, defaults to public DNS servers for detection. For
42+
performance, the result is cached when using the default address (empty string).
43+
When a specific address is provided, detection is performed fresh every time.
44+
45+
Args:
46+
address: The IP address and port of any known live service on the
47+
network you care about.
48+
49+
Returns:
50+
The IP address by which the local node can be reached from the address.
51+
"""
52+
return _node_ip_address_from_perspective(address)
53+
54+
55+
def is_ipv6_ip(ip: str) -> bool:
56+
"""Check if an IP string is IPv6 format.
57+
58+
Args:
59+
ip: The IP address string to check (must be pure IP, no port).
60+
61+
Returns:
62+
True if the IP is IPv6, False if IPv4.
63+
"""
64+
return _is_ipv6_ip(ip)
65+
66+
67+
@lru_cache(maxsize=1)
68+
def get_localhost_address() -> str:
69+
"""Get localhost loopback address with IPv4/IPv6 support.
70+
71+
Returns:
72+
The localhost loopback IP address (matching node IP family or auto-detected).
73+
"""
74+
import ray._private.worker
75+
76+
if (
77+
ray._private.worker._global_node is not None
78+
and ray._private.worker._global_node.node_ip_address
79+
):
80+
node_ip = ray._private.worker._global_node.node_ip_address
81+
return "::1" if is_ipv6_ip(node_ip) else "127.0.0.1"
82+
83+
# Try IPv4 first, then IPv6 localhost resolution
84+
for family in [socket.AF_INET, socket.AF_INET6]:
85+
try:
86+
dns_result = socket.getaddrinfo(
87+
"localhost", None, family, socket.SOCK_STREAM
88+
)
89+
return dns_result[0][4][0]
90+
except socket.gaierror:
91+
continue
92+
93+
# Final fallback to IPv4 loopback
94+
return "127.0.0.1"
95+
96+
3297
def is_localhost(host: str) -> bool:
3398
"""Check if the given host string represents a localhost address.
3499
@@ -39,3 +104,29 @@ def is_localhost(host: str) -> bool:
39104
True if the host is a localhost address, False otherwise.
40105
"""
41106
return host in ("localhost", "127.0.0.1", "::1")
107+
108+
109+
def create_socket(socket_type: int = socket.SOCK_STREAM) -> socket.socket:
110+
"""Create a Python socket object with the appropriate family based on the node IP.
111+
112+
This function automatically gets the node IP address and creates a socket
113+
with the correct family (AF_INET for IPv4, AF_INET6 for IPv6).
114+
115+
Args:
116+
socket_type: The socket type (socket.SOCK_STREAM, socket.SOCK_DGRAM, etc.).
117+
118+
Returns:
119+
A Python socket.socket object configured for the node's IP family.
120+
121+
Example:
122+
# Create a TCP socket for the current node
123+
sock = create_socket()
124+
125+
# Create a UDP socket for the current node
126+
sock = create_socket(socket.SOCK_DGRAM)
127+
"""
128+
node_ip = node_ip_address_from_perspective()
129+
family = socket.AF_INET6 if is_ipv6_ip(node_ip) else socket.AF_INET
130+
131+
# Create socket directly with Python socket API
132+
return socket.socket(family, socket_type)

python/ray/_private/node.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,12 @@
2222
import ray
2323
import ray._private.ray_constants as ray_constants
2424
import ray._private.services
25-
from ray._common.network_utils import build_address, parse_address
25+
from ray._common.network_utils import (
26+
build_address,
27+
create_socket,
28+
get_localhost_address,
29+
parse_address,
30+
)
2631
from ray._common.ray_constants import LOGGING_ROTATE_BACKUP_COUNT, LOGGING_ROTATE_BYTES
2732
from ray._common.utils import try_to_create_directory
2833
from ray._private.resource_and_label_spec import ResourceAndLabelSpec
@@ -138,7 +143,7 @@ def __init__(
138143
)
139144

140145
self._resource_and_label_spec = None
141-
self._localhost = socket.gethostbyname("localhost")
146+
self._localhost = get_localhost_address()
142147
self._ray_params = ray_params
143148
self._config = ray_params._system_config or {}
144149

@@ -880,7 +885,7 @@ def _get_unused_port(self, allocated_ports=None):
880885
if allocated_ports is None:
881886
allocated_ports = set()
882887

883-
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
888+
s = create_socket(socket.SOCK_STREAM)
884889
s.bind(("", 0))
885890
port = s.getsockname()[1]
886891

@@ -893,7 +898,7 @@ def _get_unused_port(self, allocated_ports=None):
893898
# This port is allocated for other usage already,
894899
# so we shouldn't use it even if it's not in use right now.
895900
continue
896-
new_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
901+
new_s = create_socket(socket.SOCK_STREAM)
897902
try:
898903
new_s.bind(("", new_port))
899904
except OSError:

python/ray/_private/services.py

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,13 @@
2121
# Ray modules
2222
import ray
2323
import ray._private.ray_constants as ray_constants
24-
from ray._common.network_utils import build_address, parse_address
24+
from ray._common.network_utils import (
25+
build_address,
26+
create_socket,
27+
get_localhost_address,
28+
node_ip_address_from_perspective,
29+
parse_address,
30+
)
2531
from ray._private.ray_constants import RAY_NODE_IP_FILENAME
2632
from ray._private.resource_isolation_config import ResourceIsolationConfig
2733
from ray._raylet import GcsClient, GcsClientOptions
@@ -615,52 +621,21 @@ def resolve_ip_for_localhost(host: str):
615621
return host
616622

617623

618-
def node_ip_address_from_perspective(address: str):
619-
"""IP address by which the local node can be reached *from* the `address`.
620-
621-
Args:
622-
address: The IP address and port of any known live service on the
623-
network you care about.
624-
625-
Returns:
626-
The IP address by which the local node can be reached from the address.
627-
"""
628-
ip_address, port = parse_address(address)
629-
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
630-
try:
631-
# This command will raise an exception if there is no internet
632-
# connection.
633-
s.connect((ip_address, int(port)))
634-
node_ip_address = s.getsockname()[0]
635-
except OSError as e:
636-
node_ip_address = "127.0.0.1"
637-
# [Errno 101] Network is unreachable
638-
if e.errno == errno.ENETUNREACH:
639-
try:
640-
# try get node ip address from host name
641-
host_name = socket.getfqdn(socket.gethostname())
642-
node_ip_address = socket.gethostbyname(host_name)
643-
except Exception:
644-
pass
645-
finally:
646-
s.close()
647-
648-
return node_ip_address
649-
650-
651624
# NOTE: This API should not be used when you obtain the
652625
# IP address when ray.init is not called because
653626
# it cannot find the IP address if it is specified by
654627
# ray start --node-ip-address. You should instead use
655628
# get_cached_node_ip_address.
656-
def get_node_ip_address(address="8.8.8.8:53"):
629+
def get_node_ip_address(address=""):
657630
if ray._private.worker._global_node is not None:
658631
return ray._private.worker._global_node.node_ip_address
632+
659633
if not ray_constants.ENABLE_RAY_CLUSTER:
660634
# Use loopback IP as the local IP address to prevent bothersome
661635
# firewall popups on OSX and Windows.
662636
# https://github.com/ray-project/ray/issues/18730.
663-
return "127.0.0.1"
637+
return get_localhost_address()
638+
664639
return node_ip_address_from_perspective(address)
665640

666641

@@ -1222,7 +1197,7 @@ def start_api_server(
12221197
port = ray_constants.DEFAULT_DASHBOARD_PORT
12231198
else:
12241199
port_retries = 0
1225-
port_test_socket = socket.socket()
1200+
port_test_socket = create_socket()
12261201
port_test_socket.setsockopt(
12271202
socket.SOL_SOCKET,
12281203
socket.SO_REUSEADDR,

python/ray/_private/test_utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import ray._private.services
3131
import ray._private.services as services
3232
import ray._private.utils
33-
from ray._common.network_utils import build_address, parse_address
33+
from ray._common.network_utils import build_address, create_socket, parse_address
3434
from ray._common.test_utils import wait_for_condition
3535
from ray._common.utils import get_or_create_event_loop
3636
from ray._private import (
@@ -776,7 +776,7 @@ def wait_until_server_available(address, timeout_ms=5000, retry_interval_ms=100)
776776
time_elapsed = 0
777777
start = time.time()
778778
while time_elapsed <= timeout_ms:
779-
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
779+
s = create_socket(socket.SOCK_STREAM)
780780
s.settimeout(1)
781781
try:
782782
s.connect((ip, port))
@@ -1753,7 +1753,7 @@ def job_hook(**kwargs):
17531753

17541754

17551755
def find_free_port() -> int:
1756-
sock = socket.socket()
1756+
sock = create_socket()
17571757
sock.bind(("", 0))
17581758
port = sock.getsockname()[1]
17591759
sock.close()
@@ -1884,7 +1884,7 @@ def get_current_unused_port():
18841884
A port number that is not currently in use. (Note that this port
18851885
might become used by the time you try to bind to it.)
18861886
"""
1887-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1887+
sock = create_socket(socket.SOCK_STREAM)
18881888

18891889
# Bind the socket to a local address with a random port number
18901890
sock.bind(("localhost", 0))

0 commit comments

Comments
 (0)