diff --git a/src/config_models/cli_args.rs b/src/config_models/cli_args.rs index 6244956cf..0cc96d15d 100644 --- a/src/config_models/cli_args.rs +++ b/src/config_models/cli_args.rs @@ -82,13 +82,24 @@ pub struct Args { #[clap(long)] pub(crate) max_connections_per_ip: Option, - /// Whether to act as bootstrapper node. - /// - /// Bootstrapper nodes ensure that the maximum number of peers is never - /// reached by disconnecting from existing peers when the maximum is about - /// to be reached. As a result, they will respond with high likelihood to - /// incoming connection requests -- in contrast to regular nodes, which - /// refuse incoming connections when the max is reached. + /// Whether to act as a bootstrap node. + /// + /// Bootstrap nodes almost always accept new connections. This gives newcomers + /// to the network a chance to discover other nodes on the network through the + /// automatic peer discovery process. + /// + /// The differences between bootstrap nodes and non-bootstrap nodes are: + /// + /// - If the maximum number of peers is reached, non-bootstrap nodes refuse + /// additional connection attempts, while bootstrap nodes terminate the + /// longest-standing connection and accept the new connection. + /// - If a node that got disconnected recently tries to re-establish the same + /// connection immediately, a non-bootstrap node might accept (depending on + /// its current and maximum number of peers), while a bootstrap node will + /// certainly refuse until the reconnect cooldown has expired. + /// - If a node is well-connected, it will stop trying to connect to known + /// bootstrap nodes, even if this node's address is passed as an explicit peer + /// via the corresponding command line argument. #[clap(long)] pub(crate) bootstrap: bool, @@ -198,6 +209,10 @@ pub struct Args { pub(crate) sync_mode_threshold: usize, /// IPs of nodes to connect to, e.g.: --peers 8.8.8.8:9798 --peers 8.8.4.4:1337. + /// + /// Connection attempts to bootstrap nodes will only be made if the own node is + /// not well-connected yet. Once enough connections to non-bootstrap peers have + /// been established, bootstrap nodes will not be bothered anymore. #[structopt(long)] pub(crate) peers: Vec, diff --git a/src/connect_to_peers.rs b/src/connect_to_peers.rs index 96e1dabed..ca61ff0e2 100644 --- a/src/connect_to_peers.rs +++ b/src/connect_to_peers.rs @@ -24,6 +24,7 @@ use tracing::warn; use crate::models::channel::MainToPeerTask; use crate::models::channel::PeerTaskToMain; +use crate::models::peer::bootstrap_info::BootstrapStatus; use crate::models::peer::ConnectionRefusedReason; use crate::models::peer::InternalConnectionStatus; use crate::models::peer::NegativePeerSanction; @@ -310,13 +311,24 @@ where info!("Connection accepted from {peer_address}"); // If necessary, disconnect from another, existing peer. - if connection_status == InternalConnectionStatus::AcceptedMaxReached && state.cli().bootstrap { + let self_is_bootstrap = state.cli().bootstrap; + if connection_status == InternalConnectionStatus::AcceptedMaxReached && self_is_bootstrap { info!("Maximum # peers reached, so disconnecting from an existing peer."); peer_task_to_main_tx .send(PeerTaskToMain::DisconnectFromLongestLivedPeer) .await?; } + // inform the new peer about our bootstrap status + let bootstrap_status = if self_is_bootstrap { + BootstrapStatus::Bootstrap + } else { + BootstrapStatus::Ordinary + }; + peer.send(PeerMessage::BootstrapStatus(bootstrap_status)) + .await?; + debug!("Informing {peer_address} of our bootstrap status: {bootstrap_status}"); + let peer_distance = 1; // All incoming connections have distance 1 let mut peer_loop_handler = PeerLoopHandler::new( peer_task_to_main_tx, @@ -560,6 +572,9 @@ mod connect_tests { use anyhow::bail; use anyhow::Result; + use bytes::Bytes; + use proptest::prelude::*; + use test_strategy::proptest; use tokio_test::io::Builder; use tracing_test::traced_test; use twenty_first::math::digest::Digest; @@ -586,9 +601,12 @@ mod connect_tests { #[traced_test] #[tokio::test] async fn test_outgoing_connection_succeed() -> Result<()> { - let network = Network::Alpha; - let other_handshake = get_dummy_handshake_data_for_genesis(network); - let own_handshake = get_dummy_handshake_data_for_genesis(network); + let network = Network::Main; + let (_tx, main_to_peer_rx, peer_to_main_tx, _rx, state, own_handshake) = + get_test_genesis_setup(network, 0, cli_args::Args::default()).await?; + let (peer_handshake, peer_socket_address) = + get_dummy_peer_connection_data_genesis(network, 0); + let mock = Builder::new() .write(&to_bytes(&PeerMessage::Handshake(Box::new(( MAGIC_STRING_REQUEST.to_vec(), @@ -596,7 +614,7 @@ mod connect_tests { ))))?) .read(&to_bytes(&PeerMessage::Handshake(Box::new(( MAGIC_STRING_RESPONSE.to_vec(), - other_handshake, + peer_handshake, ))))?) .read(&to_bytes(&PeerMessage::ConnectionStatus( TransferConnectionStatus::Accepted, @@ -604,25 +622,20 @@ mod connect_tests { .write(&to_bytes(&PeerMessage::PeerListRequest)?) .read(&to_bytes(&PeerMessage::Bye)?) .build(); - - let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state, _hsd) = - get_test_genesis_setup(Network::Alpha, 0, cli_args::Args::default()).await?; call_peer_inner( mock, state.clone(), - get_dummy_socket_address(0), - from_main_rx_clone, - to_main_tx, + peer_socket_address, + main_to_peer_rx, + peer_to_main_tx, &own_handshake, 1, ) .await?; - // Verify that peer map is empty after connection has been closed - match state.lock(|s| s.net.peer_map.keys().len()).await { - 0 => (), - _ => bail!("Incorrect number of maps in peer map"), - }; + if !state.lock_guard().await.net.peer_map.is_empty() { + bail!("peer map must be empty after connection has been closed"); + } Ok(()) } @@ -847,6 +860,9 @@ mod connect_tests { .write(&to_bytes(&PeerMessage::ConnectionStatus( TransferConnectionStatus::Accepted, ))?) + .write(&to_bytes(&PeerMessage::BootstrapStatus( + BootstrapStatus::Ordinary, + ))?) .read(&to_bytes(&PeerMessage::Bye)?) .build(); answer_peer_inner( @@ -888,6 +904,9 @@ mod connect_tests { .write(&to_bytes(&PeerMessage::ConnectionStatus( TransferConnectionStatus::Accepted, ))?) + .write(&to_bytes(&PeerMessage::BootstrapStatus( + BootstrapStatus::Ordinary, + ))?) .read(&to_bytes(&PeerMessage::Bye)?) .build(); let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) = @@ -1242,4 +1261,101 @@ mod connect_tests { Ok(()) } + + #[proptest(cases = 20, async = "tokio")] + async fn bootstrap_status_message_propagates_to_state( + connection_is_incoming: bool, + own_bootstrap_status: BootstrapStatus, + peer_bootstrap_status: BootstrapStatus, + ) { + // convenience wrapper for `to_bytes` + fn serialize(message: &PeerMessage) -> Bytes { + to_bytes(message).unwrap() + } + + let network = Network::Main; + let args = cli_args::Args { + network, + bootstrap: own_bootstrap_status == BootstrapStatus::Bootstrap, + ..Default::default() + }; + let (_tx, main_to_peer_rx, peer_to_main_tx, _rx, state, own_handshake) = + get_test_genesis_setup(network, 0, args).await.unwrap(); + + // sanity check: no bootstrap status is known after startup + prop_assert!(state.lock_guard().await.net.bootstrap_status.is_empty()); + + // simulate connection + let (peer_handshake, peer_socket_address) = + get_dummy_peer_connection_data_genesis(network, 1); + let mut stream_builder = Builder::new(); + + if connection_is_incoming { + stream_builder + .read(&serialize(&PeerMessage::Handshake(Box::new(( + MAGIC_STRING_REQUEST.to_vec(), + peer_handshake.clone(), + ))))) + .write(&serialize(&PeerMessage::Handshake(Box::new(( + MAGIC_STRING_RESPONSE.to_vec(), + own_handshake.clone(), + ))))) + .write(&serialize(&PeerMessage::ConnectionStatus( + TransferConnectionStatus::Accepted, + ))) + .write(&serialize(&PeerMessage::BootstrapStatus( + own_bootstrap_status, + ))); + } else { + stream_builder + .write(&serialize(&PeerMessage::Handshake(Box::new(( + MAGIC_STRING_REQUEST.to_vec(), + own_handshake.clone(), + ))))) + .read(&serialize(&PeerMessage::Handshake(Box::new(( + MAGIC_STRING_RESPONSE.to_vec(), + peer_handshake.clone(), + ))))) + .read(&serialize(&PeerMessage::ConnectionStatus( + TransferConnectionStatus::Accepted, + ))) + .write(&serialize(&PeerMessage::PeerListRequest)); + } + let mock_stream = stream_builder + .read(&serialize(&PeerMessage::BootstrapStatus( + peer_bootstrap_status, + ))) + .read(&serialize(&PeerMessage::Bye)) + .build(); + + if connection_is_incoming { + answer_peer_inner( + mock_stream, + state.clone(), + peer_socket_address, + main_to_peer_rx, + peer_to_main_tx, + own_handshake, + ) + .await + .unwrap(); + } else { + call_peer_inner( + mock_stream, + state.clone(), + peer_socket_address, + main_to_peer_rx, + peer_to_main_tx, + &own_handshake, + 1, + ) + .await + .unwrap(); + } + + let bootstrap_status = &state.lock_guard().await.net.bootstrap_status; + prop_assert_eq!(1, bootstrap_status.len()); + let peer_status = bootstrap_status.get(&peer_socket_address).unwrap(); + prop_assert_eq!(peer_bootstrap_status, peer_status.status); + } } diff --git a/src/main_loop.rs b/src/main_loop.rs index bc8a0040f..48019ecb8 100644 --- a/src/main_loop.rs +++ b/src/main_loop.rs @@ -1,5 +1,6 @@ pub mod proof_upgrader; +use std::cmp::Ordering; use std::collections::HashMap; use std::net::SocketAddr; use std::time::Duration; @@ -25,6 +26,7 @@ use tracing::info; use tracing::trace; use tracing::warn; +use crate::config_models::cli_args; use crate::connect_to_peers::answer_peer; use crate::connect_to_peers::call_peer; use crate::job_queue::triton_vm::TritonVmJobPriority; @@ -43,13 +45,16 @@ use crate::models::channel::MainToPeerTaskBatchBlockRequest; use crate::models::channel::MinerToMain; use crate::models::channel::PeerTaskToMain; use crate::models::channel::RPCServerToMain; +use crate::models::peer::bootstrap_info::BootstrapStatus; use crate::models::peer::handshake_data::HandshakeData; use crate::models::peer::peer_info::PeerInfo; use crate::models::peer::transaction_notification::TransactionNotification; +use crate::models::peer::InstanceId; use crate::models::peer::PeerSynchronizationState; use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions; use crate::models::state::block_proposal::BlockProposal; use crate::models::state::mempool::TransactionOrigin; +use crate::models::state::networking_state::NetworkingState; use crate::models::state::networking_state::SyncAnchor; use crate::models::state::tx_proving_capability::TxProvingCapability; use crate::models::state::GlobalState; @@ -149,12 +154,11 @@ struct MutableMainLoopState { } impl MutableMainLoopState { - fn new(task_handles: Vec>) -> Self { - let (_dummy_sender, dummy_receiver) = - mpsc::channel::>(TX_UPDATER_CHANNEL_CAPACITY); + fn new(cli_args: &cli_args::Args, task_handles: Vec>) -> Self { + let (_dummy_sender, dummy_receiver) = mpsc::channel(TX_UPDATER_CHANNEL_CAPACITY); Self { sync_state: SyncState::default(), - potential_peers: PotentialPeersState::default(), + potential_peers: PotentialPeersState::new(cli_args), task_handles, proof_upgrader_task: None, update_mempool_txs_handle: None, @@ -228,137 +232,177 @@ impl SyncState { } } -/// holds information about a potential peer in the process of peer discovery -struct PotentialPeerInfo { - _reported: SystemTime, - _reported_by: SocketAddr, - instance_id: u128, +/// A potential peer in the process of peer discovery. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +struct PeerCandidate { + address: SocketAddr, + id: InstanceId, distance: u8, } -impl PotentialPeerInfo { - fn new(reported_by: SocketAddr, instance_id: u128, distance: u8, now: SystemTime) -> Self { +impl PeerCandidate { + fn new(address: SocketAddr, id: InstanceId, distance: u8) -> Self { Self { - _reported: now, - _reported_by: reported_by, - instance_id, + address, + id, distance, } } } -/// holds information about a set of potential peers in the process of peer discovery +/// The potential peers in the process of peer discovery. +#[derive(Debug, Clone)] struct PotentialPeersState { - potential_peers: HashMap, + /// A copy of the corresponding field from the + /// [command line arguments](cli_args::Args). + max_num_peers: usize, + + candidates: HashMap, } impl PotentialPeersState { - fn default() -> Self { + // Takes (a reference to) the entire command line arguments to facilitate + // future refactors: in case additional information is needed, is can just + // be grabbed here. + // Storing the entire argument list is either a potentially big cloning + // operation or a big lifetime challenge. + fn new(cli_args: &cli_args::Args) -> Self { Self { - potential_peers: HashMap::new(), + max_num_peers: cli_args.max_num_peers, + candidates: HashMap::new(), } } - fn add( - &mut self, - reported_by: SocketAddr, - potential_peer: (SocketAddr, u128), - max_peers: usize, - distance: u8, - now: SystemTime, - ) { - let potential_peer_socket_address = potential_peer.0; - let potential_peer_instance_id = potential_peer.1; - - // This check *should* make it likely that a potential peer is always - // registered with the lowest observed distance. - if self - .potential_peers - .contains_key(&potential_peer_socket_address) - { + fn add(&mut self, candidate: PeerCandidate) { + // always use the lowest observed distance for a given candidate + if let Some(existing_candidate) = self.candidates.get_mut(&candidate.address) { + if candidate.distance < existing_candidate.distance { + *existing_candidate = candidate; + } return; } - // If this data structure is full, remove a random entry. Then add this. - if self.potential_peers.len() - > max_peers * POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS - { - let mut rng = rand::rng(); - let random_potential_peer = self - .potential_peers - .keys() - .choose(&mut rng) - .unwrap() - .to_owned(); - self.potential_peers.remove(&random_potential_peer); + // if the candidate list is full, remove random entries + let max_num_candidates = + self.max_num_peers * POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS; + while self.candidates.len() >= max_num_candidates { + let Some(&random_candidate) = self.candidates.keys().choose(&mut rand::rng()) else { + warn!("Failed to shrink full potential peer list: couldn't find element to remove"); + return; + }; + if self.candidates.remove(&random_candidate).is_none() { + warn!("Failed to shrink full potential peer list: couldn't remove chosen element"); + return; + }; } - let insert_value = - PotentialPeerInfo::new(reported_by, potential_peer_instance_id, distance, now); - self.potential_peers - .insert(potential_peer_socket_address, insert_value); + self.candidates.insert(candidate.address, candidate); } - /// Return a peer from the potential peer list that we aren't connected to - /// and that isn't our own address. + /// Select a node that + /// - is not us, + /// - is no peer, and + /// - if we are well-connected, is not a bootstrap node. /// - /// Favors peers with a high distance and with IPs that we are not already - /// connected to. + /// Favors peers with a large distance and those with IP addresses we are + /// not already connected to. /// - /// Returns (socket address, peer distance) - fn get_candidate( - &self, - connected_clients: &[PeerInfo], - own_instance_id: u128, - ) -> Option<(SocketAddr, u8)> { - let peers_instance_ids: Vec = - connected_clients.iter().map(|x| x.instance_id()).collect(); - - // Only pick those peers that report a listening port - let peers_listen_addresses: Vec = connected_clients - .iter() - .filter_map(|x| x.listen_address()) - .collect(); - - // Find the appropriate candidates - let candidates = self - .potential_peers - .iter() - // Prevent connecting to self. Note that we *only* use instance ID to prevent this, - // meaning this will allow multiple nodes e.g. running on the same computer to form - // a complete graph. - .filter(|pp| pp.1.instance_id != own_instance_id) - // Prevent connecting to peer we already are connected to - .filter(|potential_peer| !peers_instance_ids.contains(&potential_peer.1.instance_id)) - .filter(|potential_peer| !peers_listen_addresses.contains(potential_peer.0)) - .collect::>(); - - // Prefer candidates with IPs that we are not already connected to but - // connect to repeated IPs in case we don't have other options, as - // repeated IPs may just be multiple machines on the same NAT'ed IPv4 - // address. - let mut connected_ips = peers_listen_addresses.into_iter().map(|x| x.ip()); - let candidates = if candidates - .iter() - .any(|candidate| !connected_ips.contains(&candidate.0.ip())) - { - candidates - .into_iter() - .filter(|candidate| !connected_ips.contains(&candidate.0.ip())) - .collect() - } else { - candidates + /// The decision whether a potential peer is identical to the current node + /// is made _only_ based on the [`InstanceId`]. This means that different + /// nodes that run on the same computer _can_ connect to each other. + /// + /// This method requires (immutable) access to the [`NetworkingState`], + /// which in turn requires the caller of this function to acquire a read + /// lock of the `global_state_lock`. This design is a tradeoff; the + /// alternative is to clone the various fields and then release the lock. + // + // A note on the design: The method uses various filters as well as a way to + // compare two peer candidates in a simple iterator-combinator chain to + // select the candidate. + // The design's intention is to simplify future changes: declaring a new + // filter as well as modifying or dropping existing filters has little to no + // impact on other filters. Similarly, modifying the comparison function + // does not require changes to the filters. + fn peer_candidate(&self, networking_state: &NetworkingState) -> Option { + let is_self = |candidate: &PeerCandidate| candidate.id == networking_state.instance_id; + let same_id_is_connected = |candidate: &PeerCandidate| { + networking_state + .peer_map + .values() + .any(|peer| peer.instance_id() == candidate.id) + }; + let same_socket_is_connected = |candidate: &PeerCandidate| { + networking_state + .peer_map + .values() + .filter_map(|peer| peer.listen_address()) + .any(|address| address == candidate.address) + }; + let is_bootstrap_node = |candidate: &PeerCandidate| { + let candidate_bootstrap_status = networking_state + .bootstrap_status + .get(&candidate.address) + .map(|bootstrap_info| bootstrap_info.status) + .unwrap_or_default(); + candidate_bootstrap_status == BootstrapStatus::Bootstrap }; - // Get the candidate list with the highest distance - let max_distance_candidates = candidates.iter().max_by_key(|pp| pp.1.distance); + let curr_num_peers = networking_state.peer_map.len(); + let self_is_well_connected = self.is_well_connected(curr_num_peers); - // Pick a random candidate from the appropriate candidates - let mut rng = rand::rng(); - max_distance_candidates - .iter() - .choose(&mut rng) - .map(|x| (x.0.to_owned(), x.1.distance)) + self.candidates + .values() + .filter(|candidate| !is_self(candidate)) + .filter(|candidate| !same_id_is_connected(candidate)) + .filter(|candidate| !same_socket_is_connected(candidate)) + .filter(|candidate| !(self_is_well_connected && is_bootstrap_node(candidate))) + .max_by(|l, r| Self::candidate_ordering(networking_state.peer_map.values(), l, r)) + .copied() + } + + fn is_well_connected(&self, curr_num_peers: usize) -> bool { + /// A node is considered minimally well-connected if it has at least + /// this many peers. + const NUM_PEERS_MINIMALLY_WELL_CONNECTED: usize = 3; + + /// A node is considered sufficiently well-connected if it has more + /// peers than this ratio of its maximum number of peers. + const SUFFICIENTLY_WELL_CONNECTED_RATIO: f32 = 0.5; + + let is_minimally_well_connected = curr_num_peers >= NUM_PEERS_MINIMALLY_WELL_CONNECTED; + + let is_sufficiently_well_connected = + curr_num_peers as f32 > SUFFICIENTLY_WELL_CONNECTED_RATIO * self.max_num_peers as f32; + + is_minimally_well_connected && is_sufficiently_well_connected + } + + /// Order candidates by connection status of their IP address and distance. + /// + /// The resulting [Ordering] is used like in [`Ord::cmp`]. For example, + /// `candidate_ordering(&[], left, right) == `[`Ordering::Less`] means that + /// `left` is the less-suitable candidate. + // + // This is not `impl Ord for PeerCandidate` because it depends on the + // current peers. + fn candidate_ordering<'pi>( + current_peers: impl Iterator + Clone, + left: &PeerCandidate, + right: &PeerCandidate, + ) -> Ordering { + // Does a connection to the candidate's IP exist? + let ip_is_connected = |candidate: &PeerCandidate| { + current_peers + .clone() + .filter_map(|peer| peer.listen_address()) + .any(|address| address.ip() == candidate.address.ip()) + }; + + match (ip_is_connected(left), ip_is_connected(right)) { + (true, false) => Ordering::Less, // prefer `right` + (false, true) => Ordering::Greater, // prefer `left` + _ => left.distance.cmp(&right.distance), + } } } @@ -774,18 +818,11 @@ impl MainLoopHandler { } } } - PeerTaskToMain::PeerDiscoveryAnswer((pot_peers, reported_by, distance)) => { + PeerTaskToMain::PeerDiscoveryAnswer(potential_peers, distance) => { log_slow_scope!(fn_name!() + "::PeerTaskToMain::PeerDiscoveryAnswer"); - - let max_peers = self.global_state_lock.cli().max_num_peers; - for pot_peer in pot_peers { - main_loop_state.potential_peers.add( - reported_by, - pot_peer, - max_peers, - distance, - self.now(), - ); + for (address, id) in potential_peers { + let candidate = PeerCandidate::new(address, id, distance); + main_loop_state.potential_peers.add(candidate); } } PeerTaskToMain::Transaction(pt2m_transaction) => { @@ -1025,12 +1062,10 @@ impl MainLoopHandler { // fetch all relevant info from global state, then release the lock let cli_args = self.global_state_lock.cli(); let global_state = self.global_state_lock.lock_guard().await; - let connected_peers = global_state.net.peer_map.values().cloned().collect_vec(); - let own_instance_id = global_state.net.instance_id; + let num_peers = global_state.net.peer_map.len(); let own_handshake_data = global_state.get_own_handshakedata(); drop(global_state); - let num_peers = connected_peers.len(); let max_num_peers = cli_args.max_num_peers; // Don't make an outgoing connection if @@ -1054,29 +1089,33 @@ impl MainLoopHandler { // the peer lists requested in the previous step will not have come in // yet. Therefore, the new candidate is selected based on somewhat // (but not overly) old information. - let Some((peer_candidate, candidate_distance)) = main_loop_state + let Some(peer_candidate) = main_loop_state .potential_peers - .get_candidate(&connected_peers, own_instance_id) + .peer_candidate(&self.global_state_lock.lock_guard().await.net) else { info!("Found no peer candidate to connect to. Not making new connection."); return Ok(()); }; // Try to connect to the selected candidate. - info!("Connecting to peer {peer_candidate} with distance {candidate_distance}"); + info!( + "Connecting to peer {address} with distance {distance}", + address = peer_candidate.address, + distance = peer_candidate.distance + ); let global_state_lock = self.global_state_lock.clone(); let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe(); let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned(); let outgoing_connection_task = tokio::task::Builder::new() - .name("call_peer_wrapper_2") + .name("call_peer_for_new_connection_from_peer_discovery") .spawn(async move { call_peer( - peer_candidate, + peer_candidate.address, global_state_lock, main_to_peer_broadcast_rx, peer_task_to_main_tx, own_handshake_data, - candidate_distance, + peer_candidate.distance, ) .await; })?; @@ -1086,10 +1125,10 @@ impl MainLoopHandler { // Immediately request the new peer's peer list. This allows // incorporating the new peer's peers into the list of potential peers, // to be used in the next round of peer discovery. + let peer_discovery_request = + MainToPeerTask::MakeSpecificPeerDiscoveryRequest(peer_candidate.address); self.main_to_peer_broadcast_tx - .send(MainToPeerTask::MakeSpecificPeerDiscoveryRequest( - peer_candidate, - ))?; + .send(peer_discovery_request)?; Ok(()) } @@ -1393,7 +1432,8 @@ impl MainLoopHandler { task_handles: Vec>, ) -> Result { // Handle incoming connections, messages from peer tasks, and messages from the mining task - let mut main_loop_state = MutableMainLoopState::new(task_handles); + let mut main_loop_state = + MutableMainLoopState::new(self.global_state_lock.cli(), task_handles); // Set peer discovery to run every N seconds. All timers must be reset // every time they have run. @@ -1895,9 +1935,10 @@ mod test { mut main_to_peer_rx, .. } = test_setup; - let network = main_loop_handler.global_state_lock.cli().network; - let mut mutable_main_loop_state = MutableMainLoopState::new(task_join_handles); + let cli_args = main_loop_handler.global_state_lock.cli(); + let mut mutable_main_loop_state = MutableMainLoopState::new(cli_args, task_join_handles); + let network = main_loop_handler.global_state_lock.cli().network; let block1 = invalid_empty_block(&Block::genesis(network)); assert!( @@ -1992,7 +2033,9 @@ mod test { .. } = test_setup; - let mut mutable_main_loop_state = MutableMainLoopState::new(task_join_handles); + let cli_args = main_loop_handler.global_state_lock.cli(); + let mut mutable_main_loop_state = + MutableMainLoopState::new(cli_args, task_join_handles); main_loop_handler .block_sync(&mut mutable_main_loop_state) @@ -2151,7 +2194,9 @@ mod test { .set_cli(mocked_cli) .await; let mut main_loop_handler = main_loop_handler.with_mocked_time(SystemTime::now()); - let mut mutable_main_loop_state = MutableMainLoopState::new(task_join_handles); + let cli_args = main_loop_handler.global_state_lock.cli(); + let mut mutable_main_loop_state = + MutableMainLoopState::new(cli_args, task_join_handles); assert!( main_loop_handler @@ -2252,7 +2297,7 @@ mod test { Ok(MainToPeerTask::TransactionNotification(tx_noti)) => { assert_eq!(merged_txid, tx_noti.txid); assert_eq!(TransactionProofQuality::SingleProof, tx_noti.proof_quality); - }, + } other => panic!("Must have sent transaction notification to peer loop after successful proof upgrade. Got:\n{other:?}"), } @@ -2267,6 +2312,8 @@ mod test { mod peer_discovery { use super::*; + use crate::models::peer::bootstrap_info::BootstrapInfo; + use crate::tests::shared::get_dummy_socket_address; #[tokio::test] #[traced_test] @@ -2344,8 +2391,9 @@ mod test { .global_state_lock .set_cli(mocked_cli) .await; + let cli_args = main_loop_handler.global_state_lock.cli(); main_loop_handler - .discover_peers(&mut MutableMainLoopState::new(task_join_handles)) + .discover_peers(&mut MutableMainLoopState::new(cli_args, task_join_handles)) .await .unwrap(); @@ -2374,8 +2422,9 @@ mod test { .global_state_lock .set_cli(mocked_cli) .await; + let cli_args = main_loop_handler.global_state_lock.cli(); main_loop_handler - .discover_peers(&mut MutableMainLoopState::new(task_join_handles)) + .discover_peers(&mut MutableMainLoopState::new(cli_args, task_join_handles)) .await .unwrap(); @@ -2383,6 +2432,66 @@ mod test { assert!(peer_discovery_sent_messages_on_peer_channel); assert!(logs_contain("Performing peer discovery")); } + + #[tokio::test] + #[traced_test] + async fn poorly_connected_node_tries_to_connect_to_bootstrap_node() { + let mut global_state_lock = setup(1, 1).await.main_loop_handler.global_state_lock; + let cli_args = cli_args::Args { + max_num_peers: 100, + ..Default::default() + }; + global_state_lock.set_cli(cli_args).await; + let (peer_candidate, peer_state) = + set_up_peer_candidate_test(global_state_lock.clone()).await; + + let net_state = &global_state_lock.lock_guard().await.net; + let chosen_candidate = peer_state.peer_candidate(net_state); + assert_eq!(Some(peer_candidate), chosen_candidate); + } + + #[tokio::test] + #[traced_test] + async fn well_connected_node_does_not_try_to_connect_to_bootstrap_nodes() { + let mut global_state_lock = setup(20, 20).await.main_loop_handler.global_state_lock; + let cli_args = cli_args::Args { + max_num_peers: 42, + ..Default::default() + }; + global_state_lock.set_cli(cli_args).await; + let (_, peer_state) = set_up_peer_candidate_test(global_state_lock.clone()).await; + + let net_state = &global_state_lock.lock_guard().await.net; + let chosen_candidate = peer_state.peer_candidate(net_state); + assert!(chosen_candidate.is_none()); + } + + /// Create a [PotentialPeersState] and [add](PotentialPeersState::add) + /// one [peer candidate](PeerCandidate) that is a + /// [bootstrap node](BootstrapStatus::Bootstrap). The added peer + /// candidate as well as the resulting [PotentialPeersState] are + /// returned. + /// + /// * acquires `global_state_lock` for write + async fn set_up_peer_candidate_test( + mut global_state_lock: GlobalStateLock, + ) -> (PeerCandidate, PotentialPeersState) { + // try to avoid collisions: peer ids are implicit in the setup + let peer_id = 255; + let peer_socket_address = get_dummy_socket_address(peer_id as u8); + let peer_bootstrap_info = BootstrapInfo::new(BootstrapStatus::Bootstrap); + global_state_lock + .lock_guard_mut() + .await + .net + .bootstrap_status + .insert(peer_socket_address, peer_bootstrap_info); + + let peer_candidate = PeerCandidate::new(peer_socket_address, peer_id, 1); + let mut peer_state = PotentialPeersState::new(global_state_lock.cli()); + peer_state.add(peer_candidate); + (peer_candidate, peer_state) + } } #[test] @@ -2403,8 +2512,8 @@ mod test { instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap() ); } - mod bootstrapper_mode { + mod bootstrap_mode { use rand::Rng; use super::*; @@ -2416,7 +2525,7 @@ mod test { #[tokio::test] #[traced_test] async fn disconnect_from_oldest_peer_upon_connection_request() { - // Set up a node in bootstrapper mode and connected to a given + // Set up a node in bootstrap mode and connected to a given // number of peers, which is one less than the maximum. Initiate a // connection request. Verify that the oldest of the existing // connections is dropped. @@ -2427,11 +2536,10 @@ mod test { let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await; let TestSetup { mut peer_to_main_rx, - miner_to_main_rx: _, - rpc_server_to_main_rx: _, task_join_handles, mut main_loop_handler, mut main_to_peer_rx, + .. } = test_setup; let mocked_cli = cli_args::Args { @@ -2445,7 +2553,9 @@ mod test { .set_cli(mocked_cli) .await; - let mut mutable_main_loop_state = MutableMainLoopState::new(task_join_handles); + let cli_args = main_loop_handler.global_state_lock.cli(); + let mut mutable_main_loop_state = + MutableMainLoopState::new(cli_args, task_join_handles); // check sanity: at startup, we are connected to the initial number of peers assert_eq!( @@ -2460,9 +2570,13 @@ mod test { ); // randomize "connection established" timestamps - let mut rng = rand::rng(); - let now = SystemTime::now(); - let now_as_unix_timestamp = now.duration_since(UNIX_EPOCH).unwrap(); + let random_offset = || { + let unix_epoch_to_now_in_millis = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + Duration::from_millis(rand::rng().random_range(0..unix_epoch_to_now_in_millis)) + }; main_loop_handler .global_state_lock .lock_guard_mut() @@ -2470,13 +2584,8 @@ mod test { .net .peer_map .iter_mut() - .for_each(|(_socket_address, peer_info)| { - peer_info.set_connection_established( - UNIX_EPOCH - + Duration::from_millis( - rng.random_range(0..(now_as_unix_timestamp.as_millis() as u64)), - ), - ); + .for_each(|(_, peer_info)| { + peer_info.set_connection_established(UNIX_EPOCH + random_offset()); }); // compute which peer will be dropped, for later reference @@ -2487,11 +2596,11 @@ mod test { .net .peer_map .iter() - .min_by(|l, r| { - l.1.connection_established() - .cmp(&r.1.connection_established()) + .min_by(|(_, left), (_, right)| { + left.connection_established() + .cmp(&right.connection_established()) }) - .map(|(socket_address, _peer_info)| socket_address) + .map(|(socket_address, _)| socket_address) .copied() .unwrap(); @@ -2503,8 +2612,8 @@ mod test { .lock_guard() .await .get_own_handshakedata(); - assert_eq!(peer_handshake_data.network, own_handshake_data.network,); - assert_eq!(peer_handshake_data.version, own_handshake_data.version,); + assert_eq!(peer_handshake_data.network, own_handshake_data.network); + assert_eq!(peer_handshake_data.version, own_handshake_data.version); let mock_stream = tokio_test::io::Builder::new() .read( &to_bytes(&PeerMessage::Handshake(Box::new(( @@ -2529,23 +2638,20 @@ mod test { .build(); let peer_to_main_tx_clone = main_loop_handler.peer_task_to_main_tx.clone(); let global_state_lock_clone = main_loop_handler.global_state_lock.clone(); - let (_main_to_peer_tx_mock, main_to_peer_rx_mock) = tokio::sync::broadcast::channel(10); + let main_to_peer_rx_clone = main_loop_handler.main_to_peer_broadcast_tx.subscribe(); let incoming_peer_task_handle = tokio::task::Builder::new() .name("answer_peer_wrapper") .spawn(async move { - match answer_peer( + answer_peer( mock_stream, global_state_lock_clone, peer_socket_address, - main_to_peer_rx_mock, + main_to_peer_rx_clone, peer_to_main_tx_clone, own_handshake_data, ) .await - { - Ok(()) => (), - Err(err) => error!("Got error: {:?}", err), - } + .unwrap(); }) .unwrap(); diff --git a/src/models/channel.rs b/src/models/channel.rs index ca5836072..079e64cf7 100644 --- a/src/models/channel.rs +++ b/src/models/channel.rs @@ -14,6 +14,7 @@ use super::peer::transaction_notification::TransactionNotification; use super::proof_abstractions::mast_hash::MastHash; use super::state::wallet::expected_utxo::ExpectedUtxo; use super::state::wallet::monitored_utxo::MonitoredUtxo; +use crate::models::peer::InstanceId; #[derive(Clone, Debug)] pub(crate) enum MainToMiner { @@ -162,8 +163,8 @@ pub(crate) enum PeerTaskToMain { }, RemovePeerMaxBlockHeight(SocketAddr), - /// (\[(peer_listen_address)\], reported_by, distance) - PeerDiscoveryAnswer((Vec<(SocketAddr, u128)>, SocketAddr, u8)), + /// list of peer's peers, and their distance from “self” + PeerDiscoveryAnswer(Vec<(SocketAddr, InstanceId)>, u8), Transaction(Box), BlockProposal(Box), @@ -182,7 +183,7 @@ impl PeerTaskToMain { PeerTaskToMain::NewBlocks(_) => "new blocks", PeerTaskToMain::AddPeerMaxBlockHeight { .. } => "add peer max block height", PeerTaskToMain::RemovePeerMaxBlockHeight(_) => "remove peer max block height", - PeerTaskToMain::PeerDiscoveryAnswer(_) => "peer discovery answer", + PeerTaskToMain::PeerDiscoveryAnswer(..) => "peer discovery answer", PeerTaskToMain::Transaction(_) => "transaction", PeerTaskToMain::BlockProposal(_) => "block proposal", PeerTaskToMain::DisconnectFromLongestLivedPeer => "disconnect from longest lived peer", diff --git a/src/models/peer.rs b/src/models/peer.rs index 32d51daf0..d19b4a1bd 100644 --- a/src/models/peer.rs +++ b/src/models/peer.rs @@ -1,3 +1,4 @@ +pub(crate) mod bootstrap_info; pub(crate) mod handshake_data; pub mod peer_block_notifications; pub mod peer_info; @@ -42,6 +43,7 @@ use super::proof_abstractions::timestamp::Timestamp; use super::state::transaction_kernel_id::TransactionKernelId; use crate::config_models::network::Network; use crate::models::blockchain::block::difficulty_control::max_cumulative_pow_after; +use crate::models::peer::bootstrap_info::BootstrapStatus; use crate::models::peer::transfer_block::TransferBlock; use crate::prelude::twenty_first; @@ -97,6 +99,17 @@ pub enum NegativePeerSanction { UnwantedMessage, NoStandingFoundMaybeCrash, + + /// Nodes can change their bootstrap status and notify their peers about + /// such updates. Under normal circumstances, these updates are rare. + /// Frequent updates are an indicator of malicious behavior. + BootstrapStatusUpdateSpam, + + /// It is valid for a peer to update its bootstrap status. However, each + /// update requires write-access to the global state lock, which is to be + /// avoided. In order to disincentivize updates, they cause a mild + /// punishment. + BootstrapStatusUpdate, } /// The reason for improving a peer's standing @@ -164,6 +177,8 @@ impl Display for NegativePeerSanction { } NegativePeerSanction::FishyPowEvolutionChallengeResponse => "fishy pow evolution", NegativePeerSanction::FishyDifficultiesChallengeResponse => "fishy difficulties", + NegativePeerSanction::BootstrapStatusUpdateSpam => "bootstrap status update spam", + NegativePeerSanction::BootstrapStatusUpdate => "bootstrap status update", }; write!(f, "{string}") } @@ -238,6 +253,8 @@ impl Sanction for NegativePeerSanction { NegativePeerSanction::BatchBlocksRequestTooManyDigests => -50, NegativePeerSanction::FishyPowEvolutionChallengeResponse => -51, NegativePeerSanction::FishyDifficultiesChallengeResponse => -51, + NegativePeerSanction::BootstrapStatusUpdateSpam => -20, + NegativePeerSanction::BootstrapStatusUpdate => -2, } } } @@ -285,6 +302,7 @@ pub struct PeerStanding { pub latest_reward: Option<(PositivePeerSanction, SystemTime)>, peer_tolerance: i32, } + #[derive(Debug, Clone, Copy, Default)] pub(crate) struct StandingExceedsBanThreshold; @@ -467,6 +485,7 @@ pub(crate) enum PeerMessage { /// Inform peer that we are disconnecting them. Bye, ConnectionStatus(TransferConnectionStatus), + BootstrapStatus(BootstrapStatus), } impl PeerMessage { @@ -493,6 +512,7 @@ impl PeerMessage { PeerMessage::UnableToSatisfyBatchRequest => "unable to satisfy batch request", PeerMessage::SyncChallenge(_) => "sync challenge", PeerMessage::SyncChallengeResponse(_) => "sync challenge response", + PeerMessage::BootstrapStatus(_) => "bootstrap status", } .to_string() } @@ -520,6 +540,7 @@ impl PeerMessage { PeerMessage::UnableToSatisfyBatchRequest => true, PeerMessage::SyncChallenge(_) => false, PeerMessage::SyncChallengeResponse(_) => false, + PeerMessage::BootstrapStatus(_) => false, } } @@ -547,6 +568,7 @@ impl PeerMessage { PeerMessage::UnableToSatisfyBatchRequest => false, PeerMessage::SyncChallenge(_) => false, PeerMessage::SyncChallengeResponse(_) => false, + PeerMessage::BootstrapStatus(_) => false, } } } diff --git a/src/models/peer/bootstrap_info.rs b/src/models/peer/bootstrap_info.rs new file mode 100644 index 000000000..597063964 --- /dev/null +++ b/src/models/peer/bootstrap_info.rs @@ -0,0 +1,50 @@ +use std::fmt::Display; +use std::time::SystemTime; + +use serde_derive::Deserialize; +use serde_derive::Serialize; + +/// A node's [BootstrapStatus] and some metadata. +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub(crate) struct BootstrapInfo { + pub status: BootstrapStatus, + + /// The time when the status was last set. + pub last_set: SystemTime, +} + +/// Does the node help bootstrapping the network? +#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[cfg_attr(test, derive(test_strategy::Arbitrary))] +pub(crate) enum BootstrapStatus { + /// The node is not a bootstrap node. + /// + /// If no further information is known about a peer, it is assumed that it is an + /// ordinary node. + #[default] + Ordinary, + + /// The node _is_ a bootstrap node. + Bootstrap, +} + +impl Display for BootstrapStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let display = match self { + Self::Ordinary => "ordinary node", + Self::Bootstrap => "bootstrap node", + }; + + write!(f, "{display}") + } +} + +impl BootstrapInfo { + /// Create new [BootstrapInfo] right [now](SystemTime::now). + pub fn new(status: BootstrapStatus) -> Self { + Self { + status, + last_set: SystemTime::now(), + } + } +} diff --git a/src/models/state/networking_state.rs b/src/models/state/networking_state.rs index 0d6b65c98..cfbc55ebf 100644 --- a/src/models/state/networking_state.rs +++ b/src/models/state/networking_state.rs @@ -14,6 +14,7 @@ use crate::database::WriteBatchAsync; use crate::models::blockchain::block::block_height::BlockHeight; use crate::models::blockchain::block::difficulty_control::ProofOfWork; use crate::models::database::PeerDatabases; +use crate::models::peer::bootstrap_info::BootstrapInfo; use crate::models::peer::peer_info::PeerInfo; use crate::models::peer::InstanceId; use crate::models::peer::PeerStanding; @@ -94,7 +95,7 @@ pub struct NetworkingState { /// Timestamp for when the last tx-proof upgrade was attempted. Does not /// record latest successful upgrade, merely latest attempt. This is to /// prevent excessive runs of the proof-upgrade functionality. - pub last_tx_proof_upgrade_attempt: std::time::SystemTime, + pub last_tx_proof_upgrade_attempt: SystemTime, /// Disconnection times of past peers. Can be used to determine if a connection /// request should be accepted or rejected. @@ -106,6 +107,16 @@ pub struct NetworkingState { /// /// Only the peer tasks may update this map. disconnection_times: HashMap, + + /// Helps identify whether some peer helps to bootstrap the network. + /// + /// Once enough peers are found, no additional connection to bootstrap nodes + /// are made. + // + // This information is recorded here as opposed to the `PeerInfo` in the + // `PeerMap` because it must outlive connections to the peer. Otherwise, it + // would be impossible to suppress connection attempts to bootstrap nodes. + pub(crate) bootstrap_status: HashMap, } impl NetworkingState { @@ -120,6 +131,7 @@ impl NetworkingState { // after startup of the client. last_tx_proof_upgrade_attempt: SystemTime::now(), disconnection_times: HashMap::new(), + bootstrap_status: HashMap::new(), } } diff --git a/src/peer_loop.rs b/src/peer_loop.rs index f3d1e8a6d..362685efd 100644 --- a/src/peer_loop.rs +++ b/src/peer_loop.rs @@ -1,6 +1,7 @@ use std::cmp; use std::marker::Unpin; use std::net::SocketAddr; +use std::time::Duration; use std::time::SystemTime; use anyhow::bail; @@ -39,6 +40,8 @@ use crate::models::blockchain::transaction::Transaction; use crate::models::channel::MainToPeerTask; use crate::models::channel::PeerTaskToMain; use crate::models::channel::PeerTaskToMainTransaction; +use crate::models::peer::bootstrap_info::BootstrapInfo; +use crate::models::peer::bootstrap_info::BootstrapStatus; use crate::models::peer::handshake_data::HandshakeData; use crate::models::peer::peer_info::PeerConnectionInfo; use crate::models::peer::peer_info::PeerInfo; @@ -203,6 +206,87 @@ impl PeerLoopHandler { sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}")) } + /// Potentially update a peer's bootstrap status. + /// + /// Per connection, the bootstrap status can be set once without + /// incurring [punishment](Self::punish). After a cooldown period, the + /// bootstrap status can be set again. The cooldown exists to prevent peers + /// from forcing many write-lock acquisition of the global state lock. + /// + /// This method only acquires a write lock for the global state lock if + /// necessary. + /// + /// # Locking: + /// * acquires `global_state_lock` for read + /// * might acquire `global_state_lock` for write + async fn handle_bootstrap_status_message(&mut self, status: BootstrapStatus) -> Result<()> { + const BOOTSTRAP_STATUS_UPDATE_COOLDOWN_PERIOD: Duration = Duration::from_secs(5 * 60); + + let peer_address = self.peer_address; + let Some(connection_start_time) = self + .global_state_lock + .lock_guard() + .await + .net + .peer_map + .get(&peer_address) + .map(|info| info.own_timestamp_connection_established) + else { + warn!("Peer {peer_address} not found in peer map when updating bootstrap status."); + return Ok(()); // not great, not ban-worthy + }; + let last_update_time = self + .global_state_lock + .lock_guard() + .await + .net + .bootstrap_status + .get(&peer_address) + .map(|info| info.last_set) + .unwrap_or(SystemTime::UNIX_EPOCH); + + if last_update_time < connection_start_time { + debug!("Setting bootstrap status of peer {peer_address} to \"{status}\""); + self.global_state_lock + .lock_guard_mut() + .await + .net + .bootstrap_status + .insert(peer_address, BootstrapInfo::new(status)); + return Ok(()); + } + + let Ok(duration_since_last_update) = SystemTime::now().duration_since(last_update_time) + else { + warn!("Last bootstrap update of peer {peer_address} is in the future."); + return Ok(()); // not great, not ban-worthy + }; + if duration_since_last_update < BOOTSTRAP_STATUS_UPDATE_COOLDOWN_PERIOD { + info!( + "Punishing peer {peer_address} for bootstrap update within cooldown period. \ + Update received after {duration} seconds of last update, \ + cooldown period is {cooldown} seconds.", + duration = duration_since_last_update.as_secs(), + cooldown = BOOTSTRAP_STATUS_UPDATE_COOLDOWN_PERIOD.as_secs(), + ); + self.punish(NegativePeerSanction::BootstrapStatusUpdateSpam) + .await?; + return Ok(()); + } + + debug!("Updating bootstrap status of peer {peer_address} to \"{status}\""); + self.global_state_lock + .lock_guard_mut() + .await + .net + .bootstrap_status + .insert(peer_address, BootstrapInfo::new(status)); + self.punish(NegativePeerSanction::BootstrapStatusUpdate) + .await?; + + Ok(()) + } + /// Construct a batch response, with blocks and their MMR membership proofs /// relative to a specified anchor. /// @@ -605,12 +689,10 @@ impl PeerLoopHandler { .await?; } self.to_main_tx - .send(PeerTaskToMain::PeerDiscoveryAnswer(( + .send(PeerTaskToMain::PeerDiscoveryAnswer( peers, - self.peer_address, - // The distance to the revealed peers is 1 + this peer's distance self.distance + 1, - ))) + )) .await?; Ok(KEEP_CONNECTION_ALIVE) } @@ -1544,6 +1626,10 @@ impl PeerLoopHandler { Ok(KEEP_CONNECTION_ALIVE) } + PeerMessage::BootstrapStatus(status) => { + self.handle_bootstrap_status_message(status).await?; + Ok(KEEP_CONNECTION_ALIVE) + } } }