Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions apps/flutter_client_contract_test_service/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -388,26 +388,29 @@ packages:
source: hosted
version: "6.8.0"
launchdarkly_common_client:
dependency: "direct overridden"
dependency: transitive
description:
path: "../../packages/common_client"
relative: true
source: path
name: launchdarkly_common_client
sha256: e691f25676dfb659975843d7ef5e178297939bf3f4a53cab75381be07f27feec
url: "https://pub.dev"
source: hosted
version: "1.6.1"
launchdarkly_dart_common:
dependency: "direct overridden"
dependency: transitive
description:
path: "../../packages/common"
relative: true
source: path
version: "1.6.1"
name: launchdarkly_dart_common
sha256: "323b0ae2bc756c7c83e95494983b72b190e012e090758a920a992358cbc025a2"
url: "https://pub.dev"
source: hosted
version: "1.6.0"
launchdarkly_event_source_client:
dependency: "direct overridden"
dependency: transitive
description:
path: "../../packages/event_source_client"
relative: true
source: path
version: "1.2.1"
name: launchdarkly_event_source_client
sha256: "3506de716320c80898e12b825063a69a9a7169042902cdd6eb164f46b3ec60e3"
url: "https://pub.dev"
source: hosted
version: "1.2.0"
launchdarkly_flutter_client_sdk:
dependency: "direct main"
description:
Expand Down Expand Up @@ -872,10 +875,10 @@ packages:
dependency: transitive
description:
name: vector_math
sha256: "80b3257d1492ce4d091729e3a67a60407d227c27241d6927be0130c98e741803"
sha256: d530bd74fea330e6e364cda7a85019c434070188383e1cd8d9777ee586914c5b
url: "https://pub.dev"
source: hosted
version: "2.1.4"
version: "2.2.0"
vm_service:
dependency: transitive
description:
Expand Down Expand Up @@ -957,5 +960,5 @@ packages:
source: hosted
version: "3.1.3"
sdks:
dart: ">=3.7.0-0 <4.0.0"
dart: ">=3.8.0-0 <4.0.0"
flutter: ">=3.22.0"
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,18 @@ class TestApiImpl extends TestApi {
httpMethod: method,
headers: headers);
final subscription = client.stream.listen((event) {
callbackClient.callbackNumberPost(
PostCallback(
kind: 'event',
event: PostCallbackEvent(
type: event.type, data: event.data, id: event.id)),
callbackNumber: callbackId);
callbackId++;
switch (event) {
case MessageEvent():
callbackClient.callbackNumberPost(
PostCallback(
kind: 'event',
event: PostCallbackEvent(
type: event.type, data: event.data, id: event.id)),
callbackNumber: callbackId);
callbackId++;
default:
break;
}
}, onError: (error) {
callbackClient.callbackNumberPost(
PostCallback(kind: 'error', comment: error.toString()),
Expand Down
8 changes: 6 additions & 2 deletions melos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ environment:
sdk: '>=3.4.0 <4.0.0'

packages:
- packages/*
# Remove the event_source_client from the workspace temporarily to allow a breaking change.
- packages/common
- packages/common_client
- packages/flutter_client_sdk/example
- apps/*

Expand All @@ -19,7 +21,9 @@ scripts:
# Add more packages as more of them have tests.
# Tests are ran with flutter as it supports coverage. Some packages may also include flutter
# dependencies.
run: MELOS_PACKAGES="launchdarkly_dart_common,launchdarkly_common_client,launchdarkly_flutter_client_sdk" melos exec -- flutter test . --coverage
run: >
MELOS_PACKAGES="launchdarkly_dart_common,launchdarkly_common_client,launchdarkly_flutter_client_sdk" melos exec -- flutter test . --coverage &&
cd packages/event_source_client && dart test

merge-trace-files:
description: Merge all packages coverage trace files ignoring data related to generated files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
library launchdarkly_sse;

import 'dart:async';
import 'dart:collection';

import 'src/http_consts.dart';
import 'src/logging.dart';
import 'src/message_event.dart';
import 'src/events.dart';
import 'src/sse_client_stub.dart'
if (dart.library.io) 'src/sse_client_http.dart'
if (dart.library.js_interop) 'src/sse_client_html.dart';
import 'src/test_sse_client.dart';

export 'src/message_event.dart' show MessageEvent;
export 'src/events.dart' show Event, MessageEvent, OpenEvent;
export 'src/test_sse_client.dart' show TestSseClient;
export 'src/logging.dart'
show EventSourceLogger, LogLevel, NoOpLogger, PrintLogger;

Expand All @@ -32,10 +35,11 @@ enum SseHttpMethod {

/// An [SSEClient] that works to maintain a SSE connection to a server.
///
/// You can receive [MessageEvent]s by listening to the [stream] object. The SSEClient will
/// connect when there is a nonzero number of subscribers on [stream] and will disconnect when
/// there are zero subscribers on [stream]. In certain cases, unrecoverable errors will be
/// reported on the [stream] at which point the stream will be done.
/// You can receive [Events]s by listening to the [stream] object. The SSEClient
/// will connect when there is a nonzero number of subscribers on the [stream]
/// and will disconnect when there are zero subscribers on the [stream].
/// In certain cases, unrecoverable errors will be reported on the [stream] at
/// which point the stream will be done.
///
/// The [SSEClient] will make best effort to maintain the streaming connection.
abstract class SSEClient {
Expand All @@ -47,9 +51,9 @@ abstract class SSEClient {
static const defaultConnectTimeout = Duration(seconds: 30);
static const defaultReadTimeout = Duration(minutes: 5);

/// Subscribe to this [stream] to receive events and sometimes errors. The first
/// Subscribe to this [stream] to receive events and sometimes errors.
/// subscribe triggers the connection, so expect network delay initially.
Stream<MessageEvent> get stream;
Stream<Event> get stream;

/// Closes the SSEClient and tears down connections and resources. Do not use the
/// SSEClient after close is called, behavior is undefined at that point.
Expand Down Expand Up @@ -97,4 +101,33 @@ abstract class SSEClient {
return getSSEClient(uri, eventTypes, mergedHeaders, connectTimeout,
readTimeout, body, httpMethod.toString(), logger);
}

/// Get an SSE client for use in unit tests.
///
/// Most parameters are the same as those of the main SSEClient factory, but
/// the test client supports an additional property which is the [sourceStream].
/// Events sent to the [sourceStream] will also be emitted by the event source
/// if the event source has listeners. When a user unsubscribes from the event
/// stream, then the test client will unsubscribe from the source stream.
///
/// This method is primarily for use the the LaunchDarkly SDK implementation.
/// Changes may be made to this API without following semantic conventions.
static TestSseClient testClient(
Uri uri,
Set<String> eventTypes, {
Map<String, String> headers = defaultHeaders,
Duration connectTimeout = defaultConnectTimeout,
Duration readTimeout = defaultReadTimeout,
String? body,
SseHttpMethod httpMethod = SseHttpMethod.get,
Stream<Event>? sourceStream,
}) {
return TestSseClient.internal(
headers: UnmodifiableMapView(headers),
connectTimeout: connectTimeout,
readTimeout: readTimeout,
body: body,
httpMethod: httpMethod,
sourceStream: sourceStream);
}
}
102 changes: 102 additions & 0 deletions packages/event_source_client/lib/src/events.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import 'dart:collection';

/// Base class for event source events.
final class Event {}

// Implementation note: Any new constructor parameters should be added as
// optional parameters unless added in a major version.

/// Represents a message that came across the SSE stream.
final class MessageEvent implements Event {
/// The type of the message.
final String type;

/// The data sent in the message.
final String data;

/// An optional message id that was provided.
final String? id;

/// Creates the message with the provided values.
const MessageEvent(this.type, this.data, this.id);

@override
String toString() {
return 'MessageEvent{type:$type,data:$data,id:$id}';
}

@override
bool operator ==(Object other) =>
identical(this, other) ||
other is MessageEvent &&
runtimeType == other.runtimeType &&
type == other.type &&
data == other.data &&
id == other.id;

@override
int get hashCode => type.hashCode ^ data.hashCode ^ id.hashCode;
}

/// Event emitted when the SSE client connects.
final class OpenEvent implements Event {
/// Any headers associated with the connection.
final UnmodifiableMapView<String, String>? headers;

/// Create a connected event with the specified headers.
const OpenEvent({this.headers});

@override
String toString() {
return 'OpenEvent{headers:$headers}';
}

bool _compareHeaders(UnmodifiableMapView<String, String>? otherHeaders) {
if (headers == null && otherHeaders == null) {
return true;
}
if (headers != null && otherHeaders == null) {
return false;
}
if (headers == null && otherHeaders != null) {
return false;
}
var self = headers!;
var other = otherHeaders!;
if (self.length != other.length) {
return false;
}
for (var pair in self.entries) {
if (!other.containsKey(pair.key)) {
return false;
}
if (pair.value != other[pair.key]) {
return false;
}
}
return true;
}

@override
bool operator ==(Object other) {
return identical(this, other) ||
other is OpenEvent && _compareHeaders(other.headers);
}

@override
int get hashCode => headers != null
? Object.hashAllUnordered(
headers!.entries.map((item) => Object.hash(item.key, item.value)))
: null.hashCode;
}

bool isMessageEvent(Event event) {
{
switch (event) {
case MessageEvent():
return true;
default:
return false;
}
}
}
31 changes: 0 additions & 31 deletions packages/event_source_client/lib/src/message_event.dart

This file was deleted.

16 changes: 10 additions & 6 deletions packages/event_source_client/lib/src/sse_client_html.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ import 'dart:math' as math;
import '../launchdarkly_event_source_client.dart';

import 'backoff.dart';
import 'message_event.dart' as ld_message_event;
import 'events.dart' as ld_message_event;

/// An [SSEClient] that uses the [web.EventSource] available on most browsers for web platform support.
class HtmlSseClient implements SSEClient {
/// The underlying eventsource
web.EventSource? _eventSource;

/// This controller is for the events going to the subscribers of this client.
late final StreamController<ld_message_event.MessageEvent>
_messageEventsController;
late final StreamController<ld_message_event.Event> _messageEventsController;

late final EventSourceLogger _logger;

Expand All @@ -34,7 +33,7 @@ class HtmlSseClient implements SSEClient {
_eventTypes = eventTypes {
_logger = logger ?? NoOpLogger();
_messageEventsController =
StreamController<ld_message_event.MessageEvent>.broadcast(
StreamController<ld_message_event.Event>.broadcast(
onListen: () {
// this is triggered when first listener subscribes

Expand Down Expand Up @@ -63,6 +62,7 @@ class HtmlSseClient implements SSEClient {
_eventSource?.addEventListener(eventType, _handleMessageEvent.toJS);
}
_eventSource?.addEventListener('error', _handleError.toJS);
_eventSource?.addEventListener('open', _handleOpen.toJS);
}

void _handleError(web.Event event) {
Expand All @@ -72,6 +72,11 @@ class HtmlSseClient implements SSEClient {
restart();
}

void _handleOpen(web.Event event) {
// The browser event source doesn't have header support.
_messageEventsController.sink.add(OpenEvent());
}

void _handleMessageEvent(web.Event event) {
_activeSince = DateTime.now().millisecondsSinceEpoch;
final messageEvent = event as web.MessageEvent;
Expand All @@ -85,8 +90,7 @@ class HtmlSseClient implements SSEClient {
/// Subscribe to this [stream] to receive events and sometimes errors. The first
/// subscribe triggers the connection, so expect a network delay initially.
@override
Stream<ld_message_event.MessageEvent> get stream =>
_messageEventsController.stream;
Stream<ld_message_event.Event> get stream => _messageEventsController.stream;

@override
Future close() async {
Expand Down
Loading