35
35
import org .apache .activemq .artemis .api .core .ICoreMessage ;
36
36
import org .apache .activemq .artemis .api .core .Message ;
37
37
import org .apache .activemq .artemis .api .core .QueueConfiguration ;
38
+ import org .apache .activemq .artemis .api .core .RoutingType ;
38
39
import org .apache .activemq .artemis .api .core .SimpleString ;
39
40
import org .apache .activemq .artemis .core .client .impl .ClientConsumerImpl ;
40
41
import org .apache .activemq .artemis .core .protocol .openwire .OpenWireConstants ;
@@ -215,6 +216,7 @@ private SimpleString createTopicSubscription(boolean isDurable,
215
216
if (info .getDestination ().isComposite ()) {
216
217
queueName = queueName .concat (physicalName );
217
218
}
219
+ QueueConfiguration queueConfiguration = QueueConfiguration .of (queueName ).setAddress (address ).setRoutingType (RoutingType .MULTICAST ).setFilterString (selector ).setInternal (internalAddress );
218
220
QueueQueryResult result = session .getCoreSession ().executeQueueQuery (queueName );
219
221
if (result .isExists ()) {
220
222
// Already exists
@@ -235,10 +237,10 @@ private SimpleString createTopicSubscription(boolean isDurable,
235
237
session .getCoreSession ().deleteQueue (queueName );
236
238
237
239
// Create the new one
238
- session .getCoreSession ().createQueue (QueueConfiguration . of ( queueName ). setAddress ( address ). setFilterString ( selector ). setInternal ( internalAddress ) );
240
+ session .getCoreSession ().createQueue (queueConfiguration );
239
241
}
240
242
} else {
241
- session .getCoreSession ().createQueue (QueueConfiguration . of ( queueName ). setAddress ( address ). setFilterString ( selector ). setInternal ( internalAddress ) );
243
+ session .getCoreSession ().createQueue (queueConfiguration );
242
244
}
243
245
} else {
244
246
// The consumer may be using FQQN in which case the queue might already exist.
@@ -251,7 +253,7 @@ private SimpleString createTopicSubscription(boolean isDurable,
251
253
queueName = SimpleString .of (UUID .randomUUID ().toString ());
252
254
}
253
255
254
- session .getCoreSession ().createQueue (QueueConfiguration .of (queueName ).setAddress (address ).setFilterString (selector ).setDurable (false ).setTemporary (true ).setInternal (internalAddress ));
256
+ session .getCoreSession ().createQueue (QueueConfiguration .of (queueName ).setAddress (address ).setRoutingType ( RoutingType . MULTICAST ). setFilterString (selector ).setDurable (false ).setTemporary (true ).setInternal (internalAddress ));
255
257
}
256
258
257
259
return queueName ;
0 commit comments