Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,21 @@ jobs:
KAFKA_ADVERTISED_PORT: 9092
KAFKA_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: wurstmeister/kafka:0.10.2.1
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9093
KAFKA_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: wurstmeister/kafka:0.10.2.1
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9094
KAFKA_PORT: 9094
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: bundle install --path vendor/bundle
Expand All @@ -52,18 +55,21 @@ jobs:
KAFKA_ADVERTISED_PORT: 9092
KAFKA_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: wurstmeister/kafka:0.11.0.1
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9093
KAFKA_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: wurstmeister/kafka:0.11.0.1
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9094
KAFKA_PORT: 9094
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: bundle install --path vendor/bundle
Expand All @@ -81,18 +87,21 @@ jobs:
KAFKA_ADVERTISED_PORT: 9092
KAFKA_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: wurstmeister/kafka:1.0.0
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9093
KAFKA_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: wurstmeister/kafka:1.0.0
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9094
KAFKA_PORT: 9094
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: bundle install --path vendor/bundle
Expand Down
6 changes: 6 additions & 0 deletions lib/kafka/broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ def create_topics(**options)
send_request(request)
end

def delete_topics(**options)
request = Protocol::DeleteTopicsRequest.new(**options)

send_request(request)
end

def api_versions
request = Protocol::ApiVersionsRequest.new

Expand Down
10 changes: 10 additions & 0 deletions lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,16 @@ def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30)
@cluster.create_topic(name, num_partitions: num_partitions, replication_factor: replication_factor, timeout: timeout)
end

# Delete a topic in the cluster.
#
# @param name [String] the name of the topic.
# @param timeout [Integer] a duration of time to wait for the topic to be
# completely marked deleted.
# @return [nil]
def delete_topic(name, timeout: 30)
@cluster.delete_topic(name, timeout: timeout)
end

# Lists all topics in the cluster.
#
# @return [Array<String>] the list of topic names.
Expand Down
27 changes: 25 additions & 2 deletions lib/kafka/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,25 @@ def create_topic(name, num_partitions:, replication_factor:, timeout:)
@logger.info "Topic `#{name}` was created"
end

def delete_topic(name, timeout:)
options = {
topics: [name],
timeout: timeout,
}

broker = controller_broker

@logger.info "Deleting topic `#{name}` using controller broker #{broker}"

response = broker.delete_topics(**options)

response.errors.each do |topic, error_code|
Protocol.handle_error(error_code)
end

@logger.info "Topic `#{name}` was deleted"
end

def resolve_offsets(topic, partitions, offset)
add_target_topics([topic])
refresh_metadata_if_necessary!
Expand Down Expand Up @@ -229,13 +248,17 @@ def resolve_offset(topic, partition, offset)

def topics
refresh_metadata_if_necessary!
cluster_info.topics.map(&:topic_name)
cluster_info.topics.select do |topic|
topic.topic_error_code == 0
end.map(&:topic_name)
end

# Lists all topics in the cluster.
def list_topics
response = random_broker.fetch_metadata(topics: nil)
response.topics.map(&:topic_name)
response.topics.select do |topic|
topic.topic_error_code == 0
end.map(&:topic_name)
end

def disconnect
Expand Down
4 changes: 4 additions & 0 deletions lib/kafka/protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ module Protocol
SASL_HANDSHAKE_API = 17
API_VERSIONS_API = 18
CREATE_TOPICS_API = 19
DELETE_TOPICS_API = 20

# A mapping from numeric API keys to symbolic API names.
APIS = {
Expand All @@ -43,6 +44,7 @@ module Protocol
SASL_HANDSHAKE_API => :sasl_handshake,
API_VERSIONS_API => :api_versions,
CREATE_TOPICS_API => :create_topics,
DELETE_TOPICS_API => :delete_topics,
}

# A mapping from numeric error codes to exception classes.
Expand Down Expand Up @@ -141,3 +143,5 @@ def self.api_name(api_key)
require "kafka/protocol/sasl_handshake_response"
require "kafka/protocol/create_topics_request"
require "kafka/protocol/create_topics_response"
require "kafka/protocol/delete_topics_request"
require "kafka/protocol/delete_topics_response"
31 changes: 31 additions & 0 deletions lib/kafka/protocol/delete_topics_request.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
module Kafka
module Protocol

class DeleteTopicsRequest
def initialize(topics:, timeout:)
@topics, @timeout = topics, timeout
end

def api_key
DELETE_TOPICS_API
end

def api_version
0
end

def response_class
Protocol::DeleteTopicsResponse
end

def encode(encoder)
encoder.write_array(@topics) do |topic|
encoder.write_string(topic)
end
# Timeout is in ms.
encoder.write_int32(@timeout * 1000)
end
end

end
end
24 changes: 24 additions & 0 deletions lib/kafka/protocol/delete_topics_response.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module Kafka
module Protocol

class DeleteTopicsResponse
attr_reader :errors

def initialize(errors:)
@errors = errors
end

def self.decode(decoder)
errors = decoder.array do
topic = decoder.string
error_code = decoder.int16

[topic, error_code]
end

new(errors: errors)
end
end

end
end
9 changes: 8 additions & 1 deletion spec/functional/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@

describe "Producer API", functional: true do
let!(:topic) { create_random_topic(num_partitions: 3) }
let!(:deleted_topic) { create_random_topic(num_partitions: 3) }

example "listing all topics in the cluster" do
before do
kafka.delete_topic(deleted_topic)
end

example "listing available topics in the cluster" do
# Use a clean Kafka instance to avoid hitting caches.
kafka = Kafka.new(seed_brokers: KAFKA_BROKERS, logger: LOGGER)

topics = kafka.topics

expect(topics).to include topic
expect(topics).not_to include deleted_topic
expect(kafka.has_topic?(topic)).to eq true
expect(kafka.has_topic?(deleted_topic)).to eq false
end

example "fetching the partition count for a topic" do
Expand Down
10 changes: 10 additions & 0 deletions spec/functional/topic_management_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,14 @@

expect(partitions).to eq 3
end

example "deleting topics" do
topic = generate_topic_name

kafka.create_topic(topic, num_partitions: 3)
expect(kafka.partitions_for(topic)).to eq 3

kafka.delete_topic(topic)
expect(kafka.has_topic?(topic)).to eql(false)
end
end