8
8
use React \EventLoop \LoopInterface ;
9
9
use React \Http \Message \Response ;
10
10
use React \Http \Message \ServerRequest ;
11
+ use React \Http \Middleware \InactiveConnectionTimeoutMiddleware ;
11
12
use React \Promise ;
12
13
use React \Promise \CancellablePromiseInterface ;
13
14
use React \Promise \PromiseInterface ;
@@ -85,6 +86,7 @@ final class StreamingServer extends EventEmitter
85
86
private $ callback ;
86
87
private $ parser ;
87
88
private $ loop ;
89
+ private $ idleConnectionTimeout ;
88
90
89
91
/**
90
92
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
@@ -96,15 +98,17 @@ final class StreamingServer extends EventEmitter
96
98
*
97
99
* @param LoopInterface $loop
98
100
* @param callable $requestHandler
101
+ * @param float $idleConnectTimeout
99
102
* @see self::listen()
100
103
*/
101
- public function __construct (LoopInterface $ loop , $ requestHandler )
104
+ public function __construct (LoopInterface $ loop , $ requestHandler, $ idleConnectTimeout = InactiveConnectionTimeoutMiddleware:: DEFAULT_TIMEOUT )
102
105
{
103
106
if (!\is_callable ($ requestHandler )) {
104
107
throw new \InvalidArgumentException ('Invalid request handler given ' );
105
108
}
106
109
107
110
$ this ->loop = $ loop ;
111
+ $ this ->idleConnectionTimeout = $ idleConnectTimeout ;
108
112
109
113
$ this ->callback = $ requestHandler ;
110
114
$ this ->parser = new RequestHeaderParser ();
@@ -134,7 +138,27 @@ public function __construct(LoopInterface $loop, $requestHandler)
134
138
*/
135
139
public function listen (ServerInterface $ socket )
136
140
{
137
- $ socket ->on ('connection ' , array ($ this ->parser , 'handle ' ));
141
+ $ socket ->on ('connection ' , array ($ this , 'handle ' ));
142
+ }
143
+
144
+ /** @internal */
145
+ public function handle (ConnectionInterface $ conn )
146
+ {
147
+ $ timer = $ this ->loop ->addTimer ($ this ->idleConnectionTimeout , function () use ($ conn ) {
148
+ $ conn ->close ();
149
+ });
150
+ $ loop = $ this ->loop ;
151
+ $ conn ->once ('data ' , function () use ($ loop , $ timer ) {
152
+ $ loop ->cancelTimer ($ timer );
153
+ });
154
+ $ conn ->on ('end ' , function () use ($ loop , $ timer ) {
155
+ $ loop ->cancelTimer ($ timer );
156
+ });
157
+ $ conn ->on ('close ' , function () use ($ loop , $ timer ) {
158
+ $ loop ->cancelTimer ($ timer );
159
+ });
160
+
161
+ $ this ->parser ->handle ($ conn );
138
162
}
139
163
140
164
/** @internal */
@@ -352,7 +376,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
352
376
353
377
// either wait for next request over persistent connection or end connection
354
378
if ($ persist ) {
355
- $ this ->parser -> handle ($ connection );
379
+ $ this ->handle ($ connection );
356
380
} else {
357
381
$ connection ->end ();
358
382
}
@@ -373,10 +397,10 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
373
397
// write streaming body and then wait for next request over persistent connection
374
398
if ($ persist ) {
375
399
$ body ->pipe ($ connection , array ('end ' => false ));
376
- $ parser = $ this -> parser ;
377
- $ body ->on ('end ' , function () use ($ connection , $ parser , $ body ) {
400
+ $ that = $ this ;
401
+ $ body ->on ('end ' , function () use ($ connection , $ that , $ body ) {
378
402
$ connection ->removeListener ('close ' , array ($ body , 'close ' ));
379
- $ parser ->handle ($ connection );
403
+ $ that ->handle ($ connection );
380
404
});
381
405
} else {
382
406
$ body ->pipe ($ connection );
0 commit comments