@@ -32,22 +32,31 @@ impl TcpRelayClientHandshake {
3232 pub fn handshake ( self ) -> BoxIoFuture < TcpRelayClientPending > {
3333 let TcpRelayClientHandshake { s, svr_cfg } = self ;
3434
35- let peer_addr = s. peer_addr ( ) . expect ( "Failed to get peer addr for client" ) ;
36- debug ! ( "Handshaking with peer {}" , peer_addr) ;
37-
38- let timeout = * svr_cfg. timeout ( ) ;
39- let fut = proxy_handshake ( s, svr_cfg) . and_then ( move |( r_fut, w_fut) | {
40- r_fut. and_then ( move |r| {
41- let fut = Address :: read_from ( r) . map_err ( move |_| {
42- io:: Error :: new ( ErrorKind :: Other ,
43- format ! ( "failed to decode Address, may be wrong method or key, peer: {}" , peer_addr) )
44- } ) ;
45- Context :: with ( |ctx| try_timeout ( fut, timeout, ctx. handle ( ) ) )
46- } )
47- . map ( move |( r, addr) | TcpRelayClientPending { r : r,
48- addr : addr,
49- w : w_fut,
50- timeout : timeout, } )
35+ let fut = futures:: lazy ( move || s. peer_addr ( ) . map ( |p| ( s, p) ) ) . and_then ( |( s, peer_addr) | {
36+ debug ! ( "Handshaking with peer {}" , peer_addr) ;
37+
38+ let timeout = * svr_cfg. timeout ( ) ;
39+ proxy_handshake ( s, svr_cfg) . and_then ( move |( r_fut, w_fut) | {
40+ r_fut
41+ . and_then ( move |r| {
42+ let fut = Address :: read_from ( r) . map_err ( move |_| {
43+ io:: Error :: new (
44+ ErrorKind :: Other ,
45+ format ! (
46+ "failed to decode Address, may be wrong method or key, peer: {}" ,
47+ peer_addr
48+ ) ,
49+ )
50+ } ) ;
51+ Context :: with ( |ctx| try_timeout ( fut, timeout, ctx. handle ( ) ) )
52+ } )
53+ . map ( move |( r, addr) | TcpRelayClientPending {
54+ r : r,
55+ addr : addr,
56+ w : w_fut,
57+ timeout : timeout,
58+ } )
59+ } )
5160 } ) ;
5261 boxed_future ( fut)
5362 }
@@ -68,20 +77,18 @@ impl TcpRelayClientPending {
6877 debug ! ( "Connecting to remote {}" , addr) ;
6978
7079 match addr {
71- Address :: SocketAddress ( saddr) => {
72- Context :: with ( move |ctx| {
73- if ctx. forbidden_ip ( ) . contains ( & saddr. ip ( ) ) {
74- let err = io:: Error :: new ( ErrorKind :: Other ,
75- format ! ( "{} is forbidden, failed to connect {}" ,
76- saddr. ip( ) ,
77- saddr) ) ;
78- return boxed_future ( futures:: done ( Err ( err) ) ) ;
79- }
80-
81- let conn = TcpStream :: connect ( & saddr, ctx. handle ( ) ) ;
82- try_timeout ( conn, timeout, ctx. handle ( ) )
83- } )
84- }
80+ Address :: SocketAddress ( saddr) => Context :: with ( move |ctx| {
81+ if ctx. forbidden_ip ( ) . contains ( & saddr. ip ( ) ) {
82+ let err = io:: Error :: new (
83+ ErrorKind :: Other ,
84+ format ! ( "{} is forbidden, failed to connect {}" , saddr. ip( ) , saddr) ,
85+ ) ;
86+ return boxed_future ( futures:: done ( Err ( err) ) ) ;
87+ }
88+
89+ let conn = TcpStream :: connect ( & saddr, ctx. handle ( ) ) ;
90+ try_timeout ( conn, timeout, ctx. handle ( ) )
91+ } ) ,
8592 Address :: DomainNameAddress ( dname, port) => {
8693 let fut = Context :: with ( move |ctx| {
8794 let handle = ctx. handle ( ) . clone ( ) ;
@@ -101,10 +108,12 @@ impl TcpRelayClientPending {
101108 let client_pair = ( self . r , self . w ) ;
102109 let timeout = self . timeout ;
103110 let fut = TcpRelayClientPending :: connect_remote ( self . addr , self . timeout ) ;
104- let fut = fut. map ( move |stream| TcpRelayClientConnected { server : stream. split ( ) ,
105- client : client_pair,
106- addr : addr,
107- timeout : timeout, } ) ;
111+ let fut = fut. map ( move |stream| TcpRelayClientConnected {
112+ server : stream. split ( ) ,
113+ client : client_pair,
114+ addr : addr,
115+ timeout : timeout,
116+ } ) ;
108117 Box :: new ( fut)
109118 }
110119}
@@ -125,9 +134,11 @@ impl TcpRelayClientConnected {
125134 let ( r, w_fut) = self . client ;
126135 let timeout = self . timeout ;
127136
128- tunnel ( self . addr ,
129- r. copy_timeout_opt ( svr_w, self . timeout ) ,
130- w_fut. and_then ( move |w| w. copy_timeout_opt ( svr_r, timeout) ) )
137+ tunnel (
138+ self . addr ,
139+ r. copy_timeout_opt ( svr_w, self . timeout ) ,
140+ w_fut. and_then ( move |w| w. copy_timeout_opt ( svr_r, timeout) ) ,
141+ )
131142 }
132143}
133144
@@ -136,57 +147,56 @@ pub fn run() -> Box<Future<Item = (), Error = io::Error>> {
136147 let mut fut: Option < Box < Future < Item = ( ) , Error = io:: Error > > > = None ;
137148
138149 Context :: with ( |ctx| {
139- let config = ctx. config ( ) ;
140-
141- for svr_cfg in & config. server {
142- let listener = {
143- let addr = svr_cfg. addr ( ) ;
144- let addr = addr. listen_addr ( ) ;
145-
146- let listener = TcpListener :: bind ( & addr, ctx. handle ( ) )
147- . unwrap_or_else ( |err| panic ! ( "Failed to listen, {}" , err) ) ;
148-
149- info ! ( "ShadowSocks TCP Listening on {}" , addr) ;
150- listener
151- } ;
152-
153- let svr_cfg = Rc :: new ( svr_cfg. clone ( ) ) ;
154- let listening = listener. incoming ( )
155- . for_each ( move |( socket, addr) | {
156- let server_cfg = svr_cfg. clone ( ) ;
157-
158- trace ! ( "Got connection, addr: {}" , addr) ;
159- trace ! ( "Picked proxy server: {:?}" , server_cfg) ;
160-
161- let client = TcpRelayClientHandshake { s : socket,
162- svr_cfg : server_cfg, } ;
163-
164- let fut =
165- client. handshake ( )
166- . and_then ( |c| c. connect ( ) )
167- . and_then ( |c| c. tunnel ( ) )
168- . map_err ( move |err| {
169- error ! ( "Failed to handle client ({}): {}" ,
170- addr, err) ;
171- } ) ;
172-
173- Context :: with ( |ctx| ctx. handle ( ) . spawn ( fut) ) ;
174- Ok ( ( ) )
175- } )
176- . map_err ( |err| {
177- error ! ( "Server run failed: {}" , err) ;
178- err
179- } ) ;
180-
181- fut = Some ( match fut. take ( ) {
182- Some ( fut) => {
183- Box :: new ( fut. join ( listening) . map ( |_| ( ) ) )
184- as Box < Future < Item = ( ) , Error = io:: Error > >
185- }
186- None => Box :: new ( listening) as Box < Future < Item = ( ) , Error = io:: Error > > ,
187- } )
188- }
189-
190- fut. expect ( "Must have at least one server" )
191- } )
150+ let config = ctx. config ( ) ;
151+
152+ for svr_cfg in & config. server {
153+ let listener = {
154+ let addr = svr_cfg. addr ( ) ;
155+ let addr = addr. listen_addr ( ) ;
156+
157+ let listener =
158+ TcpListener :: bind ( & addr, ctx. handle ( ) ) . unwrap_or_else ( |err| panic ! ( "Failed to listen, {}" , err) ) ;
159+
160+ info ! ( "ShadowSocks TCP Listening on {}" , addr) ;
161+ listener
162+ } ;
163+
164+ let svr_cfg = Rc :: new ( svr_cfg. clone ( ) ) ;
165+ let listening = listener
166+ . incoming ( )
167+ . for_each ( move |( socket, addr) | {
168+ let server_cfg = svr_cfg. clone ( ) ;
169+
170+ trace ! ( "Got connection, addr: {}" , addr) ;
171+ trace ! ( "Picked proxy server: {:?}" , server_cfg) ;
172+
173+ let client = TcpRelayClientHandshake {
174+ s : socket,
175+ svr_cfg : server_cfg,
176+ } ;
177+
178+ let fut = client
179+ . handshake ( )
180+ . and_then ( |c| c. connect ( ) )
181+ . and_then ( |c| c. tunnel ( ) )
182+ . map_err ( move |err| {
183+ error ! ( "Failed to handle client ({}): {}" , addr, err) ;
184+ } ) ;
185+
186+ Context :: with ( |ctx| ctx. handle ( ) . spawn ( fut) ) ;
187+ Ok ( ( ) )
188+ } )
189+ . map_err ( |err| {
190+ error ! ( "Server run failed: {}" , err) ;
191+ err
192+ } ) ;
193+
194+ fut = Some ( match fut. take ( ) {
195+ Some ( fut) => Box :: new ( fut. join ( listening) . map ( |_| ( ) ) ) as Box < Future < Item = ( ) , Error = io:: Error > > ,
196+ None => Box :: new ( listening) as Box < Future < Item = ( ) , Error = io:: Error > > ,
197+ } )
198+ }
199+
200+ fut. expect ( "Must have at least one server" )
201+ } )
192202}
0 commit comments