Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions Sources/Valkey/Cluster/ValkeyClusterClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public final class ValkeyClusterClient: Sendable {
/* private */ let stateLock: Mutex<StateMachine>
@usableFromInline
/* private */ let nextRequestIDGenerator = Atomic(0)
@usableFromInline
/* private */ let clientConfiguration: ValkeyClientConfiguration

private enum RunAction {
case runClusterDiscovery(runNodeDiscovery: Bool)
Expand Down Expand Up @@ -101,6 +103,7 @@ public final class ValkeyClusterClient: Sendable {
connectionFactory: (@Sendable (ValkeyServerAddress, any EventLoop) async throws -> any Channel)? = nil
) {
self.logger = logger
self.clientConfiguration = clientConfiguration

(self.actionStream, self.actionStreamContinuation) = AsyncStream.makeStream(of: RunAction.self)

Expand Down Expand Up @@ -149,10 +152,12 @@ public final class ValkeyClusterClient: Sendable {
}

var asking = false
var attempt = 0
while !Task.isCancelled {
do {
let client = try await clientSelector()
if asking {
asking = false
// if asking we need to call ASKING beforehand otherwise we will get a MOVE error
return try await client.execute(
ASKING(),
Expand All @@ -164,12 +169,25 @@ public final class ValkeyClusterClient: Sendable {
} catch ValkeyClusterError.noNodeToTalkTo {
// TODO: Rerun node discovery!
} catch let error as ValkeyClientError where error.errorCode == .commandError {
guard let errorMessage = error.message, let redirectError = ValkeyClusterRedirectionError(errorMessage) else {
guard let errorMessage = error.message else {
throw error
}
self.logger.trace("Received redirect error", metadata: ["error": "\(redirectError)"])
clientSelector = { try await self.nodeClient(for: redirectError) }
asking = (redirectError.redirection == .ask)
attempt += 1
if let redirectError = ValkeyClusterRedirectionError(errorMessage) {
self.logger.trace("Received redirect error", metadata: ["error": "\(redirectError)"])
clientSelector = { try await self.nodeClient(for: redirectError) }
asking = (redirectError.redirection == .ask)
} else {
let prefix = errorMessage.prefix { $0 != " " }
switch prefix {
case "TRYAGAIN", "MASTERDOWN", "CLUSTERDOWN", "LOADING":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to consider for future, we generally would want to retry for most of the errors - Connection Timeouts, Read Timeouts, Instance address, not resolvable, TRYAGAIN, LOADING etc. so it would be worth considering retrying for most of the errors.

The errors where we want to fail immediately and not retry are for example AUTH errors etc. because they won't go away without user fixing up something. Not retrying on such errors will prevent unnecessary connection storm on the server

The errors which want special handling would be ASKED and MOVED redirections. But this is something that can be added in a future PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list of errors I retry on is based off the list that Valkey-glide uses.

For connection timeouts, instance not resolvable do we want to kick off a discover cluster topology given one of the nodes our view of the topology says is available, is not available.

MOVED and ASK errors are dealt with in the code above this, when we create a redirection error.

self.logger.trace("Received cluster error", metadata: ["error": "\(prefix)"])
let wait = self.clientConfiguration.retryParameters.calculateWaitTime(retry: attempt)
try await Task.sleep(for: wait)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also something for future, this wait via sleep can be wrapped inside the retry library itself in future to make it more clean

default:
throw error
}
}
}
}
throw CancellationError()
Expand Down
56 changes: 56 additions & 0 deletions Sources/Valkey/ValkeyClientConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@
import NIOSSL
import _ValkeyConnectionPool

#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#elseif canImport(WinSDK)
import WinSDK
#elseif canImport(Bionic)
import Bionic
#else
#error("Unsupported platform")
#endif

/// Configuration for the Valkey client.
@available(valkeySwift 1.0, *)
public struct ValkeyClientConfiguration: Sendable {
Expand Down Expand Up @@ -64,6 +78,43 @@ public struct ValkeyClientConfiguration: Sendable {
}
}

/// Retry parameters for when a client needs to retry a command
public struct RetryParameters: Sendable {
let exponentBase: Double
let factor: Double
let minWaitTime: Double
let maxWaitTime: Double

/// Initialize RetryParameters
/// - Parameters:
/// - exponentBase: Exponent base number
/// - factor: Duration to multiple exponent by get base wait value
/// - minWaitTime: Minimum wait time
/// - maxWaitTime: Maximum wait time
public init(
exponentBase: Double = 2,
factor: Duration = .milliseconds(10.0),
minWaitTime: Duration = .seconds(1.28),
maxWaitTime: Duration = .seconds(655.36)
) {
self.exponentBase = exponentBase
self.factor = factor / .milliseconds(1)
self.minWaitTime = minWaitTime / .milliseconds(1)
self.maxWaitTime = maxWaitTime / .milliseconds(1)
}

/// Calculate wait time for retry number
///
/// This code is a copy from the `RetryParam` type in cluster_clients.rs of valkey-glide,
@usableFromInline
func calculateWaitTime(retry: Int) -> Duration {
let baseWait = pow(self.exponentBase, Double(retry)) * self.factor
let clampedWait = max(min(baseWait, self.maxWaitTime), self.minWaitTime)
let jitteredWait = Double.random(in: minWaitTime...clampedWait)
return .milliseconds(jitteredWait)
}
}

/// The connection pool definition for Valkey connections.
public struct ConnectionPool: Hashable, Sendable {
/// The minimum number of connections to preserve in the pool.
Expand Down Expand Up @@ -108,6 +159,8 @@ public struct ValkeyClientConfiguration: Sendable {
public var connectionPool: ConnectionPool
/// The keep alive behavior for the connection.
public var keepAliveBehavior: KeepAliveBehavior
/// Retry parameters for when a client needs to retry a command
public var retryParameters: RetryParameters
/// The timeout the client uses to determine if a connection is considered dead.
///
/// The connection is considered dead if a response isn't received within this time.
Expand All @@ -130,20 +183,23 @@ public struct ValkeyClientConfiguration: Sendable {
/// - authentication: The authentication credentials.
/// - connectionPool: The connection pool configuration.
/// - keepAliveBehavior: The connection keep alive behavior.
/// - retryParameters: Retry parameters for when client returns an error that requires a retry
/// - commandTimeout: The timeout for a connection response.
/// - blockingCommandTimeout: The timeout for a blocking command response.
/// - tls: The TLS configuration.
public init(
authentication: Authentication? = nil,
connectionPool: ConnectionPool = .init(),
keepAliveBehavior: KeepAliveBehavior = .init(),
retryParameters: RetryParameters = .init(),
commandTimeout: Duration = .seconds(30),
blockingCommandTimeout: Duration = .seconds(120),
tls: TLS = .disable
) {
self.authentication = authentication
self.connectionPool = connectionPool
self.keepAliveBehavior = keepAliveBehavior
self.retryParameters = retryParameters
self.commandTimeout = commandTimeout
self.blockingCommandTimeout = blockingCommandTimeout
self.tls = tls
Expand Down
29 changes: 29 additions & 0 deletions Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,35 @@ struct ClusterIntegrationTests {
}
}

@Test
@available(valkeySwift 1.0, *)
func testHashSlotMigrationAndTryAgain() async throws {
var logger = Logger(label: "ValkeyCluster")
logger.logLevel = .trace
let firstNodeHostname = clusterFirstNodeHostname!
let firstNodePort = clusterFirstNodePort ?? 6379
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)], logger: logger) { client in
let keySuffix = "{\(UUID().uuidString)}"
try await Self.withKey(connection: client, suffix: keySuffix) { key in
try await Self.withKey(connection: client, suffix: keySuffix) { key2 in
let hashSlot = HashSlot(key: key)
try await client.lpush(key, elements: ["testing"])

try await testMigratingHashSlot(hashSlot, client: client) {
} duringMigrate: {
try await client.rpoplpush(source: key, destination: key2)
}
}
}
}
}

@available(valkeySwift 1.0, *)
func testMigratingHashSlot(
_ hashSlot: HashSlot,
client: ValkeyClusterClient,
beforeMigrate: () async throws -> Void,
duringMigrate: sending () async throws -> Void = {},
afterMigrate: () async throws -> Void = {},
finished: () async throws -> Void = {}
) async throws {
Expand All @@ -139,6 +163,8 @@ struct ClusterIntegrationTests {
_ = try await nodeBClient.execute(CLUSTER.SETSLOT(slot: numericCast(hashSlot.rawValue), subcommand: .importing(clientAID)))
_ = try await nodeAClient.execute(CLUSTER.SETSLOT(slot: numericCast(hashSlot.rawValue), subcommand: .migrating(clientBID)))

async let duringMigrateTask: Void = duringMigrate()

try await beforeMigrate()

// get keys associated with slot and migrate them
Expand All @@ -156,6 +182,9 @@ struct ClusterIntegrationTests {
_ = try await nodeAClient.execute(CLUSTER.SETSLOT(slot: numericCast(hashSlot.rawValue), subcommand: .node(clientBID)))
_ = try await nodeBClient.execute(CLUSTER.SETSLOT(slot: numericCast(hashSlot.rawValue), subcommand: .node(clientBID)))

// wait for during migrate
try await duringMigrateTask

try await finished()
result = .success(())
} catch {
Expand Down
Loading