Skip to content

Commit 33bab77

Browse files
author
Damir Gainulin
committed
feat: add create user command for SCRAM user management
- Add create user subcommand with SCRAM-SHA-256/512 support - Add user configuration parsing and validation - Include example user configuration files - Implement proper SCRAM credential generation with salt - Update error messages
1 parent dfbbe28 commit 33bab77

File tree

7 files changed

+782
-4
lines changed

7 files changed

+782
-4
lines changed

cmd/topicctl/subcmd/create.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/segmentio/topicctl/pkg/admin"
1313
"github.com/segmentio/topicctl/pkg/cli"
1414
"github.com/segmentio/topicctl/pkg/config"
15+
"github.com/segmentio/topicctl/pkg/create"
1516
log "github.com/sirupsen/logrus"
1617
"github.com/spf13/cobra"
1718
)
@@ -55,6 +56,7 @@ func init() {
5556
addSharedFlags(createCmd, &createConfig.shared)
5657
createCmd.AddCommand(
5758
createACLsCmd(),
59+
createUsersCmd(),
5860
)
5961
RootCmd.AddCommand(createCmd)
6062
}
@@ -202,3 +204,143 @@ func clusterConfigForACLCreate(aclConfigPath string) (string, error) {
202204
),
203205
)
204206
}
207+
208+
func createUsersCmd() *cobra.Command {
209+
cmd := &cobra.Command{
210+
Use: "user [user configs]",
211+
Short: "creates SCRAM users from configuration files",
212+
Args: cobra.MinimumNArgs(1),
213+
RunE: createUserRun,
214+
PreRunE: createPreRun,
215+
}
216+
217+
return cmd
218+
}
219+
220+
func createUserRun(cmd *cobra.Command, args []string) error {
221+
ctx, cancel := context.WithCancel(context.Background())
222+
defer cancel()
223+
224+
sigChan := make(chan os.Signal, 1)
225+
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
226+
go func() {
227+
<-sigChan
228+
cancel()
229+
}()
230+
231+
// Keep a cache of the admin clients with the cluster config path as the key
232+
adminClients := map[string]admin.Client{}
233+
234+
defer func() {
235+
for _, adminClient := range adminClients {
236+
adminClient.Close()
237+
}
238+
}()
239+
240+
matchCount := 0
241+
242+
for _, arg := range args {
243+
if createConfig.pathPrefix != "" && !filepath.IsAbs(arg) {
244+
arg = filepath.Join(createConfig.pathPrefix, arg)
245+
}
246+
247+
matches, err := filepath.Glob(arg)
248+
if err != nil {
249+
return err
250+
}
251+
252+
for _, match := range matches {
253+
matchCount++
254+
if err := createUser(ctx, match, adminClients); err != nil {
255+
return err
256+
}
257+
}
258+
}
259+
260+
if matchCount == 0 {
261+
return fmt.Errorf("No user configs match the provided args (%+v)", args)
262+
}
263+
264+
return nil
265+
}
266+
267+
func createUser(
268+
ctx context.Context,
269+
userConfigPath string,
270+
adminClients map[string]admin.Client,
271+
) error {
272+
clusterConfigPath, err := clusterConfigForUserCreate(userConfigPath)
273+
if err != nil {
274+
return err
275+
}
276+
277+
userConfigs, err := config.LoadUsersFile(userConfigPath)
278+
if err != nil {
279+
return err
280+
}
281+
282+
clusterConfig, err := config.LoadClusterFile(clusterConfigPath, createConfig.shared.expandEnv)
283+
if err != nil {
284+
return err
285+
}
286+
287+
adminClient, ok := adminClients[clusterConfigPath]
288+
if !ok {
289+
adminClient, err = clusterConfig.NewAdminClient(
290+
ctx,
291+
nil,
292+
config.AdminClientOpts{
293+
ReadOnly: createConfig.dryRun,
294+
UsernameOverride: createConfig.shared.saslUsername,
295+
PasswordOverride: createConfig.shared.saslPassword,
296+
SecretsManagerArnOverride: createConfig.shared.saslSecretsManagerArn,
297+
},
298+
)
299+
if err != nil {
300+
return err
301+
}
302+
adminClients[clusterConfigPath] = adminClient
303+
}
304+
305+
for _, userConfig := range userConfigs {
306+
userConfig.SetDefaults()
307+
log.Infof(
308+
"Processing user %s in config %s with cluster config %s",
309+
userConfig.Meta.Name,
310+
userConfigPath,
311+
clusterConfigPath,
312+
)
313+
314+
userCreatorConfig := create.UserCreatorConfig{
315+
DryRun: createConfig.dryRun,
316+
SkipConfirm: createConfig.skipConfirm,
317+
UserConfig: userConfig,
318+
ClusterConfig: clusterConfig,
319+
}
320+
321+
userCreator, err := create.NewUserCreator(ctx, adminClient, userCreatorConfig)
322+
if err != nil {
323+
return err
324+
}
325+
326+
if err := userCreator.Create(ctx); err != nil {
327+
return err
328+
}
329+
}
330+
331+
return nil
332+
}
333+
334+
func clusterConfigForUserCreate(userConfigPath string) (string, error) {
335+
if createConfig.shared.clusterConfig != "" {
336+
return createConfig.shared.clusterConfig, nil
337+
}
338+
339+
return filepath.Abs(
340+
filepath.Join(
341+
filepath.Dir(userConfigPath),
342+
"..",
343+
"cluster.yaml",
344+
),
345+
)
346+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
meta:
2+
name: test-user
3+
cluster: auth-cluster
4+
region: us-west-2
5+
environment: dev
6+
description: A sample SCRAM user for testing
7+
spec:
8+
name: test-user
9+
mechanism: scram-sha-256
10+
password: "secure-password-123"
11+
iterations: 4096
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
meta:
2+
name: app-user
3+
cluster: auth-cluster
4+
region: us-west-2
5+
environment: dev
6+
description: Application service user
7+
spec:
8+
name: app-user
9+
mechanism: scram-sha-256
10+
password: "app-secret-123"
11+
iterations: 4096
12+
---
13+
meta:
14+
name: admin-user
15+
cluster: auth-cluster
16+
region: us-west-2
17+
environment: dev
18+
description: Admin user with higher security
19+
spec:
20+
name: admin-user
21+
mechanism: scram-sha-512
22+
password: "admin-secret-456"
23+
iterations: 8192

pkg/config/load.go

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,27 +125,65 @@ func LoadACLBytes(contents []byte) (ACLConfig, error) {
125125
return config, err
126126
}
127127

128-
// CheckConsistency verifies that the argument topic config is consistent with the argument
128+
// LoadUsersFile loads one or more UserConfigs from a path to a YAML file.
129+
func LoadUsersFile(path string) ([]UserConfig, error) {
130+
contents, err := os.ReadFile(path)
131+
if err != nil {
132+
return nil, err
133+
}
134+
135+
contents = []byte(os.ExpandEnv(string(contents)))
136+
137+
trimmedFile := strings.TrimSpace(string(contents))
138+
userStrs := sep.Split(trimmedFile, -1)
139+
140+
userConfigs := []UserConfig{}
141+
142+
for _, userStr := range userStrs {
143+
userStr = strings.TrimSpace(userStr)
144+
if isEmpty(userStr) {
145+
continue
146+
}
147+
148+
userConfig, err := LoadUserBytes([]byte(userStr))
149+
if err != nil {
150+
return nil, err
151+
}
152+
153+
userConfigs = append(userConfigs, userConfig)
154+
}
155+
156+
return userConfigs, nil
157+
}
158+
159+
// LoadUserBytes loads a UserConfig from YAML bytes.
160+
func LoadUserBytes(contents []byte) (UserConfig, error) {
161+
config := UserConfig{}
162+
err := unmarshalYAMLStrict(contents, &config)
163+
return config, err
164+
}
165+
166+
// CheckConsistency verifies that the argument resource config is consistent with the argument
129167
// cluster, e.g. has the same environment and region, etc.
130168
func CheckConsistency(resourceMeta ResourceMeta, clusterConfig ClusterConfig) error {
131169
var err error
132170

133171
if resourceMeta.Cluster != clusterConfig.Meta.Name {
134172
err = multierror.Append(
135173
err,
136-
errors.New("Topic cluster name does not match name in cluster config"),
174+
errors.New("Resource cluster name does not match name in cluster config"),
137175
)
138176
}
139177
if resourceMeta.Environment != clusterConfig.Meta.Environment {
140178
err = multierror.Append(
141179
err,
142-
errors.New("Topic environment does not match cluster environment"),
180+
errors.New("Resource environment does not match cluster environment"),
143181
)
144182
}
145183
if resourceMeta.Region != clusterConfig.Meta.Region {
146184
err = multierror.Append(
147185
err,
148-
errors.New("Topic region does not match cluster region"),
186+
errors.New("Resource region does not match cluster region"),
149187
)
150188
}
151189

pkg/config/user.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package config
2+
3+
import (
4+
"errors"
5+
"strings"
6+
7+
"github.com/hashicorp/go-multierror"
8+
"github.com/segmentio/kafka-go"
9+
)
10+
11+
// UserConfig represents the configuration for a Kafka SCRAM user.
12+
type UserConfig struct {
13+
Meta ResourceMeta `json:"meta"`
14+
Spec UserSpec `json:"spec"`
15+
}
16+
17+
// UserSpec contains the specification for a Kafka SCRAM user.
18+
type UserSpec struct {
19+
// Name is the username for the SCRAM user
20+
Name string `json:"name"`
21+
22+
// Mechanism is the SCRAM mechanism (scram-sha-256 or scram-sha-512)
23+
Mechanism string `json:"mechanism"`
24+
25+
// Password is the plain-text password that will be used to generate SCRAM credentials
26+
Password string `json:"password"`
27+
28+
// Iterations is the number of iterations to use for SCRAM credential generation.
29+
// If not specified, defaults to 4096.
30+
Iterations int `json:"iterations,omitempty"`
31+
}
32+
33+
// SetDefaults sets default values for the user configuration.
34+
func (u *UserConfig) SetDefaults() {
35+
if u.Spec.Iterations == 0 {
36+
u.Spec.Iterations = 4096
37+
}
38+
if u.Spec.Mechanism == "" {
39+
u.Spec.Mechanism = "scram-sha-256"
40+
}
41+
}
42+
43+
// Validate evaluates whether the user config is valid.
44+
func (u *UserConfig) Validate() error {
45+
var err error
46+
47+
// Validate metadata
48+
if metaErr := u.Meta.Validate(); metaErr != nil {
49+
err = multierror.Append(err, metaErr)
50+
}
51+
52+
// Validate user spec
53+
if u.Spec.Name == "" {
54+
err = multierror.Append(err, errors.New("User name cannot be empty"))
55+
}
56+
57+
if u.Spec.Password == "" {
58+
err = multierror.Append(err, errors.New("User password cannot be empty"))
59+
}
60+
61+
// Validate SCRAM mechanism
62+
mechanism := strings.ToLower(strings.ReplaceAll(u.Spec.Mechanism, "_", "-"))
63+
if mechanism != "scram-sha-256" && mechanism != "scram-sha-512" {
64+
err = multierror.Append(
65+
err,
66+
errors.New("User mechanism must be either 'scram-sha-256' or 'scram-sha-512'"),
67+
)
68+
}
69+
70+
// Validate iterations
71+
if u.Spec.Iterations < 1000 {
72+
err = multierror.Append(
73+
err,
74+
errors.New("User iterations must be at least 1000 for security"),
75+
)
76+
}
77+
78+
return err
79+
}
80+
81+
// ToScramMechanism converts the mechanism string to kafka-go ScramMechanism.
82+
func (u *UserConfig) ToScramMechanism() kafka.ScramMechanism {
83+
mechanism := strings.ToLower(strings.ReplaceAll(u.Spec.Mechanism, "_", "-"))
84+
switch mechanism {
85+
case "scram-sha-256":
86+
return kafka.ScramMechanismSha256
87+
case "scram-sha-512":
88+
return kafka.ScramMechanismSha512
89+
default:
90+
// Default to SHA-256 if unknown
91+
return kafka.ScramMechanismSha256
92+
}
93+
}

0 commit comments

Comments
 (0)