diff --git a/lib/kafka/broker.rb b/lib/kafka/broker.rb index 5dc31af72..f3029b65b 100644 --- a/lib/kafka/broker.rb +++ b/lib/kafka/broker.rb @@ -19,12 +19,17 @@ def address_match?(host, port) # @return [String] def to_s - "#{connection} (node_id=#{@node_id.inspect})" + "#{@host}:#{@port} (node_id=#{@node_id.inspect})" end # @return [nil] def disconnect - connection.close + connection.close if connected? + end + + # @return [Boolean] + def connected? + !@connection.nil? end # Fetches cluster metadata from the broker. diff --git a/spec/broker_spec.rb b/spec/broker_spec.rb index 95d119e8f..29403d15c 100644 --- a/spec/broker_spec.rb +++ b/spec/broker_spec.rb @@ -31,6 +31,9 @@ def mock_response(response) def send_request(request) @mocked_response end + + def close + end end describe "#address_match?" do @@ -112,4 +115,21 @@ def send_request(request) expect(actual_response.topics).to eq [] end end + + describe "#disconnect" do + it "doesn't close a connection if it's not connected yet " do + expect(connection).not_to receive(:close) + broker.disconnect + end + + it "closes a connection if the connection is present" do + expect(connection).to receive(:close) + + broker.fetch_messages( + max_wait_time: 0, min_bytes: 0, max_bytes: 10 * 1024, topics: {} + ) + + broker.disconnect + end + end end