From 44cae976eb38e3d351737d5c726747785b68e6ce Mon Sep 17 00:00:00 2001 From: Jean-Damien Date: Fri, 9 Feb 2024 16:01:27 +0100 Subject: [PATCH 1/3] Allow usage of if-empty, if-unused parameters for queue deletion --- api/v1beta1/queue_types.go | 4 + api/v1beta1/queue_webhook_test.go | 12 +-- config/crd/bases/rabbitmq.com_queues.yaml | 7 ++ controllers/queue_controller.go | 27 ++++++- docs/api/rabbitmq.com.ref.asciidoc | 2 + internal/queue_delete_options.go | 34 ++++++++ internal/queue_delete_options_test.go | 73 +++++++++++++++++ rabbitmqclient/rabbitmq_client_factory.go | 1 + .../rabbitmqclientfakes/fake_client.go | 81 +++++++++++++++++++ system_tests/queue_system_test.go | 2 + 10 files changed, 234 insertions(+), 9 deletions(-) create mode 100644 internal/queue_delete_options.go create mode 100644 internal/queue_delete_options_test.go diff --git a/api/v1beta1/queue_types.go b/api/v1beta1/queue_types.go index e1b9b71a..f20f9ced 100644 --- a/api/v1beta1/queue_types.go +++ b/api/v1beta1/queue_types.go @@ -32,6 +32,10 @@ type QueueSpec struct { Durable bool `json:"durable,omitempty"` // when set to true, queues that have had at least one consumer before are deleted after the last consumer unsubscribes. AutoDelete bool `json:"autoDelete,omitempty"` + // when set to true, queues are deleted only if empty. + DeleteIfEmpty bool `json:"deleteIfEmpty,omitempty"` + // when set to true, queues are delete only if they have no consumer. + DeleteIfUnused bool `json:"deleteIfUnused,omitempty"` // Queue arguments in the format of KEY: VALUE. e.g. x-delivery-limit: 10000. // Configuring queues through arguments is not recommended because they cannot be updated once set; we recommend configuring queues through policies instead. // +kubebuilder:validation:Type=object diff --git a/api/v1beta1/queue_webhook_test.go b/api/v1beta1/queue_webhook_test.go index 6bbbbe1f..cb9790e2 100644 --- a/api/v1beta1/queue_webhook_test.go +++ b/api/v1beta1/queue_webhook_test.go @@ -16,11 +16,13 @@ var _ = Describe("queue webhook", func() { Name: "test-queue", }, Spec: QueueSpec{ - Name: "test", - Vhost: "/a-vhost", - Type: "quorum", - Durable: false, - AutoDelete: true, + Name: "test", + Vhost: "/a-vhost", + Type: "quorum", + Durable: false, + AutoDelete: true, + DeleteIfEmpty: true, + DeleteIfUnused: false, RabbitmqClusterReference: RabbitmqClusterReference{ Name: "some-cluster", }, diff --git a/config/crd/bases/rabbitmq.com_queues.yaml b/config/crd/bases/rabbitmq.com_queues.yaml index 6e66724c..18042bca 100644 --- a/config/crd/bases/rabbitmq.com_queues.yaml +++ b/config/crd/bases/rabbitmq.com_queues.yaml @@ -52,6 +52,13 @@ spec: description: when set to true, queues that have had at least one consumer before are deleted after the last consumer unsubscribes. type: boolean + deleteIfEmpty: + description: when set to true, queues are deleted only if empty. + type: boolean + deleteIfUnused: + description: when set to true, queues are delete only if they have + no consumer. + type: boolean durable: description: When set to false queues does not survive server restart. type: boolean diff --git a/controllers/queue_controller.go b/controllers/queue_controller.go index c4884292..09b84f80 100644 --- a/controllers/queue_controller.go +++ b/controllers/queue_controller.go @@ -47,11 +47,30 @@ func (r *QueueReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient. logger.Info("Deleting queues from ReconcilerFunc DeleteObj") queue := obj.(*topology.Queue) - err := validateResponseForDeletion(client.DeleteQueue(queue.Spec.Vhost, queue.Spec.Name)) - if errors.Is(err, NotFound) { + queueDeleteOptions, err := internal.GenerateQueueDeleteOptions(queue) + if err != nil { + return fmt.Errorf("failed to generate queue delete options: %w", err) + } + // Manage Quorum queue deletion if DeleteIfEmpty or DeleteIfUnused is true + if queue.Spec.Type == "quorum" && (queue.Spec.DeleteIfEmpty || queue.Spec.DeleteIfUnused) { + qInfo, err := client.GetQueue(queue.Spec.Vhost, queue.Spec.Name) + if err == nil { + return fmt.Errorf("cannot get %w Queue Information to verify queue is empty/unused: %s", err, queue.Spec.Name) + } + if qInfo.Messages > 0 && queue.Spec.DeleteIfEmpty { + return fmt.Errorf("Cannot delete queue %s because messages", queue.Spec.Name) + } + if qInfo.Consumers > 0 && queue.Spec.DeleteIfUnused { + return fmt.Errorf("Cannot delete queue %s because queue has consumers", queue.Spec.Name) + } + + } + + errdel := validateResponseForDeletion(client.DeleteQueue(queue.Spec.Vhost, queue.Spec.Name, *queueDeleteOptions)) + if errors.Is(errdel, NotFound) { logger.Info("cannot find queue in rabbitmq server; already deleted", "queue", queue.Spec.Name) - } else if err != nil { - return err + } else if errdel != nil { + return errdel } return nil } diff --git a/docs/api/rabbitmq.com.ref.asciidoc b/docs/api/rabbitmq.com.ref.asciidoc index c9a9bc38..50b5c41a 100644 --- a/docs/api/rabbitmq.com.ref.asciidoc +++ b/docs/api/rabbitmq.com.ref.asciidoc @@ -902,6 +902,8 @@ QueueSpec defines the desired state of Queue | *`type`* __string__ | | *`durable`* __boolean__ | When set to false queues does not survive server restart. | *`autoDelete`* __boolean__ | when set to true, queues that have had at least one consumer before are deleted after the last consumer unsubscribes. +| *`deleteIfEmpty`* __boolean__ | when set to true, queues are deleted only if empty. +| *`deleteIfUnused`* __boolean__ | when set to true, queues are delete only if they have no consumer. | *`arguments`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ | Queue arguments in the format of KEY: VALUE. e.g. x-delivery-limit: 10000. Configuring queues through arguments is not recommended because they cannot be updated once set; we recommend configuring queues through policies instead. | *`rabbitmqClusterReference`* __xref:{anchor_prefix}-gh.seave.top-rabbitmq-messaging-topology-operator-api-v1beta1-rabbitmqclusterreference[$$RabbitmqClusterReference$$]__ | Reference to the RabbitmqCluster that the queue will be created in. diff --git a/internal/queue_delete_options.go b/internal/queue_delete_options.go new file mode 100644 index 00000000..c1e3c72c --- /dev/null +++ b/internal/queue_delete_options.go @@ -0,0 +1,34 @@ +/* +RabbitMQ Messaging Topology Kubernetes Operator +Copyright 2021 VMware, Inc. + +This product is licensed to you under the Mozilla Public License 2.0 license (the "License"). You may not use this product except in compliance with the Mozilla 2.0 License. + +This product may include a number of subcomponents with separate copyright notices and license terms. Your use of these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE file. +*/ + +package internal + +import ( + "encoding/json" + "fmt" + rabbithole "github.com/michaelklishin/rabbit-hole/v2" + topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" +) + +// GenerateQueueDeleteOptions generates rabbithole.QueueDeleteOptions for a given Queue +// queue.Spec.Arguments (type k8s runtime.RawExtensions) is unmarshalled +func GenerateQueueDeleteOptions(q *topology.Queue) (*rabbithole.QueueDeleteOptions, error) { + arguments := make(map[string]interface{}) + if q.Spec.Arguments != nil { + if err := json.Unmarshal(q.Spec.Arguments.Raw, &arguments); err != nil { + return nil, fmt.Errorf("failed to unmarshall queue arguments: %v", err) + } + } + + return &rabbithole.QueueDeleteOptions{ + // Set these values to false if q.Spec.Type = Quorum, not supported by the API + IfEmpty: q.Spec.Type != "quorum" && q.Spec.DeleteIfEmpty, + IfUnused: q.Spec.Type != "quorum" && q.Spec.DeleteIfUnused, + }, nil +} diff --git a/internal/queue_delete_options_test.go b/internal/queue_delete_options_test.go new file mode 100644 index 00000000..d83b67ec --- /dev/null +++ b/internal/queue_delete_options_test.go @@ -0,0 +1,73 @@ +package internal_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" + "github.com/rabbitmq/messaging-topology-operator/internal" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("GenerateQueueDeleteOptionsQuorum", func() { + var q *topology.Queue + + BeforeEach(func() { + q = &topology.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "a-queue", + }, + Spec: topology.QueueSpec{ + Type: "quorum", + AutoDelete: false, + Durable: true, + DeleteIfEmpty: true, + DeleteIfUnused: false, + }, + } + }) + + It("sets QueueDeleteOptions.IfEmpty to false because we handle a quorum queue", func() { + options, err := internal.GenerateQueueDeleteOptions(q) + Expect(err).NotTo(HaveOccurred()) + Expect(options.IfEmpty).To(BeFalse()) + }) + + It("sets QueueDeleteOptions.IfUnused to false because we handle a quorum queue", func() { + options, err := internal.GenerateQueueDeleteOptions(q) + Expect(err).NotTo(HaveOccurred()) + Expect(options.IfUnused).To(BeFalse()) + }) + +}) + +var _ = Describe("GenerateQueueDeleteOptionsClassic", func() { + var q *topology.Queue + + BeforeEach(func() { + q = &topology.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "a-queue", + }, + Spec: topology.QueueSpec{ + Type: "classic", + AutoDelete: false, + Durable: true, + DeleteIfEmpty: true, + DeleteIfUnused: false, + }, + } + }) + + It("sets QueueDeleteOptions.IfEmpty according to queue.spec", func() { + options, err := internal.GenerateQueueDeleteOptions(q) + Expect(err).NotTo(HaveOccurred()) + Expect(options.IfEmpty).To(BeTrue()) + }) + + It("sets QueueDeleteOptions.IfUnused according to queue.spec", func() { + options, err := internal.GenerateQueueDeleteOptions(q) + Expect(err).NotTo(HaveOccurred()) + Expect(options.IfUnused).To(BeFalse()) + }) + +}) diff --git a/rabbitmqclient/rabbitmq_client_factory.go b/rabbitmqclient/rabbitmq_client_factory.go index 9a3d3cb1..27b54435 100644 --- a/rabbitmqclient/rabbitmq_client_factory.go +++ b/rabbitmqclient/rabbitmq_client_factory.go @@ -33,6 +33,7 @@ type Client interface { DeletePolicy(string, string) (*http.Response, error) DeclareQueue(string, string, rabbithole.QueueSettings) (*http.Response, error) DeleteQueue(string, string, ...rabbithole.QueueDeleteOptions) (*http.Response, error) + GetQueue(string, string) (*rabbithole.DetailedQueueInfo, error) DeclareExchange(string, string, rabbithole.ExchangeSettings) (*http.Response, error) DeleteExchange(string, string) (*http.Response, error) PutVhost(string, rabbithole.VhostSettings) (*http.Response, error) diff --git a/rabbitmqclient/rabbitmqclientfakes/fake_client.go b/rabbitmqclient/rabbitmqclientfakes/fake_client.go index 8658deec..b5a9d1b8 100644 --- a/rabbitmqclient/rabbitmqclientfakes/fake_client.go +++ b/rabbitmqclient/rabbitmqclientfakes/fake_client.go @@ -236,6 +236,20 @@ type FakeClient struct { result1 *http.Response result2 error } + GetQueueStub func(string, string) (*rabbithole.DetailedQueueInfo, error) + getQueueMutex sync.RWMutex + getQueueArgsForCall []struct { + arg1 string + arg2 string + } + getQueueReturns struct { + result1 *rabbithole.DetailedQueueInfo + result2 error + } + getQueueReturnsOnCall map[int]struct { + result1 *rabbithole.DetailedQueueInfo + result2 error + } GetVhostStub func(string) (*rabbithole.VhostInfo, error) getVhostMutex sync.RWMutex getVhostArgsForCall []struct { @@ -1442,6 +1456,71 @@ func (fake *FakeClient) DeleteVhostReturnsOnCall(i int, result1 *http.Response, }{result1, result2} } +func (fake *FakeClient) GetQueue(arg1 string, arg2 string) (*rabbithole.DetailedQueueInfo, error) { + fake.getQueueMutex.Lock() + ret, specificReturn := fake.getQueueReturnsOnCall[len(fake.getQueueArgsForCall)] + fake.getQueueArgsForCall = append(fake.getQueueArgsForCall, struct { + arg1 string + arg2 string + }{arg1, arg2}) + stub := fake.GetQueueStub + fakeReturns := fake.getQueueReturns + fake.recordInvocation("GetQueue", []interface{}{arg1, arg2}) + fake.getQueueMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeClient) GetQueueCallCount() int { + fake.getQueueMutex.RLock() + defer fake.getQueueMutex.RUnlock() + return len(fake.getQueueArgsForCall) +} + +func (fake *FakeClient) GetQueueCalls(stub func(string, string) (*rabbithole.DetailedQueueInfo, error)) { + fake.getQueueMutex.Lock() + defer fake.getQueueMutex.Unlock() + fake.GetQueueStub = stub +} + +func (fake *FakeClient) GetQueueArgsForCall(i int) (string, string) { + fake.getQueueMutex.RLock() + defer fake.getQueueMutex.RUnlock() + argsForCall := fake.getQueueArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeClient) GetQueueReturns(result1 *rabbithole.DetailedQueueInfo, result2 error) { + fake.getQueueMutex.Lock() + defer fake.getQueueMutex.Unlock() + fake.GetQueueStub = nil + fake.getQueueReturns = struct { + result1 *rabbithole.DetailedQueueInfo + result2 error + }{result1, result2} +} + +func (fake *FakeClient) GetQueueReturnsOnCall(i int, result1 *rabbithole.DetailedQueueInfo, result2 error) { + fake.getQueueMutex.Lock() + defer fake.getQueueMutex.Unlock() + fake.GetQueueStub = nil + if fake.getQueueReturnsOnCall == nil { + fake.getQueueReturnsOnCall = make(map[int]struct { + result1 *rabbithole.DetailedQueueInfo + result2 error + }) + } + fake.getQueueReturnsOnCall[i] = struct { + result1 *rabbithole.DetailedQueueInfo + result2 error + }{result1, result2} +} + func (fake *FakeClient) GetVhost(arg1 string) (*rabbithole.VhostInfo, error) { fake.getVhostMutex.Lock() ret, specificReturn := fake.getVhostReturnsOnCall[len(fake.getVhostArgsForCall)] @@ -2198,6 +2277,8 @@ func (fake *FakeClient) Invocations() map[string][][]interface{} { defer fake.deleteUserMutex.RUnlock() fake.deleteVhostMutex.RLock() defer fake.deleteVhostMutex.RUnlock() + fake.getQueueMutex.RLock() + defer fake.getQueueMutex.RUnlock() fake.getVhostMutex.RLock() defer fake.getVhostMutex.RUnlock() fake.listExchangeBindingsBetweenMutex.RLock() diff --git a/system_tests/queue_system_test.go b/system_tests/queue_system_test.go index 3a0a9b86..d5138a77 100644 --- a/system_tests/queue_system_test.go +++ b/system_tests/queue_system_test.go @@ -100,5 +100,7 @@ var _ = Describe("Queue Controller", func() { return err }, 30).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("Object Not Found")) + + //FIXME implement delete queue test with ifUnused/ if Empty? }) }) From 7eab521ee30d6bfb44d0b38709cd47f394059366 Mon Sep 17 00:00:00 2001 From: Jean-Damien Date: Mon, 19 Feb 2024 11:09:42 +0100 Subject: [PATCH 2/3] Apply feedback --- controllers/queue_controller.go | 8 ++++---- internal/queue_delete_options.go | 6 ------ system_tests/queue_system_test.go | 2 -- 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/controllers/queue_controller.go b/controllers/queue_controller.go index 09b84f80..9f473fcc 100644 --- a/controllers/queue_controller.go +++ b/controllers/queue_controller.go @@ -54,14 +54,14 @@ func (r *QueueReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient. // Manage Quorum queue deletion if DeleteIfEmpty or DeleteIfUnused is true if queue.Spec.Type == "quorum" && (queue.Spec.DeleteIfEmpty || queue.Spec.DeleteIfUnused) { qInfo, err := client.GetQueue(queue.Spec.Vhost, queue.Spec.Name) - if err == nil { - return fmt.Errorf("cannot get %w Queue Information to verify queue is empty/unused: %s", err, queue.Spec.Name) + if err != nil { + return fmt.Errorf("cannot get %w queue information to verify queue is empty/unused: %s", err, queue.Spec.Name) } if qInfo.Messages > 0 && queue.Spec.DeleteIfEmpty { - return fmt.Errorf("Cannot delete queue %s because messages", queue.Spec.Name) + return fmt.Errorf("cannot delete queue %s because it has ready messages", queue.Spec.Name) } if qInfo.Consumers > 0 && queue.Spec.DeleteIfUnused { - return fmt.Errorf("Cannot delete queue %s because queue has consumers", queue.Spec.Name) + return fmt.Errorf("cannot delete queue %s because queue has consumers", queue.Spec.Name) } } diff --git a/internal/queue_delete_options.go b/internal/queue_delete_options.go index c1e3c72c..137bd71d 100644 --- a/internal/queue_delete_options.go +++ b/internal/queue_delete_options.go @@ -19,12 +19,6 @@ import ( // GenerateQueueDeleteOptions generates rabbithole.QueueDeleteOptions for a given Queue // queue.Spec.Arguments (type k8s runtime.RawExtensions) is unmarshalled func GenerateQueueDeleteOptions(q *topology.Queue) (*rabbithole.QueueDeleteOptions, error) { - arguments := make(map[string]interface{}) - if q.Spec.Arguments != nil { - if err := json.Unmarshal(q.Spec.Arguments.Raw, &arguments); err != nil { - return nil, fmt.Errorf("failed to unmarshall queue arguments: %v", err) - } - } return &rabbithole.QueueDeleteOptions{ // Set these values to false if q.Spec.Type = Quorum, not supported by the API diff --git a/system_tests/queue_system_test.go b/system_tests/queue_system_test.go index d5138a77..3a0a9b86 100644 --- a/system_tests/queue_system_test.go +++ b/system_tests/queue_system_test.go @@ -100,7 +100,5 @@ var _ = Describe("Queue Controller", func() { return err }, 30).Should(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("Object Not Found")) - - //FIXME implement delete queue test with ifUnused/ if Empty? }) }) From b118421bac1669ffe043f4050c8bfc40094b85c5 Mon Sep 17 00:00:00 2001 From: Jean-Damien Date: Tue, 20 Feb 2024 19:48:36 +0100 Subject: [PATCH 3/3] Apply feedback (part2) --- internal/queue_delete_options.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/queue_delete_options.go b/internal/queue_delete_options.go index 137bd71d..2f801682 100644 --- a/internal/queue_delete_options.go +++ b/internal/queue_delete_options.go @@ -10,8 +10,6 @@ This product may include a number of subcomponents with separate copyright notic package internal import ( - "encoding/json" - "fmt" rabbithole "github.com/michaelklishin/rabbit-hole/v2" topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" )