Skip to content

Commit 0bd76be

Browse files
authored
lrs: use JSON for locality's String representation (#4135)
1 parent ecc9a99 commit 0bd76be

File tree

7 files changed

+105
-40
lines changed

7 files changed

+105
-40
lines changed

xds/internal/balancer/edsbalancer/eds_impl.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,17 @@ func (edsImpl *edsBalancerImpl) handleChildPolicy(name string, config json.RawMe
142142
continue
143143
}
144144
for lid, config := range bgwc.configs {
145+
lidJSON, err := lid.ToString()
146+
if err != nil {
147+
edsImpl.logger.Errorf("failed to marshal LocalityID: %#v, skipping this locality", lid)
148+
continue
149+
}
145150
// TODO: (eds) add support to balancer group to support smoothly
146151
// switching sub-balancers (keep old balancer around until new
147152
// balancer becomes ready).
148-
bgwc.bg.Remove(lid.String())
149-
bgwc.bg.Add(lid.String(), edsImpl.subBalancerBuilder)
150-
bgwc.bg.UpdateClientConnState(lid.String(), balancer.ClientConnState{
153+
bgwc.bg.Remove(lidJSON)
154+
bgwc.bg.Add(lidJSON, edsImpl.subBalancerBuilder)
155+
bgwc.bg.UpdateClientConnState(lidJSON, balancer.ClientConnState{
151156
ResolverState: resolver.State{Addresses: config.addrs},
152157
})
153158
// This doesn't need to manually update picker, because the new
@@ -285,6 +290,11 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
285290
// One balancer for each locality.
286291

287292
lid := locality.ID
293+
lidJSON, err := lid.ToString()
294+
if err != nil {
295+
edsImpl.logger.Errorf("failed to marshal LocalityID: %#v, skipping this locality", lid)
296+
continue
297+
}
288298
newLocalitiesSet[lid] = struct{}{}
289299

290300
newWeight := locality.Weight
@@ -319,8 +329,8 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
319329
config, ok := bgwc.configs[lid]
320330
if !ok {
321331
// A new balancer, add it to balancer group and balancer map.
322-
bgwc.stateAggregator.Add(lid.String(), newWeight)
323-
bgwc.bg.Add(lid.String(), edsImpl.subBalancerBuilder)
332+
bgwc.stateAggregator.Add(lidJSON, newWeight)
333+
bgwc.bg.Add(lidJSON, edsImpl.subBalancerBuilder)
324334
config = &localityConfig{
325335
weight: newWeight,
326336
}
@@ -343,23 +353,28 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
343353

344354
if weightChanged {
345355
config.weight = newWeight
346-
bgwc.stateAggregator.UpdateWeight(lid.String(), newWeight)
356+
bgwc.stateAggregator.UpdateWeight(lidJSON, newWeight)
347357
rebuildStateAndPicker = true
348358
}
349359

350360
if addrsChanged {
351361
config.addrs = newAddrs
352-
bgwc.bg.UpdateClientConnState(lid.String(), balancer.ClientConnState{
362+
bgwc.bg.UpdateClientConnState(lidJSON, balancer.ClientConnState{
353363
ResolverState: resolver.State{Addresses: newAddrs},
354364
})
355365
}
356366
}
357367

358368
// Delete localities that are removed in the latest response.
359369
for lid := range bgwc.configs {
370+
lidJSON, err := lid.ToString()
371+
if err != nil {
372+
edsImpl.logger.Errorf("failed to marshal LocalityID: %#v, skipping this locality", lid)
373+
continue
374+
}
360375
if _, ok := newLocalitiesSet[lid]; !ok {
361-
bgwc.stateAggregator.Remove(lid.String())
362-
bgwc.bg.Remove(lid.String())
376+
bgwc.stateAggregator.Remove(lidJSON)
377+
bgwc.bg.Remove(lidJSON)
363378
delete(bgwc.configs, lid)
364379
edsImpl.logger.Infof("Locality %v deleted", lid)
365380
rebuildStateAndPicker = true

xds/internal/balancer/edsbalancer/eds_impl_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -786,12 +786,14 @@ func (s) TestEDS_LoadReport(t *testing.T) {
786786
// We expect the 10 picks to be split between the localities since they are
787787
// of equal weight. And since we only mark the picks routed to sc2 as done,
788788
// the picks on sc1 should show up as inProgress.
789+
locality1JSON, _ := locality1.ToString()
790+
locality2JSON, _ := locality2.ToString()
789791
wantStoreData := []*load.Data{{
790792
Cluster: testClusterNames[0],
791793
Service: "",
792794
LocalityStats: map[string]load.LocalityData{
793-
locality1.String(): {RequestStats: load.RequestData{InProgress: 5}},
794-
locality2.String(): {RequestStats: load.RequestData{Succeeded: 5}},
795+
locality1JSON: {RequestStats: load.RequestData{InProgress: 5}},
796+
locality2JSON: {RequestStats: load.RequestData{Succeeded: 5}},
795797
},
796798
}}
797799
for i := 0; i < 10; i++ {

xds/internal/balancer/lrs/balancer.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"google.golang.org/grpc/balancer"
2828
"google.golang.org/grpc/internal/grpclog"
2929
"google.golang.org/grpc/serviceconfig"
30-
"google.golang.org/grpc/xds/internal"
3130
xdsclient "google.golang.org/grpc/xds/internal/client"
3231
"google.golang.org/grpc/xds/internal/client/load"
3332
)
@@ -101,7 +100,12 @@ func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
101100
if b.lb != nil {
102101
b.lb.Close()
103102
}
104-
b.lb = bb.Build(newCCWrapper(b.cc, b.client.loadStore(), newConfig.Locality), b.buildOpts)
103+
lidJSON, err := newConfig.Locality.ToString()
104+
if err != nil {
105+
return fmt.Errorf("failed to marshal LocalityID: %#v", newConfig.Locality)
106+
}
107+
ccWrapper := newCCWrapper(b.cc, b.client.loadStore(), lidJSON)
108+
b.lb = bb.Build(ccWrapper, b.buildOpts)
105109
}
106110
b.config = newConfig
107111

@@ -134,20 +138,20 @@ func (b *lrsBalancer) Close() {
134138

135139
type ccWrapper struct {
136140
balancer.ClientConn
137-
loadStore load.PerClusterReporter
138-
localityID *internal.LocalityID
141+
loadStore load.PerClusterReporter
142+
localityIDJSON string
139143
}
140144

141-
func newCCWrapper(cc balancer.ClientConn, loadStore load.PerClusterReporter, localityID *internal.LocalityID) *ccWrapper {
145+
func newCCWrapper(cc balancer.ClientConn, loadStore load.PerClusterReporter, localityIDJSON string) *ccWrapper {
142146
return &ccWrapper{
143-
ClientConn: cc,
144-
loadStore: loadStore,
145-
localityID: localityID,
147+
ClientConn: cc,
148+
loadStore: loadStore,
149+
localityIDJSON: localityIDJSON,
146150
}
147151
}
148152

149153
func (ccw *ccWrapper) UpdateState(s balancer.State) {
150-
s.Picker = newLoadReportPicker(s.Picker, *ccw.localityID, ccw.loadStore)
154+
s.Picker = newLoadReportPicker(s.Picker, ccw.localityIDJSON, ccw.loadStore)
151155
ccw.ClientConn.UpdateState(s)
152156
}
153157

xds/internal/balancer/lrs/balancer_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ func TestLoadReporting(t *testing.T) {
126126
if sd.Cluster != testClusterName || sd.Service != testServiceName {
127127
t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
128128
}
129-
localityData, ok := sd.LocalityStats[testLocality.String()]
129+
testLocalityJSON, _ := testLocality.ToString()
130+
localityData, ok := sd.LocalityStats[testLocalityJSON]
130131
if !ok {
131132
t.Fatalf("loads for %v not found in store", testLocality)
132133
}

xds/internal/balancer/lrs/picker.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ package lrs
2121
import (
2222
orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1"
2323
"google.golang.org/grpc/balancer"
24-
"google.golang.org/grpc/xds/internal"
2524
)
2625

2726
const (
@@ -43,10 +42,10 @@ type loadReportPicker struct {
4342
loadStore loadReporter
4443
}
4544

46-
func newLoadReportPicker(p balancer.Picker, id internal.LocalityID, loadStore loadReporter) *loadReportPicker {
45+
func newLoadReportPicker(p balancer.Picker, id string, loadStore loadReporter) *loadReportPicker {
4746
return &loadReportPicker{
4847
p: p,
49-
locality: id.String(),
48+
locality: id,
5049
loadStore: loadStore,
5150
}
5251
}

xds/internal/internal.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
package internal
2121

2222
import (
23+
"encoding/json"
2324
"fmt"
24-
"strings"
2525
)
2626

2727
// LocalityID is xds.Locality without XXX fields, so it can be used as map
@@ -34,23 +34,22 @@ type LocalityID struct {
3434
SubZone string `json:"subZone,omitempty"`
3535
}
3636

37-
// String generates a string representation of LocalityID by adding ":" between
38-
// the components of the LocalityID.
39-
func (l LocalityID) String() string {
40-
return fmt.Sprintf("%s:%s:%s", l.Region, l.Zone, l.SubZone)
37+
// ToString generates a string representation of LocalityID by marshalling it into
38+
// json. Not calling it String() so printf won't call it.
39+
func (l LocalityID) ToString() (string, error) {
40+
b, err := json.Marshal(l)
41+
if err != nil {
42+
return "", err
43+
}
44+
return string(b), nil
4145
}
4246

43-
// LocalityIDFromString converts a string representation of locality, of the
44-
// form region:zone:sub-zone (as generated by the above String() method), into a
47+
// LocalityIDFromString converts a json representation of locality, into a
4548
// LocalityID struct.
46-
func LocalityIDFromString(l string) (LocalityID, error) {
47-
parts := strings.Split(l, ":")
48-
if len(parts) != 3 {
49-
return LocalityID{}, fmt.Errorf("%s is not a well formatted locality ID", l)
49+
func LocalityIDFromString(s string) (ret LocalityID, _ error) {
50+
err := json.Unmarshal([]byte(s), &ret)
51+
if err != nil {
52+
return LocalityID{}, fmt.Errorf("%s is not a well formatted locality ID, error: %v", s, err)
5053
}
51-
return LocalityID{
52-
Region: parts[0],
53-
Zone: parts[1],
54-
SubZone: parts[2],
55-
}, nil
54+
return ret, nil
5655
}

xds/internal/internal_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,48 @@ func (s) TestLocalityMatchProtoMessage(t *testing.T) {
7070
t.Fatalf("internal type and proto message have different fields: (-got +want):\n%+v", diff)
7171
}
7272
}
73+
74+
func TestLocalityToAndFromJSON(t *testing.T) {
75+
tests := []struct {
76+
name string
77+
localityID LocalityID
78+
str string
79+
wantErr bool
80+
}{
81+
{
82+
name: "3 fields",
83+
localityID: LocalityID{Region: "r:r", Zone: "z#z", SubZone: "s^s"},
84+
str: `{"region":"r:r","zone":"z#z","subZone":"s^s"}`,
85+
},
86+
{
87+
name: "2 fields",
88+
localityID: LocalityID{Region: "r:r", Zone: "z#z"},
89+
str: `{"region":"r:r","zone":"z#z"}`,
90+
},
91+
{
92+
name: "1 field",
93+
localityID: LocalityID{Region: "r:r"},
94+
str: `{"region":"r:r"}`,
95+
},
96+
}
97+
for _, tt := range tests {
98+
t.Run(tt.name, func(t *testing.T) {
99+
gotStr, err := tt.localityID.ToString()
100+
if err != nil {
101+
t.Errorf("failed to marshal LocalityID: %#v", tt.localityID)
102+
}
103+
if gotStr != tt.str {
104+
t.Errorf("%#v.String() = %q, want %q", tt.localityID, gotStr, tt.str)
105+
}
106+
107+
gotID, err := LocalityIDFromString(tt.str)
108+
if (err != nil) != tt.wantErr {
109+
t.Errorf("LocalityIDFromString(%q) error = %v, wantErr %v", tt.str, err, tt.wantErr)
110+
return
111+
}
112+
if diff := cmp.Diff(gotID, tt.localityID); diff != "" {
113+
t.Errorf("LocalityIDFromString() got = %v, want %v, diff: %s", gotID, tt.localityID, diff)
114+
}
115+
})
116+
}
117+
}

0 commit comments

Comments
 (0)