Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.DS_Store
/.build
.build
.swiftpm
/Packages
xcuserdata/
DerivedData/
Expand All @@ -10,4 +11,4 @@ DerivedData/
Package.resolved
.benchmarkBaselines/
.swift-version
.docc-build
.docc-build
48 changes: 47 additions & 1 deletion Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,17 @@ import NIOPosix
import Synchronization
import Valkey

#if DistributedTracingSupport
import Tracing
#endif

@available(valkeySwift 1.0, *)
func connectionBenchmarks() {
makeConnectionCreateAndDropBenchmark()
makeConnectionGETBenchmark()
#if DistributedTracingSupport
makeConnectionGETNoOpTracerBenchmark()
#endif
makeConnectionPipelineBenchmark()
}

Expand Down Expand Up @@ -58,9 +65,47 @@ func makeConnectionGETBenchmark() -> Benchmark? {
return Benchmark("Connection: GET benchmark", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in
let port = serverMutex.withLock { $0 }!.localAddress!.port!
let logger = Logger(label: "test")
#if DistributedTracingSupport
// explicitly set tracer to nil, if trait is enabled
var configuration = ValkeyConnectionConfiguration()
configuration.tracing.tracer = nil
#else
let configuration = ValkeyConnectionConfiguration()
#endif
try await ValkeyConnection.withConnection(
address: .hostname("127.0.0.1", port: port),
configuration: .init(),
configuration: configuration,
logger: logger
) { connection in
benchmark.startMeasurement()
for _ in benchmark.scaledIterations {
let foo = try await connection.get("foo")
precondition(foo.map { String(buffer: $0) } == "Bar")
}
benchmark.stopMeasurement()
}
} setup: {
let server = try await makeLocalServer()
serverMutex.withLock { $0 = server }
} teardown: {
try await serverMutex.withLock { $0 }?.close().get()
}
}

