Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class ForwardInput < Input
config_param :port, :integer, default: LISTEN_PORT
desc 'The bind address to listen to.'
config_param :bind, :string, default: '0.0.0.0'
desc 'Whether it accept only IPv6 connection with IPv6 bind address.'
config_param :bind_ipv6_only, :bool, default: true

config_param :backlog, :integer, default: nil
# SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src
Expand Down Expand Up @@ -176,6 +178,7 @@ def start
resolve_name: @resolve_hostname,
linger_timeout: @linger_timeout,
send_keepalive_packet: @send_keepalive_packet,
bind_ipv6_only: @bind_ipv6_only,
backlog: @backlog,
&method(:handle_connection)
)
Expand Down
31 changes: 17 additions & 14 deletions lib/fluent/plugin_helper/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def server_wait_until_stop
# conn.close
# end
# end
def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block)
def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, bind_ipv6_only: true, backlog: nil, tls_options: nil, **socket_options, &block)
proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp

raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
Expand All @@ -91,7 +91,7 @@ def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: t

case proto
when :tcp
server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, bind_ipv6_only: bind_ipv6_only, &block)
when :tls
transport_config = if tls_options
server_create_transport_section_object(tls_options)
Expand All @@ -100,7 +100,7 @@ def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: t
else
raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
end
server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, &block)
server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, bind_ipv6_only: bind_ipv6_only, &block)
when :unix
raise "not implemented yet"
else
Expand All @@ -121,7 +121,7 @@ def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: t
# sock.remote_port
# # ...
# end
def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, bind_ipv6_only: true, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp

raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
Expand Down Expand Up @@ -155,7 +155,7 @@ def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket

case proto
when :tcp
server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter) do |conn|
server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, bind_ipv6_only: bind_ipv6_only) do |conn|
conn.data(&callback)
end
when :tls
Expand All @@ -166,7 +166,7 @@ def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket
else
raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
end
server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter) do |conn|
server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, bind_ipv6_only: bind_ipv6_only) do |conn|
conn.data(&callback)
end
when :udp
Expand Down Expand Up @@ -212,8 +212,8 @@ def server_attach(title, proto, port, bind, shared, server)
event_loop_attach(server)
end

def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
sock = server_create_tcp_socket(shared, bind, port)
def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, bind_ipv6_only: true, &block)
sock = server_create_tcp_socket(shared, bind, port, bind_ipv6_only: bind_ipv6_only)
socket_option_setter.call(sock)
close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
Expand All @@ -227,9 +227,9 @@ def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_
server
end

def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block)
def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, bind_ipv6_only: true, &block)
context = cert_option_create_context(conf.version, conf.insecure, conf.ciphers, conf)
sock = server_create_tcp_socket(shared, bind, port)
sock = server_create_tcp_socket(shared, bind, port, bind_ipv6_only: bind_ipv6_only)
socket_option_setter.call(sock)
close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
server = Coolio::TCPServer.new(sock, nil, EventHandler::TLSServer, context, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
Expand Down Expand Up @@ -379,13 +379,16 @@ def server_socket_manager_client
ServerEngine::SocketManager::Client.new(socket_manager_path)
end

def server_create_tcp_socket(shared, bind, port)
def server_create_tcp_socket(shared, bind, port, bind_ipv6_only: true)
sock = if shared
server_socket_manager_client.listen_tcp(bind, port)
else
# TCPServer.new doesn't set IPV6_V6ONLY flag, so use Addrinfo class instead.
# backlog will be set by the caller, we don't need to set backlog here
tsock = Addrinfo.tcp(bind, port).listen
addrinfo = Addrinfo.tcp(bind, port)
tsock = ::Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
tsock.ipv6only! if addrinfo.ipv6? && bind_ipv6_only
tsock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_REUSEADDR, 1)
tsock.bind(addrinfo)
tsock.listen(::Socket::SOMAXCONN)
tsock.autoclose = false
TCPServer.for_fd(tsock.fileno)
end
Expand Down
Loading