Skip to content

Commit 5014c9e

Browse files
committed
Fix adding etcd-only node to existing cluster
Signed-off-by: Brad Davidson <[email protected]>
1 parent 7f4aa59 commit 5014c9e

File tree

13 files changed

+164
-93
lines changed

13 files changed

+164
-93
lines changed

pkg/agent/loadbalancer/loadbalancer.go

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@ type LoadBalancer struct {
1919
dialer *net.Dialer
2020
proxy *tcpproxy.Proxy
2121

22-
configFile string
23-
localAddress string
24-
localServerURL string
25-
originalServerAddress string
26-
ServerURL string
27-
ServerAddresses []string
28-
randomServers []string
29-
currentServerAddress string
30-
nextServerIndex int
31-
Listener net.Listener
22+
serviceName string
23+
configFile string
24+
localAddress string
25+
localServerURL string
26+
defaultServerAddress string
27+
ServerURL string
28+
ServerAddresses []string
29+
randomServers []string
30+
currentServerAddress string
31+
nextServerIndex int
32+
Listener net.Listener
3233
}
3334

3435
const RandomPort = 0
@@ -55,26 +56,27 @@ func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPo
5556
}
5657
localAddress := listener.Addr().String()
5758

58-
originalServerAddress, localServerURL, err := parseURL(serverURL, localAddress)
59+
defaultServerAddress, localServerURL, err := parseURL(serverURL, localAddress)
5960
if err != nil {
6061
return nil, err
6162
}
6263

6364
if serverURL == localServerURL {
64-
logrus.Debugf("Initial server URL for load balancer points at local server URL - starting with empty original server address")
65-
originalServerAddress = ""
65+
logrus.Debugf("Initial server URL for load balancer %s points at local server URL - starting with empty default server address", serviceName)
66+
defaultServerAddress = ""
6667
}
6768

6869
lb := &LoadBalancer{
69-
dialer: &net.Dialer{},
70-
configFile: filepath.Join(dataDir, "etc", serviceName+".json"),
71-
localAddress: localAddress,
72-
localServerURL: localServerURL,
73-
originalServerAddress: originalServerAddress,
74-
ServerURL: serverURL,
70+
serviceName: serviceName,
71+
dialer: &net.Dialer{},
72+
configFile: filepath.Join(dataDir, "etc", serviceName+".json"),
73+
localAddress: localAddress,
74+
localServerURL: localServerURL,
75+
defaultServerAddress: defaultServerAddress,
76+
ServerURL: serverURL,
7577
}
7678

77-
lb.setServers([]string{lb.originalServerAddress})
79+
lb.setServers([]string{lb.defaultServerAddress})
7880

