Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -901,8 +901,9 @@ Typically, Kafka certificates come in the JKS format, which isn't supported by r

#### Authentication using SASL

Kafka has support for using SASL to authenticate clients. Currently GSSAPI and PLAIN mechanisms are supported by ruby-kafka.
Kafka has support for using SASL to authenticate clients. Currently GSSAPI, SCRAM and PLAIN mechanisms are supported by ruby-kafka.

##### GSSAPI
In order to authenticate using GSSAPI, set your principal and optionally your keytab when initializing the Kafka client:

```ruby
Expand All @@ -913,6 +914,7 @@ kafka = Kafka.new(
)
```

##### PLAIN
In order to authenticate using PLAIN, you must set your username and password when initializing the Kafka client:

```ruby
Expand All @@ -926,6 +928,18 @@ kafka = Kafka.new(

**NOTE**: It is __highly__ recommended that you use SSL for encryption when using SASL_PLAIN

##### SCRAM
Since 0.11 kafka supports [SCRAM](https://kafka.apache.org/documentation.html#security_sasl_scram).

```ruby
kafka = Kafka.new(
sasl_scram_username: 'username',
sasl_scram_password: 'password',
sasl_scram_mechanism: 'sha256',
# ...
)
```

## Design

The library has been designed as a layered system, with each layer having a clear responsibility:
Expand Down
6 changes: 6 additions & 0 deletions lib/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ class OffsetCommitError < Error
class FetchError < Error
end

class SaslScramError < Error
end

class FailedScramAuthentication < SaslScramError
end

# Initializes a new Kafka client.
#
# @see Client#initialize
Expand Down
15 changes: 14 additions & 1 deletion lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,36 @@ class Client
#
# @param sasl_gssapi_keytab [String, nil] a KRB5 keytab filepath
#
# @param sasl_scram_username [String, nil] SCRAM username
#
# @param sasl_scram_password [String, nil] SCRAM password
#
# @param sasl_scram_mechanism [String, nil] Scram mechanism ("sha256", "sha512")
#
# @param use_ssl [Booleanm false] Use SSL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this parameter? The decision on whether to use SSL is already done here:

def build_ssl_context(ca_cert_file_path, ca_cert, client_cert, client_cert_key)
return nil unless ca_cert_file_path || ca_cert || client_cert || client_cert_key

#
# @return [Client]
def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil,
ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil,
sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil)
sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil,
use_ssl: false, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil)
@logger = logger || Logger.new(nil)
@instrumenter = Instrumenter.new(client_id: client_id)
@seed_brokers = normalize_seed_brokers(seed_brokers)

ssl_context = build_ssl_context(ssl_ca_cert_file_path, ssl_ca_cert, ssl_client_cert, ssl_client_cert_key)
ssl_context = OpenSSL::SSL::SSLContext.new if use_ssl and !ssl_context

sasl_authenticator = SaslAuthenticator.new(
sasl_gssapi_principal: sasl_gssapi_principal,
sasl_gssapi_keytab: sasl_gssapi_keytab,
sasl_plain_authzid: sasl_plain_authzid,
sasl_plain_username: sasl_plain_username,
sasl_plain_password: sasl_plain_password,
sasl_scram_username: sasl_scram_username,
sasl_scram_password: sasl_scram_password,
sasl_scram_mechanism: sasl_scram_mechanism,
logger: @logger
)

Expand Down
2 changes: 1 addition & 1 deletion lib/kafka/protocol/sasl_handshake_request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Protocol

class SaslHandshakeRequest

SUPPORTED_MECHANISMS = %w(GSSAPI PLAIN)
SUPPORTED_MECHANISMS = %w(GSSAPI PLAIN SCRAM-SHA-256 SCRAM-SHA-512)

def initialize(mechanism)
unless SUPPORTED_MECHANISMS.include?(mechanism)
Expand Down
34 changes: 29 additions & 5 deletions lib/kafka/sasl_authenticator.rb
Original file line number Diff line number Diff line change
@@ -1,50 +1,74 @@
require 'kafka/sasl_gssapi_authenticator'
require 'kafka/sasl_plain_authenticator'
require 'kafka/sasl_scram_authenticator'

module Kafka
class SaslAuthenticator
def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:, sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:)
def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:,
sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:,
sasl_scram_username:, sasl_scram_password:, sasl_scram_mechanism:)
@logger = logger
@sasl_gssapi_principal = sasl_gssapi_principal
@sasl_gssapi_keytab = sasl_gssapi_keytab
@sasl_plain_authzid = sasl_plain_authzid
@sasl_plain_username = sasl_plain_username
@sasl_plain_password = sasl_plain_password
@sasl_scram_username = sasl_scram_username
@sasl_scram_password = sasl_scram_password
@sasl_scram_mechanism = sasl_scram_mechanism
end

def authenticate!(connection)
if authenticate_using_sasl_gssapi?
sasl_gssapi_authenticate(connection)
elsif authenticate_using_sasl_plain?
sasl_plain_authenticate(connection)
elsif authenticate_using_sasl_scram?
sasl_scram_authenticate(connection)
end
end

private

def sasl_scram_authenticate(connection)
auth = SaslScramAuthenticator.new(
@sasl_scram_username,
@sasl_scram_password,
logger: @logger,
mechanism: @sasl_scram_mechanism,
connection: connection
)

auth.authenticate!
end

def sasl_gssapi_authenticate(connection)
auth = SaslGssapiAuthenticator.new(
connection: connection,
logger: @logger,
sasl_gssapi_principal: @sasl_gssapi_principal,
sasl_gssapi_keytab: @sasl_gssapi_keytab
sasl_gssapi_keytab: @sasl_gssapi_keytab,
connection: connection
)

auth.authenticate!
end

def sasl_plain_authenticate(connection)
auth = SaslPlainAuthenticator.new(
connection: connection,
logger: @logger,
authzid: @sasl_plain_authzid,
username: @sasl_plain_username,
password: @sasl_plain_password
password: @sasl_plain_password,
connection: connection
)

auth.authenticate!
end

def authenticate_using_sasl_scram?
@sasl_scram_username && @sasl_scram_password
end

def authenticate_using_sasl_gssapi?
!@ssl_context && @sasl_gssapi_principal && !@sasl_gssapi_principal.empty?
end
Expand Down
183 changes: 183 additions & 0 deletions lib/kafka/sasl_scram_authenticator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
require 'securerandom'
require 'base64'

module Kafka

class SaslScramAuthenticator
MECHANISMS = {
sha256: {
mechanism: 'SHA-256'
},
sha512: {
mechanism: 'SHA-512'
}
}.freeze

VALID_MECHANISMS = %w{sha256 sha512}.freeze

def initialize(username, password, mechanism: 'sha256', logger: nil, connection:)
unless VALID_MECHANISMS.include?(mechanism)
raise Kafka::SaslScramError, "SCRAM mechanism #{mechanism} is not supported."
end
@username = username
@password = password
@mechanism = MECHANISMS[mechanism.to_sym][:mechanism]
@logger = logger
@connection = connection
end

def authenticate!
response = @connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new('SCRAM-' + @mechanism))

unless response.error_code == 0 && response.enabled_mechanisms.include?('SCRAM-' + @mechanism)
raise Kafka::SaslScramError, "SCRAM-#{@mechanism} is not supported."
end

log_debug "Authenticating #{@username} with SASL SCRAM #{@mechanism}"

@encoder = @connection.encoder
@decoder = @connection.decoder

begin
msg = first_message
log_debug "Sending first client SASL SCRAM message: #{msg}"
@encoder.write_bytes(msg)

@server_first_message = @decoder.bytes
log_debug "Received first server SASL SCRAM message: #{@server_first_message}"

msg = final_message
log_debug "Sending final client SASL SCRAM message: #{msg}"
@encoder.write_bytes(msg)

response = parse_response(@decoder.bytes)
log_debug "Received last server SASL SCRAM message: #{response}"

raise FailedScramAuthentication, response['e'] if response['e']
raise FailedScramAuthentication, 'Invalid server signature' if response['v'] != server_signature
rescue EOFError => e
raise FailedScramAuthentication, e.message
end
log_debug "SASL SCRAM authentication successful"
end

private

def log_debug(str)
@logger.debug str if @logger
end

def first_message
"n,,#{first_message_bare}"
end

def first_message_bare
"n=#{encoded_username},r=#{nonce}"
end

def final_message_without_proof
"c=biws,r=#{rnonce}"
end

def final_message
"#{final_message_without_proof},p=#{client_proof}"
end

def server_data
parse_response(@server_first_message)
end

def rnonce
server_data['r']
end

def salt
Base64.strict_decode64(server_data['s'])
end

def iterations
server_data['i'].to_i
end

def auth_message
msg = [first_message_bare, @server_first_message, final_message_without_proof].join(',')
end

def salted_password
hi(@password, salt, iterations)
end

def client_key
hmac(salted_password, 'Client Key')
end

def stored_key
h(client_key)
end

def server_key
hmac(salted_password, 'Server Key')
end

def client_signature
hmac(stored_key, auth_message)
end

def server_signature
Base64.strict_encode64(hmac(server_key, auth_message))
end

def client_proof
Base64.strict_encode64(xor(client_key, client_signature))
end

def h(str)
digest.digest(str)
end

def hi(str, salt, iterations)
OpenSSL::PKCS5.pbkdf2_hmac(
str,
salt,
iterations,
digest.size,
digest
)
end

def hmac(data, key)
OpenSSL::HMAC.digest(digest, data, key)
end

def xor(first, second)
first.bytes.zip(second.bytes).map { |(a, b)| (a ^ b).chr }.join('')
end

def parse_response(data)
data.split(',').map { |s| s.split('=', 2) }.to_h
end

def encoded_username
safe_str(@username.encode(Encoding::UTF_8))
end

def nonce
@nonce ||= SecureRandom.urlsafe_base64(32)
end

def digest
@digest ||= case @mechanism
when 'SHA-256'
OpenSSL::Digest::SHA256.new.freeze
when 'SHA-512'
OpenSSL::Digest::SHA512.new.freeze
else
raise StandardError, "Unknown mechanism '#{@mechanism}'"
end
end

def safe_str(val)
val.gsub('=', '=3D').gsub(',', '=2C')
end
end
end
Loading