#if DistributedTracingSupport
@available(valkeySwift 1.0, *)
@discardableResult
func makeConnectionGETNoOpTracerBenchmark() -> Benchmark? {
let serverMutex = Mutex<(any Channel)?>(nil)

return Benchmark("Connection: GET benchmark – NoOpTracer", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in
let port = serverMutex.withLock { $0 }!.localAddress!.port!
let logger = Logger(label: "test")
var configuration = ValkeyConnectionConfiguration()
configuration.tracing.tracer = NoOpTracer()
try await ValkeyConnection.withConnection(
address: .hostname("127.0.0.1", port: port),
configuration: configuration,
logger: logger
) { connection in
benchmark.startMeasurement()
Expand All @@ -77,6 +122,7 @@ func makeConnectionGETBenchmark() -> Benchmark? {
try await serverMutex.withLock { $0 }?.close().get()
}
}
#endif

@available(valkeySwift 1.0, *)
@discardableResult
Expand Down
6 changes: 5 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ let package = Package(
],
traits: [
.trait(name: "ServiceLifecycleSupport"),
.default(enabledTraits: ["ServiceLifecycleSupport"]),
.trait(name: "DistributedTracingSupport"),
.default(enabledTraits: ["ServiceLifecycleSupport", "DistributedTracingSupport"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-collections.git", from: "1.1.4"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.6.3"),
.package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.3.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.29.0"),
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.23.0"),
Expand All @@ -36,6 +38,7 @@ let package = Package(
.byName(name: "_ValkeyConnectionPool"),
.product(name: "DequeModule", package: "swift-collections"),
.product(name: "Logging", package: "swift-log"),
.product(name: "Tracing", package: "swift-distributed-tracing", condition: .when(traits: ["DistributedTracingSupport"])),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOPosix", package: "swift-nio"),
.product(name: "NIOSSL", package: "swift-nio-ssl"),
Expand Down Expand Up @@ -90,6 +93,7 @@ let package = Package(
.product(name: "NIOTestUtils", package: "swift-nio"),
.product(name: "Logging", package: "swift-log"),
.product(name: "NIOEmbedded", package: "swift-nio"),
.product(name: "InMemoryTracing", package: "swift-distributed-tracing", condition: .when(traits: ["DistributedTracingSupport"])),
],
swiftSettings: defaultSwiftSettings
),
Expand Down
98 changes: 91 additions & 7 deletions Sources/Valkey/Connection/ValkeyConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import Network
import NIOTransportServices
#endif

#if DistributedTracingSupport
import Tracing
#endif

/// A single connection to a Valkey database.
@available(valkeySwift 1.0, *)
public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
Expand All @@ -29,6 +33,12 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
public let id: ID
/// Logger used by Server
let logger: Logger
#if DistributedTracingSupport
@usableFromInline
let tracer: (any Tracer)?
@usableFromInline
let address: (hostOrSocketPath: String, port: Int?)?
#endif
@usableFromInline
let channel: any Channel
@usableFromInline
Expand All @@ -42,6 +52,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
connectionID: ID,
channelHandler: ValkeyChannelHandler,
configuration: ValkeyConnectionConfiguration,
address: ValkeyServerAddress?,
logger: Logger
) {
self.unownedExecutor = channel.eventLoop.executor.asUnownedSerialExecutor()
Expand All @@ -50,6 +61,17 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
self.configuration = configuration
self.id = connectionID
self.logger = logger
#if DistributedTracingSupport
self.tracer = configuration.tracing.tracer
switch address?.value {
case let .hostname(host, port):
self.address = (host, port)
case let .unixDomainSocket(path):
self.address = (path, nil)
case nil:
self.address = nil
}
#endif
self.isClosed = .init(false)
}

Expand Down Expand Up @@ -153,16 +175,47 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {

@inlinable
func _execute<Command: ValkeyCommand>(command: Command) async throws -> RESPToken {
#if DistributedTracingSupport
let span = self.tracer?.startSpan(Command.name, ofKind: .client)
defer { span?.end() }

span?.updateAttributes { attributes in
self.applyCommonAttributes(to: &attributes, commandName: Command.name)
}
#endif

let requestID = Self.requestIDGenerator.next()
return try await withTaskCancellationHandler {
if Task.isCancelled {
throw ValkeyClientError(.cancelled)

do {
return try await withTaskCancellationHandler {
if Task.isCancelled {
throw ValkeyClientError(.cancelled)
}
return try await withCheckedThrowingContinuation { continuation in
self.channelHandler.write(command: command, continuation: continuation, requestID: requestID)
}
} onCancel: {
self.cancel(requestID: requestID)
}
} catch let error as ValkeyClientError {
#if DistributedTracingSupport
if let span {
span.recordError(error)
span.setStatus(SpanStatus(code: .error))
if let prefix = error.simpleErrorPrefix {
span.attributes["db.response.status_code"] = "\(prefix)"
}
}
return try await withCheckedThrowingContinuation { continuation in
self.channelHandler.write(command: command, continuation: continuation, requestID: requestID)
#endif
throw error
} catch {
#if DistributedTracingSupport
if let span {
span.recordError(error)
span.setStatus(SpanStatus(code: .error))
}
} onCancel: {
self.cancel(requestID: requestID)
#endif
throw error
}
}

Expand Down Expand Up @@ -213,6 +266,18 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
}
}

#if DistributedTracingSupport
@usableFromInline
func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) {
attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commandName
attributes[self.configuration.tracing.attributeNames.databaseSystemName] = self.configuration.tracing.attributeValues.databaseSystem
attributes[self.configuration.tracing.attributeNames.networkPeerAddress] = channel.remoteAddress?.ipAddress
attributes[self.configuration.tracing.attributeNames.networkPeerPort] = channel.remoteAddress?.port
attributes[self.configuration.tracing.attributeNames.serverAddress] = address?.hostOrSocketPath
attributes[self.configuration.tracing.attributeNames.serverPort] = address?.port == 6379 ? nil : address?.port
}
#endif

@usableFromInline
nonisolated func cancel(requestID: Int) {
self.channel.eventLoop.execute {
Expand Down Expand Up @@ -278,6 +343,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
connectionID: connectionID,
channelHandler: handler,
configuration: configuration,
address: address,
logger: logger
)
}
Expand Down Expand Up @@ -313,6 +379,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
connectionID: 0,
channelHandler: handler,
configuration: configuration,
address: .hostname("127.0.0.1", port: 6379),
logger: logger
)
return channel.connect(to: try SocketAddress(ipAddress: "127.0.0.1", port: 6379)).map {
Expand Down Expand Up @@ -390,3 +457,20 @@ struct AutoIncrementingInteger {
return value - 1
}
}

#if DistributedTracingSupport
extension ValkeyClientError {
/// Extract the simple error prefix from this error.
///
/// - SeeAlso: [](https://valkey.io/topics/protocol/#simple-errors)
@usableFromInline
var simpleErrorPrefix: Substring? {
guard let message else { return nil }
var prefixEndIndex = message.startIndex
while prefixEndIndex < message.endIndex, message[prefixEndIndex] != " " {
message.formIndex(after: &prefixEndIndex)
}
return message[message.startIndex..<prefixEndIndex]
}
}
#endif
41 changes: 41 additions & 0 deletions Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

import NIOSSL

#if DistributedTracingSupport
import Tracing
#endif

/// A configuration object that defines how to connect to a Valkey server.
///
/// `ValkeyConnectionConfiguration` allows you to customize various aspects of the connection,
Expand Down Expand Up @@ -112,6 +116,12 @@ public struct ValkeyConnectionConfiguration: Sendable {
/// Default value is `nil` (no client name is set).
public var clientName: String?

#if DistributedTracingSupport
/// The distributed tracing configuration to use for this connection.
/// Defaults to using the globally bootstrapped tracer with OpenTelemetry semantic conventions.
public var tracing: ValkeyTracingConfiguration = .init()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On this one maybe just a small note that picks global bootstrapped tracer at this point in time and allows configuring tracing?

#endif

/// Creates a new Valkey connection configuration.
///
/// Use this initializer to create a configuration object that can be used to establish
Expand All @@ -137,3 +147,34 @@ public struct ValkeyConnectionConfiguration: Sendable {
self.clientName = clientName
}
}

#if DistributedTracingSupport
@available(valkeySwift 1.0, *)
/// A configuration object that defines distributed tracing behavior of a Valkey client.
public struct ValkeyTracingConfiguration: Sendable {
/// The tracer to use, or `nil` to disable tracing.
/// Defaults to the globally bootstrapped tracer.
public var tracer: (any Tracer)? = InstrumentationSystem.tracer
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth some docs here how this is

  • by default picks up global tracer bootstrapped in InstrumentationSystem
  • can be set to nil to disable tracing entirely, even if global tracer is bootstrapped
  • can be set to specific tracer, which takes precedence over global bootstrapped tracer

?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If global tracer is NoOp tracer is it worth setting tracer to nil?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had asked about this over here #177 (comment) and had a chat with @fabianfett about it.

So there seems to be minor differences and although the performance is roughly the same it seems to have caused an odd allocation here and there, whereas without it the allocation count in valkey was a solid 0... The NoopTracer is definitely nicer to use but if you truly want to go with the Tracer? here I think that's fine -- @fabianfett strongly preferred that at least for this lib.

If we were to suggest people in libraries, I think you're very right -- doing Tracer = NoopTracer... is preferable, rather than Tracer? because of the usability aspect, but in this specific lib I think it's fine... Since Fabian really wanted to have the allocs count a strict zero here heh :)

It's not that the allocations come from the noop-spans btw, they are structs... so I'm uncertain where they were coming from; we'd need to do more digging

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@slashmo Can we get some comments on ValkeyTracingConfiguration and its members. This is currently a public symbol without any comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let me add some.


/// The attribute names used in spans created by Valkey. Defaults to OpenTelemetry semantics.
public var attributeNames: AttributeNames = .init()

/// The static attribute values used in spans created by Valkey.
public var attributeValues: AttributeValues = .init()

/// Attribute names used in spans created by Valkey.
public struct AttributeNames: Sendable {
public var databaseOperationName: String = "db.operation.name"
public var databaseSystemName: String = "db.system.name"
public var networkPeerAddress: String = "network.peer.address"
public var networkPeerPort: String = "network.peer.port"
public var serverAddress: String = "server.address"
public var serverPort: String = "server.port"
}

/// Static attribute values used in spans created by Valkey.
public struct AttributeValues: Sendable {
public var databaseSystem: String = "valkey"
}
}
#endif
6 changes: 6 additions & 0 deletions Sources/Valkey/ValkeyClientConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ public struct ValkeyClientConfiguration: Sendable {
/// The TLS to use for the Valkey connection.
public var tls: TLS

#if DistributedTracingSupport
/// The distributed tracing configuration to use for the Valkey connection.
/// Defaults to using the globally bootstrapped tracer with OpenTelemetry semantic conventions.
public var tracing: ValkeyTracingConfiguration = .init()
#endif

/// Creates a Valkey client connection configuration.
///
/// - Parameters:
Expand Down
11 changes: 10 additions & 1 deletion Sources/Valkey/ValkeyConnectionFactory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ package final class ValkeyConnectionFactory: Sendable {
connectionID: connectionID,
channelHandler: channelHandler,
configuration: connectionConfig,
address: nil,
logger: logger
)
}.get()
Expand All @@ -102,7 +103,7 @@ package final class ValkeyConnectionFactory: Sendable {
try await .enable(self.cache!.getSSLContext(), tlsServerName: clientName)
}

return ValkeyConnectionConfiguration(
let newConfig = ValkeyConnectionConfiguration(
authentication: self.configuration.authentication.flatMap {
.init(username: $0.username, password: $0.password)
},
Expand All @@ -111,5 +112,13 @@ package final class ValkeyConnectionFactory: Sendable {
tls: tls,
clientName: nil
)

#if DistributedTracingSupport
var mConfig = newConfig
mConfig.tracing = self.configuration.tracing
return mConfig
#else
return newConfig
#endif
}
}
1 change: 0 additions & 1 deletion Tests/IntegrationTests/ClientIntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ struct ClientIntegratedTests {
func testValkeyCommand() async throws {
struct GET: ValkeyCommand {
typealias Response = String?

static let name = "GET"

var key: ValkeyKey
Expand Down
Loading
Loading