7981
lb.proxy = &tcpproxy.Proxy{
8082
ListenFunc: func(string, string) (net.Listener, error) {
@@ -93,22 +95,27 @@ func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPo
9395
if err := lb.proxy.Start(); err != nil {
9496
return nil, err
9597
}
96-
logrus.Infof("Running load balancer %s -> %v", lb.localAddress, lb.randomServers)
98+
logrus.Infof("Running load balancer %s %s -> %v", serviceName, lb.localAddress, lb.randomServers)
9799

98100
return lb, nil
99101
}
100102

103+
func (lb *LoadBalancer) SetDefault(serverAddress string) {
104+
logrus.Infof("Updating load balancer %s default server address -> %s", lb.serviceName, serverAddress)
105+
lb.defaultServerAddress = serverAddress
106+
}
107+
101108
func (lb *LoadBalancer) Update(serverAddresses []string) {
102109
if lb == nil {
103110
return
104111
}
105112
if !lb.setServers(serverAddresses) {
106113
return
107114
}
108-
logrus.Infof("Updating load balancer server addresses -> %v", lb.randomServers)
115+
logrus.Infof("Updating load balancer %s server addresses -> %v", lb.serviceName, lb.randomServers)
109116

110117
if err := lb.writeConfig(); err != nil {
111-
logrus.Warnf("Error updating load balancer config: %s", err)
118+
logrus.Warnf("Error updating load balancer %s config: %s", lb.serviceName, err)
112119
}
113120
}
114121

@@ -128,14 +135,14 @@ func (lb *LoadBalancer) dialContext(ctx context.Context, network, address string
128135
if err == nil {
129136
return conn, nil
130137
}
131-
logrus.Debugf("Dial error from load balancer: %s", err)
138+
logrus.Debugf("Dial error from load balancer %s: %s", lb.serviceName, err)
132139

133140
newServer, err := lb.nextServer(targetServer)
134141
if err != nil {
135142
return nil, err
136143
}
137144
if targetServer != newServer {
138-
logrus.Debugf("Dial server in load balancer failed over to %s", newServer)
145+
logrus.Debugf("Dial server in load balancer %s failed over to %s", lb.serviceName, newServer)
139146
}
140147
if ctx.Err() != nil {
141148
return nil, ctx.Err()
@@ -156,7 +163,7 @@ func onDialError(src net.Conn, dstDialErr error) {
156163
src.Close()
157164
}
158165

159-
// ResetLoadBalancer will delete the local state file for the load balacner on disk
166+
// ResetLoadBalancer will delete the local state file for the load balancer on disk
160167
func ResetLoadBalancer(dataDir, serviceName string) error {
161168
stateFile := filepath.Join(dataDir, "etc", serviceName+".json")
162169
if err := os.Remove(stateFile); err != nil {

pkg/agent/loadbalancer/servers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
)
88

99
func (lb *LoadBalancer) setServers(serverAddresses []string) bool {
10-
serverAddresses, hasOriginalServer := sortServers(serverAddresses, lb.originalServerAddress)
10+
serverAddresses, hasOriginalServer := sortServers(serverAddresses, lb.defaultServerAddress)
1111
if len(serverAddresses) == 0 {
1212
return false
1313
}
@@ -25,7 +25,7 @@ func (lb *LoadBalancer) setServers(serverAddresses []string) bool {
2525
lb.randomServers[i], lb.randomServers[j] = lb.randomServers[j], lb.randomServers[i]
2626
})
2727
if !hasOriginalServer {
28-
lb.randomServers = append(lb.randomServers, lb.originalServerAddress)
28+
lb.randomServers = append(lb.randomServers, lb.defaultServerAddress)
2929
}
3030
lb.currentServerAddress = lb.randomServers[0]
3131
lb.nextServerIndex = 1

pkg/agent/proxy/apiproxy.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
type Proxy interface {
1616
Update(addresses []string)
1717
SetAPIServerPort(ctx context.Context, port int) error
18+
SetSupervisorDefault(address string)
1819
SupervisorURL() string
1920
SupervisorAddresses() []string
2021
APIServerURL() string
@@ -135,6 +136,28 @@ func (p *proxy) SetAPIServerPort(ctx context.Context, port int) error {
135136
return nil
136137
}
137138

139+
// SetSupervisorDefault updates the default (fallback) address for the connection to the
140+
// supervisor. This is most useful on k3s nodes without apiservers, where the local
141+
// supervisor must be used to bootstrap the agent config, but then switched over to
142+
// another node running an apiserver once one is available.
143+
func (p *proxy) SetSupervisorDefault(address string) {
144+
host, port, err := sysnet.SplitHostPort(address)
145+
if err != nil {
146+
logrus.Errorf("Failed to parse address %s, dropping: %v", address, err)
147+
return
148+
}
149+
if p.apiServerEnabled {
150+
port = p.supervisorPort
151+
address = sysnet.JoinHostPort(host, port)
152+
}
153+
p.fallbackSupervisorAddress = address
154+
if p.supervisorLB == nil {
155+
p.supervisorURL = "https://" + address
156+
} else {
157+
p.supervisorLB.SetDefault(address)
158+
}
159+
}
160+
138161
func (p *proxy) SupervisorURL() string {
139162
return p.supervisorURL
140163
}

pkg/agent/run.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ package agent
33
import (
44
"context"
55
"fmt"
6-
"net/url"
6+
"net"
77
"os"
88
"path/filepath"
9+
"strconv"
910
"strings"
1011
"time"
1112

@@ -350,17 +351,8 @@ func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node,
350351
}
351352
agentRan = true
352353
}
353-
354-
select {
355-
case address := <-cfg.APIAddressCh:
356-
cfg.ServerURL = address
357-
u, err := url.Parse(cfg.ServerURL)
358-
if err != nil {
359-
logrus.Warn(err)
360-
}
361-
proxy.Update([]string{fmt.Sprintf("%s:%d", u.Hostname(), nodeConfig.ServerHTTPSPort)})
362-
case <-ctx.Done():
363-
return ctx.Err()
354+
if err := waitForAPIServerAddresses(ctx, nodeConfig, cfg, proxy); err != nil {
355+
return err
364356
}
365357
} else if cfg.ClusterReset && proxy.IsAPIServerLBEnabled() {
366358
// If we're doing a cluster-reset on RKE2, the kubelet needs to be started early to clean
@@ -379,3 +371,26 @@ func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node,
379371
}
380372
return nil
381373
}
374+
375+
func waitForAPIServerAddresses(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error {
376+
for {
377+
select {
378+
case <-time.After(5 * time.Second):
379+
logrus.Info("Waiting for apiserver addresses")
380+
case addresses := <-cfg.APIAddressCh:
381+
for i, a := range addresses {
382+
host, _, err := net.SplitHostPort(a)
383+
if err == nil {
384+
addresses[i] = net.JoinHostPort(host, strconv.Itoa(nodeConfig.ServerHTTPSPort))
385+
if i == 0 {
386+
proxy.SetSupervisorDefault(addresses[i])
387+
}
388+
}
389+
}
390+
proxy.Update(addresses)
391+
return nil
392+
case <-ctx.Done():
393+
return ctx.Err()
394+
}
395+
}
396+
}

pkg/cli/cmds/agent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type Agent struct {
1414
TokenFile string
1515
ClusterSecret string
1616
ServerURL string
17-
APIAddressCh chan string
17+
APIAddressCh chan []string
1818
DisableLoadBalancer bool
1919
DisableServiceLB bool
2020
ETCDAgent bool

pkg/cli/server/server.go

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -394,13 +394,6 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
394394
serverConfig.ControlConfig.DisableScheduler = true
395395
serverConfig.ControlConfig.DisableCCM = true
396396

397-
// only close the agentReady channel in case of k3s restoration, because k3s does not start
398-
// the agent until server returns successfully, unlike rke2's agent which starts in parallel
399-
// with the server
400-
if serverConfig.ControlConfig.SupervisorPort == serverConfig.ControlConfig.HTTPSPort {
401-
close(agentReady)
402-
}
403-
404397
dataDir, err := datadir.LocalHome(cfg.DataDir, false)
405398
if err != nil {
406399
return err
@@ -484,10 +477,12 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
484477
}
485478

486479
if serverConfig.ControlConfig.DisableAPIServer {
480+
if cfg.ServerURL != "" {
481+
agentConfig.ServerURL = cfg.ServerURL
482+
}
487483
// initialize the apiAddress Channel for receiving the api address from etcd
488-
agentConfig.APIAddressCh = make(chan string, 1)
489-
setAPIAddressChannel(ctx, &serverConfig, &agentConfig)
490-
defer close(agentConfig.APIAddressCh)
484+
agentConfig.APIAddressCh = make(chan []string)
485+
go getAPIAddressFromEtcd(ctx, serverConfig, agentConfig)
491486
}
492487
return agent.Run(ctx, agentConfig)
493488
}
@@ -533,29 +528,19 @@ func getArgValueFromList(searchArg string, argList []string) string {
533528
return value
534529
}
535530

536-
// setAPIAddressChannel will try to get the api address key from etcd and when it succeed it will
537-
// set the APIAddressCh channel with its value, the function works for both k3s and rke2 in case
538-
// of k3s we block returning back to the agent.Run until we get the api address, however in rke2
539-
// the code will not block operation and will run the operation in a goroutine
540-
func setAPIAddressChannel(ctx context.Context, serverConfig *server.Config, agentConfig *cmds.Agent) {
541-
// start a goroutine to check for the server ip if set from etcd in case of rke2
542-
if serverConfig.ControlConfig.HTTPSPort != serverConfig.ControlConfig.SupervisorPort {
543-
go getAPIAddressFromEtcd(ctx, serverConfig, agentConfig)
544-
return
545-
}
546-
getAPIAddressFromEtcd(ctx, serverConfig, agentConfig)
547-
}
548-
549-
func getAPIAddressFromEtcd(ctx context.Context, serverConfig *server.Config, agentConfig *cmds.Agent) {
550-
t := time.NewTicker(5 * time.Second)
551-
defer t.Stop()
552-
for range t.C {
553-
serverAddress, err := etcd.GetAPIServerURLFromETCD(ctx, &serverConfig.ControlConfig)
554-
if err == nil {
555-
agentConfig.ServerURL = "https://" + serverAddress
556-
agentConfig.APIAddressCh <- agentConfig.ServerURL
531+
func getAPIAddressFromEtcd(ctx context.Context, serverConfig server.Config, agentConfig cmds.Agent) {
532+
defer close(agentConfig.APIAddressCh)
533+
for {
534+
toCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
535+
defer cancel()
536+
serverAddresses, err := etcd.GetAPIServerURLsFromETCD(toCtx, &serverConfig.ControlConfig)
537+
if err == nil && len(serverAddresses) > 0 {
538+
agentConfig.APIAddressCh <- serverAddresses
557539
break
558540
}
559-
logrus.Warn(err)
541+
if !errors.Is(err, etcd.ErrAddressNotSet) {
542+
logrus.Warnf("Failed to get apiserver address from etcd: %v", err)
543+
}
544+
<-toCtx.Done()
560545
}
561546
}

pkg/cluster/bootstrap.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ func (c *Cluster) ReconcileBootstrapData(ctx context.Context, buf io.ReadSeeker,
356356
buf.Seek(0, 0)
357357
}
358358

359+
logrus.Debugf("One or more certificate directories do not exist; writing data to disk from datastore")
359360
return bootstrap.WriteToDiskFromStorage(files, crb)
360361
}
361362

pkg/daemons/control/server.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,12 @@ func Server(ctx context.Context, cfg *config.Control) error {
5555
if err := apiServer(ctx, cfg); err != nil {
5656
return err
5757
}
58+
}
5859

59-
if err := waitForAPIServerInBackground(ctx, cfg.Runtime); err != nil {
60-
return err
61-
}
60+
// Wait for an apiserver to become available before starting additional controllers,
61+
// even if we're not running an apiserver locally.
62+
if err := waitForAPIServerInBackground(ctx, cfg.Runtime); err != nil {
63+
return err
6264
}
6365

6466
if !cfg.DisableScheduler {

0 commit comments

Comments
 (0)