Skip to content

Commit 8c673e2

Browse files
committed
Add gRPC address to WorkerInterface
1 parent cbcad00 commit 8c673e2

File tree

9 files changed

+25
-11
lines changed

9 files changed

+25
-11
lines changed

fdbcli_lib/CliCommands.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ Future<grpc::Status> getWorkers(Reference<IDatabase> db, const GetWorkersRequest
274274
Worker* w = rep->add_workers();
275275
w->set_address(data.address.toString());
276276
w->set_process_class(data.processClass.toString());
277+
w->set_grpc_address(w->grpc_address());
277278

278279
auto* lc = w->mutable_locality();
279280
localityDataToProto(data.locality, lc);

fdbcli_lib/ExcludeCommand.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ Future<grpc::Status> exclude(Reference<IDatabase> db, const ExcludeRequest* req,
345345
else if (addr.port > 0 && worker->second.count(addr.port) == 0)
346346
absentExclusions.insert(addr);
347347
}
348-
348+
349349
co_return grpc::Status::OK;
350350
} catch (const Error& e) {
351351
co_return grpc::Status(grpc::StatusCode::INTERNAL,

fdbcli_lib/include/fdbcli_lib/CliCommands.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ Future<std::set<NetworkAddress>> getInProgressExclusion(Reference<ITransaction>
7373

7474
Future<Void> getStorageServerInterfaces(Reference<IDatabase> db,
7575
std::map<std::string, StorageServerInterface>* interfaces);
76-
Future<bool> getWorkers(Reference<IDatabase> db, std::vector<ProcessData>* workers);
76+
Future<bool> getWorkers(Reference<IDatabase> db, std::vector<ProcessData>* workers);
7777
} // namespace utils
7878

7979
namespace special_keys {

fdbclient/CoordinationInterface.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
IPAddress ClusterConnectionString::determineLocalSourceIP() const {
3838
int size = coords.size() + hostnames.size();
3939
int index = 0;
40-
loop {
40+
while (true) {
4141
try {
4242
using namespace boost::asio;
4343

fdbclient/include/fdbclient/ClientWorkerInterface.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#define FDBCLIENT_CLIENTWORKERINTERFACE_H
2323
#pragma once
2424

25+
#include "fdbrpc/FlowGrpc.h"
2526
#include "fdbclient/FDBTypes.h"
2627
#include "fdbrpc/FailureMonitor.h"
2728
#include "fdbclient/Status.h"
@@ -35,17 +36,24 @@ struct ClientWorkerInterface {
3536
RequestStream<struct RebootRequest> reboot;
3637
RequestStream<struct ProfilerRequest> profiler;
3738
RequestStream<struct SetFailureInjection> setFailureInjection;
39+
Optional<NetworkAddress> grpcAddress;
3840

3941
bool operator==(ClientWorkerInterface const& r) const { return id() == r.id(); }
4042
bool operator!=(ClientWorkerInterface const& r) const { return id() != r.id(); }
4143
UID id() const { return reboot.getEndpoint().token; }
4244
NetworkAddress address() const { return reboot.getEndpoint().getPrimaryAddress(); }
4345

44-
void initEndpoints() { reboot.getEndpoint(TaskPriority::ReadSocket); }
46+
void initEndpoints() {
47+
reboot.getEndpoint(TaskPriority::ReadSocket);
48+
auto grpcInstance = FlowGrpc::instance();
49+
if (grpcInstance) {
50+
grpcAddress = grpcInstance->server()->getAddress();
51+
}
52+
}
4553

4654
template <class Ar>
4755
void serialize(Ar& ar) {
48-
serializer(ar, reboot, profiler, setFailureInjection);
56+
serializer(ar, reboot, profiler, setFailureInjection, grpcAddress);
4957
}
5058
};
5159

fdbrpc/include/fdbrpc/Locality.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -410,16 +410,20 @@ struct ProcessData {
410410
LocalityData locality;
411411
ProcessClass processClass;
412412
NetworkAddress address;
413+
Optional<NetworkAddress> grpcAddress;
413414

414415
ProcessData() {}
415-
ProcessData(LocalityData locality, ProcessClass processClass, NetworkAddress address)
416-
: locality(locality), processClass(processClass), address(address) {}
416+
ProcessData(LocalityData locality,
417+
ProcessClass processClass,
418+
NetworkAddress address,
419+
Optional<NetworkAddress> grpcAddress)
420+
: locality(locality), processClass(processClass), address(address), grpcAddress(grpcAddress) {}
417421

418422
// To change this serialization, ProtocolVersion::WorkerListValue must be updated, and downgrades need to be
419423
// considered
420424
template <class Ar>
421425
void serialize(Ar& ar) {
422-
serializer(ar, locality, processClass, address);
426+
serializer(ar, locality, processClass, address, grpcAddress);
423427
}
424428

425429
struct sort_by_address {

fdbserver/ClusterController.actor.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -916,8 +916,9 @@ ACTOR Future<Void> workerAvailabilityWatch(WorkerInterface worker,
916916
state Future<Void> failed = (worker.address() == g_network->getLocalAddress())
917917
? Never()
918918
: waitFailureClient(worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME);
919-
cluster->updateWorkerList.set(worker.locality.processId(),
920-
ProcessData(worker.locality, startingClass, worker.stableAddress()));
919+
cluster->updateWorkerList.set(
920+
worker.locality.processId(),
921+
ProcessData(worker.locality, startingClass, worker.stableAddress(), worker.grpcAddress()));
921922
// This switching avoids a race where the worker can be added to id_worker map after the workerAvailabilityWatch
922923
// fails for the worker.
923924
wait(delay(0));

fdbserver/include/fdbserver/WorkerInterface.actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ struct WorkerInterface {
8585
NetworkAddress stableAddress() const { return tLog.getEndpoint().getStableAddress(); }
8686
Optional<NetworkAddress> secondaryAddress() const { return tLog.getEndpoint().addresses.secondaryAddress; }
8787
NetworkAddressList addresses() const { return tLog.getEndpoint().addresses; }
88+
Optional<NetworkAddress> grpcAddress() const { return clientInterface.grpcAddress; }
8889

8990
WorkerInterface() {}
9091
WorkerInterface(const LocalityData& locality) : locality(locality) {}

fdbserver/worker.actor.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@
8282
#include "flow/ApiVersion.h"
8383
#include "fdbcli_lib/CliService.h"
8484

85-
8685
#ifdef __linux__
8786
#include <fcntl.h>
8887
#include <stdio.h>

0 commit comments

Comments
 (0)