Skip to content

Commit 5cb3e54

Browse files
add IPv6 support for sockets
Signed-off-by: Yicheng-Lu-llll <luyc58576@gmail.com>
1 parent b738755 commit 5cb3e54

File tree

34 files changed

+384
-138
lines changed

34 files changed

+384
-138
lines changed

cpp/src/ray/runtime/native_ray_runtime.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "./object/object_store.h"
2121
#include "./task/native_task_submitter.h"
2222
#include "ray/common/ray_config.h"
23+
#include "ray/util/network_util.h"
2324

2425
namespace ray {
2526
namespace internal {
@@ -31,7 +32,7 @@ NativeRayRuntime::NativeRayRuntime() {
3132

3233
auto bootstrap_address = ConfigInternal::Instance().bootstrap_ip;
3334
if (bootstrap_address.empty()) {
34-
bootstrap_address = GetNodeIpAddress();
35+
bootstrap_address = ray::GetNodeIpAddressFromPerspective();
3536
}
3637
global_state_accessor_ = ProcessHelper::GetInstance().CreateGlobalStateAccessor(
3738
bootstrap_address, ConfigInternal::Instance().bootstrap_port);

cpp/src/ray/test/cluster/cluster_mode_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ TEST(RayClusterModeTest, FullTest) {
7171
auto port = absl::GetFlag<int32_t>(FLAGS_redis_port);
7272
std::string username = absl::GetFlag<std::string>(FLAGS_redis_username);
7373
std::string password = absl::GetFlag<std::string>(FLAGS_redis_password);
74-
std::string local_ip = ray::internal::GetNodeIpAddress();
74+
std::string local_ip = ray::GetNodeIpAddressFromPerspective();
7575
ray::internal::ProcessHelper::GetInstance().StartRayNode(
7676
local_ip, port, username, password);
7777
config.address = ray::BuildAddress(local_ip, port);

cpp/src/ray/util/process_helper.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
8383

8484
if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER &&
8585
bootstrap_ip.empty()) {
86-
bootstrap_ip = GetNodeIpAddress();
86+
bootstrap_ip = ray::GetNodeIpAddressFromPerspective();
8787
StartRayNode(bootstrap_ip,
8888
bootstrap_port,
8989
ConfigInternal::Instance().redis_username,
@@ -95,9 +95,9 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
9595
std::string node_ip = ConfigInternal::Instance().node_ip_address;
9696
if (node_ip.empty()) {
9797
if (!bootstrap_ip.empty()) {
98-
node_ip = GetNodeIpAddress(bootstrap_address);
98+
node_ip = ray::GetNodeIpAddressFromPerspective(bootstrap_address);
9999
} else {
100-
node_ip = GetNodeIpAddress();
100+
node_ip = ray::GetNodeIpAddressFromPerspective();
101101
}
102102
}
103103

cpp/src/ray/util/util.cc

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,6 @@
2424
namespace ray {
2525
namespace internal {
2626

27-
std::string GetNodeIpAddress(const std::string &address) {
28-
auto parts = ParseAddress(address);
29-
RAY_CHECK(parts.has_value());
30-
try {
31-
boost::asio::io_service netService;
32-
boost::asio::ip::udp::resolver resolver(netService);
33-
boost::asio::ip::udp::resolver::query query(
34-
boost::asio::ip::udp::v4(), (*parts)[0], (*parts)[1]);
35-
boost::asio::ip::udp::resolver::iterator endpoints = resolver.resolve(query);
36-
boost::asio::ip::udp::endpoint ep = *endpoints;
37-
boost::asio::ip::udp::socket socket(netService);
38-
socket.connect(ep);
39-
boost::asio::ip::address addr = socket.local_endpoint().address();
40-
return addr.to_string();
41-
} catch (std::exception &e) {
42-
RAY_LOG(FATAL) << "Could not get the node IP address with socket. Exception: "
43-
<< e.what();
44-
return "";
45-
}
46-
}
47-
4827
std::string getLibraryPathEnv() {
4928
auto path_env_p = std::getenv(kLibraryPathEnvName);
5029
if (path_env_p != nullptr && strlen(path_env_p) != 0) {

cpp/src/ray/util/util.h

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,6 @@
1818
namespace ray {
1919
namespace internal {
2020

21-
/// IP address by which the local node can be reached *from* the `address`.
22-
///
23-
/// The behavior should be the same as `node_ip_address_from_perspective` from Ray Python
24-
/// code. See
25-
/// https://stackoverflow.com/questions/2674314/get-local-ip-address-using-boost-asio.
26-
///
27-
/// TODO(kfstorm): Make this function shared code and migrate Python & Java to use this
28-
/// function.
29-
///
30-
/// \param address The IP address and port of any known live service on the network
31-
/// you care about.
32-
/// \return The IP address by which the local node can be reached from the address.
33-
std::string GetNodeIpAddress(const std::string &address = "8.8.8.8:53");
34-
3521
std::string getLibraryPathEnv();
3622

3723
} // namespace internal

doc/source/ray-core/examples/lm/ray_train.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from fairseq_cli.train import main
1212

1313
import ray
14-
from ray._common.network_utils import build_address
14+
from ray._common.network_utils import build_address, create_socket
1515

1616
_original_save_checkpoint = fairseq.checkpoint_utils.save_checkpoint
1717

@@ -79,7 +79,7 @@ def get_node_ip(self):
7979

8080
def find_free_port(self):
8181
"""Finds a free port on the current node."""
82-
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
82+
with closing(create_socket(socket.SOCK_STREAM)) as s:
8383
s.bind(("", 0))
8484
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
8585
return s.getsockname()[1]

python/ray/_common/network_utils.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
from typing import Optional, Tuple, Union
2+
import socket
23

34
from ray._raylet import build_address as _build_address
45
from ray._raylet import parse_address as _parse_address
6+
from ray._raylet import (
7+
node_ip_address_from_perspective as _node_ip_address_from_perspective,
8+
)
9+
from ray._raylet import node_ip_from_hostname as _node_ip_from_hostname
10+
from ray._raylet import is_ipv6_ip as _is_ipv6_ip
511

612

713
def parse_address(address: str) -> Optional[Tuple[str, str]]:
@@ -29,6 +35,47 @@ def build_address(host: str, port: Union[int, str]) -> str:
2935
return _build_address(host, port)
3036

3137

38+
def node_ip_address_from_perspective(address: str = "") -> str:
39+
"""IP address by which the local node can be reached *from* the `address`.
40+
41+
If no address is given, defaults to public DNS servers for detection. For
42+
performance, the result is cached when using the default address (empty string).
43+
When a specific address is provided, detection is performed fresh every time.
44+
45+
Args:
46+
address: The IP address and port of any known live service on the
47+
network you care about.
48+
49+
Returns:
50+
The IP address by which the local node can be reached from the address.
51+
"""
52+
return _node_ip_address_from_perspective(address)
53+
54+
55+
def node_ip_from_hostname() -> str:
56+
"""Get node IP address from hostname resolution without creating external connections.
57+
58+
This method uses hostname resolution to determine the local node's IP address,
59+
avoiding socket creation that could trigger firewall popups on macOS/Windows.
60+
61+
Returns:
62+
The IP address resolved from hostname, or empty string if resolution fails.
63+
"""
64+
return _node_ip_from_hostname()
65+
66+
67+
def is_ipv6_ip(ip: str) -> bool:
68+
"""Check if an IP string is IPv6 format.
69+
70+
Args:
71+
ip: The IP address string to check (must be pure IP, no port).
72+
73+
Returns:
74+
True if the IP is IPv6, False if IPv4.
75+
"""
76+
return _is_ipv6_ip(ip)
77+
78+
3279
def is_localhost(host: str) -> bool:
3380
"""Check if the given host string represents a localhost address.
3481
@@ -39,3 +86,29 @@ def is_localhost(host: str) -> bool:
3986
True if the host is a localhost address, False otherwise.
4087
"""
4188
return host in ("localhost", "127.0.0.1", "::1")
89+
90+
91+
def create_socket(socket_type: int = socket.SOCK_STREAM) -> socket.socket:
92+
"""Create a Python socket object with the appropriate family based on the node IP.
93+
94+
This function automatically gets the node IP address and creates a socket
95+
with the correct family (AF_INET for IPv4, AF_INET6 for IPv6).
96+
97+
Args:
98+
socket_type: The socket type (socket.SOCK_STREAM, socket.SOCK_DGRAM, etc.).
99+
100+
Returns:
101+
A Python socket.socket object configured for the node's IP family.
102+
103+
Example:
104+
# Create a TCP socket for the current node
105+
sock = create_socket()
106+
107+
# Create a UDP socket for the current node
108+
sock = create_socket(socket.SOCK_DGRAM)
109+
"""
110+
node_ip = node_ip_address_from_perspective()
111+
family = socket.AF_INET6 if is_ipv6_ip(node_ip) else socket.AF_INET
112+
113+
# Create socket directly with Python socket API
114+
return socket.socket(family, socket_type)

python/ray/_private/node.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import ray
2323
import ray._private.ray_constants as ray_constants
2424
import ray._private.services
25-
from ray._common.network_utils import build_address, parse_address
25+
from ray._common.network_utils import build_address, create_socket, parse_address
2626
from ray._common.ray_constants import LOGGING_ROTATE_BACKUP_COUNT, LOGGING_ROTATE_BYTES
2727
from ray._common.utils import try_to_create_directory
2828
from ray._private.resource_and_label_spec import ResourceAndLabelSpec
@@ -880,7 +880,7 @@ def _get_unused_port(self, allocated_ports=None):
880880
if allocated_ports is None:
881881
allocated_ports = set()
882882

883-
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
883+
s = create_socket(socket.SOCK_STREAM)
884884
s.bind(("", 0))
885885
port = s.getsockname()[1]
886886

@@ -893,7 +893,7 @@ def _get_unused_port(self, allocated_ports=None):
893893
# This port is allocated for other usage already,
894894
# so we shouldn't use it even if it's not in use right now.
895895
continue
896-
new_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
896+
new_s = create_socket(socket.SOCK_STREAM)
897897
try:
898898
new_s.bind(("", new_port))
899899
except OSError:

python/ray/_private/services.py

Lines changed: 14 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@
2121
# Ray modules
2222
import ray
2323
import ray._private.ray_constants as ray_constants
24-
from ray._common.network_utils import build_address, parse_address
24+
from ray._common.network_utils import (
25+
build_address,
26+
create_socket,
27+
is_ipv6_ip,
28+
node_ip_address_from_perspective,
29+
node_ip_from_hostname,
30+
parse_address,
31+
)
2532
from ray._private.ray_constants import RAY_NODE_IP_FILENAME
2633
from ray._private.resource_isolation_config import ResourceIsolationConfig
2734
from ray._raylet import GcsClient, GcsClientOptions
@@ -615,52 +622,22 @@ def resolve_ip_for_localhost(host: str):
615622
return host
616623

617624

618-
def node_ip_address_from_perspective(address: str):
619-
"""IP address by which the local node can be reached *from* the `address`.
620-
621-
Args:
622-
address: The IP address and port of any known live service on the
623-
network you care about.
624-
625-
Returns:
626-
The IP address by which the local node can be reached from the address.
627-
"""
628-
ip_address, port = parse_address(address)
629-
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
630-
try:
631-
# This command will raise an exception if there is no internet
632-
# connection.
633-
s.connect((ip_address, int(port)))
634-
node_ip_address = s.getsockname()[0]
635-
except OSError as e:
636-
node_ip_address = "127.0.0.1"
637-
# [Errno 101] Network is unreachable
638-
if e.errno == errno.ENETUNREACH:
639-
try:
640-
# try get node ip address from host name
641-
host_name = socket.getfqdn(socket.gethostname())
642-
node_ip_address = socket.gethostbyname(host_name)
643-
except Exception:
644-
pass
645-
finally:
646-
s.close()
647-
648-
return node_ip_address
649-
650-
651625
# NOTE: This API should not be used when you obtain the
652626
# IP address when ray.init is not called because
653627
# it cannot find the IP address if it is specified by
654628
# ray start --node-ip-address. You should instead use
655629
# get_cached_node_ip_address.
656-
def get_node_ip_address(address="8.8.8.8:53"):
630+
def get_node_ip_address(address=""):
657631
if ray._private.worker._global_node is not None:
658632
return ray._private.worker._global_node.node_ip_address
633+
659634
if not ray_constants.ENABLE_RAY_CLUSTER:
660635
# Use loopback IP as the local IP address to prevent bothersome
661636
# firewall popups on OSX and Windows.
662637
# https://github.com/ray-project/ray/issues/18730.
663-
return "127.0.0.1"
638+
hostname_ip = node_ip_from_hostname()
639+
return "::1" if hostname_ip and is_ipv6_ip(hostname_ip) else "127.0.0.1"
640+
664641
return node_ip_address_from_perspective(address)
665642

666643

@@ -1222,7 +1199,7 @@ def start_api_server(
12221199
port = ray_constants.DEFAULT_DASHBOARD_PORT
12231200
else:
12241201
port_retries = 0
1225-
port_test_socket = socket.socket()
1202+
port_test_socket = create_socket()
12261203
port_test_socket.setsockopt(
12271204
socket.SOL_SOCKET,
12281205
socket.SO_REUSEADDR,

python/ray/_private/test_utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import ray._private.services
3131
import ray._private.services as services
3232
import ray._private.utils
33-
from ray._common.network_utils import build_address, parse_address
33+
from ray._common.network_utils import build_address, create_socket, parse_address
3434
from ray._common.test_utils import wait_for_condition
3535
from ray._common.utils import get_or_create_event_loop
3636
from ray._private import (
@@ -776,7 +776,7 @@ def wait_until_server_available(address, timeout_ms=5000, retry_interval_ms=100)
776776
time_elapsed = 0
777777
start = time.time()
778778
while time_elapsed <= timeout_ms:
779-
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
779+
s = create_socket(socket.SOCK_STREAM)
780780
s.settimeout(1)
781781
try:
782782
s.connect((ip, port))
@@ -1753,7 +1753,7 @@ def job_hook(**kwargs):
17531753

17541754

17551755
def find_free_port() -> int:
1756-
sock = socket.socket()
1756+
sock = create_socket()
17571757
sock.bind(("", 0))
17581758
port = sock.getsockname()[1]
17591759
sock.close()
@@ -1884,7 +1884,7 @@ def get_current_unused_port():
18841884
A port number that is not currently in use. (Note that this port
18851885
might become used by the time you try to bind to it.)
18861886
"""
1887-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1887+
sock = create_socket(socket.SOCK_STREAM)
18881888

18891889
# Bind the socket to a local address with a random port number
18901890
sock.bind(("localhost", 0))

0 commit comments

Comments
 (0)