Skip to content

Commit 2a91361

Browse files
authored
chore(pubsub): Topic race condition; SMT test failure (#5381)
TestCreateSubscriptionWithSMT has been failing in repo-wide builds with a nil topic error. * update getOrCreateTopic test function to avoid race between checking topic existence and creating the topic. (tests are using t.Parallel(), so it can happen across tests) * in TestCreateSubscriptionWithSMT: * use shared getOrCreateTopic rather than local code. (local code had a bug which only occured when topic didn't already exist.) * use test specific topicID * use only one Retry block in body of the test so new topic both lives long enough to run the test and is cleaned up by defer.
1 parent af70992 commit 2a91361

File tree

1 file changed

+20
-20
lines changed

1 file changed

+20
-20
lines changed

pubsub/v1samples/subscriptions/subscription_test.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,15 +1078,18 @@ func publishMsgs(ctx context.Context, t *pubsub.Topic, numMsgs int) error {
10781078

10791079
// getOrCreateTopic gets a topic or creates it if it doesn't exist.
10801080
func getOrCreateTopic(ctx context.Context, client *pubsub.Client, topicID string) (*pubsub.Topic, error) {
1081-
topic := client.Topic(topicID)
1082-
ok, err := topic.Exists(ctx)
1081+
// avoid async race conditions by attempting to create first and checking error
1082+
// rather than checking for existence, then later creating
1083+
topic, err := client.CreateTopic(ctx, topicID)
10831084
if err != nil {
1084-
return nil, fmt.Errorf("failed to check if topic exists: %w", err)
1085-
}
1086-
if !ok {
1087-
topic, err = client.CreateTopic(ctx, topicID)
1088-
if err != nil {
1089-
return nil, fmt.Errorf("failed to create topic (%q): %w", topicID, err)
1085+
st, ok := status.FromError(err)
1086+
if !ok {
1087+
return nil, fmt.Errorf("CreateTopic failed with unknown err: %v", err)
1088+
}
1089+
if st.Code() == codes.AlreadyExists {
1090+
topic = client.Topic(topicID)
1091+
} else {
1092+
return nil, fmt.Errorf("CreateTopic: %v", err)
10901093
}
10911094
}
10921095
return topic, nil
@@ -1174,23 +1177,20 @@ func TestCreateSubscriptionWithSMT(t *testing.T) {
11741177
client := setup(t)
11751178

11761179
smtSubID := subID + "-smt"
1180+
smtTopicID := topicID + "-smt"
11771181
var topic *pubsub.Topic
1178-
var err error
1182+
11791183
testutil.Retry(t, 10, time.Second, func(r *testutil.R) {
1180-
topic, err = client.CreateTopic(ctx, topicID)
1184+
var err error
1185+
// use = to set topic in outer scope
1186+
topic, err = getOrCreateTopic(ctx, client, smtTopicID)
11811187
if err != nil {
1182-
st, ok := status.FromError(err)
1183-
if !ok {
1184-
r.Errorf("CreateTopic failed with unknown err: %v", err)
1185-
}
1186-
// Don't return error if topic already exists, just use that for the test.
1187-
if st.Code() != codes.AlreadyExists {
1188-
r.Errorf("CreateTopic: %v", err)
1189-
}
1188+
r.Errorf("getOrCreateTopic: %v", err)
1189+
return
11901190
}
1191-
})
1191+
defer topic.Delete(ctx)
1192+
defer topic.Stop()
11921193

1193-
testutil.Retry(t, 10, time.Second, func(r *testutil.R) {
11941194
buf := new(bytes.Buffer)
11951195
if err := createSubscriptionWithSMT(buf, tc.ProjectID, smtSubID, topic); err != nil {
11961196
r.Errorf("failed to create subscription with SMT: %v", err)

0 commit comments

Comments
 (0)