@@ -364,33 +364,50 @@ struct P2PNetworkTest {
364
364
double probabilityNotCloseConn = 0 ;
365
365
366
366
double startTime;
367
- int64_t bytesSent;
368
- int64_t bytesReceived;
369
- int sessionsIn;
370
- int sessionsOut;
371
- int connectErrors;
372
- int acceptErrors;
373
- int sessionErrors;
367
+ int64_t bytesSent = 0 ;
368
+ int64_t bytesReceived = 0 ;
369
+ int sessionsIn = 0 ;
370
+ int sessionsOut = 0 ;
371
+ int connectErrors = 0 ;
372
+ int acceptErrors = 0 ;
373
+ int sessionErrors = 0 ;
374
+ int handshakeReqIn = 0 ;
375
+ int handshakeReqOut = 0 ;
376
+ int handshakeDoneIn = 0 ;
377
+ int handshakeDoneOut = 0 ;
374
378
375
379
Standalone<StringRef> msgBuffer;
376
380
377
381
std::string statsString () {
378
382
double elapsed = now () - startTime;
379
- std::string s = format (
380
- " %.2f MB/s bytes in %.2f MB/s bytes out %.2f/s completed sessions in %.2f/s completed sessions out " ,
381
- bytesReceived / elapsed / 1e6 ,
382
- bytesSent / elapsed / 1e6 ,
383
- sessionsIn / elapsed,
384
- sessionsOut / elapsed);
385
- s += format (" Total Errors %d connect=%d accept=%d session=%d" ,
386
- connectErrors + acceptErrors + sessionErrors,
387
- connectErrors,
388
- acceptErrors,
389
- sessionErrors);
383
+ std::string s = format (" %.2f MB/s in; %.2f MB/s out\n Arrive: %.2f/s in HS req; %.2f/s "
384
+ " out HS req\n Processed %.2f/s in HS req; %.2f/s out HS req\n "
385
+ " Completed Session %.2f/s in sessions; %.2f/s "
386
+ " out sessions\n " ,
387
+ bytesReceived / elapsed / 1e6 ,
388
+ bytesSent / elapsed / 1e6 ,
389
+ handshakeReqIn / elapsed,
390
+ handshakeReqOut / elapsed,
391
+ handshakeDoneIn / elapsed,
392
+ handshakeDoneOut / elapsed,
393
+ sessionsIn / elapsed,
394
+ sessionsOut / elapsed);
395
+ s += format (" Total Errors %.2f/s ConnectError=%.2f/s AcceptError=%.2f/s SessionError=%.2f/s" ,
396
+ (connectErrors + acceptErrors + sessionErrors) / elapsed,
397
+ connectErrors / elapsed,
398
+ acceptErrors / elapsed,
399
+ sessionErrors / elapsed);
390
400
bytesSent = 0 ;
391
401
bytesReceived = 0 ;
392
402
sessionsIn = 0 ;
393
403
sessionsOut = 0 ;
404
+ connectErrors = 0 ;
405
+ acceptErrors = 0 ;
406
+ sessionErrors = 0 ;
407
+ handshakeReqIn = 0 ;
408
+ handshakeReqOut = 0 ;
409
+ handshakeDoneIn = 0 ;
410
+ handshakeDoneOut = 0 ;
394
411
startTime = now ();
395
412
return s;
396
413
}
@@ -419,6 +436,10 @@ struct P2PNetworkTest {
419
436
connectErrors = 0 ;
420
437
acceptErrors = 0 ;
421
438
sessionErrors = 0 ;
439
+ handshakeReqIn = 0 ;
440
+ handshakeReqOut = 0 ;
441
+ handshakeDoneIn = 0 ;
442
+ handshakeDoneOut = 0 ;
422
443
msgBuffer = makeString (std::max (sendMsgBytes.max , recvMsgBytes.max ));
423
444
424
445
if (!remoteAddresses.empty ()) {
@@ -514,19 +535,23 @@ struct P2PNetworkTest {
514
535
515
536
try {
516
537
if (incoming) {
538
+ self->handshakeReqIn ++;
517
539
if (self->randomCloseMaxDelay > -1 ) {
518
540
randomClose = randomDelayedConnClose (self, conn);
519
541
}
520
542
wait (conn->acceptHandshake ());
543
+ self->handshakeDoneIn ++;
521
544
// Read the number of requests for the session
522
545
Standalone<StringRef> buf = wait (readMsg (self, conn));
523
546
ASSERT (buf.size () == sizeof (int ));
524
547
numRequests = *(int *)buf.begin ();
525
548
} else {
549
+ self->handshakeReqOut ++;
526
550
if (self->randomCloseMaxDelay > -1 ) {
527
551
randomClose = randomDelayedConnClose (self, conn);
528
552
}
529
553
wait (conn->connectHandshake ());
554
+ self->handshakeDoneOut ++;
530
555
531
556
// Pick the number of requests for the session and send it to remote
532
557
numRequests = self->requests .get ();
@@ -590,10 +615,8 @@ struct P2PNetworkTest {
590
615
591
616
ACTOR static Future<Void> incoming (P2PNetworkTest* self, Reference<IListener> listener) {
592
617
state ActorCollection sessions (false );
593
-
618
+ state uint64_t connectionCount = 0 ;
594
619
loop {
595
- wait (delay (0 , TaskPriority::AcceptSocket));
596
-
597
620
try {
598
621
state Reference<IConnection> conn = wait (listener->accept ());
599
622
// printf("Connected from %s\n", conn->getPeerAddress().toString().c_str());
@@ -602,6 +625,10 @@ struct P2PNetworkTest {
602
625
++self->acceptErrors ;
603
626
TraceEvent (SevError, " P2PIncomingError" ).error (e).detail (" Listener" , listener->getListenAddress ());
604
627
}
628
+ connectionCount++;
629
+ if (connectionCount % (FLOW_KNOBS->ACCEPT_BATCH_SIZE ) == 0 ) {
630
+ wait (delay (0 , TaskPriority::AcceptSocket));
631
+ }
605
632
}
606
633
}
607
634
0 commit comments