diff --git a/apps/flutter_client_contract_test_service/pubspec.lock b/apps/flutter_client_contract_test_service/pubspec.lock index 6a5d1fef..a226a7a5 100644 --- a/apps/flutter_client_contract_test_service/pubspec.lock +++ b/apps/flutter_client_contract_test_service/pubspec.lock @@ -388,26 +388,29 @@ packages: source: hosted version: "6.8.0" launchdarkly_common_client: - dependency: "direct overridden" + dependency: transitive description: - path: "../../packages/common_client" - relative: true - source: path + name: launchdarkly_common_client + sha256: e691f25676dfb659975843d7ef5e178297939bf3f4a53cab75381be07f27feec + url: "https://pub.dev" + source: hosted version: "1.6.1" launchdarkly_dart_common: - dependency: "direct overridden" + dependency: transitive description: - path: "../../packages/common" - relative: true - source: path - version: "1.6.1" + name: launchdarkly_dart_common + sha256: "323b0ae2bc756c7c83e95494983b72b190e012e090758a920a992358cbc025a2" + url: "https://pub.dev" + source: hosted + version: "1.6.0" launchdarkly_event_source_client: - dependency: "direct overridden" + dependency: transitive description: - path: "../../packages/event_source_client" - relative: true - source: path - version: "1.2.1" + name: launchdarkly_event_source_client + sha256: "3506de716320c80898e12b825063a69a9a7169042902cdd6eb164f46b3ec60e3" + url: "https://pub.dev" + source: hosted + version: "1.2.0" launchdarkly_flutter_client_sdk: dependency: "direct main" description: @@ -872,10 +875,10 @@ packages: dependency: transitive description: name: vector_math - sha256: "80b3257d1492ce4d091729e3a67a60407d227c27241d6927be0130c98e741803" + sha256: d530bd74fea330e6e364cda7a85019c434070188383e1cd8d9777ee586914c5b url: "https://pub.dev" source: hosted - version: "2.1.4" + version: "2.2.0" vm_service: dependency: transitive description: @@ -957,5 +960,5 @@ packages: source: hosted version: "3.1.3" sdks: - dart: ">=3.7.0-0 <4.0.0" + dart: ">=3.8.0-0 <4.0.0" flutter: ">=3.22.0" diff --git a/apps/sse_contract_test_service/bin/sse_contract_test_service.dart b/apps/sse_contract_test_service/bin/sse_contract_test_service.dart index 071fa353..b34943af 100644 --- a/apps/sse_contract_test_service/bin/sse_contract_test_service.dart +++ b/apps/sse_contract_test_service/bin/sse_contract_test_service.dart @@ -65,13 +65,18 @@ class TestApiImpl extends TestApi { httpMethod: method, headers: headers); final subscription = client.stream.listen((event) { - callbackClient.callbackNumberPost( - PostCallback( - kind: 'event', - event: PostCallbackEvent( - type: event.type, data: event.data, id: event.id)), - callbackNumber: callbackId); - callbackId++; + switch (event) { + case MessageEvent(): + callbackClient.callbackNumberPost( + PostCallback( + kind: 'event', + event: PostCallbackEvent( + type: event.type, data: event.data, id: event.id)), + callbackNumber: callbackId); + callbackId++; + default: + break; + } }, onError: (error) { callbackClient.callbackNumberPost( PostCallback(kind: 'error', comment: error.toString()), diff --git a/melos.yaml b/melos.yaml index 303547cb..9fbc4a45 100644 --- a/melos.yaml +++ b/melos.yaml @@ -4,7 +4,9 @@ environment: sdk: '>=3.4.0 <4.0.0' packages: - - packages/* + # Remove the event_source_client from the workspace temporarily to allow a breaking change. + - packages/common + - packages/common_client - packages/flutter_client_sdk/example - apps/* @@ -19,7 +21,9 @@ scripts: # Add more packages as more of them have tests. # Tests are ran with flutter as it supports coverage. Some packages may also include flutter # dependencies. - run: MELOS_PACKAGES="launchdarkly_dart_common,launchdarkly_common_client,launchdarkly_flutter_client_sdk" melos exec -- flutter test . --coverage + run: > + MELOS_PACKAGES="launchdarkly_dart_common,launchdarkly_common_client,launchdarkly_flutter_client_sdk" melos exec -- flutter test . --coverage && + cd packages/event_source_client && dart test merge-trace-files: description: Merge all packages coverage trace files ignoring data related to generated files. diff --git a/packages/event_source_client/lib/launchdarkly_event_source_client.dart b/packages/event_source_client/lib/launchdarkly_event_source_client.dart index 6ba2af5b..ab92bcde 100644 --- a/packages/event_source_client/lib/launchdarkly_event_source_client.dart +++ b/packages/event_source_client/lib/launchdarkly_event_source_client.dart @@ -2,15 +2,18 @@ library launchdarkly_sse; import 'dart:async'; +import 'dart:collection'; import 'src/http_consts.dart'; import 'src/logging.dart'; -import 'src/message_event.dart'; +import 'src/events.dart'; import 'src/sse_client_stub.dart' if (dart.library.io) 'src/sse_client_http.dart' if (dart.library.js_interop) 'src/sse_client_html.dart'; +import 'src/test_sse_client.dart'; -export 'src/message_event.dart' show MessageEvent; +export 'src/events.dart' show Event, MessageEvent, OpenEvent; +export 'src/test_sse_client.dart' show TestSseClient; export 'src/logging.dart' show EventSourceLogger, LogLevel, NoOpLogger, PrintLogger; @@ -32,10 +35,11 @@ enum SseHttpMethod { /// An [SSEClient] that works to maintain a SSE connection to a server. /// -/// You can receive [MessageEvent]s by listening to the [stream] object. The SSEClient will -/// connect when there is a nonzero number of subscribers on [stream] and will disconnect when -/// there are zero subscribers on [stream]. In certain cases, unrecoverable errors will be -/// reported on the [stream] at which point the stream will be done. +/// You can receive [Events]s by listening to the [stream] object. The SSEClient +/// will connect when there is a nonzero number of subscribers on the [stream] +/// and will disconnect when there are zero subscribers on the [stream]. +/// In certain cases, unrecoverable errors will be reported on the [stream] at +/// which point the stream will be done. /// /// The [SSEClient] will make best effort to maintain the streaming connection. abstract class SSEClient { @@ -47,9 +51,9 @@ abstract class SSEClient { static const defaultConnectTimeout = Duration(seconds: 30); static const defaultReadTimeout = Duration(minutes: 5); - /// Subscribe to this [stream] to receive events and sometimes errors. The first + /// Subscribe to this [stream] to receive events and sometimes errors. /// subscribe triggers the connection, so expect network delay initially. - Stream get stream; + Stream get stream; /// Closes the SSEClient and tears down connections and resources. Do not use the /// SSEClient after close is called, behavior is undefined at that point. @@ -97,4 +101,33 @@ abstract class SSEClient { return getSSEClient(uri, eventTypes, mergedHeaders, connectTimeout, readTimeout, body, httpMethod.toString(), logger); } + + /// Get an SSE client for use in unit tests. + /// + /// Most parameters are the same as those of the main SSEClient factory, but + /// the test client supports an additional property which is the [sourceStream]. + /// Events sent to the [sourceStream] will also be emitted by the event source + /// if the event source has listeners. When a user unsubscribes from the event + /// stream, then the test client will unsubscribe from the source stream. + /// + /// This method is primarily for use the the LaunchDarkly SDK implementation. + /// Changes may be made to this API without following semantic conventions. + static TestSseClient testClient( + Uri uri, + Set eventTypes, { + Map headers = defaultHeaders, + Duration connectTimeout = defaultConnectTimeout, + Duration readTimeout = defaultReadTimeout, + String? body, + SseHttpMethod httpMethod = SseHttpMethod.get, + Stream? sourceStream, + }) { + return TestSseClient.internal( + headers: UnmodifiableMapView(headers), + connectTimeout: connectTimeout, + readTimeout: readTimeout, + body: body, + httpMethod: httpMethod, + sourceStream: sourceStream); + } } diff --git a/packages/event_source_client/lib/src/events.dart b/packages/event_source_client/lib/src/events.dart new file mode 100644 index 00000000..633c5fc1 --- /dev/null +++ b/packages/event_source_client/lib/src/events.dart @@ -0,0 +1,102 @@ +import 'dart:collection'; + +/// Base class for event source events. +final class Event {} + +// Implementation note: Any new constructor parameters should be added as +// optional parameters unless added in a major version. + +/// Represents a message that came across the SSE stream. +final class MessageEvent implements Event { + /// The type of the message. + final String type; + + /// The data sent in the message. + final String data; + + /// An optional message id that was provided. + final String? id; + + /// Creates the message with the provided values. + const MessageEvent(this.type, this.data, this.id); + + @override + String toString() { + return 'MessageEvent{type:$type,data:$data,id:$id}'; + } + + @override + bool operator ==(Object other) => + identical(this, other) || + other is MessageEvent && + runtimeType == other.runtimeType && + type == other.type && + data == other.data && + id == other.id; + + @override + int get hashCode => type.hashCode ^ data.hashCode ^ id.hashCode; +} + +/// Event emitted when the SSE client connects. +final class OpenEvent implements Event { + /// Any headers associated with the connection. + final UnmodifiableMapView? headers; + + /// Create a connected event with the specified headers. + const OpenEvent({this.headers}); + + @override + String toString() { + return 'OpenEvent{headers:$headers}'; + } + + bool _compareHeaders(UnmodifiableMapView? otherHeaders) { + if (headers == null && otherHeaders == null) { + return true; + } + if (headers != null && otherHeaders == null) { + return false; + } + if (headers == null && otherHeaders != null) { + return false; + } + var self = headers!; + var other = otherHeaders!; + if (self.length != other.length) { + return false; + } + for (var pair in self.entries) { + if (!other.containsKey(pair.key)) { + return false; + } + if (pair.value != other[pair.key]) { + return false; + } + } + return true; + } + + @override + bool operator ==(Object other) { + return identical(this, other) || + other is OpenEvent && _compareHeaders(other.headers); + } + + @override + int get hashCode => headers != null + ? Object.hashAllUnordered( + headers!.entries.map((item) => Object.hash(item.key, item.value))) + : null.hashCode; +} + +bool isMessageEvent(Event event) { + { + switch (event) { + case MessageEvent(): + return true; + default: + return false; + } + } +} diff --git a/packages/event_source_client/lib/src/message_event.dart b/packages/event_source_client/lib/src/message_event.dart deleted file mode 100644 index 0913a31e..00000000 --- a/packages/event_source_client/lib/src/message_event.dart +++ /dev/null @@ -1,31 +0,0 @@ -/// Represents a message that came across the SSE stream. -class MessageEvent { - /// The type of the message. - final String type; - - /// The data sent in the message. - final String data; - - /// An optional message id that was provided. - final String? id; - - /// Creates the message with the provided values. - MessageEvent(this.type, this.data, this.id); - - @override - String toString() { - return '{type:$type,data:$data,id:$id}'; - } - - @override - bool operator ==(Object other) => - identical(this, other) || - other is MessageEvent && - runtimeType == other.runtimeType && - type == other.type && - data == other.data && - id == other.id; - - @override - int get hashCode => type.hashCode ^ data.hashCode ^ id.hashCode; -} diff --git a/packages/event_source_client/lib/src/sse_client_html.dart b/packages/event_source_client/lib/src/sse_client_html.dart index f89ec425..55c91b9c 100644 --- a/packages/event_source_client/lib/src/sse_client_html.dart +++ b/packages/event_source_client/lib/src/sse_client_html.dart @@ -6,7 +6,7 @@ import 'dart:math' as math; import '../launchdarkly_event_source_client.dart'; import 'backoff.dart'; -import 'message_event.dart' as ld_message_event; +import 'events.dart' as ld_message_event; /// An [SSEClient] that uses the [web.EventSource] available on most browsers for web platform support. class HtmlSseClient implements SSEClient { @@ -14,8 +14,7 @@ class HtmlSseClient implements SSEClient { web.EventSource? _eventSource; /// This controller is for the events going to the subscribers of this client. - late final StreamController - _messageEventsController; + late final StreamController _messageEventsController; late final EventSourceLogger _logger; @@ -34,7 +33,7 @@ class HtmlSseClient implements SSEClient { _eventTypes = eventTypes { _logger = logger ?? NoOpLogger(); _messageEventsController = - StreamController.broadcast( + StreamController.broadcast( onListen: () { // this is triggered when first listener subscribes @@ -63,6 +62,7 @@ class HtmlSseClient implements SSEClient { _eventSource?.addEventListener(eventType, _handleMessageEvent.toJS); } _eventSource?.addEventListener('error', _handleError.toJS); + _eventSource?.addEventListener('open', _handleOpen.toJS); } void _handleError(web.Event event) { @@ -72,6 +72,11 @@ class HtmlSseClient implements SSEClient { restart(); } + void _handleOpen(web.Event event) { + // The browser event source doesn't have header support. + _messageEventsController.sink.add(OpenEvent()); + } + void _handleMessageEvent(web.Event event) { _activeSince = DateTime.now().millisecondsSinceEpoch; final messageEvent = event as web.MessageEvent; @@ -85,8 +90,7 @@ class HtmlSseClient implements SSEClient { /// Subscribe to this [stream] to receive events and sometimes errors. The first /// subscribe triggers the connection, so expect a network delay initially. @override - Stream get stream => - _messageEventsController.stream; + Stream get stream => _messageEventsController.stream; @override Future close() async { diff --git a/packages/event_source_client/lib/src/sse_client_http.dart b/packages/event_source_client/lib/src/sse_client_http.dart index 47d392fa..29832372 100644 --- a/packages/event_source_client/lib/src/sse_client_http.dart +++ b/packages/event_source_client/lib/src/sse_client_http.dart @@ -4,6 +4,7 @@ import 'dart:math' as math; import 'package:http/http.dart' as http; import '../launchdarkly_event_source_client.dart'; +import 'events.dart' show isMessageEvent; import 'state_idle.dart'; import 'state_value_object.dart'; @@ -18,7 +19,7 @@ class HttpSseClient implements SSEClient { static const defaultReadTimeout = Duration(minutes: 5); /// This controller is for the events going to the subscribers of this client. - late final StreamController _messageEventsController; + late final StreamController _messageEventsController; late final EventSourceLogger _logger; /// This controller is for controlling the internal state machine when subscribers @@ -67,7 +68,7 @@ class HttpSseClient implements SSEClient { String httpMethod, EventSourceLogger? logger) { _logger = logger ?? NoOpLogger(); - _messageEventsController = StreamController.broadcast( + _messageEventsController = StreamController.broadcast( // this is triggered when first listener subscribes onListen: () => _connectionDesiredStateController.add(true), // this is triggered when last listener unsubscribes @@ -94,7 +95,7 @@ class HttpSseClient implements SSEClient { /// Subscribe to this [stream] to receive events and sometimes errors. The first /// subscribe triggers the connection, so expect a network delay initially. @override - Stream get stream => _messageEventsController.stream; + Stream get stream => _messageEventsController.stream; @override Future close() async { diff --git a/packages/event_source_client/lib/src/state_connected.dart b/packages/event_source_client/lib/src/state_connected.dart index e3875672..2789f870 100644 --- a/packages/event_source_client/lib/src/state_connected.dart +++ b/packages/event_source_client/lib/src/state_connected.dart @@ -1,7 +1,9 @@ import 'dart:async'; +import 'dart:collection'; import 'dart:convert'; import 'package:http/http.dart' as http; +import 'events.dart'; import 'state_backoff.dart'; import 'state_idle.dart'; import 'state_value_object.dart'; @@ -16,6 +18,10 @@ class StateConnected { // record transition to this state for testing/logging svo.transitionSink.add(StateConnected); svo.logger.debug('Transitioned to StateConnected'); + svo.eventSink.add(OpenEvent( + headers: svo.connectHeaders != null + ? UnmodifiableMapView(svo.connectHeaders!) + : null)); // wait for either the stream to terminate or desired connection change to transition final transition = await Future.any([ @@ -47,12 +53,20 @@ class StateConnected { recordedActiveSince = true; } - // hold on to most recent id if there is one so we can use it for session resumption - svo.lastId = event.id ?? svo.lastId; + // Implementation note: Currently only message events are supported + // by the parser, but we could potentially extend that to support + // emitting comment events. + switch (event) { + case MessageEvent(): + // hold on to most recent id if there is one so we can use it for session resumption + svo.lastId = event.id ?? svo.lastId; - // only emit events that have event types the sse client was configured to use - if (svo.eventTypes.contains(event.type)) { - svo.eventSink.add(event); + // only emit events that have event types the sse client was configured to use + if (svo.eventTypes.contains(event.type)) { + svo.eventSink.add(event); + } + default: + break; } } diff --git a/packages/event_source_client/lib/src/state_connecting.dart b/packages/event_source_client/lib/src/state_connecting.dart index e9a3bc1a..85364047 100644 --- a/packages/event_source_client/lib/src/state_connecting.dart +++ b/packages/event_source_client/lib/src/state_connecting.dart @@ -85,6 +85,8 @@ class StateConnecting { }; } + svo.connectHeaders = response.headers; + return () => StateConnected.run(svo, client, response.stream); } on TimeoutException { // didn't connect in a timely manner, so backoff then we'll try again diff --git a/packages/event_source_client/lib/src/state_value_object.dart b/packages/event_source_client/lib/src/state_value_object.dart index 7263732e..ed419e82 100644 --- a/packages/event_source_client/lib/src/state_value_object.dart +++ b/packages/event_source_client/lib/src/state_value_object.dart @@ -4,7 +4,7 @@ import 'dart:math' as math; import 'backoff.dart'; import 'logging.dart'; -import 'message_event.dart'; +import 'events.dart'; typedef ClientFactory = http.Client Function(); @@ -32,7 +32,7 @@ class StateValues { // This is a broadcast stream, and the request is only posted if there are // listeners, so it is an ephemeral trigger. final Stream resetRequest; - final EventSink eventSink; + final EventSink eventSink; final Sink transitionSink; // for testing transitions final ClientFactory clientFactory; final math.Random random; @@ -47,6 +47,9 @@ class StateValues { /// The most recently received event ID from the server. Used for resumption. String lastId = ''; + /// Headers received from the connection. + Map? connectHeaders; + /// Creates a [_StateValues] instance. Used by the state machine. StateValues( this.uri, diff --git a/packages/event_source_client/lib/src/stateful_sse_parser.dart b/packages/event_source_client/lib/src/stateful_sse_parser.dart index 5147cda0..c0565d9d 100644 --- a/packages/event_source_client/lib/src/stateful_sse_parser.dart +++ b/packages/event_source_client/lib/src/stateful_sse_parser.dart @@ -1,6 +1,6 @@ import 'dart:async'; -import 'message_event.dart'; +import 'events.dart'; /// Enum for tracking the category of the last parsed rune. enum _LastParsed { @@ -47,7 +47,7 @@ class StatefulSSEParser { /// This function will iterate over the SSE stream [chunk] provided and statefully process it. /// When warranted, [MessageEvent]s may be sent to the provided [sink]. Subsequent calls /// with subsequent [chunk]s provided will be treated as a continuation of the data stream. - void parse(String chunk, EventSink sink) { + void parse(String chunk, EventSink sink) { // switch statements are used instead of a state machine for memory and performance reasons for (var rune in chunk.runes) { switch (_lastParsed) { @@ -207,7 +207,7 @@ class StatefulSSEParser { /// This function is intended to send a [MessageEvent] to the provided [sink] when invoked. /// There are some edge cases in which dispatching will be cancelled, such as invalid data. - void _dispatchEvent(EventSink sink) { + void _dispatchEvent(EventSink sink) { // if data is empty, ignore this dispatch and reset, but don't clear id if (_dataBuffer.isEmpty) { _eventType = ''; diff --git a/packages/event_source_client/lib/src/test_sse_client.dart b/packages/event_source_client/lib/src/test_sse_client.dart new file mode 100644 index 00000000..e58f0d3a --- /dev/null +++ b/packages/event_source_client/lib/src/test_sse_client.dart @@ -0,0 +1,85 @@ +import 'dart:async'; +import 'dart:collection'; + +import '../launchdarkly_event_source_client.dart'; + +const String _simulatedErrorString = + 'an error has occurred, any potential string may be provided here and it should not be treated as an interface'; + +/// An SSE client to use for testing. +/// +/// Changes may be made to this class without following semantic conventions. +final class TestSseClient implements SSEClient { + final UnmodifiableMapView headers; + final Duration connectTimeout; + final Duration readTimeout; + final String? body; + final SseHttpMethod httpMethod; + late final Stream? _sourceStream; + StreamSubscription? _sourceStreamSubscription; + + /// This controller is for the events going to the subscribers of this client. + late final StreamController _messageEventsController; + + @override + Future close() async { + _messageEventsController.close(); + } + + @override + void restart() {} + + @override + Stream get stream => _messageEventsController.stream; + + /// Emit an event on the stream. + /// Has no effect if the client has been closed. + /// + /// [event] The event to emit. + void emitEvent(Event event) { + if (_messageEventsController.isClosed) { + return; + } + _messageEventsController.sink.add(event); + } + + /// Emit an error event. + /// + /// [error] The error to emit. The event source makes no contract about the + /// type of errors it will emit. If not error is provided, then a default + /// error will be emitted. + void emitError({Object? error}) { + if (_messageEventsController.isClosed) { + return; + } + if (error != null) { + _messageEventsController.sink.addError(error); + } else { + _messageEventsController.sink.addError(Exception(_simulatedErrorString)); + } + } + + TestSseClient.internal({ + required this.headers, + required this.connectTimeout, + required this.readTimeout, + required this.body, + required this.httpMethod, + Stream? sourceStream, + }) { + _sourceStream = sourceStream; + _messageEventsController = StreamController.broadcast( + onListen: () { + _sourceStreamSubscription = _sourceStream?.listen((event) { + emitEvent(event); + }); + _sourceStreamSubscription?.onError((error) { + emitError(); + }); + }, + onCancel: () { + _sourceStreamSubscription?.cancel(); + }, + ); + } +} diff --git a/packages/event_source_client/test/events_test.dart b/packages/event_source_client/test/events_test.dart new file mode 100644 index 00000000..98fe5460 --- /dev/null +++ b/packages/event_source_client/test/events_test.dart @@ -0,0 +1,63 @@ +import 'dart:collection'; + +import 'package:test/test.dart'; +import 'package:launchdarkly_event_source_client/src/events.dart'; + +class CompareCase { + final Event a; + final Event b; + final bool result; + + const CompareCase(this.a, this.b, this.result); + + @override + String toString() { + return '$a, $b, $result'; + } +} + +void main() { + group('given different connect messages', () { + var cases = [ + CompareCase(OpenEvent(), OpenEvent(), true), + CompareCase(OpenEvent(), + OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), false), + CompareCase(OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), + OpenEvent(), false), + CompareCase(OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), + OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), true), + CompareCase(OpenEvent(headers: UnmodifiableMapView({'key': 'valueA'})), + OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), false), + CompareCase( + OpenEvent( + headers: + UnmodifiableMapView({'key': 'value', 'second': 'value'})), + OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), + false), + CompareCase( + OpenEvent( + headers: + UnmodifiableMapView({'key': 'value', 'second': 'value'})), + OpenEvent( + headers: + UnmodifiableMapView({'second': 'value', 'key': 'value'})), + true), + ]; + + for (var testCase in cases) { + test('Compare $testCase', () { + expect(testCase.a == testCase.b, equals(testCase.result)); + }); + + test('HashCode $testCase', () { + var codeA = testCase.a.hashCode; + var codeB = testCase.b.hashCode; + if (testCase.result) { + expect(codeA, equals(codeB)); + } else { + expect(codeA, isNot(equals(codeB))); + } + }); + } + }); +} diff --git a/packages/event_source_client/test/sse_client_http_test.dart b/packages/event_source_client/test/sse_client_http_test.dart index bd4ab2c5..f2657e89 100644 --- a/packages/event_source_client/test/sse_client_http_test.dart +++ b/packages/event_source_client/test/sse_client_http_test.dart @@ -7,6 +7,7 @@ import 'dart:math' as math; import 'package:http/http.dart'; import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; import 'package:launchdarkly_event_source_client/src/http_consts.dart'; +import 'package:launchdarkly_event_source_client/src/events.dart'; import 'package:launchdarkly_event_source_client/src/sse_client_http.dart'; import 'package:launchdarkly_event_source_client/src/state_connected.dart'; import 'package:launchdarkly_event_source_client/src/state_connecting.dart'; @@ -42,8 +43,10 @@ void main() { // this expect statement will register a listener on the stream triggering the client to // connect to the mock client. The mock client is set up to send a message. - expectLater(sseClientUnderTest.stream, - emitsInOrder([MessageEvent('put', 'helloworld', '')])); + expectLater( + sseClientUnderTest.stream, + emitsInOrder( + [isA(), MessageEvent('put', 'helloworld', '')])); }); test('Test disconnects when stream.first unsubscribes', () async { @@ -68,8 +71,16 @@ void main() { // this expect statement will register a listener on the stream triggering the client to // connect to the mock client. The mock client is set up to send a message. - var messageEvent = await sseClientUnderTest.stream.first; - expect(messageEvent.data, equals('helloworld')); + var events = []; + await for (final event in sseClientUnderTest.stream) { + events.add(event); + if (events.length >= 2) break; // Collect OpenEvent and MessageEvent + } + + expect(events.length, equals(2)); + expect(events[0], isA()); + expect(events[1], isA()); + expect((events[1] as MessageEvent).data, equals('helloworld')); }); test('Test close', () async { @@ -94,8 +105,16 @@ void main() { // this expect statement will register a listener on the stream triggering the client to // connect to the mock client. - var messageEvent = await sseClientUnderTest.stream.first; - expect(messageEvent.data, equals('helloworld')); + var events = []; + await for (final event in sseClientUnderTest.stream) { + events.add(event); + if (events.length >= 2) break; // Collect OpenEvent and MessageEvent + } + + expect(events.length, equals(2)); + expect(events[0], isA()); + expect(events[1], isA()); + expect((events[1] as MessageEvent).data, equals('helloworld')); sseClientUnderTest.close(); }); diff --git a/packages/event_source_client/test/state_connected_test.dart b/packages/event_source_client/test/state_connected_test.dart index f19a7d88..73a24272 100644 --- a/packages/event_source_client/test/state_connected_test.dart +++ b/packages/event_source_client/test/state_connected_test.dart @@ -1,10 +1,11 @@ // ignore_for_file: close_sinks import 'dart:async'; +import 'dart:collection'; import 'dart:convert'; import 'package:http/http.dart'; -import 'package:launchdarkly_event_source_client/src/message_event.dart'; +import 'package:launchdarkly_event_source_client/src/events.dart'; import 'package:launchdarkly_event_source_client/src/state_backoff.dart'; import 'package:launchdarkly_event_source_client/src/state_connected.dart'; import 'package:launchdarkly_event_source_client/src/state_idle.dart'; @@ -16,9 +17,57 @@ import 'test_utils.dart'; class MockClient extends Mock implements Client {} void main() { + test('Test connected emits OpenEvent without headers when entered', () async { + final transitionController = StreamController.broadcast(); + final eventController = StreamController.broadcast(); + final dataController = StreamController>.broadcast(); + final mockClient = MockClient(); + + final svo = TestUtils.makeMockStateValues( + eventTypes: {'put'}, + transitionSink: transitionController, + eventSink: eventController.sink, + clientFactory: () => mockClient); + + // connectHeaders is null, so OpenEvent should have no headers + svo.connectHeaders = null; + + expectLater(transitionController.stream, emitsInOrder([StateConnected])); + expectLater(eventController.stream, emitsInOrder([OpenEvent()])); + + StateConnected.run(svo, mockClient, dataController.stream); + }); + + test('Test connected emits OpenEvent with headers when entered', () async { + final transitionController = StreamController.broadcast(); + final eventController = StreamController.broadcast(); + final dataController = StreamController>.broadcast(); + final mockClient = MockClient(); + + final svo = TestUtils.makeMockStateValues( + eventTypes: {'put'}, + transitionSink: transitionController, + eventSink: eventController.sink, + clientFactory: () => mockClient); + + // Set connectHeaders to simulate headers received from connection + svo.connectHeaders = { + 'x-custom-header': 'custom-value', + 'content-type': 'text/event-stream' + }; + + final expectedOpenEvent = + OpenEvent(headers: UnmodifiableMapView(svo.connectHeaders!)); + + expectLater(transitionController.stream, emitsInOrder([StateConnected])); + expectLater(eventController.stream, emitsInOrder([expectedOpenEvent])); + + StateConnected.run(svo, mockClient, dataController.stream); + }); + test('Test connected emits events', () async { final transitionController = StreamController.broadcast(); - final eventController = StreamController.broadcast(); + final eventController = StreamController.broadcast(); final dataController = StreamController>.broadcast(); final mockClient = MockClient(); // this mock client doesn't do anything in this test case @@ -32,7 +81,7 @@ void main() { expectLater(transitionController.stream, emitsInOrder([StateConnected])); expectLater(eventController.stream, - emitsInOrder([MessageEvent('put', 'helloworld', '')])); + emitsInOrder([OpenEvent(), MessageEvent('put', 'helloworld', '')])); StateConnected.run(svo, mockClient, dataController.stream); dataController.add(utf8.encode('event:put\ndata:helloworld\n\n')); }); @@ -78,4 +127,40 @@ void main() { resetController.sink.add(null); await StateConnected.run(svo, mockClient, dataController.stream); }); + + test('Test OpenEvent is emitted before MessageEvents in order', () async { + final transitionController = StreamController.broadcast(); + final eventController = StreamController.broadcast(); + final dataController = StreamController>.broadcast(); + final mockClient = MockClient(); + + final svo = TestUtils.makeMockStateValues( + eventTypes: {'put', 'patch'}, + transitionSink: transitionController, + eventSink: eventController.sink, + clientFactory: () => mockClient); + + // Set connectHeaders to simulate headers received from connection + svo.connectHeaders = {'x-request-id': 'abc123', 'server': 'nginx/1.18.0'}; + + final expectedOpenEvent = + OpenEvent(headers: UnmodifiableMapView(svo.connectHeaders!)); + + // Verify that OpenEvent comes first, followed by MessageEvents in order + expectLater(transitionController.stream, emitsInOrder([StateConnected])); + expectLater( + eventController.stream, + emitsInOrder([ + expectedOpenEvent, + MessageEvent('put', 'first-message', '1'), + MessageEvent('patch', 'second-message', '2') + ])); + + StateConnected.run(svo, mockClient, dataController.stream); + + // Send multiple events to verify ordering + dataController.add(utf8.encode('id:1\nevent:put\ndata:first-message\n\n')); + dataController + .add(utf8.encode('id:2\nevent:patch\ndata:second-message\n\n')); + }); } diff --git a/packages/event_source_client/test/stateful_sse_parser_test.dart b/packages/event_source_client/test/stateful_sse_parser_test.dart index f1dffbd1..e528584a 100644 --- a/packages/event_source_client/test/stateful_sse_parser_test.dart +++ b/packages/event_source_client/test/stateful_sse_parser_test.dart @@ -1,18 +1,18 @@ import 'dart:async'; -import 'package:launchdarkly_event_source_client/src/message_event.dart'; +import 'package:launchdarkly_event_source_client/src/events.dart'; import 'package:launchdarkly_event_source_client/src/stateful_sse_parser.dart'; import 'package:test/test.dart'; import 'package:mocktail/mocktail.dart'; -class MockSink extends Mock implements EventSink {} +class MockSink extends Mock implements EventSink {} void main() { setUpAll(() { registerFallbackValue(MessageEvent('fallback', 'fallback', 'fallback')); }); - void testCase(String input, List expected) { + void testCase(String input, List expected) { final parserUnderTest = StatefulSSEParser(); final mockSink = MockSink(); parserUnderTest.parse(input, mockSink); @@ -23,8 +23,10 @@ void main() { reason: 'Captured:{$captured}, Expected:{$expected}'); for (var i = 0; i < captured.length; i++) { - final expectedMessageEvent = expected[i]; + expect(expected[i], isA()); + final expectedMessageEvent = expected[i] as MessageEvent; final messageEvent = captured[i] as MessageEvent; + expect(messageEvent, isA()); expect(messageEvent.type, equals(expectedMessageEvent.type)); expect(messageEvent.data, equals(expectedMessageEvent.data)); expect(messageEvent.id, equals(expectedMessageEvent.id)); diff --git a/packages/event_source_client/test/test_utils.dart b/packages/event_source_client/test/test_utils.dart index 8ca31714..50c17c2a 100644 --- a/packages/event_source_client/test/test_utils.dart +++ b/packages/event_source_client/test/test_utils.dart @@ -8,7 +8,7 @@ import 'package:http/http.dart'; import 'package:http/testing.dart'; import 'package:launchdarkly_event_source_client/src/http_consts.dart'; import 'package:launchdarkly_event_source_client/src/logging.dart'; -import 'package:launchdarkly_event_source_client/src/message_event.dart'; +import 'package:launchdarkly_event_source_client/src/events.dart'; import 'package:launchdarkly_event_source_client/src/state_value_object.dart'; class TestUtils { @@ -25,7 +25,7 @@ class TestUtils { Duration? connectTimeout, Duration? readTimeout, Stream? connectionDesired, - EventSink? eventSink, + EventSink? eventSink, Sink? transitionSink, ClientFactory? clientFactory, math.Random? random, @@ -38,7 +38,7 @@ class TestUtils { connectTimeout ?? Duration.zero, readTimeout ?? Duration.zero, connectionDesired ?? StreamController.broadcast().stream, - eventSink ?? StreamController.broadcast(), + eventSink ?? StreamController.broadcast(), transitionSink ?? StreamController.broadcast(), clientFactory ?? makeMockHttpClient, math.Random(),