18
18
* limitations under the License.
19
19
*/
20
20
21
+ #include " flow/IRandom.h"
21
22
#include " fmt/format.h"
22
23
#include " fdbserver/NetworkTest.h"
23
24
#include " flow/Knobs.h"
@@ -358,6 +359,10 @@ struct P2PNetworkTest {
358
359
// Random delay before socket writes
359
360
RandomIntRange waitWriteMilliseconds;
360
361
362
+ double randomCloseMaxDelay = -1 ; // Maximum delay before closing a connection
363
+
364
+ double probabilityNotCloseConn = 0 ;
365
+
361
366
double startTime;
362
367
int64_t bytesSent;
363
368
int64_t bytesReceived;
@@ -400,10 +405,13 @@ struct P2PNetworkTest {
400
405
RandomIntRange requests,
401
406
RandomIntRange idleMilliseconds,
402
407
RandomIntRange waitReadMilliseconds,
403
- RandomIntRange waitWriteMilliseconds)
408
+ RandomIntRange waitWriteMilliseconds,
409
+ double randomCloseMaxDelay,
410
+ double probabilityNotCloseConn)
404
411
: connectionsOut(connectionsOut), requestBytes(sendMsgBytes), replyBytes(recvMsgBytes), requests(requests),
405
412
idleMilliseconds (idleMilliseconds), waitReadMilliseconds(waitReadMilliseconds),
406
- waitWriteMilliseconds(waitWriteMilliseconds) {
413
+ waitWriteMilliseconds(waitWriteMilliseconds), randomCloseMaxDelay(randomCloseMaxDelay),
414
+ probabilityNotCloseConn(probabilityNotCloseConn) {
407
415
bytesSent = 0 ;
408
416
bytesReceived = 0 ;
409
417
sessionsIn = 0 ;
@@ -494,18 +502,30 @@ struct P2PNetworkTest {
494
502
return Void ();
495
503
}
496
504
505
+ ACTOR static Future<Void> randomDelayedConnClose (P2PNetworkTest* self, Reference<IConnection> conn) {
506
+ wait (delay (deterministicRandom ()->random01 () * self->randomCloseMaxDelay / 1000.0 ));
507
+ conn->close ();
508
+ return Void ();
509
+ }
510
+
497
511
ACTOR static Future<Void> doSession (P2PNetworkTest* self, Reference<IConnection> conn, bool incoming) {
498
512
state int numRequests;
513
+ state Future<Void> randomClose;
499
514
500
515
try {
501
516
if (incoming) {
517
+ if (self->randomCloseMaxDelay > -1 ) {
518
+ randomClose = randomDelayedConnClose (self, conn);
519
+ }
502
520
wait (conn->acceptHandshake ());
503
-
504
521
// Read the number of requests for the session
505
522
Standalone<StringRef> buf = wait (readMsg (self, conn));
506
523
ASSERT (buf.size () == sizeof (int ));
507
524
numRequests = *(int *)buf.begin ();
508
525
} else {
526
+ if (self->randomCloseMaxDelay > -1 ) {
527
+ randomClose = randomDelayedConnClose (self, conn);
528
+ }
509
529
wait (conn->connectHandshake ());
510
530
511
531
// Pick the number of requests for the session and send it to remote
@@ -532,7 +552,9 @@ struct P2PNetworkTest {
532
552
}
533
553
534
554
wait (delay (self->idleMilliseconds .get () / 1e3 ));
535
- conn->close ();
555
+ if (deterministicRandom ()->random01 () > self->probabilityNotCloseConn ) {
556
+ conn->close ();
557
+ }
536
558
537
559
if (incoming) {
538
560
++self->sessionsIn ;
@@ -653,7 +675,9 @@ TEST_CASE(":/network/p2ptest") {
653
675
params.get (" requests" ).orDefault (" 10:10000" ),
654
676
params.get (" idleMilliseconds" ).orDefault (" 0" ),
655
677
params.get (" waitReadMilliseconds" ).orDefault (" 0" ),
656
- params.get (" waitWriteMilliseconds" ).orDefault (" 0" ));
678
+ params.get (" waitWriteMilliseconds" ).orDefault (" 0" ),
679
+ params.getDouble (" randomCloseMaxDelay" ).orDefault (-1 ),
680
+ params.getDouble (" probabilityNotCloseConn" ).orDefault (0.0 ));
657
681
658
682
wait (p2p.run ());
659
683
return Void ();
0 commit comments