Skip to content

Commit 6ba52b1

Browse files
authored
Merge pull request #508 from zendesk/dasch/fix-list-topics
Fix the list topics API
2 parents 6a25911 + 95a841a commit 6ba52b1

File tree

4 files changed

+19
-7
lines changed

4 files changed

+19
-7
lines changed

lib/kafka/client.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,8 +467,7 @@ def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30)
467467
#
468468
# @return [Array<String>] the list of topic names.
469469
def topics
470-
@cluster.clear_target_topics
471-
@cluster.topics
470+
@cluster.list_topics
472471
end
473472

474473
def has_topic?(topic)

lib/kafka/cluster.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,12 @@ def topics
232232
cluster_info.topics.map(&:topic_name)
233233
end
234234

235+
# Lists all topics in the cluster.
236+
def list_topics
237+
response = random_broker.fetch_metadata(topics: nil)
238+
response.topics.map(&:topic_name)
239+
end
240+
235241
def disconnect
236242
@broker_pool.close
237243
end

lib/kafka/protocol/encoder.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,13 @@ def write_int64(int)
7474
# @param array [Array]
7575
# @return [nil]
7676
def write_array(array, &block)
77-
write_int32(array.size)
78-
array.each(&block)
77+
if array.nil?
78+
# An array can be null, which is different from it being empty.
79+
write_int32(-1)
80+
else
81+
write_int32(array.size)
82+
array.each(&block)
83+
end
7984
end
8085

8186
# Writes a string to the IO object.

spec/functional/client_spec.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
let!(:topic) { create_random_topic(num_partitions: 3) }
55

66
example "listing all topics in the cluster" do
7-
expect(kafka.has_topic?(topic)).to eq true
7+
# Use a clean Kafka instance to avoid hitting caches.
8+
kafka = Kafka.new(seed_brokers: KAFKA_BROKERS, logger: LOGGER)
89

9-
topic2 = create_random_topic
10+
topics = kafka.topics
1011

11-
expect(kafka.has_topic?(topic2)).to eq true
12+
expect(topics).to include topic
13+
expect(kafka.has_topic?(topic)).to eq true
1214
end
1315

1416
example "fetching the partition count for a topic" do

0 commit comments

Comments
 (0)