From caecb0dc51a7b1c35eca69afce91e020b80a2ecd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Tue, 10 Oct 2017 07:02:25 +0200 Subject: [PATCH 01/17] Don't validate sasl type It must be possible to implement new sasl types without having to modify ruby-kafka. Made authenticator parameter name more generic (remove sasl_) --- lib/kafka/client.rb | 9 +++++---- lib/kafka/connection.rb | 6 +++--- lib/kafka/connection_builder.rb | 6 +++--- lib/kafka/protocol/sasl_handshake_request.rb | 5 ----- spec/connection_spec.rb | 6 +++--- spec/protocol/sasl_handshake_request_spec.rb | 16 ---------------- spec/sasl_plain_authenticator_spec.rb | 4 ++-- 7 files changed, 16 insertions(+), 36 deletions(-) diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 614080b2b..5a49c10f4 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -53,14 +53,15 @@ class Client def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, - sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil) + sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, + authenticator: nil, ssl_context: nil) @logger = logger || Logger.new(nil) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) - ssl_context = build_ssl_context(ssl_ca_cert_file_path, ssl_ca_cert, ssl_client_cert, ssl_client_cert_key) + ssl_context = ssl_context || build_ssl_context(ssl_ca_cert_file_path, ssl_ca_cert, ssl_client_cert, ssl_client_cert_key) - sasl_authenticator = SaslAuthenticator.new( + authenticator ||= SaslAuthenticator.new( sasl_gssapi_principal: sasl_gssapi_principal, sasl_gssapi_keytab: sasl_gssapi_keytab, sasl_plain_authzid: sasl_plain_authzid, @@ -76,7 +77,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ssl_context: ssl_context, logger: @logger, instrumenter: @instrumenter, - sasl_authenticator: sasl_authenticator + authenticator: authenticator ) @cluster = initialize_cluster diff --git a/lib/kafka/connection.rb b/lib/kafka/connection.rb index f04ec8698..48a2f295d 100644 --- a/lib/kafka/connection.rb +++ b/lib/kafka/connection.rb @@ -48,7 +48,7 @@ class Connection # broker. Default is 10 seconds. # # @return [Connection] a new connection. - def initialize(host:, port:, client_id:, logger:, instrumenter:, sasl_authenticator:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) + def initialize(host:, port:, client_id:, logger:, instrumenter:, authenticator:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) @host, @port, @client_id = host, port, client_id @logger = logger @instrumenter = instrumenter @@ -56,7 +56,7 @@ def initialize(host:, port:, client_id:, logger:, instrumenter:, sasl_authentica @connect_timeout = connect_timeout || CONNECT_TIMEOUT @socket_timeout = socket_timeout || SOCKET_TIMEOUT @ssl_context = ssl_context - @sasl_authenticator = sasl_authenticator + @authenticator = authenticator end def address_match?(host, port) @@ -137,7 +137,7 @@ def open @correlation_id = 0 @last_request = nil - @sasl_authenticator.authenticate!(self) + @authenticator.authenticate!(self) rescue Errno::ETIMEDOUT => e @logger.error "Timed out while trying to connect to #{self}: #{e}" raise ConnectionError, e diff --git a/lib/kafka/connection_builder.rb b/lib/kafka/connection_builder.rb index 59448393a..31d36d1e8 100644 --- a/lib/kafka/connection_builder.rb +++ b/lib/kafka/connection_builder.rb @@ -1,13 +1,13 @@ module Kafka class ConnectionBuilder - def initialize(client_id:, logger:, instrumenter:, connect_timeout:, socket_timeout:, ssl_context:, sasl_authenticator:) + def initialize(client_id:, logger:, instrumenter:, connect_timeout:, socket_timeout:, ssl_context:, authenticator:) @client_id = client_id @logger = logger @instrumenter = instrumenter @connect_timeout = connect_timeout @socket_timeout = socket_timeout @ssl_context = ssl_context - @sasl_authenticator = sasl_authenticator + @authenticator = authenticator end def build_connection(host, port) @@ -20,7 +20,7 @@ def build_connection(host, port) logger: @logger, instrumenter: @instrumenter, ssl_context: @ssl_context, - sasl_authenticator: @sasl_authenticator + authenticator: @authenticator ) connection diff --git a/lib/kafka/protocol/sasl_handshake_request.rb b/lib/kafka/protocol/sasl_handshake_request.rb index c34ae5667..9c49ee139 100644 --- a/lib/kafka/protocol/sasl_handshake_request.rb +++ b/lib/kafka/protocol/sasl_handshake_request.rb @@ -6,12 +6,7 @@ module Protocol class SaslHandshakeRequest - SUPPORTED_MECHANISMS = %w(GSSAPI PLAIN) - def initialize(mechanism) - unless SUPPORTED_MECHANISMS.include?(mechanism) - raise Kafka::Error, "Unsupported SASL mechanism #{mechanism}. Supported are #{SUPPORTED_MECHANISMS.join(', ')}" - end @mechanism = mechanism end diff --git a/spec/connection_spec.rb b/spec/connection_spec.rb index f1331cb9c..2f7ab6b23 100644 --- a/spec/connection_spec.rb +++ b/spec/connection_spec.rb @@ -6,7 +6,7 @@ let(:server) { TCPServer.new(host, 0) } let(:port) { server.addr[1] } - let(:sasl_authenticator) { + let(:authenticator) { instance_double(Kafka::SaslAuthenticator, authenticate!: true) } let(:connection) { @@ -18,7 +18,7 @@ instrumenter: Kafka::Instrumenter.new(client_id: "test"), connect_timeout: 0.1, socket_timeout: 0.1, - sasl_authenticator: sasl_authenticator + authenticator: authenticator ) } @@ -91,7 +91,7 @@ end it "calls authenticate when a new connection is open" do - expect(sasl_authenticator).to receive(:authenticate!).with(connection).once + expect(authenticator).to receive(:authenticate!).with(connection).once response = connection.send_request(request) connection.send_request(request) diff --git a/spec/protocol/sasl_handshake_request_spec.rb b/spec/protocol/sasl_handshake_request_spec.rb index bd0901e96..1e019355a 100644 --- a/spec/protocol/sasl_handshake_request_spec.rb +++ b/spec/protocol/sasl_handshake_request_spec.rb @@ -5,20 +5,4 @@ expect(request.api_key).to eq 17 end end - - describe "#initialize" do - context "#supported" do - it "allows GSSAPI" do - expect { Kafka::Protocol::SaslHandshakeRequest.new('GSSAPI') }.not_to raise_error - end - it "allows PLAIN" do - expect { Kafka::Protocol::SaslHandshakeRequest.new('PLAIN') }.not_to raise_error - end - end - context "#unsupported" do - it "reject unknown handshake" do - expect { Kafka::Protocol::SaslHandshakeRequest.new('Unsupported') }.to raise_error Kafka::Error - end - end - end end diff --git a/spec/sasl_plain_authenticator_spec.rb b/spec/sasl_plain_authenticator_spec.rb index 4585bf6d3..dfd8752ff 100644 --- a/spec/sasl_plain_authenticator_spec.rb +++ b/spec/sasl_plain_authenticator_spec.rb @@ -5,7 +5,7 @@ let(:host) { "127.0.0.1" } let(:server) { TCPServer.new(host, 0) } let(:port) { server.addr[1] } - let(:sasl_authenticator) { + let(:authenticator) { instance_double(Kafka::SaslAuthenticator, authenticate!: true) } @@ -18,7 +18,7 @@ instrumenter: Kafka::Instrumenter.new(client_id: "test"), connect_timeout: 0.1, socket_timeout: 0.1, - sasl_authenticator: sasl_authenticator + authenticator: authenticator ) } From 984d4519f61fe59a5e3afed4e01d196b1c5a08cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Tue, 10 Oct 2017 08:01:15 +0200 Subject: [PATCH 02/17] Added sasl scram authenticator --- lib/kafka.rb | 9 ++ lib/kafka/sasl_scram_authenticator.rb | 170 ++++++++++++++++++++++++++ 2 files changed, 179 insertions(+) create mode 100644 lib/kafka/sasl_scram_authenticator.rb diff --git a/lib/kafka.rb b/lib/kafka.rb index 4cbb7143c..ddeafab04 100644 --- a/lib/kafka.rb +++ b/lib/kafka.rb @@ -225,6 +225,15 @@ class OffsetCommitError < Error class FetchError < Error end + class NoPartitionsAssignedError < Error + end + + class SaslScramError < Error + end + + class FailedScramAuthentication < SaslScramError + end + # Initializes a new Kafka client. # # @see Client#initialize diff --git a/lib/kafka/sasl_scram_authenticator.rb b/lib/kafka/sasl_scram_authenticator.rb new file mode 100644 index 000000000..2e35b7a36 --- /dev/null +++ b/lib/kafka/sasl_scram_authenticator.rb @@ -0,0 +1,170 @@ +require 'securerandom' +require 'base64' + +module Kafka + SCRAM_SHA256 = 'SHA-256'.freeze + SCRAM_SHA512 = 'SHA-512'.freeze + class SaslScramAuthenticator + def initialize(username, password, mechanism: SCRAM_SHA256, logger: nil) + @username = username + @password = password + @mechanism = mechanism + @logger = logger + end + + def authenticate!(connection) + @connection = connection + response = @connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new('SCRAM-' + @mechanism)) + + unless response.error_code == 0 && response.enabled_mechanisms.include?('SCRAM-' + @mechanism) + raise Kafka::SaslScramError, "SCRAM-#{@mechanism} is not supported." + end + + log_debug "authenticating #{@username} with scram, mechanism: #{@mechanism}" + + @encoder = @connection.encoder + @decoder = @connection.decoder + + msg = first_message + log_debug "[scram] Sending client's first message: #{msg}" + @encoder.write_bytes(msg) + + @server_first_message = @decoder.bytes + log_debug "[scram] Received server's first message: #{@server_first_message}" + + msg = final_message + log_debug "[scram] Sending client's final message: #{msg}" + @encoder.write_bytes(msg) + + response = parse_response(@decoder.bytes) + log_debug "[scram] Received server's final msg: #{response}" + log_debug "[scram] Client calculated server signature: #{server_signature}" + + raise FailedScramAuthentication, response['e'] if response['e'] + raise FailedScramAuthentication, 'Invalid server signature' if response['v'] != server_signature + rescue FailedScramAuthentication + raise + rescue StandardError => e + @logger.error "authentication error #{e.inspect}\n\n#{e.backtrace.join("\n")}" + raise FailedScramAuthentication, 'Authentication failed: Unknown reason' + end + + private + + def log_debug(str) + @logger.debug str if @logger + end + + def first_message + "n,,#{first_message_bare}" + end + + def first_message_bare + "n=#{encoded_username},r=#{nonce}" + end + + def final_message_without_proof + "c=biws,r=#{rnonce}" + end + + def final_message + "#{final_message_without_proof},p=#{client_proof}" + end + + def server_data + parse_response(@server_first_message) + end + + def rnonce + server_data['r'] + end + + def salt + Base64.strict_decode64(server_data['s']) + end + + def iterations + server_data['i'].to_i + end + + def auth_message + [first_message_bare, @server_first_message, final_message_without_proof].join(',') + end + + def salted_password + hi(@password, salt, iterations) + end + + def client_key + hmac(salted_password, 'Client Key') + end + + def stored_key + h(client_key) + end + + def server_key + hmac(salted_password, 'Server Key') + end + + def client_signature + hmac(stored_key, auth_message) + end + + def server_signature + Base64.strict_encode64(hmac(server_key, auth_message)) + end + + def client_proof + Base64.strict_encode64(xor(client_key, client_signature)) + end + + def h(str) + digest.digest(str) + end + + def hi(str, salt, iterations) + OpenSSL::PKCS5.pbkdf2_hmac( + str, + salt, + iterations, + digest.size, + digest) + end + + def hmac(data, key) + OpenSSL::HMAC.digest(digest, data, key) + end + + def xor(first, second) + first.bytes.zip(second.bytes).map{ |(a,b)| (a ^ b).chr }.join('') + end + + def parse_response(data) + data.split(',').map { |s| s.split('=', 2) }.to_h + end + + def encoded_username + safe_str(@username.encode(Encoding::UTF_8)) + end + + def nonce + @nonce ||= SecureRandom.urlsafe_base64(32) + end + + def digest + @digest ||= case @mechanism + when SCRAM_SHA256 + OpenSSL::Digest::SHA256.new.freeze + when SCRAM_SHA512 + OpenSSL::Digest::SHA512.new.freeze + else + raise StandardError, "Unknown mechanism '#{@mechanism}'" + end + end + + def safe_str(val) + val.gsub('=', '=3D').gsub(',', '=2C') + end + end +end From a54e9bb6ce19f8f89b9cc5fc494bf67c620480e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Wed, 25 Oct 2017 12:26:56 +0200 Subject: [PATCH 03/17] Pass connection to authenticate! instead of constructor --- lib/kafka/sasl_authenticator.rb | 6 ++---- lib/kafka/sasl_gssapi_authenticator.rb | 8 ++++---- lib/kafka/sasl_plain_authenticator.rb | 12 ++++++------ spec/sasl_plain_authenticator_spec.rb | 6 ++---- 4 files changed, 14 insertions(+), 18 deletions(-) diff --git a/lib/kafka/sasl_authenticator.rb b/lib/kafka/sasl_authenticator.rb index d1f889524..f56c00d74 100644 --- a/lib/kafka/sasl_authenticator.rb +++ b/lib/kafka/sasl_authenticator.rb @@ -24,25 +24,23 @@ def authenticate!(connection) def sasl_gssapi_authenticate(connection) auth = SaslGssapiAuthenticator.new( - connection: connection, logger: @logger, sasl_gssapi_principal: @sasl_gssapi_principal, sasl_gssapi_keytab: @sasl_gssapi_keytab ) - auth.authenticate! + auth.authenticate!(connection) end def sasl_plain_authenticate(connection) auth = SaslPlainAuthenticator.new( - connection: connection, logger: @logger, authzid: @sasl_plain_authzid, username: @sasl_plain_username, password: @sasl_plain_password ) - auth.authenticate! + auth.authenticate!(connection) end def authenticate_using_sasl_gssapi? diff --git a/lib/kafka/sasl_gssapi_authenticator.rb b/lib/kafka/sasl_gssapi_authenticator.rb index 5ddd3fcd6..ecd60891c 100644 --- a/lib/kafka/sasl_gssapi_authenticator.rb +++ b/lib/kafka/sasl_gssapi_authenticator.rb @@ -3,17 +3,17 @@ class SaslGssapiAuthenticator GSSAPI_IDENT = "GSSAPI" GSSAPI_CONFIDENTIALITY = false - def initialize(connection:, logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:) - @connection = connection + def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:) @logger = logger @principal = sasl_gssapi_principal @keytab = sasl_gssapi_keytab load_gssapi - initialize_gssapi_context end - def authenticate! + def authenticate!(connection) + @connection = connection + initialize_gssapi_context proceed_sasl_gssapi_negotiation end diff --git a/lib/kafka/sasl_plain_authenticator.rb b/lib/kafka/sasl_plain_authenticator.rb index b68581fbd..6acdc87be 100644 --- a/lib/kafka/sasl_plain_authenticator.rb +++ b/lib/kafka/sasl_plain_authenticator.rb @@ -2,19 +2,19 @@ module Kafka class SaslPlainAuthenticator PLAIN_IDENT = "PLAIN" - def initialize(connection:, logger:, authzid:, username:, password:) - @connection = connection + def initialize(logger:, authzid:, username:, password:) @logger = logger @authzid = authzid @username = username @password = password end - def authenticate! - response = @connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new(PLAIN_IDENT)) + def authenticate!(connection) + @logger.debug 'Authenticating SASL PLAIN' + response = connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new(PLAIN_IDENT)) - @encoder = @connection.encoder - @decoder = @connection.decoder + @encoder = connection.encoder + @decoder = connection.decoder unless response.error_code == 0 && response.enabled_mechanisms.include?(PLAIN_IDENT) raise Kafka::Error, "#{PLAIN_IDENT} is not supported." diff --git a/spec/sasl_plain_authenticator_spec.rb b/spec/sasl_plain_authenticator_spec.rb index dfd8752ff..7fa3c1002 100644 --- a/spec/sasl_plain_authenticator_spec.rb +++ b/spec/sasl_plain_authenticator_spec.rb @@ -28,7 +28,6 @@ context 'when correct username/password' do let(:sasl_plain_authenticator) { Kafka::SaslPlainAuthenticator.new( - connection: connection, logger: logger, authzid: 'spec_authzid', username: 'spec_username', @@ -37,14 +36,13 @@ } it 'successfully authenticates' do - expect(sasl_plain_authenticator.authenticate!).to be_truthy + expect(sasl_plain_authenticator.authenticate!(connection)).to be_truthy end end context 'when incorrect username/password' do let(:sasl_plain_authenticator) { Kafka::SaslPlainAuthenticator.new( - connection: connection, logger: logger, authzid: '', username: 'bad_username', @@ -53,7 +51,7 @@ } it 'raises Kafka::Error with EOFError' do - expect { sasl_plain_authenticator.authenticate! }.to raise_error(Kafka::Error, 'SASL PLAIN authentication failed: EOFError') + expect { sasl_plain_authenticator.authenticate!(connection) }.to raise_error(Kafka::Error, 'SASL PLAIN authentication failed: EOFError') end end end From 14749ffc5c5c12dc642507348cb79dc36d586f11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 2 Nov 2017 07:39:23 +0100 Subject: [PATCH 04/17] No need for connection to be instance variable --- lib/kafka/sasl_scram_authenticator.rb | 59 ++++++++++++++------------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/lib/kafka/sasl_scram_authenticator.rb b/lib/kafka/sasl_scram_authenticator.rb index 2e35b7a36..4424f8549 100644 --- a/lib/kafka/sasl_scram_authenticator.rb +++ b/lib/kafka/sasl_scram_authenticator.rb @@ -13,8 +13,8 @@ def initialize(username, password, mechanism: SCRAM_SHA256, logger: nil) end def authenticate!(connection) - @connection = connection - response = @connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new('SCRAM-' + @mechanism)) + connection = connection + response = connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new('SCRAM-' + @mechanism)) unless response.error_code == 0 && response.enabled_mechanisms.include?('SCRAM-' + @mechanism) raise Kafka::SaslScramError, "SCRAM-#{@mechanism} is not supported." @@ -22,31 +22,34 @@ def authenticate!(connection) log_debug "authenticating #{@username} with scram, mechanism: #{@mechanism}" - @encoder = @connection.encoder - @decoder = @connection.decoder - - msg = first_message - log_debug "[scram] Sending client's first message: #{msg}" - @encoder.write_bytes(msg) - - @server_first_message = @decoder.bytes - log_debug "[scram] Received server's first message: #{@server_first_message}" - - msg = final_message - log_debug "[scram] Sending client's final message: #{msg}" - @encoder.write_bytes(msg) - - response = parse_response(@decoder.bytes) - log_debug "[scram] Received server's final msg: #{response}" - log_debug "[scram] Client calculated server signature: #{server_signature}" - - raise FailedScramAuthentication, response['e'] if response['e'] - raise FailedScramAuthentication, 'Invalid server signature' if response['v'] != server_signature - rescue FailedScramAuthentication - raise - rescue StandardError => e - @logger.error "authentication error #{e.inspect}\n\n#{e.backtrace.join("\n")}" - raise FailedScramAuthentication, 'Authentication failed: Unknown reason' + @encoder = connection.encoder + @decoder = connection.decoder + + begin + msg = first_message + log_debug "[scram] Sending client's first message: #{msg}" + @encoder.write_bytes(msg) + + @server_first_message = @decoder.bytes + log_debug "[scram] Received server's first message: #{@server_first_message}" + + msg = final_message + log_debug "[scram] Sending client's final message: #{msg}" + @encoder.write_bytes(msg) + + response = parse_response(@decoder.bytes) + log_debug "[scram] Received server's final msg: #{response}" + log_debug "[scram] Client calculated server signature: #{server_signature}" + + raise FailedScramAuthentication, response['e'] if response['e'] + raise FailedScramAuthentication, 'Invalid server signature' if response['v'] != server_signature + rescue FailedScramAuthentication + raise + rescue => e + @logger.error "authentication error #{e.inspect}\n\n#{e.backtrace.join("\n")}" + raise FailedScramAuthentication, 'Authentication failed: Unknown reason' + end + @logger.debug "[scram] Authenticated" end private @@ -88,7 +91,7 @@ def iterations end def auth_message - [first_message_bare, @server_first_message, final_message_without_proof].join(',') + msg = [first_message_bare, @server_first_message, final_message_without_proof].join(',') end def salted_password From 1a3112c426619767d45f2e1851dec109e69a4280 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 2 Nov 2017 07:48:24 +0100 Subject: [PATCH 05/17] Added specs for scram --- spec/fake_server.rb | 155 ++++++++++++++++++++------ spec/sasl_scram_authenticator_spec.rb | 86 ++++++++++++++ 2 files changed, 207 insertions(+), 34 deletions(-) create mode 100644 spec/sasl_scram_authenticator_spec.rb diff --git a/spec/fake_server.rb b/spec/fake_server.rb index fb669efa2..b361cef55 100644 --- a/spec/fake_server.rb +++ b/spec/fake_server.rb @@ -1,5 +1,5 @@ class FakeServer - SUPPORTED_MECHANISMS = ['PLAIN'] + SUPPORTED_MECHANISMS = ['PLAIN','SCRAM-SHA-256', 'SCRAM-SHA-512'] def self.start(server) thread = Thread.new { new(server).start } @@ -29,49 +29,136 @@ def start end def handle_client(client) - loop do - request_bytes = Kafka::Protocol::Decoder.new(client).bytes - encoder = Kafka::Protocol::Encoder.new(client) - - # Special case for authentication - if @authenticating - case @auth_mechanism - when 'PLAIN' - _authzid, username, password = request_bytes.split("\000") - - if username == 'spec_username' && password == 'spec_password' - # Successfully Authenticated, send back empty string - encoder.write_bytes('') - @authenticating = false - end - else - # Unknown mechanism - end - - break - end + encoder = Kafka::Protocol::Encoder.new(client) + decoder = Kafka::Protocol::Decoder.new(client) - request_decoder = Kafka::Protocol::Decoder.new(StringIO.new(request_bytes)) - api_key = request_decoder.int16 - api_version = request_decoder.int16 - correlation_id = request_decoder.int32 - client_id = request_decoder.string + loop do + request_bytes = decoder.bytes + request_data = Kafka::Protocol::Decoder.new(StringIO.new(request_bytes)); + api_key = request_data.int16 + api_version = request_data.int16 + correlation_id = request_data.int32 + client_id = request_data.string - message = request_decoder.string + message = request_data.string response = StringIO.new response_encoder = Kafka::Protocol::Encoder.new(response) response_encoder.write_int32(correlation_id) - if api_key == 17 # SASL Authentication - response_encoder.write_int16(0) # no errors - response_encoder.write_array(SUPPORTED_MECHANISMS) { |msg| response_encoder.write_string(msg) } - @authenticating = true - @auth_mechanism = message + case api_key + when 17 then + response_encoder.write_int16(0) + response_encoder.write_array(SUPPORTED_MECHANISMS) { |m| response_encoder.write_string(m) } + encoder.write_bytes(response.string) + auth(message, encoder, decoder) + break else response_encoder.write_string(message) + encoder.write_bytes(response.string) + end + end + end + + def auth(auth_mechanism, encoder, decoder) + case auth_mechanism + when 'PLAIN' + message = decoder.bytes + _authzid, username, password = message.split("\000") + if username == 'spec_username' && password == 'spec_password' + encoder.write_bytes('') end - encoder.write_bytes(response.string) + when 'SCRAM-SHA-256', 'SCRAM-SHA-512' + scram_sasl_authenticate(auth_mechanism[6..-1],encoder, decoder) + else + puts "UNKNOWN AUTH MECHANISM" end end + + def scram_sasl_authenticate (algorithm, encoder, decoder) + zk_username = 'spec_username' + zk_data = { + 'SHA-512': { + salt: 'ODVhbzNqcGdneDR5ZzIzbmJpcnpodmdxcg==', + stored_key: 'kfUpWelvXn406F1rKx3gE9Nz6qBBI+7v1Dg2n8QSNy9ZA1vU1jxYKOMRVV9188TDxhQe6Te0D8R2t0r5YFILnA==', + server_key: 'CDkccMty/z9z7KUciVixhIuPLV53QtMHT2SbJUbvNqdaqGvtkTwDgMCLjWKqMKkUvnInYziJh/YfRKYNoLEnaQ==', + iterations: 4096 + }, + 'SHA-256': { + salt: 'MWVkNGdvam9qNG4yYmt1dG82ZGxrY3ppM3c=', + stored_key: 'W28WpOjPl87SPMfFZsuyA5Yor0Z/q4+VZJlZqzDfgsI=', + server_key: '17y/jubvVV8cWGxhaMN/8eOFTvnaYQ9f/JJmNszmOFI=', + iterations: 4096 + } + } + @scram_mechanism = algorithm + algorithm = algorithm.to_sym + request_bytes = decoder.bytes + _, _, userdata, nouncedata = request_bytes.split(',') + _, username = userdata.split('=') + _, client_nounce = nouncedata.split('=') + + return if username != zk_username + + client_first_message_bare = "#{userdata},#{nouncedata}" + server_nounce = SecureRandom.urlsafe_base64(8) + + salt64 = zk_data[algorithm][:salt] + iterations = zk_data[algorithm][:iterations] + stored_key = Base64.strict_decode64(zk_data[algorithm][:stored_key]) + server_key = Base64.strict_decode64(zk_data[algorithm][:server_key]) + salt = Base64.strict_decode64(salt64) + + server_first_message = "r=#{client_nounce}#{server_nounce},s=#{salt64},i=#{iterations}" + encoder.write_bytes(server_first_message) + + request_bytes = decoder.bytes + c, r, proofdata = request_bytes.split(",") + _, proof = proofdata.split("=",2) + + client_last_message_without_proof = "#{c},#{r}" + auth_message = [client_first_message_bare, server_first_message, client_last_message_without_proof].join(',') + salted_password = hi('spec_password', salt, iterations) + client_key = hmac(salted_password, 'Client Key') + client_signature = hmac(stored_key, auth_message) + server_proof = Base64.strict_encode64(xor(client_key, client_signature)) + + return if server_proof != proof + + server_signature = Base64.strict_encode64(hmac(server_key, auth_message)) + encoder.write_bytes("v=#{server_signature}") + end + + def digest + @digest ||= case @scram_mechanism + when Kafka::SCRAM_SHA256 + OpenSSL::Digest::SHA256.new.freeze + when Kafka::SCRAM_SHA512 + OpenSSL::Digest::SHA512.new.freeze + else + raise StandardError, "Unknown mechanism '#{@scram_mechanism}'" + end + end + + def xor(first, second) + first.bytes.zip(second.bytes).map{ |(a,b)| (a ^ b).chr }.join('') + end + + def hi(str, salt, iterations) + OpenSSL::PKCS5.pbkdf2_hmac( + str, + salt, + iterations, + digest.size, + digest) + end + + def hmac(data, key) + OpenSSL::HMAC.digest(digest, data, key) + end + + def h(str) + digest.digest(str) + end + end diff --git a/spec/sasl_scram_authenticator_spec.rb b/spec/sasl_scram_authenticator_spec.rb new file mode 100644 index 000000000..3db0c7375 --- /dev/null +++ b/spec/sasl_scram_authenticator_spec.rb @@ -0,0 +1,86 @@ +require 'fake_server' +require 'kafka/sasl_scram_authenticator' + +describe Kafka::SaslScramAuthenticator do + let(:logger) { LOGGER } + let(:host) { "127.0.0.1" } + let(:server) { TCPServer.new(host, 0) } + let(:port) { server.addr[1] } + let(:authenticator) { + instance_double(Kafka::SaslAuthenticator, authenticate!: true) + } + + let(:connection) { + Kafka::Connection.new( + host: host, + port: port, + client_id: "test", + logger: logger, + instrumenter: Kafka::Instrumenter.new(client_id: "test"), + connect_timeout: 0.1, + socket_timeout: 0.1, + authenticator: authenticator + ) + } + + let!(:broker) { FakeServer.start(server) } + + describe '#authenticate!' do + context 'when correct username/password using SHA-256' do + let(:sasl_scram_authenticator) { + Kafka::SaslScramAuthenticator.new( + 'spec_username', + 'spec_password', + logger: logger, + mechanism: Kafka::SCRAM_SHA256 + ) + } + + it 'successfully authenticates' do + expect(sasl_scram_authenticator.authenticate!(connection)).to be_truthy + end + end + context 'when correct username/password using SHA-512' do + let(:sasl_scram_authenticator) { + Kafka::SaslScramAuthenticator.new( + 'spec_username', + 'spec_password', + logger: logger, + mechanism: Kafka::SCRAM_SHA512 + ) + } + + it 'successfully authenticates' do + expect(sasl_scram_authenticator.authenticate!(connection)).to be_truthy + end + end + context 'when incorrect username' do + let(:sasl_scram_authenticator) { + Kafka::SaslScramAuthenticator.new( + 'spec_wrong_username', + 'spec_password', + logger: logger, + mechanism: Kafka::SCRAM_SHA256 + ) + } + + it 'raise error' do + expect { sasl_scram_authenticator.authenticate!(connection) }.to raise_error(Kafka::FailedScramAuthentication) + end + end + context 'when incorrect password' do + let(:sasl_scram_authenticator) { + Kafka::SaslScramAuthenticator.new( + 'spec_username', + 'spec_wrong_password', + logger: logger, + mechanism: Kafka::SCRAM_SHA256 + ) + } + + it 'raise error' do + expect { sasl_scram_authenticator.authenticate!(connection) }.to raise_error(Kafka::FailedScramAuthentication) + end + end + end +end From 0be25629a771a09931c331a6adba41aa7c881887 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 2 Nov 2017 09:26:50 +0100 Subject: [PATCH 06/17] Documented scram --- README.md | 60 ++++++++++++++++++++++++++++++++++++++++++++- lib/kafka/client.rb | 4 +++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 2ff69d82d..e02f48e14 100644 --- a/README.md +++ b/README.md @@ -901,8 +901,9 @@ Typically, Kafka certificates come in the JKS format, which isn't supported by r #### Authentication using SASL -Kafka has support for using SASL to authenticate clients. Currently GSSAPI and PLAIN mechanisms are supported by ruby-kafka. +Kafka has support for using SASL to authenticate clients. Currently GSSAPI, SCRAM and PLAIN mechanisms are supported by ruby-kafka. +##### GSSAPI In order to authenticate using GSSAPI, set your principal and optionally your keytab when initializing the Kafka client: ```ruby @@ -911,8 +912,21 @@ kafka = Kafka.new( sasl_gssapi_keytab: '/etc/keytabs/kafka.keytab', # ... ) + +# or + +require 'kafka/sasl_gssapi_authenticator' +kafka = Kafka.new( + authenticator: Kafka::SaslGssapiAuthenticator( + sasl_gssapi_principal: 'kafka/kafka.example.com@EXAMPLE.COM', + sasl_gssapi_keytab: '/etc/keytabs/kafka.keytab', + logger: someLogger # optional + ), + # ... +) ``` +##### PLAIN In order to authenticate using PLAIN, you must set your username and password when initializing the Kafka client: ```ruby @@ -922,10 +936,54 @@ kafka = Kafka.new( sasl_plain_password: 'password' # ... ) + +# or + +require 'kafka/sasl_plain_authenticator' +kafka = Kafka.new( + ssl_ca_cert: File.read('/etc/openssl/cert.pem'), # Optional but highly recommended + authenticator: Kafka::SaslPlainAuthenticator( + username: 'username', + password: 'password', + logger: someLogger, # optional + authzid: 'auth_identity' # optional. if nil, username will be used by kafka + ), + # ... +) ``` **NOTE**: It is __highly__ recommended that you use SSL for encryption when using SASL_PLAIN +##### SCRAM + +Since 0.11 kafka supports [SCRAM](https://kafka.apache.org/documentation.html#security_sasl_scram). + +```ruby +require 'kafka/sasl_scram_authenticator' +kafka = Kafka.new( + authenticator: Kafka::SaslScramAuthenticator.new( + 'username', + 'password', + logger: someLogger, # optional + mechanism: Kafka::SCRAM_SHA256 # optional Supported values are Kafka::SCRAM_SHA256 (default) and Kafka::SCRAM_SHA512 + ), + # ... +) +``` + +To use SCRAM over SSL and the server cert is signed by a CA in your default cert store, add a SSL context to enable SSL: +```ruby +require 'openssl' +require 'kafka/sasl_scram_authenticator' +kafka = Kafka.new( + ssl_context: OpenSSL::SSL::SSLContext.new, + authenticator: Kafka::SaslScramAuthenticator.new( + # ... + ), + # ... +) +``` + ## Design The library has been designed as a layered system, with each layer having a clear responsibility: diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 5a49c10f4..66ff6383d 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -49,6 +49,10 @@ class Client # # @param sasl_gssapi_keytab [String, nil] a KRB5 keytab filepath # + # @param authenticator [Authenticator, nil] an authenticator (responds to authenticate!) + # + # @param ssl_context [OpenSSL::SSL::SSLContext, nil] a SSL context to use for the connection + # # @return [Client] def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, From 8b3e751b5a3ea5200d2f10e85966c09acecf4a4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 2 Nov 2017 11:07:23 +0100 Subject: [PATCH 07/17] Code formatting and clean up --- lib/kafka/sasl_scram_authenticator.rb | 7 ++++--- spec/fake_server.rb | 25 ++++++++++++------------- spec/sasl_scram_authenticator_spec.rb | 2 +- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/lib/kafka/sasl_scram_authenticator.rb b/lib/kafka/sasl_scram_authenticator.rb index 4424f8549..4991fc5ad 100644 --- a/lib/kafka/sasl_scram_authenticator.rb +++ b/lib/kafka/sasl_scram_authenticator.rb @@ -40,7 +40,7 @@ def authenticate!(connection) response = parse_response(@decoder.bytes) log_debug "[scram] Received server's final msg: #{response}" log_debug "[scram] Client calculated server signature: #{server_signature}" - + raise FailedScramAuthentication, response['e'] if response['e'] raise FailedScramAuthentication, 'Invalid server signature' if response['v'] != server_signature rescue FailedScramAuthentication @@ -132,7 +132,8 @@ def hi(str, salt, iterations) salt, iterations, digest.size, - digest) + digest + ) end def hmac(data, key) @@ -140,7 +141,7 @@ def hmac(data, key) end def xor(first, second) - first.bytes.zip(second.bytes).map{ |(a,b)| (a ^ b).chr }.join('') + first.bytes.zip(second.bytes).map { |(a, b)| (a ^ b).chr }.join('') end def parse_response(data) diff --git a/spec/fake_server.rb b/spec/fake_server.rb index b361cef55..b34033d03 100644 --- a/spec/fake_server.rb +++ b/spec/fake_server.rb @@ -1,5 +1,5 @@ class FakeServer - SUPPORTED_MECHANISMS = ['PLAIN','SCRAM-SHA-256', 'SCRAM-SHA-512'] + SUPPORTED_MECHANISMS = ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'] def self.start(server) thread = Thread.new { new(server).start } @@ -36,9 +36,9 @@ def handle_client(client) request_bytes = decoder.bytes request_data = Kafka::Protocol::Decoder.new(StringIO.new(request_bytes)); api_key = request_data.int16 - api_version = request_data.int16 + _api_version = request_data.int16 correlation_id = request_data.int32 - client_id = request_data.string + _client_id = request_data.string message = request_data.string @@ -69,22 +69,22 @@ def auth(auth_mechanism, encoder, decoder) encoder.write_bytes('') end when 'SCRAM-SHA-256', 'SCRAM-SHA-512' - scram_sasl_authenticate(auth_mechanism[6..-1],encoder, decoder) + scram_sasl_authenticate(auth_mechanism[6..-1], encoder, decoder) else puts "UNKNOWN AUTH MECHANISM" end end - def scram_sasl_authenticate (algorithm, encoder, decoder) + def scram_sasl_authenticate(algorithm, encoder, decoder) zk_username = 'spec_username' zk_data = { - 'SHA-512': { + 'SHA-512' => { salt: 'ODVhbzNqcGdneDR5ZzIzbmJpcnpodmdxcg==', stored_key: 'kfUpWelvXn406F1rKx3gE9Nz6qBBI+7v1Dg2n8QSNy9ZA1vU1jxYKOMRVV9188TDxhQe6Te0D8R2t0r5YFILnA==', server_key: 'CDkccMty/z9z7KUciVixhIuPLV53QtMHT2SbJUbvNqdaqGvtkTwDgMCLjWKqMKkUvnInYziJh/YfRKYNoLEnaQ==', iterations: 4096 }, - 'SHA-256': { + 'SHA-256' => { salt: 'MWVkNGdvam9qNG4yYmt1dG82ZGxrY3ppM3c=', stored_key: 'W28WpOjPl87SPMfFZsuyA5Yor0Z/q4+VZJlZqzDfgsI=', server_key: '17y/jubvVV8cWGxhaMN/8eOFTvnaYQ9f/JJmNszmOFI=', @@ -92,9 +92,8 @@ def scram_sasl_authenticate (algorithm, encoder, decoder) } } @scram_mechanism = algorithm - algorithm = algorithm.to_sym request_bytes = decoder.bytes - _, _, userdata, nouncedata = request_bytes.split(',') + _, _, userdata, nouncedata = request_bytes.split(',') _, username = userdata.split('=') _, client_nounce = nouncedata.split('=') @@ -114,7 +113,7 @@ def scram_sasl_authenticate (algorithm, encoder, decoder) request_bytes = decoder.bytes c, r, proofdata = request_bytes.split(",") - _, proof = proofdata.split("=",2) + _, proof = proofdata.split("=", 2) client_last_message_without_proof = "#{c},#{r}" auth_message = [client_first_message_bare, server_first_message, client_last_message_without_proof].join(',') @@ -141,7 +140,7 @@ def digest end def xor(first, second) - first.bytes.zip(second.bytes).map{ |(a,b)| (a ^ b).chr }.join('') + first.bytes.zip(second.bytes).map { |(a, b)| (a ^ b).chr }.join('') end def hi(str, salt, iterations) @@ -150,7 +149,8 @@ def hi(str, salt, iterations) salt, iterations, digest.size, - digest) + digest + ) end def hmac(data, key) @@ -160,5 +160,4 @@ def hmac(data, key) def h(str) digest.digest(str) end - end diff --git a/spec/sasl_scram_authenticator_spec.rb b/spec/sasl_scram_authenticator_spec.rb index 3db0c7375..9e3a797c7 100644 --- a/spec/sasl_scram_authenticator_spec.rb +++ b/spec/sasl_scram_authenticator_spec.rb @@ -26,7 +26,7 @@ let!(:broker) { FakeServer.start(server) } describe '#authenticate!' do - context 'when correct username/password using SHA-256' do + context 'when correct username/password using SHA-256' do let(:sasl_scram_authenticator) { Kafka::SaslScramAuthenticator.new( 'spec_username', From e08cfc407f0970e9f6e75c35f1f5abd7db6ae40d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 2 Nov 2017 12:01:09 +0100 Subject: [PATCH 08/17] Don't catch all errors --- lib/kafka/sasl_scram_authenticator.rb | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/lib/kafka/sasl_scram_authenticator.rb b/lib/kafka/sasl_scram_authenticator.rb index 4991fc5ad..6cc122b45 100644 --- a/lib/kafka/sasl_scram_authenticator.rb +++ b/lib/kafka/sasl_scram_authenticator.rb @@ -43,13 +43,10 @@ def authenticate!(connection) raise FailedScramAuthentication, response['e'] if response['e'] raise FailedScramAuthentication, 'Invalid server signature' if response['v'] != server_signature - rescue FailedScramAuthentication - raise - rescue => e - @logger.error "authentication error #{e.inspect}\n\n#{e.backtrace.join("\n")}" - raise FailedScramAuthentication, 'Authentication failed: Unknown reason' + rescue EOFError => e + raise FailedScramAuthentication, e.message end - @logger.debug "[scram] Authenticated" + log_debug "[scram] Authenticated" end private From 062c11716e93a294ea9ded8e3263e322c78d7034 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 2 Nov 2017 12:59:30 +0100 Subject: [PATCH 09/17] Refactorized to use parameters instead of DI --- README.md | 50 ++------------------ lib/kafka/client.rb | 20 +++++--- lib/kafka/connection.rb | 6 +-- lib/kafka/connection_builder.rb | 6 +-- lib/kafka/protocol/sasl_handshake_request.rb | 5 ++ lib/kafka/sasl_authenticator.rb | 36 ++++++++++++-- lib/kafka/sasl_gssapi_authenticator.rb | 8 ++-- lib/kafka/sasl_plain_authenticator.rb | 12 ++--- lib/kafka/sasl_scram_authenticator.rb | 12 ++--- spec/connection_spec.rb | 6 +-- spec/protocol/sasl_handshake_request_spec.rb | 21 ++++++++ spec/sasl_authenticator_spec.rb | 38 +++++++++++++-- spec/sasl_plain_authenticator_spec.rb | 14 +++--- spec/sasl_scram_authenticator_spec.rb | 24 ++++++---- 14 files changed, 156 insertions(+), 102 deletions(-) diff --git a/README.md b/README.md index e02f48e14..3d84abce2 100644 --- a/README.md +++ b/README.md @@ -912,18 +912,6 @@ kafka = Kafka.new( sasl_gssapi_keytab: '/etc/keytabs/kafka.keytab', # ... ) - -# or - -require 'kafka/sasl_gssapi_authenticator' -kafka = Kafka.new( - authenticator: Kafka::SaslGssapiAuthenticator( - sasl_gssapi_principal: 'kafka/kafka.example.com@EXAMPLE.COM', - sasl_gssapi_keytab: '/etc/keytabs/kafka.keytab', - logger: someLogger # optional - ), - # ... -) ``` ##### PLAIN @@ -936,50 +924,18 @@ kafka = Kafka.new( sasl_plain_password: 'password' # ... ) - -# or - -require 'kafka/sasl_plain_authenticator' -kafka = Kafka.new( - ssl_ca_cert: File.read('/etc/openssl/cert.pem'), # Optional but highly recommended - authenticator: Kafka::SaslPlainAuthenticator( - username: 'username', - password: 'password', - logger: someLogger, # optional - authzid: 'auth_identity' # optional. if nil, username will be used by kafka - ), - # ... -) ``` **NOTE**: It is __highly__ recommended that you use SSL for encryption when using SASL_PLAIN ##### SCRAM - Since 0.11 kafka supports [SCRAM](https://kafka.apache.org/documentation.html#security_sasl_scram). ```ruby -require 'kafka/sasl_scram_authenticator' -kafka = Kafka.new( - authenticator: Kafka::SaslScramAuthenticator.new( - 'username', - 'password', - logger: someLogger, # optional - mechanism: Kafka::SCRAM_SHA256 # optional Supported values are Kafka::SCRAM_SHA256 (default) and Kafka::SCRAM_SHA512 - ), - # ... -) -``` - -To use SCRAM over SSL and the server cert is signed by a CA in your default cert store, add a SSL context to enable SSL: -```ruby -require 'openssl' -require 'kafka/sasl_scram_authenticator' kafka = Kafka.new( - ssl_context: OpenSSL::SSL::SSLContext.new, - authenticator: Kafka::SaslScramAuthenticator.new( - # ... - ), + sasl_scram_username: 'username', + sasl_scram_password: 'password, + sasl_scram_mechanism: Kafka::SCRAM_SHA256, # ... ) ``` diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 66ff6383d..63ea543e4 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -49,28 +49,36 @@ class Client # # @param sasl_gssapi_keytab [String, nil] a KRB5 keytab filepath # - # @param authenticator [Authenticator, nil] an authenticator (responds to authenticate!) + # @param sasl_scram_username [String, nil] SCRAM username # - # @param ssl_context [OpenSSL::SSL::SSLContext, nil] a SSL context to use for the connection + # @param sasl_scram_password [String, nil] SCRAM password + # + # @param sasl_scram_mechanism [String, nil] Scram mechanism (Kafka::SCRAM_SHA256, Kafka::SCRAM_SHA512) + # + # @param use_ssl [Booleanm false] Use SSL # # @return [Client] def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, - authenticator: nil, ssl_context: nil) + use_ssl: false, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil) @logger = logger || Logger.new(nil) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) - ssl_context = ssl_context || build_ssl_context(ssl_ca_cert_file_path, ssl_ca_cert, ssl_client_cert, ssl_client_cert_key) + ssl_context = build_ssl_context(ssl_ca_cert_file_path, ssl_ca_cert, ssl_client_cert, ssl_client_cert_key) + ssl_context = OpenSSL::SSL::SSLContext.new if use_ssl and !ssl_context - authenticator ||= SaslAuthenticator.new( + sasl_authenticator ||= SaslAuthenticator.new( sasl_gssapi_principal: sasl_gssapi_principal, sasl_gssapi_keytab: sasl_gssapi_keytab, sasl_plain_authzid: sasl_plain_authzid, sasl_plain_username: sasl_plain_username, sasl_plain_password: sasl_plain_password, + sasl_scram_username: sasl_scram_username, + sasl_scram_password: sasl_scram_password, + sasl_scram_mechanism: sasl_scram_mechanism, logger: @logger ) @@ -81,7 +89,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ssl_context: ssl_context, logger: @logger, instrumenter: @instrumenter, - authenticator: authenticator + sasl_authenticator: sasl_authenticator ) @cluster = initialize_cluster diff --git a/lib/kafka/connection.rb b/lib/kafka/connection.rb index 48a2f295d..f04ec8698 100644 --- a/lib/kafka/connection.rb +++ b/lib/kafka/connection.rb @@ -48,7 +48,7 @@ class Connection # broker. Default is 10 seconds. # # @return [Connection] a new connection. - def initialize(host:, port:, client_id:, logger:, instrumenter:, authenticator:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) + def initialize(host:, port:, client_id:, logger:, instrumenter:, sasl_authenticator:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) @host, @port, @client_id = host, port, client_id @logger = logger @instrumenter = instrumenter @@ -56,7 +56,7 @@ def initialize(host:, port:, client_id:, logger:, instrumenter:, authenticator:, @connect_timeout = connect_timeout || CONNECT_TIMEOUT @socket_timeout = socket_timeout || SOCKET_TIMEOUT @ssl_context = ssl_context - @authenticator = authenticator + @sasl_authenticator = sasl_authenticator end def address_match?(host, port) @@ -137,7 +137,7 @@ def open @correlation_id = 0 @last_request = nil - @authenticator.authenticate!(self) + @sasl_authenticator.authenticate!(self) rescue Errno::ETIMEDOUT => e @logger.error "Timed out while trying to connect to #{self}: #{e}" raise ConnectionError, e diff --git a/lib/kafka/connection_builder.rb b/lib/kafka/connection_builder.rb index 31d36d1e8..59448393a 100644 --- a/lib/kafka/connection_builder.rb +++ b/lib/kafka/connection_builder.rb @@ -1,13 +1,13 @@ module Kafka class ConnectionBuilder - def initialize(client_id:, logger:, instrumenter:, connect_timeout:, socket_timeout:, ssl_context:, authenticator:) + def initialize(client_id:, logger:, instrumenter:, connect_timeout:, socket_timeout:, ssl_context:, sasl_authenticator:) @client_id = client_id @logger = logger @instrumenter = instrumenter @connect_timeout = connect_timeout @socket_timeout = socket_timeout @ssl_context = ssl_context - @authenticator = authenticator + @sasl_authenticator = sasl_authenticator end def build_connection(host, port) @@ -20,7 +20,7 @@ def build_connection(host, port) logger: @logger, instrumenter: @instrumenter, ssl_context: @ssl_context, - authenticator: @authenticator + sasl_authenticator: @sasl_authenticator ) connection diff --git a/lib/kafka/protocol/sasl_handshake_request.rb b/lib/kafka/protocol/sasl_handshake_request.rb index 9c49ee139..5f9ab3971 100644 --- a/lib/kafka/protocol/sasl_handshake_request.rb +++ b/lib/kafka/protocol/sasl_handshake_request.rb @@ -6,7 +6,12 @@ module Protocol class SaslHandshakeRequest + SUPPORTED_MECHANISMS = %w(GSSAPI PLAIN SCRAM-SHA-256 SCRAM-SHA-512) + def initialize(mechanism) + unless SUPPORTED_MECHANISMS.include?(mechanism) + raise Kafka::Error, "Unsupported SASL mechanism #{mechanism}. Supported are #{SUPPORTED_MECHANISMS.join(', ')}" + end @mechanism = mechanism end diff --git a/lib/kafka/sasl_authenticator.rb b/lib/kafka/sasl_authenticator.rb index f56c00d74..7e7a73ee5 100644 --- a/lib/kafka/sasl_authenticator.rb +++ b/lib/kafka/sasl_authenticator.rb @@ -1,15 +1,21 @@ require 'kafka/sasl_gssapi_authenticator' require 'kafka/sasl_plain_authenticator' +require 'kafka/sasl_scram_authenticator' module Kafka class SaslAuthenticator - def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:, sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:) + def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:, + sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:, + sasl_scram_username:, sasl_scram_password:, sasl_scram_mechanism:) @logger = logger @sasl_gssapi_principal = sasl_gssapi_principal @sasl_gssapi_keytab = sasl_gssapi_keytab @sasl_plain_authzid = sasl_plain_authzid @sasl_plain_username = sasl_plain_username @sasl_plain_password = sasl_plain_password + @sasl_scram_username = sasl_scram_username + @sasl_scram_password = sasl_scram_password + @sasl_scram_mechanism = sasl_scram_mechanism end def authenticate!(connection) @@ -17,19 +23,34 @@ def authenticate!(connection) sasl_gssapi_authenticate(connection) elsif authenticate_using_sasl_plain? sasl_plain_authenticate(connection) + elsif authenticate_using_sasl_scram? + sasl_scram_authenticate(connection) end end private + def sasl_scram_authenticate(connection) + auth = SaslScramAuthenticator.new( + @sasl_scram_username, + @sasl_scram_password, + logger: @logger, + mechanism: @sasl_scram_mechanism, + connection: connection + ) + + auth.authenticate! + end + def sasl_gssapi_authenticate(connection) auth = SaslGssapiAuthenticator.new( logger: @logger, sasl_gssapi_principal: @sasl_gssapi_principal, - sasl_gssapi_keytab: @sasl_gssapi_keytab + sasl_gssapi_keytab: @sasl_gssapi_keytab, + connection: connection ) - auth.authenticate!(connection) + auth.authenticate! end def sasl_plain_authenticate(connection) @@ -37,10 +58,15 @@ def sasl_plain_authenticate(connection) logger: @logger, authzid: @sasl_plain_authzid, username: @sasl_plain_username, - password: @sasl_plain_password + password: @sasl_plain_password, + connection: connection ) - auth.authenticate!(connection) + auth.authenticate! + end + + def authenticate_using_sasl_scram? + @sasl_scram_username && @sasl_scram_password end def authenticate_using_sasl_gssapi? diff --git a/lib/kafka/sasl_gssapi_authenticator.rb b/lib/kafka/sasl_gssapi_authenticator.rb index ecd60891c..bfe1848fc 100644 --- a/lib/kafka/sasl_gssapi_authenticator.rb +++ b/lib/kafka/sasl_gssapi_authenticator.rb @@ -3,17 +3,17 @@ class SaslGssapiAuthenticator GSSAPI_IDENT = "GSSAPI" GSSAPI_CONFIDENTIALITY = false - def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:) + def initialize(conncetion:, logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:) + @connection = connection @logger = logger @principal = sasl_gssapi_principal @keytab = sasl_gssapi_keytab load_gssapi + initialize_gssapi_context end - def authenticate!(connection) - @connection = connection - initialize_gssapi_context + def authenticate! proceed_sasl_gssapi_negotiation end diff --git a/lib/kafka/sasl_plain_authenticator.rb b/lib/kafka/sasl_plain_authenticator.rb index 6acdc87be..b68581fbd 100644 --- a/lib/kafka/sasl_plain_authenticator.rb +++ b/lib/kafka/sasl_plain_authenticator.rb @@ -2,19 +2,19 @@ module Kafka class SaslPlainAuthenticator PLAIN_IDENT = "PLAIN" - def initialize(logger:, authzid:, username:, password:) + def initialize(connection:, logger:, authzid:, username:, password:) + @connection = connection @logger = logger @authzid = authzid @username = username @password = password end - def authenticate!(connection) - @logger.debug 'Authenticating SASL PLAIN' - response = connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new(PLAIN_IDENT)) + def authenticate! + response = @connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new(PLAIN_IDENT)) - @encoder = connection.encoder - @decoder = connection.decoder + @encoder = @connection.encoder + @decoder = @connection.decoder unless response.error_code == 0 && response.enabled_mechanisms.include?(PLAIN_IDENT) raise Kafka::Error, "#{PLAIN_IDENT} is not supported." diff --git a/lib/kafka/sasl_scram_authenticator.rb b/lib/kafka/sasl_scram_authenticator.rb index 6cc122b45..95b88cb73 100644 --- a/lib/kafka/sasl_scram_authenticator.rb +++ b/lib/kafka/sasl_scram_authenticator.rb @@ -5,16 +5,16 @@ module Kafka SCRAM_SHA256 = 'SHA-256'.freeze SCRAM_SHA512 = 'SHA-512'.freeze class SaslScramAuthenticator - def initialize(username, password, mechanism: SCRAM_SHA256, logger: nil) + def initialize(username, password, mechanism: SCRAM_SHA256, logger: nil, connection:) @username = username @password = password @mechanism = mechanism @logger = logger + @connection = connection end - def authenticate!(connection) - connection = connection - response = connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new('SCRAM-' + @mechanism)) + def authenticate! + response = @connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new('SCRAM-' + @mechanism)) unless response.error_code == 0 && response.enabled_mechanisms.include?('SCRAM-' + @mechanism) raise Kafka::SaslScramError, "SCRAM-#{@mechanism} is not supported." @@ -22,8 +22,8 @@ def authenticate!(connection) log_debug "authenticating #{@username} with scram, mechanism: #{@mechanism}" - @encoder = connection.encoder - @decoder = connection.decoder + @encoder = @connection.encoder + @decoder = @connection.decoder begin msg = first_message diff --git a/spec/connection_spec.rb b/spec/connection_spec.rb index 2f7ab6b23..f1331cb9c 100644 --- a/spec/connection_spec.rb +++ b/spec/connection_spec.rb @@ -6,7 +6,7 @@ let(:server) { TCPServer.new(host, 0) } let(:port) { server.addr[1] } - let(:authenticator) { + let(:sasl_authenticator) { instance_double(Kafka::SaslAuthenticator, authenticate!: true) } let(:connection) { @@ -18,7 +18,7 @@ instrumenter: Kafka::Instrumenter.new(client_id: "test"), connect_timeout: 0.1, socket_timeout: 0.1, - authenticator: authenticator + sasl_authenticator: sasl_authenticator ) } @@ -91,7 +91,7 @@ end it "calls authenticate when a new connection is open" do - expect(authenticator).to receive(:authenticate!).with(connection).once + expect(sasl_authenticator).to receive(:authenticate!).with(connection).once response = connection.send_request(request) connection.send_request(request) diff --git a/spec/protocol/sasl_handshake_request_spec.rb b/spec/protocol/sasl_handshake_request_spec.rb index 1e019355a..00774e2ec 100644 --- a/spec/protocol/sasl_handshake_request_spec.rb +++ b/spec/protocol/sasl_handshake_request_spec.rb @@ -4,5 +4,26 @@ it 'expects correct api_key' do expect(request.api_key).to eq 17 end + describe "#initialize" do + context "#supported" do + it "allows GSSAPI" do + expect { Kafka::Protocol::SaslHandshakeRequest.new('GSSAPI') }.not_to raise_error + end + it "allows PLAIN" do + expect { Kafka::Protocol::SaslHandshakeRequest.new('PLAIN') }.not_to raise_error + end + it "allows SCRAM-SHA-256" do + expect { Kafka::Protocol::SaslHandshakeRequest.new('SCRAM-SHA-256') }.not_to raise_error + end + it "allows SCRAM-SHA-512" do + expect { Kafka::Protocol::SaslHandshakeRequest.new('SCRAM-SHA-512') }.not_to raise_error + end + end + context "#unsupported" do + it "reject unknown handshake" do + expect { Kafka::Protocol::SaslHandshakeRequest.new('Unsupported') }.to raise_error Kafka::Error + end + end + end end end diff --git a/spec/sasl_authenticator_spec.rb b/spec/sasl_authenticator_spec.rb index c31635b46..aee2dbbae 100644 --- a/spec/sasl_authenticator_spec.rb +++ b/spec/sasl_authenticator_spec.rb @@ -19,7 +19,10 @@ sasl_gssapi_keytab: nil, sasl_plain_authzid: nil, sasl_plain_username: nil, - sasl_plain_password: nil + sasl_plain_password: nil, + sasl_scram_username: nil, + sasl_scram_password: nil, + sasl_scram_mechanism: nil } } @@ -35,7 +38,10 @@ sasl_gssapi_keytab: nil, sasl_plain_authzid: "", sasl_plain_username: "user", - sasl_plain_password: "pass" + sasl_plain_password: "pass", + sasl_scram_username: nil, + sasl_scram_password: nil, + sasl_scram_mechanism: nil } } let(:auth) { instance_double(Kafka::SaslPlainAuthenticator) } @@ -55,7 +61,10 @@ sasl_gssapi_keytab: "bar", sasl_plain_authzid: "", sasl_plain_username: nil, - sasl_plain_password: nil + sasl_plain_password: nil, + sasl_scram_username: nil, + sasl_scram_password: nil, + sasl_scram_mechanism: nil } } @@ -68,5 +77,28 @@ sasl_authenticator.authenticate!(connection) end end + + context "when sasl scram authentication" do + let(:auth_options) { + { + sasl_gssapi_principal: nil, + sasl_gssapi_keytab: nil, + sasl_plain_authzid: nil, + sasl_plain_username: nil, + sasl_plain_password: nil, + sasl_scram_username: "username", + sasl_scram_password: "password", + sasl_scram_mechanism: Kafka::SCRAM_SHA256 + } + } + let(:auth) { instance_double(Kafka::SaslScramAuthenticator) } + + it "uses sasl scram authentication strategy" do + expect(Kafka::SaslScramAuthenticator).to receive(:new).and_return(auth) + expect(auth).to receive(:authenticate!) + + sasl_authenticator.authenticate!(connection) + end + end end end diff --git a/spec/sasl_plain_authenticator_spec.rb b/spec/sasl_plain_authenticator_spec.rb index 7fa3c1002..45587da39 100644 --- a/spec/sasl_plain_authenticator_spec.rb +++ b/spec/sasl_plain_authenticator_spec.rb @@ -5,7 +5,7 @@ let(:host) { "127.0.0.1" } let(:server) { TCPServer.new(host, 0) } let(:port) { server.addr[1] } - let(:authenticator) { + let(:sasl_authenticator) { instance_double(Kafka::SaslAuthenticator, authenticate!: true) } @@ -18,7 +18,7 @@ instrumenter: Kafka::Instrumenter.new(client_id: "test"), connect_timeout: 0.1, socket_timeout: 0.1, - authenticator: authenticator + sasl_authenticator: sasl_authenticator ) } @@ -31,12 +31,13 @@ logger: logger, authzid: 'spec_authzid', username: 'spec_username', - password: 'spec_password' + password: 'spec_password', + connection: connection ) } it 'successfully authenticates' do - expect(sasl_plain_authenticator.authenticate!(connection)).to be_truthy + expect(sasl_plain_authenticator.authenticate!).to be_truthy end end @@ -46,12 +47,13 @@ logger: logger, authzid: '', username: 'bad_username', - password: 'bad_password' + password: 'bad_password', + connection: connection ) } it 'raises Kafka::Error with EOFError' do - expect { sasl_plain_authenticator.authenticate!(connection) }.to raise_error(Kafka::Error, 'SASL PLAIN authentication failed: EOFError') + expect { sasl_plain_authenticator.authenticate! }.to raise_error(Kafka::Error, 'SASL PLAIN authentication failed: EOFError') end end end diff --git a/spec/sasl_scram_authenticator_spec.rb b/spec/sasl_scram_authenticator_spec.rb index 9e3a797c7..93cfda10e 100644 --- a/spec/sasl_scram_authenticator_spec.rb +++ b/spec/sasl_scram_authenticator_spec.rb @@ -6,7 +6,7 @@ let(:host) { "127.0.0.1" } let(:server) { TCPServer.new(host, 0) } let(:port) { server.addr[1] } - let(:authenticator) { + let(:sasl_authenticator) { instance_double(Kafka::SaslAuthenticator, authenticate!: true) } @@ -19,7 +19,7 @@ instrumenter: Kafka::Instrumenter.new(client_id: "test"), connect_timeout: 0.1, socket_timeout: 0.1, - authenticator: authenticator + sasl_authenticator: sasl_authenticator ) } @@ -32,12 +32,13 @@ 'spec_username', 'spec_password', logger: logger, - mechanism: Kafka::SCRAM_SHA256 + mechanism: Kafka::SCRAM_SHA256, + connection: connection ) } it 'successfully authenticates' do - expect(sasl_scram_authenticator.authenticate!(connection)).to be_truthy + expect(sasl_scram_authenticator.authenticate!).to be_truthy end end context 'when correct username/password using SHA-512' do @@ -46,12 +47,13 @@ 'spec_username', 'spec_password', logger: logger, - mechanism: Kafka::SCRAM_SHA512 + mechanism: Kafka::SCRAM_SHA512, + connection: connection ) } it 'successfully authenticates' do - expect(sasl_scram_authenticator.authenticate!(connection)).to be_truthy + expect(sasl_scram_authenticator.authenticate!).to be_truthy end end context 'when incorrect username' do @@ -60,12 +62,13 @@ 'spec_wrong_username', 'spec_password', logger: logger, - mechanism: Kafka::SCRAM_SHA256 + mechanism: Kafka::SCRAM_SHA256, + connection: connection ) } it 'raise error' do - expect { sasl_scram_authenticator.authenticate!(connection) }.to raise_error(Kafka::FailedScramAuthentication) + expect { sasl_scram_authenticator.authenticate! }.to raise_error(Kafka::FailedScramAuthentication) end end context 'when incorrect password' do @@ -74,12 +77,13 @@ 'spec_username', 'spec_wrong_password', logger: logger, - mechanism: Kafka::SCRAM_SHA256 + mechanism: Kafka::SCRAM_SHA256, + connection: connection ) } it 'raise error' do - expect { sasl_scram_authenticator.authenticate!(connection) }.to raise_error(Kafka::FailedScramAuthentication) + expect { sasl_scram_authenticator.authenticate! }.to raise_error(Kafka::FailedScramAuthentication) end end end From 9fb20e16b1eaeffa94d9e179866529e7a47f6ebb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 2 Nov 2017 13:24:07 +0100 Subject: [PATCH 10/17] Added missing quote to code example --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3d84abce2..5dfac7678 100644 --- a/README.md +++ b/README.md @@ -934,7 +934,7 @@ Since 0.11 kafka supports [SCRAM](https://kafka.apache.org/documentation.html#se ```ruby kafka = Kafka.new( sasl_scram_username: 'username', - sasl_scram_password: 'password, + sasl_scram_password: 'password', sasl_scram_mechanism: Kafka::SCRAM_SHA256, # ... ) From 216e5cf889e1119e437678916006bf1130b73d9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 2 Nov 2017 13:45:06 +0100 Subject: [PATCH 11/17] Removed unused exception --- lib/kafka.rb | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/kafka.rb b/lib/kafka.rb index ddeafab04..c4ddc0a18 100644 --- a/lib/kafka.rb +++ b/lib/kafka.rb @@ -225,9 +225,6 @@ class OffsetCommitError < Error class FetchError < Error end - class NoPartitionsAssignedError < Error - end - class SaslScramError < Error end From 16138a8443a49e49639f92d7e6aed8ba95cdb37b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 2 Nov 2017 13:45:23 +0100 Subject: [PATCH 12/17] Fixed typo --- lib/kafka/sasl_gssapi_authenticator.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka/sasl_gssapi_authenticator.rb b/lib/kafka/sasl_gssapi_authenticator.rb index bfe1848fc..5ddd3fcd6 100644 --- a/lib/kafka/sasl_gssapi_authenticator.rb +++ b/lib/kafka/sasl_gssapi_authenticator.rb @@ -3,7 +3,7 @@ class SaslGssapiAuthenticator GSSAPI_IDENT = "GSSAPI" GSSAPI_CONFIDENTIALITY = false - def initialize(conncetion:, logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:) + def initialize(connection:, logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:) @connection = connection @logger = logger @principal = sasl_gssapi_principal From d9549350f2900b99c3fcccb907a2e96a1126dd99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Thu, 2 Nov 2017 13:46:26 +0100 Subject: [PATCH 13/17] Changed log message --- lib/kafka/sasl_scram_authenticator.rb | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/lib/kafka/sasl_scram_authenticator.rb b/lib/kafka/sasl_scram_authenticator.rb index 95b88cb73..49eb80dba 100644 --- a/lib/kafka/sasl_scram_authenticator.rb +++ b/lib/kafka/sasl_scram_authenticator.rb @@ -20,33 +20,32 @@ def authenticate! raise Kafka::SaslScramError, "SCRAM-#{@mechanism} is not supported." end - log_debug "authenticating #{@username} with scram, mechanism: #{@mechanism}" + log_debug "Authenticating #{@username} with SASL SCRAM #{@mechanism}" @encoder = @connection.encoder @decoder = @connection.decoder begin msg = first_message - log_debug "[scram] Sending client's first message: #{msg}" + log_debug "Sending first client SASL SCRAM message: #{msg}" @encoder.write_bytes(msg) @server_first_message = @decoder.bytes - log_debug "[scram] Received server's first message: #{@server_first_message}" + log_debug "Received first server SASL SCRAM message: #{@server_first_message}" msg = final_message - log_debug "[scram] Sending client's final message: #{msg}" + log_debug "Sending final client SASL SCRAM message: #{msg}" @encoder.write_bytes(msg) response = parse_response(@decoder.bytes) - log_debug "[scram] Received server's final msg: #{response}" - log_debug "[scram] Client calculated server signature: #{server_signature}" + log_debug "Received last server SASL SCRAM message: #{response}" raise FailedScramAuthentication, response['e'] if response['e'] raise FailedScramAuthentication, 'Invalid server signature' if response['v'] != server_signature rescue EOFError => e raise FailedScramAuthentication, e.message end - log_debug "[scram] Authenticated" + log_debug "SASL SCRAM authentication successful" end private From 4f00aa6c7c2327df1621dd7075188fda3899b90b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 3 Nov 2017 19:28:27 +0100 Subject: [PATCH 14/17] Changed scram mechanism parameter values --- README.md | 2 +- lib/kafka/client.rb | 2 +- lib/kafka/sasl_scram_authenticator.rb | 25 +++++++++++++++++++------ spec/fake_server.rb | 4 ++-- spec/sasl_authenticator_spec.rb | 2 +- spec/sasl_scram_authenticator_spec.rb | 12 ++++++------ 6 files changed, 30 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 5dfac7678..0d92efdc7 100644 --- a/README.md +++ b/README.md @@ -935,7 +935,7 @@ Since 0.11 kafka supports [SCRAM](https://kafka.apache.org/documentation.html#se kafka = Kafka.new( sasl_scram_username: 'username', sasl_scram_password: 'password', - sasl_scram_mechanism: Kafka::SCRAM_SHA256, + sasl_scram_mechanism: 'SHA-256', # ... ) ``` diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 63ea543e4..8248a3ba2 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -53,7 +53,7 @@ class Client # # @param sasl_scram_password [String, nil] SCRAM password # - # @param sasl_scram_mechanism [String, nil] Scram mechanism (Kafka::SCRAM_SHA256, Kafka::SCRAM_SHA512) + # @param sasl_scram_mechanism [String, nil] Scram mechanism ("sha256", "sha512") # # @param use_ssl [Booleanm false] Use SSL # diff --git a/lib/kafka/sasl_scram_authenticator.rb b/lib/kafka/sasl_scram_authenticator.rb index 49eb80dba..1f85c5acb 100644 --- a/lib/kafka/sasl_scram_authenticator.rb +++ b/lib/kafka/sasl_scram_authenticator.rb @@ -2,13 +2,26 @@ require 'base64' module Kafka - SCRAM_SHA256 = 'SHA-256'.freeze - SCRAM_SHA512 = 'SHA-512'.freeze + class SaslScramAuthenticator - def initialize(username, password, mechanism: SCRAM_SHA256, logger: nil, connection:) + MECHANISMS = { + sha256: { + mechanism: 'SHA-256' + }, + sha512: { + mechanism: 'SHA-512' + } + }.freeze + + VALID_MECHANISMS = %w{sha256 sha512}.freeze + + def initialize(username, password, mechanism: 'sha256', logger: nil, connection:) + unless VALID_MECHANISMS.include?(mechanism) + raise Kafka::SaslScramError, "SCRAM mechanism #{mechanism} is not supported." + end @username = username @password = password - @mechanism = mechanism + @mechanism = MECHANISMS[mechanism.to_sym][:mechanism] @logger = logger @connection = connection end @@ -154,9 +167,9 @@ def nonce def digest @digest ||= case @mechanism - when SCRAM_SHA256 + when 'SHA-256' OpenSSL::Digest::SHA256.new.freeze - when SCRAM_SHA512 + when 'SHA-512' OpenSSL::Digest::SHA512.new.freeze else raise StandardError, "Unknown mechanism '#{@mechanism}'" diff --git a/spec/fake_server.rb b/spec/fake_server.rb index b34033d03..545b50e96 100644 --- a/spec/fake_server.rb +++ b/spec/fake_server.rb @@ -130,9 +130,9 @@ def scram_sasl_authenticate(algorithm, encoder, decoder) def digest @digest ||= case @scram_mechanism - when Kafka::SCRAM_SHA256 + when 'SHA-256' OpenSSL::Digest::SHA256.new.freeze - when Kafka::SCRAM_SHA512 + when 'SHA-512' OpenSSL::Digest::SHA512.new.freeze else raise StandardError, "Unknown mechanism '#{@scram_mechanism}'" diff --git a/spec/sasl_authenticator_spec.rb b/spec/sasl_authenticator_spec.rb index aee2dbbae..96e9695c6 100644 --- a/spec/sasl_authenticator_spec.rb +++ b/spec/sasl_authenticator_spec.rb @@ -88,7 +88,7 @@ sasl_plain_password: nil, sasl_scram_username: "username", sasl_scram_password: "password", - sasl_scram_mechanism: Kafka::SCRAM_SHA256 + sasl_scram_mechanism: "SHA-256" } } let(:auth) { instance_double(Kafka::SaslScramAuthenticator) } diff --git a/spec/sasl_scram_authenticator_spec.rb b/spec/sasl_scram_authenticator_spec.rb index 93cfda10e..946a38705 100644 --- a/spec/sasl_scram_authenticator_spec.rb +++ b/spec/sasl_scram_authenticator_spec.rb @@ -26,13 +26,13 @@ let!(:broker) { FakeServer.start(server) } describe '#authenticate!' do - context 'when correct username/password using SHA-256' do + context 'when correct username/password using sha256' do let(:sasl_scram_authenticator) { Kafka::SaslScramAuthenticator.new( 'spec_username', 'spec_password', logger: logger, - mechanism: Kafka::SCRAM_SHA256, + mechanism: 'sha256', connection: connection ) } @@ -41,13 +41,13 @@ expect(sasl_scram_authenticator.authenticate!).to be_truthy end end - context 'when correct username/password using SHA-512' do + context 'when correct username/password using sha512' do let(:sasl_scram_authenticator) { Kafka::SaslScramAuthenticator.new( 'spec_username', 'spec_password', logger: logger, - mechanism: Kafka::SCRAM_SHA512, + mechanism: 'sha512', connection: connection ) } @@ -62,7 +62,7 @@ 'spec_wrong_username', 'spec_password', logger: logger, - mechanism: Kafka::SCRAM_SHA256, + mechanism: 'sha256', connection: connection ) } @@ -77,7 +77,7 @@ 'spec_username', 'spec_wrong_password', logger: logger, - mechanism: Kafka::SCRAM_SHA256, + mechanism: 'sha256', connection: connection ) } From ce5c3bd146b6df22b291611acd1a1bddb1aa9113 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 3 Nov 2017 19:32:22 +0100 Subject: [PATCH 15/17] Updated scram code example --- README.md | 2 +- lib/kafka/sasl_scram_authenticator.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0d92efdc7..c9b5a2c00 100644 --- a/README.md +++ b/README.md @@ -935,7 +935,7 @@ Since 0.11 kafka supports [SCRAM](https://kafka.apache.org/documentation.html#se kafka = Kafka.new( sasl_scram_username: 'username', sasl_scram_password: 'password', - sasl_scram_mechanism: 'SHA-256', + sasl_scram_mechanism: 'sha256', # ... ) ``` diff --git a/lib/kafka/sasl_scram_authenticator.rb b/lib/kafka/sasl_scram_authenticator.rb index 1f85c5acb..9c53ab8d4 100644 --- a/lib/kafka/sasl_scram_authenticator.rb +++ b/lib/kafka/sasl_scram_authenticator.rb @@ -14,7 +14,7 @@ class SaslScramAuthenticator }.freeze VALID_MECHANISMS = %w{sha256 sha512}.freeze - + def initialize(username, password, mechanism: 'sha256', logger: nil, connection:) unless VALID_MECHANISMS.include?(mechanism) raise Kafka::SaslScramError, "SCRAM mechanism #{mechanism} is not supported." From a6233f4db9c43999bec31723f6ae3b239292330d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Mon, 6 Nov 2017 12:02:36 +0100 Subject: [PATCH 16/17] Removed unwanted and unnecessary characters --- lib/kafka/client.rb | 2 +- spec/fake_server.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 8248a3ba2..210322e5c 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -70,7 +70,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ssl_context = build_ssl_context(ssl_ca_cert_file_path, ssl_ca_cert, ssl_client_cert, ssl_client_cert_key) ssl_context = OpenSSL::SSL::SSLContext.new if use_ssl and !ssl_context - sasl_authenticator ||= SaslAuthenticator.new( + sasl_authenticator = SaslAuthenticator.new( sasl_gssapi_principal: sasl_gssapi_principal, sasl_gssapi_keytab: sasl_gssapi_keytab, sasl_plain_authzid: sasl_plain_authzid, diff --git a/spec/fake_server.rb b/spec/fake_server.rb index 545b50e96..802aba625 100644 --- a/spec/fake_server.rb +++ b/spec/fake_server.rb @@ -34,7 +34,7 @@ def handle_client(client) loop do request_bytes = decoder.bytes - request_data = Kafka::Protocol::Decoder.new(StringIO.new(request_bytes)); + request_data = Kafka::Protocol::Decoder.new(StringIO.new(request_bytes)) api_key = request_data.int16 _api_version = request_data.int16 correlation_id = request_data.int32 From 609b84085fc6f45fed1989f664e56dc3a1ea110b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Tue, 7 Nov 2017 20:57:30 +0100 Subject: [PATCH 17/17] Changed case to if and removed underscore from unused variables --- spec/fake_server.rb | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/spec/fake_server.rb b/spec/fake_server.rb index 802aba625..2f5b569f2 100644 --- a/spec/fake_server.rb +++ b/spec/fake_server.rb @@ -36,9 +36,9 @@ def handle_client(client) request_bytes = decoder.bytes request_data = Kafka::Protocol::Decoder.new(StringIO.new(request_bytes)) api_key = request_data.int16 - _api_version = request_data.int16 + api_version = request_data.int16 correlation_id = request_data.int32 - _client_id = request_data.string + client_id = request_data.string message = request_data.string @@ -46,8 +46,7 @@ def handle_client(client) response_encoder = Kafka::Protocol::Encoder.new(response) response_encoder.write_int32(correlation_id) - case api_key - when 17 then + if api_key == 17 response_encoder.write_int16(0) response_encoder.write_array(SUPPORTED_MECHANISMS) { |m| response_encoder.write_string(m) } encoder.write_bytes(response.string)