Skip to content

Commit 66075b6

Browse files
authored
Merge pull request #528 from nguyenquangminh0711/delete-topics-api
Delete Topic API
2 parents af09bdd + 19ae726 commit 66075b6

File tree

9 files changed

+127
-3
lines changed

9 files changed

+127
-3
lines changed

.circleci/config.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,21 @@ jobs:
2323
KAFKA_ADVERTISED_PORT: 9092
2424
KAFKA_PORT: 9092
2525
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
26+
KAFKA_DELETE_TOPIC_ENABLE: true
2627
- image: wurstmeister/kafka:0.10.2.1
2728
environment:
2829
KAFKA_ADVERTISED_HOST_NAME: localhost
2930
KAFKA_ADVERTISED_PORT: 9093
3031
KAFKA_PORT: 9093
3132
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
33+
KAFKA_DELETE_TOPIC_ENABLE: true
3234
- image: wurstmeister/kafka:0.10.2.1
3335
environment:
3436
KAFKA_ADVERTISED_HOST_NAME: localhost
3537
KAFKA_ADVERTISED_PORT: 9094
3638
KAFKA_PORT: 9094
3739
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
40+
KAFKA_DELETE_TOPIC_ENABLE: true
3841
steps:
3942
- checkout
4043
- run: bundle install --path vendor/bundle
@@ -52,18 +55,21 @@ jobs:
5255
KAFKA_ADVERTISED_PORT: 9092
5356
KAFKA_PORT: 9092
5457
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
58+
KAFKA_DELETE_TOPIC_ENABLE: true
5559
- image: wurstmeister/kafka:0.11.0.1
5660
environment:
5761
KAFKA_ADVERTISED_HOST_NAME: localhost
5862
KAFKA_ADVERTISED_PORT: 9093
5963
KAFKA_PORT: 9093
6064
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
65+
KAFKA_DELETE_TOPIC_ENABLE: true
6166
- image: wurstmeister/kafka:0.11.0.1
6267
environment:
6368
KAFKA_ADVERTISED_HOST_NAME: localhost
6469
KAFKA_ADVERTISED_PORT: 9094
6570
KAFKA_PORT: 9094
6671
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
72+
KAFKA_DELETE_TOPIC_ENABLE: true
6773
steps:
6874
- checkout
6975
- run: bundle install --path vendor/bundle
@@ -81,18 +87,21 @@ jobs:
8187
KAFKA_ADVERTISED_PORT: 9092
8288
KAFKA_PORT: 9092
8389
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
90+
KAFKA_DELETE_TOPIC_ENABLE: true
8491
- image: wurstmeister/kafka:1.0.0
8592
environment:
8693
KAFKA_ADVERTISED_HOST_NAME: localhost
8794
KAFKA_ADVERTISED_PORT: 9093
8895
KAFKA_PORT: 9093
8996
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
97+
KAFKA_DELETE_TOPIC_ENABLE: true
9098
- image: wurstmeister/kafka:1.0.0
9199
environment:
92100
KAFKA_ADVERTISED_HOST_NAME: localhost
93101
KAFKA_ADVERTISED_PORT: 9094
94102
KAFKA_PORT: 9094
95103
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
104+
KAFKA_DELETE_TOPIC_ENABLE: true
96105
steps:
97106
- checkout
98107
- run: bundle install --path vendor/bundle

lib/kafka/broker.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ def create_topics(**options)
115115
send_request(request)
116116
end
117117

118+
def delete_topics(**options)
119+
request = Protocol::DeleteTopicsRequest.new(**options)
120+
121+
send_request(request)
122+
end
123+
118124
def api_versions
119125
request = Protocol::ApiVersionsRequest.new
120126

lib/kafka/client.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,16 @@ def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30)
463463
@cluster.create_topic(name, num_partitions: num_partitions, replication_factor: replication_factor, timeout: timeout)
464464
end
465465

466+
# Delete a topic in the cluster.
467+
#
468+
# @param name [String] the name of the topic.
469+
# @param timeout [Integer] a duration of time to wait for the topic to be
470+
# completely marked deleted.
471+
# @return [nil]
472+
def delete_topic(name, timeout: 30)
473+
@cluster.delete_topic(name, timeout: timeout)
474+
end
475+
466476
# Lists all topics in the cluster.
467477
#
468478
# @return [Array<String>] the list of topic names.

lib/kafka/cluster.rb

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,25 @@ def create_topic(name, num_partitions:, replication_factor:, timeout:)
180180
@logger.info "Topic `#{name}` was created"
181181
end
182182

