@@ -9,6 +9,7 @@ use std::{
99
1010use async_trait:: async_trait;
1111use bytes:: Bytes ;
12+ use futures:: future:: { self , AbortHandle } ;
1213use log:: { debug, error, trace, warn} ;
1314use lru_time_cache:: LruCache ;
1415use shadowsocks:: {
@@ -23,7 +24,6 @@ use spin::Mutex as SpinMutex;
2324use tokio:: {
2425 net:: UdpSocket ,
2526 sync:: { mpsc, Mutex } ,
26- task:: JoinHandle ,
2727 time,
2828} ;
2929
5353 respond_writer : W ,
5454 context : Arc < ServiceContext > ,
5555 assoc_map : SharedAssociationMap < W > ,
56- cleanup_abortable : JoinHandle < ( ) > ,
57- keepalive_abortable : JoinHandle < ( ) > ,
56+ cleanup_abortable : AbortHandle ,
57+ keepalive_abortable : AbortHandle ,
5858 keepalive_tx : mpsc:: Sender < SocketAddr > ,
5959 balancer : PingBalancer ,
6060}
@@ -89,25 +89,29 @@ where
8989
9090 let cleanup_abortable = {
9191 let assoc_map = assoc_map. clone ( ) ;
92- tokio :: spawn ( async move {
92+ let ( cleanup_task , cleanup_abortable ) = future :: abortable ( async move {
9393 loop {
9494 time:: sleep ( time_to_live) . await ;
9595
9696 // cleanup expired associations. iter() will remove expired elements
9797 let _ = assoc_map. lock ( ) . await . iter ( ) ;
9898 }
99- } )
99+ } ) ;
100+ tokio:: spawn ( cleanup_task) ;
101+ cleanup_abortable
100102 } ;
101103
102- let ( keepalive_tx, mut keepalive_rx) = mpsc:: channel ( 64 ) ;
104+ let ( keepalive_tx, mut keepalive_rx) = mpsc:: channel ( 256 ) ;
103105
104106 let keepalive_abortable = {
105107 let assoc_map = assoc_map. clone ( ) ;
106- tokio :: spawn ( async move {
108+ let ( keepalive_task , keepalive_abortable ) = future :: abortable ( async move {
107109 while let Some ( peer_addr) = keepalive_rx. recv ( ) . await {
108110 assoc_map. lock ( ) . await . get ( & peer_addr) ;
109111 }
110- } )
112+ } ) ;
113+ tokio:: spawn ( keepalive_task) ;
114+ keepalive_abortable
111115 } ;
112116
113117 UdpAssociationManager {
@@ -195,7 +199,7 @@ enum UdpAssociationBypassState {
195199 Empty ,
196200 Connected {
197201 socket : Arc < UdpSocket > ,
198- abortable : JoinHandle < io :: Result < ( ) > > ,
202+ abortable : AbortHandle ,
199203 } ,
200204 Aborted ,
201205}
@@ -213,7 +217,7 @@ impl UdpAssociationBypassState {
213217 UdpAssociationBypassState :: Empty
214218 }
215219
216- fn set_connected ( & mut self , socket : Arc < UdpSocket > , abortable : JoinHandle < io :: Result < ( ) > > ) {
220+ fn set_connected ( & mut self , socket : Arc < UdpSocket > , abortable : AbortHandle ) {
217221 * self = UdpAssociationBypassState :: Connected { socket, abortable } ;
218222 }
219223
@@ -226,7 +230,7 @@ enum UdpAssociationProxyState {
226230 Empty ,
227231 Connected {
228232 socket : Arc < MonProxySocket > ,
229- abortable : JoinHandle < io :: Result < ( ) > > ,
233+ abortable : AbortHandle ,
230234 } ,
231235 Aborted ,
232236}
@@ -247,7 +251,7 @@ impl UdpAssociationProxyState {
247251 * self = UdpAssociationProxyState :: Empty ;
248252 }
249253
250- fn set_connected ( & mut self , socket : Arc < MonProxySocket > , abortable : JoinHandle < io :: Result < ( ) > > ) {
254+ fn set_connected ( & mut self , socket : Arc < MonProxySocket > , abortable : AbortHandle ) {
251255 self . abort_inner ( ) ;
252256 * self = UdpAssociationProxyState :: Connected { socket, abortable } ;
253257 }
@@ -390,12 +394,13 @@ where
390394 ShadowUdpSocket :: connect_any_with_opts ( & target_addr, self . context . connect_opts_ref ( ) ) . await ?;
391395 let socket: Arc < UdpSocket > = Arc :: new ( socket. into ( ) ) ;
392396
393- // CLIENT <- REMOTE
394- let r2l_abortable = {
397+ let ( r2l_fut, r2l_abortable) = {
395398 let assoc = self . clone ( ) ;
396- tokio :: spawn ( assoc. copy_bypassed_r2l ( socket. clone ( ) ) )
399+ future :: abortable ( assoc. copy_bypassed_r2l ( socket. clone ( ) ) )
397400 } ;
398401
402+ // CLIENT <- REMOTE
403+ tokio:: spawn ( r2l_fut) ;
399404 debug ! (
400405 "created udp association for {} (bypassed) with {:?}" ,
401406 self . peer_addr,
@@ -444,12 +449,13 @@ where
444449 ShadowUdpSocket :: connect_any_with_opts ( & target_addr, self . context . connect_opts_ref ( ) ) . await ?;
445450 let socket: Arc < UdpSocket > = Arc :: new ( socket. into ( ) ) ;
446451
447- // CLIENT <- REMOTE
448- let r2l_abortable = {
452+ let ( r2l_fut, r2l_abortable) = {
449453 let assoc = self . clone ( ) ;
450- tokio :: spawn ( assoc. copy_bypassed_r2l ( socket. clone ( ) ) )
454+ future :: abortable ( assoc. copy_bypassed_r2l ( socket. clone ( ) ) )
451455 } ;
452456
457+ // CLIENT <- REMOTE
458+ tokio:: spawn ( r2l_fut) ;
453459 debug ! (
454460 "created udp association for {} (bypassed) with {:?}" ,
455461 self . peer_addr,
@@ -509,12 +515,14 @@ where
509515 let socket = MonProxySocket :: from_socket ( socket, self . context . flow_stat ( ) ) ;
510516 let socket = Arc :: new ( socket) ;
511517
512- // CLIENT <- REMOTE
513- let r2l_abortable = {
518+ let ( r2l_fut, r2l_abortable) = {
514519 let assoc = self . clone ( ) ;
515- tokio :: spawn ( assoc. copy_proxied_r2l ( socket. clone ( ) ) )
520+ future :: abortable ( assoc. copy_proxied_r2l ( socket. clone ( ) ) )
516521 } ;
517522
523+ // CLIENT <- REMOTE
524+ tokio:: spawn ( r2l_fut) ;
525+
518526 debug ! (
519527 "created udp association for {} <-> {} (proxied) with {:?}" ,
520528 self . peer_addr,
0 commit comments