Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flow/Knobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( TLS_HANDSHAKE_THREAD_STACKSIZE, 64 * 1024 );
init( TLS_MALLOC_ARENA_MAX, 6 );
init( TLS_HANDSHAKE_LIMIT, 1000 );
init( TLS_HANDSHAKE_FLOWLOCK_HIGH_PRIORITY, false );

init( NETWORK_TEST_CLIENT_COUNT, 30 );
init( NETWORK_TEST_REPLY_SIZE, 600e3 );
Expand Down
104 changes: 90 additions & 14 deletions flow/Net2.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,25 +363,38 @@ class BindPromise {
std::variant<const char*, AuditedEvent> errContext;
UID errID;
NetworkAddress peerAddr;
double createTime = 0.0;
double startTime = 0.0;
double startHandshakeTime = 0.0;

public:
BindPromise(const char* errContext, UID errID) : errContext(errContext), errID(errID) {}
BindPromise(AuditedEvent auditedEvent, UID errID) : errContext(auditedEvent), errID(errID) {}
BindPromise(BindPromise const& r) : p(r.p), errContext(r.errContext), errID(r.errID), peerAddr(r.peerAddr) {}
BindPromise(BindPromise const& r)
: p(r.p), errContext(r.errContext), errID(r.errID), peerAddr(r.peerAddr), createTime(r.createTime),
startTime(r.startTime), startHandshakeTime(r.startHandshakeTime) {}
BindPromise(BindPromise&& r) noexcept
: p(std::move(r.p)), errContext(r.errContext), errID(r.errID), peerAddr(r.peerAddr) {}
: p(std::move(r.p)), errContext(r.errContext), errID(r.errID), peerAddr(r.peerAddr), createTime(r.createTime),
startTime(r.startTime), startHandshakeTime(r.startHandshakeTime) {}

Future<Void> getFuture() const { return p.getFuture(); }

NetworkAddress getPeerAddr() const { return peerAddr; }

void setPeerAddr(const NetworkAddress& addr) { peerAddr = addr; }

void setTimeMetrics(double inputCreateTime, double inputStartTime, double inputStartHandshakeTime) {
createTime = inputCreateTime;
startTime = inputStartTime;
startHandshakeTime = inputStartHandshakeTime;
}

void operator()(const boost::system::error_code& error, size_t bytesWritten = 0) {
try {
if (error) {
// Log the error...
{
double currentTime = now();
std::optional<TraceEvent> traceEvent;
if (std::holds_alternative<AuditedEvent>(errContext))
traceEvent.emplace(SevWarn, std::get<AuditedEvent>(errContext), errID);
Expand All @@ -400,6 +413,15 @@ class BindPromise {
evt.detail("PeerAddr", peerAddr);
evt.detail("PeerAddress", peerAddr);
}
if (createTime > 0) {
evt.detail("Duration", currentTime - createTime);
}
if (startTime > 0) {
evt.detail("QueuingTime", currentTime - startTime);
}
if (startHandshakeTime > 0) {
evt.detail("ServiceTime", currentTime - startHandshakeTime);
}
}

p.sendError(connection_failed());
Expand Down Expand Up @@ -832,14 +854,24 @@ struct SSLHandshakerThread final : IThreadPoolReceiver {
return std::move(o).str();
}

void setTimeMetrics(double inputCreateTime, double inputStartTime, double inputStartHandshakeTime) {
createTime = inputCreateTime;
startTime = inputStartTime;
startHandshakeTime = inputStartHandshakeTime;
}

ThreadReturnPromise<Void> done;
ssl_socket& socket;
ssl_socket::handshake_type type;
boost::system::error_code err;
double createTime = 0.0;
double startTime = 0.0;
double startHandshakeTime = 0.0;
};

void action(Handshake& h) {
try {
double currentTime = now();
h.socket.next_layer().non_blocking(false, h.err);
if (!h.err.failed()) {
h.socket.handshake(h.type, h.err);
Expand All @@ -848,14 +880,23 @@ struct SSLHandshakerThread final : IThreadPoolReceiver {
h.socket.next_layer().non_blocking(true, h.err);
}
if (h.err.failed()) {
TraceEvent(SevWarn,
h.type == ssl_socket::handshake_type::client ? "N2_ConnectHandshakeError"_audit
: "N2_AcceptHandshakeError"_audit)
.detail("PeerAddr", h.getPeerAddress())
.detail("PeerAddress", h.getPeerAddress())
.detail("ErrorCode", h.err.value())
.detail("ErrorMsg", h.err.message().c_str())
.detail("BackgroundThread", true);
TraceEvent evt(SevWarn,
h.type == ssl_socket::handshake_type::client ? "N2_ConnectHandshakeError"_audit
: "N2_AcceptHandshakeError"_audit);
evt.detail("PeerAddr", h.getPeerAddress());
evt.detail("PeerAddress", h.getPeerAddress());
evt.detail("ErrorCode", h.err.value());
evt.detail("ErrorMsg", h.err.message().c_str());
evt.detail("BackgroundThread", true);
if (h.createTime > 0) {
evt.detail("Duration", currentTime - h.createTime);
}
if (h.startTime > 0) {
evt.detail("QueuingTime", currentTime - h.startTime);
}
if (h.startHandshakeTime > 0) {
evt.detail("ServiceTime", currentTime - h.startHandshakeTime);
}
h.done.sendError(connection_failed());
} else {
h.done.send(Void());
Expand All @@ -882,11 +923,11 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
explicit SSLConnection(boost::asio::io_service& io_service,
Reference<ReferencedObject<boost::asio::ssl::context>> context)
: id(nondeterministicRandom()->randomUniqueID()), socket(io_service), ssl_sock(socket, context->mutate()),
sslContext(context), has_trusted_peer(false) {}
sslContext(context), has_trusted_peer(false), createTime(now()) {}

explicit SSLConnection(Reference<ReferencedObject<boost::asio::ssl::context>> context, tcp::socket* existingSocket)
: id(nondeterministicRandom()->randomUniqueID()), socket(std::move(*existingSocket)),
ssl_sock(socket, context->mutate()), sslContext(context) {}
ssl_sock(socket, context->mutate()), sslContext(context), createTime(now()) {}

// This is not part of the IConnection interface, because it is wrapped by INetwork::connect()
ACTOR static Future<Reference<IConnection>> connect(boost::asio::io_service* ios,
Expand Down Expand Up @@ -924,6 +965,7 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>

wait(onConnected);
self->init();
self->startTime = now();
return self;
} catch (Error&) {
// Either the connection failed, or was cancelled by the caller
Expand All @@ -940,6 +982,8 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>

ACTOR static void doAcceptHandshake(Reference<SSLConnection> self, Promise<Void> connected) {
state Hold<int> holder;
state double startHandshakeTime = now();
state bool doBackgroundHandshake = false;

try {
Future<Void> onHandshook;
Expand All @@ -955,18 +999,31 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
holder = Hold(&N2::g_net2->sslPoolHandshakesInProgress);
auto handshake =
new SSLHandshakerThread::Handshake(self->ssl_sock, boost::asio::ssl::stream_base::server);
handshake->setTimeMetrics(self->createTime, self->startTime, startHandshakeTime);
onHandshook = handshake->done.getFuture();
doBackgroundHandshake = true;
N2::g_net2->sslHandshakerPool->post(handshake);
} else {
// Otherwise use flow network thread
g_net2->countServerTLSHandshakesOnMainThread++;
BindPromise p("N2_AcceptHandshakeError"_audit, self->id);
p.setPeerAddr(self->getPeerAddress());
p.setTimeMetrics(self->createTime, self->startTime, startHandshakeTime);
onHandshook = p.getFuture();
doBackgroundHandshake = false;
self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::server, std::move(p));
}
wait(onHandshook);
wait(delay(0, TaskPriority::Handshake));
double currentTime = now();
TraceEvent(SevInfo, "N2_AcceptHandshakeComplete"_audit)
.suppressFor(1.0)
.detail("PeerAddr", self->getPeerAddress())
.detail("PeerAddress", self->getPeerAddress())
.detail("BackgroundThread", doBackgroundHandshake)
.detail("Duration", currentTime - self->createTime)
.detail("QueuingTime", currentTime - self->startTime)
.detail("ServiceTime", currentTime - startHandshakeTime);
connected.send(Void());
} catch (...) {
self->closeSocket();
Expand Down Expand Up @@ -1028,6 +1085,8 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>

ACTOR static void doConnectHandshake(Reference<SSLConnection> self, Promise<Void> connected) {
state Hold<int> holder;
state double startHandshakeTime = now();
state bool doBackgroundHandshake = false;

try {
Future<Void> onHandshook;
Expand All @@ -1050,18 +1109,31 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
holder = Hold(&N2::g_net2->sslPoolHandshakesInProgress);
auto handshake =
new SSLHandshakerThread::Handshake(self->ssl_sock, boost::asio::ssl::stream_base::client);
handshake->setTimeMetrics(self->createTime, self->startTime, startHandshakeTime);
onHandshook = handshake->done.getFuture();
doBackgroundHandshake = true;
N2::g_net2->sslHandshakerPool->post(handshake);
} else {
// Otherwise use flow network thread
g_net2->countClientTLSHandshakesOnMainThread++;
BindPromise p("N2_ConnectHandshakeError"_audit, self->id);
p.setPeerAddr(self->getPeerAddress());
p.setTimeMetrics(self->createTime, self->startTime, startHandshakeTime);
onHandshook = p.getFuture();
doBackgroundHandshake = false;
self->ssl_sock.async_handshake(boost::asio::ssl::stream_base::client, std::move(p));
}
wait(onHandshook);
wait(delay(0, TaskPriority::Handshake));
double currentTime = now();
TraceEvent(SevInfo, "N2_ConnectHandshakeComplete"_audit)
.suppressFor(1.0)
.detail("PeerAddr", self->getPeerAddress())
.detail("PeerAddress", self->getPeerAddress())
.detail("BackgroundThread", doBackgroundHandshake)
.detail("Duration", currentTime - self->createTime)
.detail("QueuingTime", currentTime - self->startTime)
.detail("ServiceTime", currentTime - startHandshakeTime);
connected.send(Void());
} catch (...) {
self->closeSocket();
Expand All @@ -1070,7 +1142,8 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>
}

ACTOR static Future<Void> connectHandshakeWrapper(Reference<SSLConnection> self) {
wait(g_network->networkInfo.handshakeLock->take());
wait(g_network->networkInfo.handshakeLock->take(
FLOW_KNOBS->TLS_HANDSHAKE_FLOWLOCK_HIGH_PRIORITY ? TaskPriority::Handshake : TaskPriority::DefaultYield));
state FlowLock::Releaser releaser(*g_network->networkInfo.handshakeLock);

Promise<Void> connected;
Expand Down Expand Up @@ -1193,6 +1266,9 @@ class SSLConnection final : public IConnection, ReferenceCounted<SSLConnection>

ssl_socket& getSSLSocket() { return ssl_sock; }

double createTime = 0.0;
double startTime = 0.0;

private:
UID id;
tcp::socket socket;
Expand Down Expand Up @@ -1283,7 +1359,7 @@ class SSLListener final : public IListener, ReferenceCounted<SSLListener> {
: IPAddress(peer_endpoint.address().to_v4().to_ulong());

conn->accept(NetworkAddress(peer_address, peer_endpoint.port(), false, true));

conn->startTime = now();
return conn;
} catch (...) {
conn->close();
Expand Down
1 change: 1 addition & 0 deletions flow/include/flow/Knobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class FlowKnobs : public KnobsImpl<FlowKnobs> {
int TLS_HANDSHAKE_THREAD_STACKSIZE;
int TLS_MALLOC_ARENA_MAX;
int TLS_HANDSHAKE_LIMIT;
bool TLS_HANDSHAKE_FLOWLOCK_HIGH_PRIORITY;

int NETWORK_TEST_CLIENT_COUNT;
int NETWORK_TEST_REPLY_SIZE;
Expand Down