From 5f763d175d30f35cab5bbee86071123f41676d0c Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 11 Sep 2025 15:40:53 +0000 Subject: [PATCH 1/2] Cluster: Retry command on receiving specific errors The list of errors that cause a retry are TRYAGAIN, MASTERDOWN, CLUSTERDOWN and LOADING. This list was obtained from valkey-glide. The retry wait time calculation was also taken from valkey-glide Signed-off-by: Adam Fowler --- .../Valkey/Cluster/ValkeyClusterClient.swift | 26 +++++++-- .../Valkey/ValkeyClientConfiguration.swift | 55 +++++++++++++++++++ .../ClusterIntegrationTests.swift | 29 ++++++++++ 3 files changed, 106 insertions(+), 4 deletions(-) diff --git a/Sources/Valkey/Cluster/ValkeyClusterClient.swift b/Sources/Valkey/Cluster/ValkeyClusterClient.swift index 65283169..7eddaaa7 100644 --- a/Sources/Valkey/Cluster/ValkeyClusterClient.swift +++ b/Sources/Valkey/Cluster/ValkeyClusterClient.swift @@ -73,6 +73,8 @@ public final class ValkeyClusterClient: Sendable { /* private */ let stateLock: Mutex @usableFromInline /* private */ let nextRequestIDGenerator = Atomic(0) + @usableFromInline + /* private */ let clientConfiguration: ValkeyClientConfiguration private enum RunAction { case runClusterDiscovery(runNodeDiscovery: Bool) @@ -101,6 +103,7 @@ public final class ValkeyClusterClient: Sendable { connectionFactory: (@Sendable (ValkeyServerAddress, any EventLoop) async throws -> any Channel)? = nil ) { self.logger = logger + self.clientConfiguration = clientConfiguration (self.actionStream, self.actionStreamContinuation) = AsyncStream.makeStream(of: RunAction.self) @@ -149,10 +152,12 @@ public final class ValkeyClusterClient: Sendable { } var asking = false + var attempt = 0 while !Task.isCancelled { do { let client = try await clientSelector() if asking { + asking = false // if asking we need to call ASKING beforehand otherwise we will get a MOVE error return try await client.execute( ASKING(), @@ -164,12 +169,25 @@ public final class ValkeyClusterClient: Sendable { } catch ValkeyClusterError.noNodeToTalkTo { // TODO: Rerun node discovery! } catch let error as ValkeyClientError where error.errorCode == .commandError { - guard let errorMessage = error.message, let redirectError = ValkeyClusterRedirectionError(errorMessage) else { + guard let errorMessage = error.message else { throw error } - self.logger.trace("Received redirect error", metadata: ["error": "\(redirectError)"]) - clientSelector = { try await self.nodeClient(for: redirectError) } - asking = (redirectError.redirection == .ask) + attempt += 1 + if let redirectError = ValkeyClusterRedirectionError(errorMessage) { + self.logger.trace("Received redirect error", metadata: ["error": "\(redirectError)"]) + clientSelector = { try await self.nodeClient(for: redirectError) } + asking = (redirectError.redirection == .ask) + } else { + let prefix = errorMessage.prefix { $0 != " " } + switch prefix { + case "TRYAGAIN", "MASTERDOWN", "CLUSTERDOWN", "LOADING": + self.logger.trace("Received cluster error", metadata: ["error": "\(prefix)"]) + let wait = self.clientConfiguration.retryParameters.calculateWaitTime(retry: attempt) + try await Task.sleep(for: wait) + default: + throw error + } + } } } throw CancellationError() diff --git a/Sources/Valkey/ValkeyClientConfiguration.swift b/Sources/Valkey/ValkeyClientConfiguration.swift index 1b009f2e..cf325982 100644 --- a/Sources/Valkey/ValkeyClientConfiguration.swift +++ b/Sources/Valkey/ValkeyClientConfiguration.swift @@ -9,6 +9,20 @@ import NIOSSL import _ValkeyConnectionPool +#if canImport(Darwin) +import Darwin +#elseif canImport(Glibc) +import Glibc +#elseif canImport(Musl) +import Musl +#elseif canImport(WinSDK) +import WinSDK +#elseif canImport(Bionic) +import Bionic +#else +#error("Unsupported platform") +#endif + /// Configuration for the Valkey client. @available(valkeySwift 1.0, *) public struct ValkeyClientConfiguration: Sendable { @@ -64,6 +78,43 @@ public struct ValkeyClientConfiguration: Sendable { } } + /// Retry parameters for when a client needs to retry a command + public struct RetryParameters: Sendable { + let exponentBase: Double + let factor: Double + let minWaitTime: Double + let maxWaitTime: Double + + /// Initialize RetryParameters + /// - Parameters: + /// - exponentBase: Exponent base number + /// - factor: Duration to multiple exponent by get base wait value + /// - minWaitTime: Minimum wait time + /// - maxWaitTime: Maximum wait time + public init( + exponentBase: Double = 2, + factor: Duration = .milliseconds(10.0), + minWaitTime: Duration = .seconds(1.28), + maxWaitTime: Duration = .seconds(655.36) + ) { + self.exponentBase = exponentBase + self.factor = factor / .milliseconds(1) + self.minWaitTime = minWaitTime / .milliseconds(1) + self.maxWaitTime = maxWaitTime / .milliseconds(1) + } + + /// Calculate wait time for retry number + /// + /// This code is a copy from the `RetryParam` type in cluster_clients.rs of valkey-glide, + @usableFromInline + func calculateWaitTime(retry: Int) -> Duration { + let baseWait = pow(self.exponentBase, Double(retry)) * self.factor + let clampedWait = max(min(baseWait, self.maxWaitTime), self.minWaitTime) + let jitteredWait = Double.random(in: minWaitTime...clampedWait) + return .milliseconds(jitteredWait) + } + } + /// The connection pool definition for Valkey connections. public struct ConnectionPool: Hashable, Sendable { /// The minimum number of connections to preserve in the pool. @@ -108,6 +159,8 @@ public struct ValkeyClientConfiguration: Sendable { public var connectionPool: ConnectionPool /// The keep alive behavior for the connection. public var keepAliveBehavior: KeepAliveBehavior + /// Retry parameters for when a client needs to retry a command + public var retryParameters: RetryParameters /// The timeout the client uses to determine if a connection is considered dead. /// /// The connection is considered dead if a response isn't received within this time. @@ -131,6 +184,7 @@ public struct ValkeyClientConfiguration: Sendable { authentication: Authentication? = nil, connectionPool: ConnectionPool = .init(), keepAliveBehavior: KeepAliveBehavior = .init(), + retryParameters: RetryParameters = .init(), commandTimeout: Duration = .seconds(30), blockingCommandTimeout: Duration = .seconds(120), tls: TLS = .disable @@ -138,6 +192,7 @@ public struct ValkeyClientConfiguration: Sendable { self.authentication = authentication self.connectionPool = connectionPool self.keepAliveBehavior = keepAliveBehavior + self.retryParameters = retryParameters self.commandTimeout = commandTimeout self.blockingCommandTimeout = blockingCommandTimeout self.tls = tls diff --git a/Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift b/Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift index 56159817..6a2d3cab 100644 --- a/Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift +++ b/Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift @@ -112,11 +112,35 @@ struct ClusterIntegrationTests { } } + @Test + @available(valkeySwift 1.0, *) + func testHashSlotMigrationAndTryAgain() async throws { + var logger = Logger(label: "ValkeyCluster") + logger.logLevel = .trace + let firstNodeHostname = clusterFirstNodeHostname! + let firstNodePort = clusterFirstNodePort ?? 6379 + try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)], logger: logger) { client in + let keySuffix = "{\(UUID().uuidString)}" + try await Self.withKey(connection: client, suffix: keySuffix) { key in + try await Self.withKey(connection: client, suffix: keySuffix) { key2 in + let hashSlot = HashSlot(key: key) + try await client.lpush(key, elements: ["testing"]) + + try await testMigratingHashSlot(hashSlot, client: client) { + } duringMigrate: { + try await client.rpoplpush(source: key, destination: key2) + } + } + } + } + } + @available(valkeySwift 1.0, *) func testMigratingHashSlot( _ hashSlot: HashSlot, client: ValkeyClusterClient, beforeMigrate: () async throws -> Void, + duringMigrate: sending () async throws -> Void = {}, afterMigrate: () async throws -> Void = {}, finished: () async throws -> Void = {} ) async throws { @@ -139,6 +163,8 @@ struct ClusterIntegrationTests { _ = try await nodeBClient.execute(CLUSTER.SETSLOT(slot: numericCast(hashSlot.rawValue), subcommand: .importing(clientAID))) _ = try await nodeAClient.execute(CLUSTER.SETSLOT(slot: numericCast(hashSlot.rawValue), subcommand: .migrating(clientBID))) + async let duringMigrateTask: Void = duringMigrate() + try await beforeMigrate() // get keys associated with slot and migrate them @@ -156,6 +182,9 @@ struct ClusterIntegrationTests { _ = try await nodeAClient.execute(CLUSTER.SETSLOT(slot: numericCast(hashSlot.rawValue), subcommand: .node(clientBID))) _ = try await nodeBClient.execute(CLUSTER.SETSLOT(slot: numericCast(hashSlot.rawValue), subcommand: .node(clientBID))) + // wait for during migrate + try await duringMigrateTask + try await finished() result = .success(()) } catch { From 211c2d29b5c47bfe48c2eea2f63363ad3097d10a Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 11 Sep 2025 15:49:55 +0000 Subject: [PATCH 2/2] Fix documentation issue Signed-off-by: Adam Fowler --- Sources/Valkey/ValkeyClientConfiguration.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/Valkey/ValkeyClientConfiguration.swift b/Sources/Valkey/ValkeyClientConfiguration.swift index cf325982..c69a01cb 100644 --- a/Sources/Valkey/ValkeyClientConfiguration.swift +++ b/Sources/Valkey/ValkeyClientConfiguration.swift @@ -177,6 +177,7 @@ public struct ValkeyClientConfiguration: Sendable { /// - authentication: The authentication credentials. /// - connectionPool: The connection pool configuration. /// - keepAliveBehavior: The connection keep alive behavior. + /// - retryParameters: Retry parameters for when client returns an error that requires a retry /// - commandTimeout: The timeout for a connection response. /// - blockingCommandTimeout: The timeout for a blocking command response. /// - tls: The TLS configuration.