Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions src/config_models/cli_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,24 @@ pub struct Args {
#[clap(long)]
pub(crate) max_connections_per_ip: Option<usize>,

/// Whether to act as bootstrapper node.
///
/// Bootstrapper nodes ensure that the maximum number of peers is never
/// reached by disconnecting from existing peers when the maximum is about
/// to be reached. As a result, they will respond with high likelihood to
/// incoming connection requests -- in contrast to regular nodes, which
/// refuse incoming connections when the max is reached.
/// Whether to act as a bootstrap node.
///
/// Bootstrap nodes almost always accept new connections. This gives newcomers
/// to the network a chance to discover other nodes on the network through the
/// automatic peer discovery process.
///
/// The differences between bootstrap nodes and non-bootstrap nodes are:
///
/// - If the maximum number of peers is reached, non-bootstrap nodes refuse
/// additional connection attempts, while bootstrap nodes terminate the
/// longest-standing connection and accept the new connection.
/// - If a node that got disconnected recently tries to re-establish the same
/// connection immediately, a non-bootstrap node might accept (depending on
/// its current and maximum number of peers), while a bootstrap node will
/// certainly refuse until the reconnect cooldown has expired.
/// - If a node is well-connected, it will stop trying to connect to known
/// bootstrap nodes, even if this node's address is passed as an explicit peer
/// via the corresponding command line argument.
#[clap(long)]
pub(crate) bootstrap: bool,

Expand Down Expand Up @@ -198,6 +209,10 @@ pub struct Args {
pub(crate) sync_mode_threshold: usize,

/// IPs of nodes to connect to, e.g.: --peers 8.8.8.8:9798 --peers 8.8.4.4:1337.
///
/// Connection attempts to bootstrap nodes will only be made if the own node is
/// not well-connected yet. Once enough connections to non-bootstrap peers have
/// been established, bootstrap nodes will not be bothered anymore.
#[structopt(long)]
pub(crate) peers: Vec<SocketAddr>,

Expand Down
148 changes: 132 additions & 16 deletions src/connect_to_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tracing::warn;

use crate::models::channel::MainToPeerTask;
use crate::models::channel::PeerTaskToMain;
use crate::models::peer::bootstrap_info::BootstrapStatus;
use crate::models::peer::ConnectionRefusedReason;
use crate::models::peer::InternalConnectionStatus;
use crate::models::peer::NegativePeerSanction;
Expand Down Expand Up @@ -310,13 +311,24 @@ where
info!("Connection accepted from {peer_address}");

// If necessary, disconnect from another, existing peer.
if connection_status == InternalConnectionStatus::AcceptedMaxReached && state.cli().bootstrap {
let self_is_bootstrap = state.cli().bootstrap;
if connection_status == InternalConnectionStatus::AcceptedMaxReached && self_is_bootstrap {
info!("Maximum # peers reached, so disconnecting from an existing peer.");
peer_task_to_main_tx
.send(PeerTaskToMain::DisconnectFromLongestLivedPeer)
.await?;
}

// inform the new peer about our bootstrap status
let bootstrap_status = if self_is_bootstrap {
BootstrapStatus::Bootstrap
} else {
BootstrapStatus::Ordinary
};
peer.send(PeerMessage::BootstrapStatus(bootstrap_status))
.await?;
debug!("Informing {peer_address} of our bootstrap status: {bootstrap_status}");

let peer_distance = 1; // All incoming connections have distance 1
let mut peer_loop_handler = PeerLoopHandler::new(
peer_task_to_main_tx,
Expand Down Expand Up @@ -560,6 +572,9 @@ mod connect_tests {

use anyhow::bail;
use anyhow::Result;
use bytes::Bytes;
use proptest::prelude::*;
use test_strategy::proptest;
use tokio_test::io::Builder;
use tracing_test::traced_test;
use twenty_first::math::digest::Digest;
Expand All @@ -586,43 +601,41 @@ mod connect_tests {
#[traced_test]
#[tokio::test]
async fn test_outgoing_connection_succeed() -> Result<()> {
let network = Network::Alpha;
let other_handshake = get_dummy_handshake_data_for_genesis(network);
let own_handshake = get_dummy_handshake_data_for_genesis(network);
let network = Network::Main;
let (_tx, main_to_peer_rx, peer_to_main_tx, _rx, state, own_handshake) =
get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
let (peer_handshake, peer_socket_address) =
get_dummy_peer_connection_data_genesis(network, 0);

let mock = Builder::new()
.write(&to_bytes(&PeerMessage::Handshake(Box::new((
MAGIC_STRING_REQUEST.to_vec(),
own_handshake.clone(),
))))?)
.read(&to_bytes(&PeerMessage::Handshake(Box::new((
MAGIC_STRING_RESPONSE.to_vec(),
other_handshake,
peer_handshake,
))))?)
.read(&to_bytes(&PeerMessage::ConnectionStatus(
TransferConnectionStatus::Accepted,
))?)
.write(&to_bytes(&PeerMessage::PeerListRequest)?)
.read(&to_bytes(&PeerMessage::Bye)?)
.build();

let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state, _hsd) =
get_test_genesis_setup(Network::Alpha, 0, cli_args::Args::default()).await?;
call_peer_inner(
mock,
state.clone(),
get_dummy_socket_address(0),
from_main_rx_clone,
to_main_tx,
peer_socket_address,
main_to_peer_rx,
peer_to_main_tx,
&own_handshake,
1,
)
.await?;

// Verify that peer map is empty after connection has been closed
match state.lock(|s| s.net.peer_map.keys().len()).await {
0 => (),
_ => bail!("Incorrect number of maps in peer map"),
};
if !state.lock_guard().await.net.peer_map.is_empty() {
bail!("peer map must be empty after connection has been closed");
}

Ok(())
}
Expand Down Expand Up @@ -847,6 +860,9 @@ mod connect_tests {
.write(&to_bytes(&PeerMessage::ConnectionStatus(
TransferConnectionStatus::Accepted,
))?)
.write(&to_bytes(&PeerMessage::BootstrapStatus(
BootstrapStatus::Ordinary,
))?)
.read(&to_bytes(&PeerMessage::Bye)?)
.build();
answer_peer_inner(
Expand Down Expand Up @@ -888,6 +904,9 @@ mod connect_tests {
.write(&to_bytes(&PeerMessage::ConnectionStatus(
TransferConnectionStatus::Accepted,
))?)
.write(&to_bytes(&PeerMessage::BootstrapStatus(
BootstrapStatus::Ordinary,
))?)
.read(&to_bytes(&PeerMessage::Bye)?)
.build();
let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
Expand Down Expand Up @@ -1242,4 +1261,101 @@ mod connect_tests {

Ok(())
}

#[proptest(cases = 20, async = "tokio")]
async fn bootstrap_status_message_propagates_to_state(
connection_is_incoming: bool,
own_bootstrap_status: BootstrapStatus,
peer_bootstrap_status: BootstrapStatus,
) {
// convenience wrapper for `to_bytes`
fn serialize(message: &PeerMessage) -> Bytes {
to_bytes(message).unwrap()
}

let network = Network::Main;
let args = cli_args::Args {
network,
bootstrap: own_bootstrap_status == BootstrapStatus::Bootstrap,
..Default::default()
};
let (_tx, main_to_peer_rx, peer_to_main_tx, _rx, state, own_handshake) =
get_test_genesis_setup(network, 0, args).await.unwrap();

// sanity check: no bootstrap status is known after startup
prop_assert!(state.lock_guard().await.net.bootstrap_status.is_empty());

// simulate connection
let (peer_handshake, peer_socket_address) =
get_dummy_peer_connection_data_genesis(network, 1);
let mut stream_builder = Builder::new();

if connection_is_incoming {
stream_builder
.read(&serialize(&PeerMessage::Handshake(Box::new((
MAGIC_STRING_REQUEST.to_vec(),
peer_handshake.clone(),
)))))
.write(&serialize(&PeerMessage::Handshake(Box::new((
MAGIC_STRING_RESPONSE.to_vec(),
own_handshake.clone(),
)))))
.write(&serialize(&PeerMessage::ConnectionStatus(
TransferConnectionStatus::Accepted,
)))
.write(&serialize(&PeerMessage::BootstrapStatus(
own_bootstrap_status,
)));
} else {
stream_builder
.write(&serialize(&PeerMessage::Handshake(Box::new((
MAGIC_STRING_REQUEST.to_vec(),
own_handshake.clone(),
)))))
.read(&serialize(&PeerMessage::Handshake(Box::new((
MAGIC_STRING_RESPONSE.to_vec(),
peer_handshake.clone(),
)))))
.read(&serialize(&PeerMessage::ConnectionStatus(
TransferConnectionStatus::Accepted,
)))
.write(&serialize(&PeerMessage::PeerListRequest));
}
let mock_stream = stream_builder
.read(&serialize(&PeerMessage::BootstrapStatus(
peer_bootstrap_status,
)))
.read(&serialize(&PeerMessage::Bye))
.build();

if connection_is_incoming {
answer_peer_inner(
mock_stream,
state.clone(),
peer_socket_address,
main_to_peer_rx,
peer_to_main_tx,
own_handshake,
)
.await
.unwrap();
} else {
call_peer_inner(
mock_stream,
state.clone(),
peer_socket_address,
main_to_peer_rx,
peer_to_main_tx,
&own_handshake,
1,
)
.await
.unwrap();
}

let bootstrap_status = &state.lock_guard().await.net.bootstrap_status;
prop_assert_eq!(1, bootstrap_status.len());
let peer_status = bootstrap_status.get(&peer_socket_address).unwrap();
prop_assert_eq!(peer_bootstrap_status, peer_status.status);
}
}
Loading