183+
def delete_topic(name, timeout:)
184+
options = {
185+
topics: [name],
186+
timeout: timeout,
187+
}
188+
189+
broker = controller_broker
190+
191+
@logger.info "Deleting topic `#{name}` using controller broker #{broker}"
192+
193+
response = broker.delete_topics(**options)
194+
195+
response.errors.each do |topic, error_code|
196+
Protocol.handle_error(error_code)
197+
end
198+
199+
@logger.info "Topic `#{name}` was deleted"
200+
end
201+
183202
def resolve_offsets(topic, partitions, offset)
184203
add_target_topics([topic])
185204
refresh_metadata_if_necessary!
@@ -229,13 +248,17 @@ def resolve_offset(topic, partition, offset)
229248

230249
def topics
231250
refresh_metadata_if_necessary!
232-
cluster_info.topics.map(&:topic_name)
251+
cluster_info.topics.select do |topic|
252+
topic.topic_error_code == 0
253+
end.map(&:topic_name)
233254
end
234255

235256
# Lists all topics in the cluster.
236257
def list_topics
237258
response = random_broker.fetch_metadata(topics: nil)
238-
response.topics.map(&:topic_name)
259+
response.topics.select do |topic|
260+
topic.topic_error_code == 0
261+
end.map(&:topic_name)
239262
end
240263

241264
def disconnect

lib/kafka/protocol.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ module Protocol
2626
SASL_HANDSHAKE_API = 17
2727
API_VERSIONS_API = 18
2828
CREATE_TOPICS_API = 19
29+
DELETE_TOPICS_API = 20
2930

3031
# A mapping from numeric API keys to symbolic API names.
3132
APIS = {
@@ -43,6 +44,7 @@ module Protocol
4344
SASL_HANDSHAKE_API => :sasl_handshake,
4445
API_VERSIONS_API => :api_versions,
4546
CREATE_TOPICS_API => :create_topics,
47+
DELETE_TOPICS_API => :delete_topics,
4648
}
4749

4850
# A mapping from numeric error codes to exception classes.
@@ -141,3 +143,5 @@ def self.api_name(api_key)
141143
require "kafka/protocol/sasl_handshake_response"
142144
require "kafka/protocol/create_topics_request"
143145
require "kafka/protocol/create_topics_response"
146+
require "kafka/protocol/delete_topics_request"
147+
require "kafka/protocol/delete_topics_response"
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
module Kafka
2+
module Protocol
3+
4+
class DeleteTopicsRequest
5+
def initialize(topics:, timeout:)
6+
@topics, @timeout = topics, timeout
7+
end
8+
9+
def api_key
10+
DELETE_TOPICS_API
11+
end
12+
13+
def api_version
14+
0
15+
end
16+
17+
def response_class
18+
Protocol::DeleteTopicsResponse
19+
end
20+
21+
def encode(encoder)
22+
encoder.write_array(@topics) do |topic|
23+
encoder.write_string(topic)
24+
end
25+
# Timeout is in ms.
26+
encoder.write_int32(@timeout * 1000)
27+
end
28+
end
29+
30+
end
31+
end
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
module Kafka
2+
module Protocol
3+
4+
class DeleteTopicsResponse
5+
attr_reader :errors
6+
7+
def initialize(errors:)
8+
@errors = errors
9+
end
10+
11+
def self.decode(decoder)
12+
errors = decoder.array do
13+
topic = decoder.string
14+
error_code = decoder.int16
15+
16+
[topic, error_code]
17+
end
18+
19+
new(errors: errors)
20+
end
21+
end
22+
23+
end
24+
end

spec/functional/client_spec.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,22 @@
22

33
describe "Producer API", functional: true do
44
let!(:topic) { create_random_topic(num_partitions: 3) }
5+
let!(:deleted_topic) { create_random_topic(num_partitions: 3) }
56

6-
example "listing all topics in the cluster" do
7+
before do
8+
kafka.delete_topic(deleted_topic)
9+
end
10+
11+
example "listing available topics in the cluster" do
712
# Use a clean Kafka instance to avoid hitting caches.
813
kafka = Kafka.new(seed_brokers: KAFKA_BROKERS, logger: LOGGER)
914

1015
topics = kafka.topics
1116

1217
expect(topics).to include topic
18+
expect(topics).not_to include deleted_topic
1319
expect(kafka.has_topic?(topic)).to eq true
20+
expect(kafka.has_topic?(deleted_topic)).to eq false
1421
end
1522

1623
example "fetching the partition count for a topic" do

spec/functional/topic_management_spec.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,14 @@
99

1010
expect(partitions).to eq 3
1111
end
12+
13+
example "deleting topics" do
14+
topic = generate_topic_name
15+
16+
kafka.create_topic(topic, num_partitions: 3)
17+
expect(kafka.partitions_for(topic)).to eq 3
18+
19+
kafka.delete_topic(topic)
20+
expect(kafka.has_topic?(topic)).to eql(false)
21+
end
1222
end

0 commit comments

Comments
 (0)