@@ -261,9 +261,46 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N
261
261
management .delete_queue (stream_name )
262
262
263
263
264
- def test_stream_filtering_not_present (
265
- connection : Connection , environment : Environment
266
- ) -> None :
264
+ def test_stream_filtering_mixed (connection : Connection ) -> None :
265
+
266
+ consumer = None
267
+ stream_name = "test_stream_info_with_filtering"
268
+ messages_to_send = 10
269
+
270
+ queue_specification = StreamSpecification (
271
+ name = stream_name ,
272
+ )
273
+ management = connection .management ()
274
+ management .declare_queue (queue_specification )
275
+
276
+ addr_queue = AddressHelper .queue_address (stream_name )
277
+
278
+ # consume and then publish
279
+ try :
280
+ stream_filter_options = StreamOptions ()
281
+ stream_filter_options .filter_values (["banana" ])
282
+ connection_consumer = create_connection ()
283
+ consumer = connection_consumer .consumer (
284
+ addr_queue ,
285
+ # check we are reading just from offset 10 as just banana filtering applies
286
+ message_handler = MyMessageHandlerAcceptStreamOffset (10 ),
287
+ stream_filter_options = stream_filter_options ,
288
+ )
289
+ # send with annotations filter apple and then banana
290
+ # consumer will read just from offset 10
291
+ publish_messages (connection , messages_to_send , stream_name , ["apple" ])
292
+ publish_messages (connection , messages_to_send , stream_name , ["banana" ])
293
+ consumer .run ()
294
+ # ack to terminate the consumer
295
+ except ConsumerTestException :
296
+ pass
297
+
298
+ consumer .close ()
299
+
300
+ management .delete_queue (stream_name )
301
+
302
+
303
+ def test_stream_filtering_not_present (connection : Connection , environment : Environment ) -> None :
267
304
268
305
raised = False
269
306
stream_name = "test_stream_info_with_filtering"
0 commit comments