Skip to content

Commit 7d1618e

Browse files
authored
Merge pull request #465 from CloudKarafka/sasl-scram
SASL SCRAM support
2 parents 9bb8ce9 + 609b840 commit 7d1618e

11 files changed

+514
-62
lines changed

README.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -901,8 +901,9 @@ Typically, Kafka certificates come in the JKS format, which isn't supported by r
901901

902902
#### Authentication using SASL
903903

904-
Kafka has support for using SASL to authenticate clients. Currently GSSAPI and PLAIN mechanisms are supported by ruby-kafka.
904+
Kafka has support for using SASL to authenticate clients. Currently GSSAPI, SCRAM and PLAIN mechanisms are supported by ruby-kafka.
905905

906+
##### GSSAPI
906907
In order to authenticate using GSSAPI, set your principal and optionally your keytab when initializing the Kafka client:
907908

908909
```ruby
@@ -913,6 +914,7 @@ kafka = Kafka.new(
913914
)
914915
```
915916

917+
##### PLAIN
916918
In order to authenticate using PLAIN, you must set your username and password when initializing the Kafka client:
917919

918920
```ruby
@@ -926,6 +928,18 @@ kafka = Kafka.new(
926928

927929
**NOTE**: It is __highly__ recommended that you use SSL for encryption when using SASL_PLAIN
928930

931+
##### SCRAM
932+
Since 0.11 kafka supports [SCRAM](https://kafka.apache.org/documentation.html#security_sasl_scram).
933+
934+
```ruby
935+
kafka = Kafka.new(
936+
sasl_scram_username: 'username',
937+
sasl_scram_password: 'password',
938+
sasl_scram_mechanism: 'sha256',
939+
# ...
940+
)
941+
```
942+
929943
## Design
930944

931945
The library has been designed as a layered system, with each layer having a clear responsibility:

lib/kafka.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,12 @@ class OffsetCommitError < Error
229229
class FetchError < Error
230230
end
231231

232+
class SaslScramError < Error
233+
end
234+
235+
class FailedScramAuthentication < SaslScramError
236+
end
237+
232238
# Initializes a new Kafka client.
233239
#
234240
# @see Client#initialize

lib/kafka/client.rb

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,23 +49,36 @@ class Client
4949
#
5050
# @param sasl_gssapi_keytab [String, nil] a KRB5 keytab filepath
5151
#
52+
# @param sasl_scram_username [String, nil] SCRAM username
53+
#
54+
# @param sasl_scram_password [String, nil] SCRAM password
55+
#
56+
# @param sasl_scram_mechanism [String, nil] Scram mechanism ("sha256", "sha512")
57+
#
58+
# @param use_ssl [Booleanm false] Use SSL
59+
#
5260
# @return [Client]
5361
def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil,
5462
ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
5563
sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil,
56-
sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil)
64+
sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil,
65+
use_ssl: false, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil)
5766
@logger = logger || Logger.new(nil)
5867
@instrumenter = Instrumenter.new(client_id: client_id)
5968
@seed_brokers = normalize_seed_brokers(seed_brokers)
6069

6170
ssl_context = build_ssl_context(ssl_ca_cert_file_path, ssl_ca_cert, ssl_client_cert, ssl_client_cert_key)
71+
ssl_context = OpenSSL::SSL::SSLContext.new if use_ssl and !ssl_context
6272

6373
sasl_authenticator = SaslAuthenticator.new(
6474
sasl_gssapi_principal: sasl_gssapi_principal,
6575
sasl_gssapi_keytab: sasl_gssapi_keytab,
6676
sasl_plain_authzid: sasl_plain_authzid,
6777
sasl_plain_username: sasl_plain_username,
6878
sasl_plain_password: sasl_plain_password,
79+
sasl_scram_username: sasl_scram_username,
80+
sasl_scram_password: sasl_scram_password,
81+
sasl_scram_mechanism: sasl_scram_mechanism,
6982
logger: @logger
7083
)
7184

lib/kafka/protocol/sasl_handshake_request.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ module Protocol
66

77
class SaslHandshakeRequest
88

9-
SUPPORTED_MECHANISMS = %w(GSSAPI PLAIN)
9+
SUPPORTED_MECHANISMS = %w(GSSAPI PLAIN SCRAM-SHA-256 SCRAM-SHA-512)
1010

1111
def initialize(mechanism)
1212
unless SUPPORTED_MECHANISMS.include?(mechanism)

lib/kafka/sasl_authenticator.rb

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,74 @@
11
require 'kafka/sasl_gssapi_authenticator'
22
require 'kafka/sasl_plain_authenticator'
3+
require 'kafka/sasl_scram_authenticator'
34

45
module Kafka
56
class SaslAuthenticator
6-
def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:, sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:)
7+
def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:,
8+
sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:,
9+
sasl_scram_username:, sasl_scram_password:, sasl_scram_mechanism:)
710
@logger = logger
811
@sasl_gssapi_principal = sasl_gssapi_principal
912
@sasl_gssapi_keytab = sasl_gssapi_keytab
1013
@sasl_plain_authzid = sasl_plain_authzid
1114
@sasl_plain_username = sasl_plain_username
1215
@sasl_plain_password = sasl_plain_password
16+
@sasl_scram_username = sasl_scram_username
17+
@sasl_scram_password = sasl_scram_password
18+
@sasl_scram_mechanism = sasl_scram_mechanism
1319
end
1420

1521
def authenticate!(connection)
1622
if authenticate_using_sasl_gssapi?
1723
sasl_gssapi_authenticate(connection)
1824
elsif authenticate_using_sasl_plain?
1925
sasl_plain_authenticate(connection)
26+
elsif authenticate_using_sasl_scram?
27+
sasl_scram_authenticate(connection)
2028
end
2129
end
2230

2331
private
2432

33+
def sasl_scram_authenticate(connection)
34+
auth = SaslScramAuthenticator.new(
35+
@sasl_scram_username,
36+
@sasl_scram_password,
37+
logger: @logger,
38+
mechanism: @sasl_scram_mechanism,
39+
connection: connection
40+
)
41+
42+
auth.authenticate!
43+
end
44+
2545
def sasl_gssapi_authenticate(connection)
2646
auth = SaslGssapiAuthenticator.new(
27-
connection: connection,
2847
logger: @logger,
2948
sasl_gssapi_principal: @sasl_gssapi_principal,
30-
sasl_gssapi_keytab: @sasl_gssapi_keytab
49+
sasl_gssapi_keytab: @sasl_gssapi_keytab,
50+
connection: connection
3151
)
3252

3353
auth.authenticate!
3454
end
3555

3656
def sasl_plain_authenticate(connection)
3757
auth = SaslPlainAuthenticator.new(
38-
connection: connection,
3958
logger: @logger,
4059
authzid: @sasl_plain_authzid,
4160
username: @sasl_plain_username,
42-
password: @sasl_plain_password
61+
password: @sasl_plain_password,
62+
connection: connection
4363
)
4464

4565
auth.authenticate!
4666
end
4767

68+
def authenticate_using_sasl_scram?
69+
@sasl_scram_username && @sasl_scram_password
70+
end
71+
4872
def authenticate_using_sasl_gssapi?
4973
!@ssl_context && @sasl_gssapi_principal && !@sasl_gssapi_principal.empty?
5074
end
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
require 'securerandom'
2+
require 'base64'
3+
4+
module Kafka
5+
6+
class SaslScramAuthenticator
7+
MECHANISMS = {
8+
sha256: {
9+
mechanism: 'SHA-256'
10+
},
11+
sha512: {
12+
mechanism: 'SHA-512'
13+
}
14+
}.freeze
15+
16+
VALID_MECHANISMS = %w{sha256 sha512}.freeze
17+
18+
def initialize(username, password, mechanism: 'sha256', logger: nil, connection:)
19+
unless VALID_MECHANISMS.include?(mechanism)
20+
raise Kafka::SaslScramError, "SCRAM mechanism #{mechanism} is not supported."
21+
end
22+
@username = username
23+
@password = password
24+
@mechanism = MECHANISMS[mechanism.to_sym][:mechanism]
25+
@logger = logger
26+
@connection = connection
27+
end
28+
29+
def authenticate!
30+
response = @connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new('SCRAM-' + @mechanism))
31+
32+
unless response.error_code == 0 && response.enabled_mechanisms.include?('SCRAM-' + @mechanism)
33+
raise Kafka::SaslScramError, "SCRAM-#{@mechanism} is not supported."
34+
end
35+
36+
log_debug "Authenticating #{@username} with SASL SCRAM #{@mechanism}"
37+
38+
@encoder = @connection.encoder
39+
@decoder = @connection.decoder
40+
41+
begin
42+
msg = first_message
43+
log_debug "Sending first client SASL SCRAM message: #{msg}"
44+
@encoder.write_bytes(msg)
45+
46+
@server_first_message = @decoder.bytes
47+
log_debug "Received first server SASL SCRAM message: #{@server_first_message}"
48+
49+
msg = final_message
50+
log_debug "Sending final client SASL SCRAM message: #{msg}"
51+
@encoder.write_bytes(msg)
52+
53+
response = parse_response(@decoder.bytes)
54+
log_debug "Received last server SASL SCRAM message: #{response}"
55+
56+
raise FailedScramAuthentication, response['e'] if response['e']
57+
raise FailedScramAuthentication, 'Invalid server signature' if response['v'] != server_signature
58+
rescue EOFError => e
59+
raise FailedScramAuthentication, e.message
60+
end
61+
log_debug "SASL SCRAM authentication successful"
62+
end
63+
64+
private
65+
66+
def log_debug(str)
67+
@logger.debug str if @logger
68+
end
69+
70+
def first_message
71+
"n,,#{first_message_bare}"
72+
end
73+
74+
def first_message_bare
75+
"n=#{encoded_username},r=#{nonce}"
76+
end
77+
78+
def final_message_without_proof
79+
"c=biws,r=#{rnonce}"
80+
end
81+
82+
def final_message
83+
"#{final_message_without_proof},p=#{client_proof}"
84+
end
85+
86+
def server_data
87+
parse_response(@server_first_message)
88+
end
89+
90+
def rnonce
91+
server_data['r']
92+
end
93+
94+
def salt
95+
Base64.strict_decode64(server_data['s'])
96+
end
97+
98+
def iterations
99+
server_data['i'].to_i
100+
end
101+
102+
def auth_message
103+
msg = [first_message_bare, @server_first_message, final_message_without_proof].join(',')
104+
end
105+
106+
def salted_password
107+
hi(@password, salt, iterations)
108+
end
109+
110+
def client_key
111+
hmac(salted_password, 'Client Key')
112+
end
113+
114+
def stored_key
115+
h(client_key)
116+
end
117+
118+
def server_key
119+
hmac(salted_password, 'Server Key')
120+
end
121+
122+
def client_signature
123+
hmac(stored_key, auth_message)
124+
end
125+
126+
def server_signature
127+
Base64.strict_encode64(hmac(server_key, auth_message))
128+
end
129+
130+
def client_proof
131+
Base64.strict_encode64(xor(client_key, client_signature))
132+
end
133+
134+
def h(str)
135+
digest.digest(str)
136+
end
137+
138+
def hi(str, salt, iterations)
139+
OpenSSL::PKCS5.pbkdf2_hmac(
140+
str,
141+
salt,
142+
iterations,
143+
digest.size,
144+
digest
145+
)
146+
end
147+
148+
def hmac(data, key)
149+
OpenSSL::HMAC.digest(digest, data, key)
150+
end
151+
152+
def xor(first, second)
153+
first.bytes.zip(second.bytes).map { |(a, b)| (a ^ b).chr }.join('')
154+
end
155+
156+
def parse_response(data)
157+
data.split(',').map { |s| s.split('=', 2) }.to_h
158+
end
159+
160+
def encoded_username
161+
safe_str(@username.encode(Encoding::UTF_8))
162+
end
163+
164+
def nonce
165+
@nonce ||= SecureRandom.urlsafe_base64(32)
166+
end
167+
168+
def digest
169+
@digest ||= case @mechanism
170+
when 'SHA-256'
171+
OpenSSL::Digest::SHA256.new.freeze
172+
when 'SHA-512'
173+
OpenSSL::Digest::SHA512.new.freeze
174+
else
175+
raise StandardError, "Unknown mechanism '#{@mechanism}'"
176+
end
177+
end
178+
179+
def safe_str(val)
180+
val.gsub('=', '=3D').gsub(',', '=2C')
181+
end
182+
end
183+
end

0 commit comments

Comments
 (0)