@@ -207,80 +207,70 @@ where
207
207
async fn answer_peer_inner < S > (
208
208
stream : S ,
209
209
state : GlobalStateLock ,
210
- peer_address : std :: net :: SocketAddr ,
210
+ peer_address : SocketAddr ,
211
211
main_to_peer_task_rx : broadcast:: Receiver < MainToPeerTask > ,
212
212
peer_task_to_main_tx : mpsc:: Sender < PeerTaskToMain > ,
213
213
own_handshake_data : HandshakeData ,
214
214
) -> Result < ( ) >
215
215
where
216
- S : AsyncRead + AsyncWrite + std :: fmt :: Debug + std :: marker :: Unpin ,
216
+ S : AsyncRead + AsyncWrite + Debug + Unpin ,
217
217
{
218
218
info ! ( "Established incoming TCP connection with {peer_address}" ) ;
219
219
220
220
// Build the communication/serialization/frame handler
221
221
let length_delimited = Framed :: new ( stream, get_codec_rules ( ) ) ;
222
- let mut peer: tokio_serde :: Framed <
222
+ let mut peer = SymmetricallyFramed :: <
223
223
Framed < S , LengthDelimitedCodec > ,
224
224
PeerMessage ,
225
- PeerMessage ,
226
225
Bincode < PeerMessage , PeerMessage > ,
227
- > = SymmetricallyFramed :: new ( length_delimited, SymmetricalBincode :: default ( ) ) ;
226
+ > :: new ( length_delimited, SymmetricalBincode :: default ( ) ) ;
228
227
229
228
// Complete Neptune handshake
230
- let ( peer_handshake_data, acceptance_code) = match peer. try_next ( ) . await ? {
231
- Some ( PeerMessage :: Handshake ( payload) ) => {
232
- let ( v, peer_handshake_data) = * payload;
233
- if v != crate :: MAGIC_STRING_REQUEST {
234
- bail ! ( "Expected magic value, got {:?}" , v) ;
235
- }
236
-
237
- peer. send ( PeerMessage :: Handshake ( Box :: new ( (
238
- crate :: MAGIC_STRING_RESPONSE . to_vec ( ) ,
239
- own_handshake_data. clone ( ) ,
240
- ) ) ) )
241
- . await ?;
242
-
243
- // Verify peer network before moving on
244
- if peer_handshake_data. network != own_handshake_data. network {
245
- bail ! (
246
- "Cannot connect with {}: Peer runs {}, this client runs {}." ,
247
- peer_address,
248
- peer_handshake_data. network,
249
- own_handshake_data. network,
250
- ) ;
251
- }
252
-
253
- // Check if incoming connection is allowed
254
- let connection_status = check_if_connection_is_allowed (
255
- state. clone ( ) ,
256
- & own_handshake_data,
257
- & peer_handshake_data,
258
- & peer_address,
259
- )
260
- . await ;
229
+ let Some ( PeerMessage :: Handshake ( payload) ) = peer. try_next ( ) . await ? else {
230
+ bail ! ( "Didn't get handshake on connection attempt" ) ;
231
+ } ;
232
+ let ( magic_string_request, peer_handshake_data) = * payload;
233
+ if magic_string_request != MAGIC_STRING_REQUEST {
234
+ bail ! ( "Expected magic value, got {magic_string_request:?}" ) ;
235
+ }
261
236
262
- peer. send ( PeerMessage :: ConnectionStatus ( connection_status. into ( ) ) )
263
- . await ?;
237
+ let handshake_response = Box :: new ( ( MAGIC_STRING_RESPONSE . to_vec ( ) , own_handshake_data. clone ( ) ) ) ;
238
+ peer. send ( PeerMessage :: Handshake ( handshake_response) )
239
+ . await ?;
264
240
265
- if let InternalConnectionStatus :: Refused ( refused_reason) = connection_status {
266
- warn ! ( "Incoming connection refused: {:?}" , refused_reason) ;
267
- bail ! ( "Refusing incoming connection. Reason: {:?}" , refused_reason) ;
268
- }
241
+ // Verify peer network before moving on
242
+ let peer_network = peer_handshake_data. network ;
243
+ let own_network = own_handshake_data. network ;
244
+ if peer_network != own_network {
245
+ bail ! (
246
+ "Cannot connect with {peer_address}: \
247
+ Peer runs {peer_network}, this client runs {own_network}."
248
+ ) ;
249
+ }
269
250
270
- ( peer_handshake_data, connection_status)
271
- }
272
- _ => {
273
- bail ! ( "Didn't get handshake on connection attempt" ) ;
274
- }
275
- } ;
251
+ // Check if incoming connection is allowed
252
+ let connection_status = check_if_connection_is_allowed (
253
+ state. clone ( ) ,
254
+ & own_handshake_data,
255
+ & peer_handshake_data,
256
+ & peer_address,
257
+ )
258
+ . await ;
259
+ peer. send ( PeerMessage :: ConnectionStatus ( connection_status. into ( ) ) )
260
+ . await ?;
261
+ if let InternalConnectionStatus :: Refused ( reason) = connection_status {
262
+ let refusal_reason = "Refusing incoming connection. Reason: {reason:?}" ;
263
+ warn ! ( refusal_reason) ;
264
+ bail ! ( refusal_reason) ;
265
+ }
276
266
277
267
// Whether the incoming connection comes from a peer in bad standing is
278
268
// checked in `check_if_connection_is_allowed`. So if we get here, we are
279
269
// good to go.
280
- info ! ( "Connection accepted from {}" , peer_address ) ;
270
+ info ! ( "Connection accepted from {peer_address}" ) ;
281
271
282
272
// If necessary, disconnect from another, existing peer.
283
- if acceptance_code == InternalConnectionStatus :: AcceptedMaxReached && state. cli ( ) . bootstrap {
273
+ if connection_status == InternalConnectionStatus :: AcceptedMaxReached && state. cli ( ) . bootstrap {
284
274
info ! ( "Maximum # peers reached, so disconnecting from an existing peer." ) ;
285
275
peer_task_to_main_tx
286
276
. send ( PeerTaskToMain :: DisconnectFromLongestLivedPeer )
0 commit comments