Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions src/app/firedancer/topology.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,17 @@ resolve_address( char const * address,

static int
resolve_peer( char const * peer,
fd_ip4_port_t * ip4_port ) {
char hostname[ static 256UL ],
fd_ip4_port_t * ip4_port,
int * is_https ) {

/* Split host:port */
char const * host_port = peer;
if( FD_LIKELY( strncmp( peer, "http://", 7UL )==0 ) ) {
if( FD_LIKELY( is_https ) ) *is_https = 0;
host_port += 7UL;
} else if( FD_LIKELY( strncmp( peer, "https://", 8UL )==0 ) ) {
if( FD_LIKELY( is_https ) ) *is_https = 1;
host_port += 8UL;
}

Expand All @@ -148,7 +152,9 @@ resolve_peer( char const * peer,
FD_LOG_ERR(( "invalid [gossip.entrypoints] entry \"%s\": hostname too long", host_port ));
}
fd_memcpy( fqdn, host_port, fqdn_len );
fd_memcpy( hostname, fqdn, fqdn_len );
fqdn[ fqdn_len ] = '\0';
hostname[ fqdn_len ] = '\0';

/* Parse port number */

Expand All @@ -169,7 +175,8 @@ static void
resolve_gossip_entrypoints( config_t * config ) {
ulong entrypoint_cnt = config->gossip.entrypoints_cnt;
for( ulong i=0UL; i<entrypoint_cnt; i++ ) {
if( FD_UNLIKELY( 0==resolve_peer( config->gossip.entrypoints[ i ], &config->gossip.resolved_entrypoints[ i ] ) ) ) {
char hostname[ 256UL ];
if( FD_UNLIKELY( 0==resolve_peer( config->gossip.entrypoints[ i ], hostname, &config->gossip.resolved_entrypoints[ i ], NULL ) ) ) {
FD_LOG_ERR(( "failed to resolve address of [gossip.entrypoints] entry \"%s\"", config->gossip.entrypoints[ i ] ));
}
}
Expand All @@ -184,7 +191,10 @@ resolve_snapshot_peers( config_t * config,
for( ulong j=0UL; j<peers_cnt; j++ ) {
if( FD_UNLIKELY( !config->firedancer.snapshots.sources.http.peers[ j ].enabled ) ) continue;

if( FD_UNLIKELY( 0==resolve_peer( config->firedancer.snapshots.sources.http.peers[ j ].url, &tile->snaprd.http.peers[ resolved_peers_cnt ] ) ) ) {
if( FD_UNLIKELY( 0==resolve_peer( config->firedancer.snapshots.sources.http.peers[ j ].url,
tile->snaprd.http.peers[ resolved_peers_cnt ].hostname,
&tile->snaprd.http.peers[ resolved_peers_cnt ].addr,
&tile->snaprd.http.peers[ resolved_peers_cnt ].is_https ) ) ) {
FD_LOG_ERR(( "failed to resolve address of [snapshots.sources.http.peers] entry \"%s\"", config->firedancer.snapshots.sources.http.peers[ j ].url ));
} else {
resolved_peers_cnt++;
Expand Down
7 changes: 6 additions & 1 deletion src/disco/topo/fd_topo.h
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,12 @@ struct fd_topo_tile {

struct {
ulong peers_cnt;
fd_ip4_port_t peers[ 16UL ];

struct {
fd_ip4_port_t addr;
char hostname[ 256UL ];
int is_https;
} peers[ 16UL ];
} http;

int diagnostics;
Expand Down
62 changes: 42 additions & 20 deletions src/discof/restore/fd_snaprd_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@
#define IN_KIND_GOSSIP (1)
#define MAX_IN_LINKS (3)

struct fd_snaprd_http_peer {
char hostname[ 256UL ];
ulong hostname_len;
int is_https;
fd_ip4_port_t addr;
};

typedef struct fd_snaprd_http_peer fd_snaprd_http_peer_t;

struct fd_snaprd_tile {
fd_ssping_t * ssping;
fd_sshttp_t * sshttp;
Expand All @@ -57,7 +66,7 @@ struct fd_snaprd_tile {

long diagnostic_deadline_nanos;

fd_ip4_port_t addr;
fd_sspeer_t peer;

struct {
ulong write_buffer_pos;
Expand Down Expand Up @@ -95,6 +104,9 @@ struct fd_snaprd_tile {
uint maximum_download_retry_abort;
uint max_full_snapshots_to_keep;
uint max_incremental_snapshots_to_keep;

ulong peers_cnt;
fd_snaprd_http_peer_t peers[ 16UL ];
} config;

struct {
Expand Down Expand Up @@ -232,8 +244,8 @@ read_http_data( fd_snaprd_tile_t * ctx,
case FD_SSHTTP_ADVANCE_AGAIN: break;
case FD_SSHTTP_ADVANCE_ERROR: {
FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ) ));
fd_ssping_invalidate( ctx->ssping, ctx->addr, now );
FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ) ));
fd_ssping_invalidate( ctx->ssping, ctx->peer, now );
fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
ctx->deadline_nanos = now;
Expand Down Expand Up @@ -457,8 +469,8 @@ after_credit( fd_snaprd_tile_t * ctx,

switch ( ctx->state ) {
case FD_SNAPRD_STATE_WAITING_FOR_PEERS: {
fd_ip4_port_t best = fd_ssping_best( ctx->ssping );
if( FD_LIKELY( best.l ) ) {
fd_sspeer_t best = fd_ssping_best( ctx->ssping );
if( FD_LIKELY( best.addr.l ) ) {
ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS;
ctx->deadline_nanos = now+500L*1000L*1000L;
}
Expand All @@ -467,8 +479,8 @@ after_credit( fd_snaprd_tile_t * ctx,
case FD_SNAPRD_STATE_COLLECTING_PEERS: {
if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;

fd_ip4_port_t best = fd_ssping_best( ctx->ssping );
if( FD_UNLIKELY( !best.l ) ) {
fd_sspeer_t best = fd_ssping_best( ctx->ssping );
if( FD_UNLIKELY( !best.addr.l ) ) {
ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS;
break;
}
Expand All @@ -479,10 +491,10 @@ after_credit( fd_snaprd_tile_t * ctx,
ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE;
} else {
FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr ), fd_ushort_bswap( best.port ) ));
ctx->addr = best;
FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr.addr ), fd_ushort_bswap( best.addr.port ) ));
ctx->peer = best;
ctx->state = FD_SNAPRD_STATE_READING_FULL_HTTP;
fd_sshttp_init( ctx->sshttp, best, "/snapshot.tar.bz2", 17UL, now );
fd_sshttp_init( ctx->sshttp, ctx->peer, "/snapshot.tar.bz2", 17UL, now );
}
break;
}
Expand Down Expand Up @@ -547,8 +559,8 @@ after_credit( fd_snaprd_tile_t * ctx,
break;
}

FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ) ));
fd_sshttp_init( ctx->sshttp, ctx->addr, "/incremental-snapshot.tar.bz2", 29UL, fd_log_wallclock() );
FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ) ));
fd_sshttp_init( ctx->sshttp, ctx->peer, "/incremental-snapshot.tar.bz2", 29UL, fd_log_wallclock() );
ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP;
break;
case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET:
Expand Down Expand Up @@ -632,12 +644,12 @@ after_frag( fd_snaprd_tile_t * ctx,
fd_contact_info_t * cur = &ctx->gossip.ci_table[ msg->contact_info.idx ];
fd_ip4_port_t cur_addr = ctx->gossip.ci_table[ msg->contact_info.idx ].sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
if( cur_addr.l ){
fd_ssping_remove( ctx->ssping, cur_addr );
fd_ssping_remove( ctx->ssping, (fd_sspeer_t){ .addr = cur_addr, .hostname=NULL, .hostname_len=0UL } );
}
fd_contact_info_t * new = msg->contact_info.contact_info;
fd_ip4_port_t new_addr = new->sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
if( new_addr.l ) {
fd_ssping_add( ctx->ssping, new_addr );
fd_ssping_add( ctx->ssping, (fd_sspeer_t){ .addr = new_addr, .hostname=NULL, .hostname_len=0UL } );
}
*cur = *new;
}
Expand All @@ -646,7 +658,7 @@ after_frag( fd_snaprd_tile_t * ctx,
fd_contact_info_t * cur = &ctx->gossip.ci_table[ msg->contact_info_remove.idx ];
fd_ip4_port_t addr = cur->sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
if( addr.l ) {
fd_ssping_remove( ctx->ssping, addr );
fd_ssping_remove( ctx->ssping, (fd_sspeer_t){ .addr = addr, .hostname=NULL, .hostname_len=0UL } );
}
}
break;
Expand Down Expand Up @@ -675,9 +687,9 @@ after_frag( fd_snaprd_tile_t * ctx,
case FD_SNAPRD_STATE_READING_FULL_HTTP:
case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port ));
fd_sshttp_cancel( ctx->sshttp );
fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
fd_ssping_invalidate( ctx->ssping, ctx->peer, fd_log_wallclock() );
fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
break;
Expand All @@ -686,9 +698,9 @@ after_frag( fd_snaprd_tile_t * ctx,
if( FD_UNLIKELY( ctx->malformed ) ) break;

FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2",
FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), ctx->peer.addr.port ));
fd_sshttp_cancel( ctx->sshttp );
fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
fd_ssping_invalidate( ctx->ssping, ctx->peer, fd_log_wallclock() );
/* We would like to transition to FULL_HTTP_RESET, but we
can't do it just yet, because we have already sent a DONE
control fragment, and need to wait for acknowledges to come
Expand Down Expand Up @@ -862,7 +874,17 @@ unprivileged_init( fd_topo_t * topo,
}
}

for( ulong i=0UL; i<tile->snaprd.http.peers_cnt; i++ ) fd_ssping_add( ctx->ssping, tile->snaprd.http.peers[ i ] );
ctx->config.peers_cnt = tile->snaprd.http.peers_cnt;
for( ulong i=0UL; i<tile->snaprd.http.peers_cnt; i++ ) {
ctx->config.peers[ i ].addr = tile->snaprd.http.peers[ i ].addr;
fd_memcpy( ctx->config.peers[ i ].hostname, tile->snaprd.http.peers[ i ].hostname, 256UL );
ctx->config.peers[ i ].hostname_len = strnlen( tile->snaprd.http.peers[ i ].hostname, 256UL );
}

for( ulong i=0UL; i<ctx->config.peers_cnt; i++ ) {
fd_sspeer_t peer = { .addr = ctx->config.peers[ i ].addr, .hostname=ctx->config.peers[ i ].hostname, .hostname_len=ctx->config.peers[ i ].hostname_len };
fd_ssping_add( ctx->ssping, peer );
}

if( FD_UNLIKELY( tile->out_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1", tile->out_cnt ));

Expand Down
Loading