Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/v1beta1/queue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type QueueSpec struct {
Durable bool `json:"durable,omitempty"`
// when set to true, queues that have had at least one consumer before are deleted after the last consumer unsubscribes.
AutoDelete bool `json:"autoDelete,omitempty"`
// when set to true, queues are deleted only if empty.
DeleteIfEmpty bool `json:"deleteIfEmpty,omitempty"`
// when set to true, queues are delete only if they have no consumer.
DeleteIfUnused bool `json:"deleteIfUnused,omitempty"`
// Queue arguments in the format of KEY: VALUE. e.g. x-delivery-limit: 10000.
// Configuring queues through arguments is not recommended because they cannot be updated once set; we recommend configuring queues through policies instead.
// +kubebuilder:validation:Type=object
Expand Down
12 changes: 7 additions & 5 deletions api/v1beta1/queue_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ var _ = Describe("queue webhook", func() {
Name: "test-queue",
},
Spec: QueueSpec{
Name: "test",
Vhost: "/a-vhost",
Type: "quorum",
Durable: false,
AutoDelete: true,
Name: "test",
Vhost: "/a-vhost",
Type: "quorum",
Durable: false,
AutoDelete: true,
DeleteIfEmpty: true,
DeleteIfUnused: false,
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
Expand Down
7 changes: 7 additions & 0 deletions config/crd/bases/rabbitmq.com_queues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ spec:
description: when set to true, queues that have had at least one consumer
before are deleted after the last consumer unsubscribes.
type: boolean
deleteIfEmpty:
description: when set to true, queues are deleted only if empty.
type: boolean
deleteIfUnused:
description: when set to true, queues are delete only if they have
no consumer.
type: boolean
durable:
description: When set to false queues does not survive server restart.
type: boolean
Expand Down
27 changes: 23 additions & 4 deletions controllers/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,30 @@ func (r *QueueReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient.
logger.Info("Deleting queues from ReconcilerFunc DeleteObj")

queue := obj.(*topology.Queue)
err := validateResponseForDeletion(client.DeleteQueue(queue.Spec.Vhost, queue.Spec.Name))
if errors.Is(err, NotFound) {
queueDeleteOptions, err := internal.GenerateQueueDeleteOptions(queue)
if err != nil {
return fmt.Errorf("failed to generate queue delete options: %w", err)
}
// Manage Quorum queue deletion if DeleteIfEmpty or DeleteIfUnused is true
if queue.Spec.Type == "quorum" && (queue.Spec.DeleteIfEmpty || queue.Spec.DeleteIfUnused) {
qInfo, err := client.GetQueue(queue.Spec.Vhost, queue.Spec.Name)
if err != nil {
return fmt.Errorf("cannot get %w queue information to verify queue is empty/unused: %s", err, queue.Spec.Name)
}
if qInfo.Messages > 0 && queue.Spec.DeleteIfEmpty {
return fmt.Errorf("cannot delete queue %s because it has ready messages", queue.Spec.Name)
}
if qInfo.Consumers > 0 && queue.Spec.DeleteIfUnused {
return fmt.Errorf("cannot delete queue %s because queue has consumers", queue.Spec.Name)
}

}

errdel := validateResponseForDeletion(client.DeleteQueue(queue.Spec.Vhost, queue.Spec.Name, *queueDeleteOptions))
if errors.Is(errdel, NotFound) {
logger.Info("cannot find queue in rabbitmq server; already deleted", "queue", queue.Spec.Name)
} else if err != nil {
return err
} else if errdel != nil {
return errdel
}
return nil
}
2 changes: 2 additions & 0 deletions docs/api/rabbitmq.com.ref.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,8 @@ QueueSpec defines the desired state of Queue
| *`type`* __string__ |
| *`durable`* __boolean__ | When set to false queues does not survive server restart.
| *`autoDelete`* __boolean__ | when set to true, queues that have had at least one consumer before are deleted after the last consumer unsubscribes.
| *`deleteIfEmpty`* __boolean__ | when set to true, queues are deleted only if empty.
| *`deleteIfUnused`* __boolean__ | when set to true, queues are delete only if they have no consumer.
| *`arguments`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ | Queue arguments in the format of KEY: VALUE. e.g. x-delivery-limit: 10000.
Configuring queues through arguments is not recommended because they cannot be updated once set; we recommend configuring queues through policies instead.
| *`rabbitmqClusterReference`* __xref:{anchor_prefix}-gh.seave.top-rabbitmq-messaging-topology-operator-api-v1beta1-rabbitmqclusterreference[$$RabbitmqClusterReference$$]__ | Reference to the RabbitmqCluster that the queue will be created in.
Expand Down
26 changes: 26 additions & 0 deletions internal/queue_delete_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
RabbitMQ Messaging Topology Kubernetes Operator
Copyright 2021 VMware, Inc.

This product is licensed to you under the Mozilla Public License 2.0 license (the "License"). You may not use this product except in compliance with the Mozilla 2.0 License.

This product may include a number of subcomponents with separate copyright notices and license terms. Your use of these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE file.
*/

package internal

import (
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
)

// GenerateQueueDeleteOptions generates rabbithole.QueueDeleteOptions for a given Queue
// queue.Spec.Arguments (type k8s runtime.RawExtensions) is unmarshalled
func GenerateQueueDeleteOptions(q *topology.Queue) (*rabbithole.QueueDeleteOptions, error) {

return &rabbithole.QueueDeleteOptions{
// Set these values to false if q.Spec.Type = Quorum, not supported by the API
IfEmpty: q.Spec.Type != "quorum" && q.Spec.DeleteIfEmpty,
IfUnused: q.Spec.Type != "quorum" && q.Spec.DeleteIfUnused,
}, nil
}
73 changes: 73 additions & 0 deletions internal/queue_delete_options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package internal_test

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
"github.com/rabbitmq/messaging-topology-operator/internal"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("GenerateQueueDeleteOptionsQuorum", func() {
var q *topology.Queue

BeforeEach(func() {
q = &topology.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "a-queue",
},
Spec: topology.QueueSpec{
Type: "quorum",
AutoDelete: false,
Durable: true,
DeleteIfEmpty: true,
DeleteIfUnused: false,
},
}
})

It("sets QueueDeleteOptions.IfEmpty to false because we handle a quorum queue", func() {
options, err := internal.GenerateQueueDeleteOptions(q)
Expect(err).NotTo(HaveOccurred())
Expect(options.IfEmpty).To(BeFalse())
})

It("sets QueueDeleteOptions.IfUnused to false because we handle a quorum queue", func() {
options, err := internal.GenerateQueueDeleteOptions(q)
Expect(err).NotTo(HaveOccurred())
Expect(options.IfUnused).To(BeFalse())
})

})

var _ = Describe("GenerateQueueDeleteOptionsClassic", func() {
var q *topology.Queue

BeforeEach(func() {
q = &topology.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "a-queue",
},
Spec: topology.QueueSpec{
Type: "classic",
AutoDelete: false,
Durable: true,
DeleteIfEmpty: true,
DeleteIfUnused: false,
},
}
})

It("sets QueueDeleteOptions.IfEmpty according to queue.spec", func() {
options, err := internal.GenerateQueueDeleteOptions(q)
Expect(err).NotTo(HaveOccurred())
Expect(options.IfEmpty).To(BeTrue())
})

It("sets QueueDeleteOptions.IfUnused according to queue.spec", func() {
options, err := internal.GenerateQueueDeleteOptions(q)
Expect(err).NotTo(HaveOccurred())
Expect(options.IfUnused).To(BeFalse())
})

})
1 change: 1 addition & 0 deletions rabbitmqclient/rabbitmq_client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Client interface {
DeletePolicy(string, string) (*http.Response, error)
DeclareQueue(string, string, rabbithole.QueueSettings) (*http.Response, error)
DeleteQueue(string, string, ...rabbithole.QueueDeleteOptions) (*http.Response, error)
GetQueue(string, string) (*rabbithole.DetailedQueueInfo, error)
DeclareExchange(string, string, rabbithole.ExchangeSettings) (*http.Response, error)
DeleteExchange(string, string) (*http.Response, error)
PutVhost(string, rabbithole.VhostSettings) (*http.Response, error)
Expand Down
81 changes: 81 additions & 0 deletions rabbitmqclient/rabbitmqclientfakes/fake_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.