Skip to content

Commit

Permalink
Implement high integrity mode for commands (#2741)
Browse files Browse the repository at this point in the history
* initial implementation of #2706

* build: net8 compat

* Docs: add HighIntegrity initial docs

* PR fixups

* Options fixes

* Update tests/StackExchange.Redis.Tests/TestBase.cs

* add tests that result boxes / continuations work correctly for high-integrity-mode

* - switch to counter rather than entropy
- add transaction work to basic tests, to ensure handled

* naming is hard

* - add explicit connection failure type
- burn the connection on failure
- add initial metrics

* benchmark impact of high performance mode

* be more flexible in HeartbeatConsistencyCheckPingsAsync

* add config tests

---------

Co-authored-by: Nick Craver <[email protected]>
Co-authored-by: Nick Craver <[email protected]>
  • Loading branch information
3 people committed Jun 24, 2024
1 parent 39b992f commit 1de8dac
Show file tree
Hide file tree
Showing 18 changed files with 485 additions and 46 deletions.
5 changes: 5 additions & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ The `ConfigurationOptions` object has a wide range of properties, all of which a
| tunnel={string} | `Tunnel` | `null` | Tunnel for connections (use `http:{proxy url}` for "connect"-based proxy server) |
| setlib={bool} | `SetClientLibrary` | `true` | Whether to attempt to use `CLIENT SETINFO` to set the library name/version on the connection |
| protocol={string} | `Protocol` | `null` | Redis protocol to use; see section below |
| highIntegrity={bool} | `HighIntegrity` | `false` | High integrity (incurs overhead) sequence checking on every command; see section below |

Additional code-only options:
- LoggerFactory (`ILoggerFactory`) - Default: `null`
Expand All @@ -115,6 +116,10 @@ Additional code-only options:
- The thread pool to use for scheduling work to and from the socket connected to Redis, one of...
- `SocketManager.Shared`: Use a shared dedicated thread pool for _all_ multiplexers (defaults to 10 threads) - best balance for most scenarios.
- `SocketManager.ThreadPool`: Use the build-in .NET thread pool for scheduling. This can perform better for very small numbers of cores or with large apps on large machines that need to use more than 10 threads (total, across all multiplexers) under load. **Important**: this option isn't the default because it's subject to thread pool growth/starvation and if for example synchronous calls are waiting on a redis command to come back to unblock other threads, stalls/hangs can result. Use with caution, especially if you have sync-over-async work in play.
- HighIntegrity - Default: `false`
- This enables sending a sequence check command after _every single command_ sent to Redis. This is an opt-in option that incurs overhead to add this integrity check which isn't in the Redis protocol (RESP2/3) itself. The impact on this for a given workload depends on the number of commands, size of payloads, etc. as to how proportionately impactful it will be - you should test with your workloads to assess this.
- This is especially relevant if your primary use case is all strings (e.g. key/value caching) where the protocol would otherwise not error.
- Intended for cases where network drops (e.g. bytes from the Redis stream, not packet loss) are suspected and integrity of responses is critical.
- HeartbeatConsistencyChecks - Default: `false`
- Allows _always_ sending keepalive checks even if a connection isn't idle. This trades extra commands (per `HeartbeatInterval` - default 1 second) to check the network stream for consistency. If any data was lost, the result won't be as expected and the connection will be terminated ASAP. This is a check to react to any data loss at the network layer as soon as possible.
- HeartbeatInterval - Default: `1000ms`
Expand Down
11 changes: 11 additions & 0 deletions src/StackExchange.Redis/Configuration/DefaultOptionsProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ public static DefaultOptionsProvider GetProvider(EndPoint endpoint)
/// </summary>
public virtual bool CheckCertificateRevocation => true;

/// <summary>
/// A Boolean value that specifies whether to use per-command validation of strict protocol validity.
/// This sends an additional command after EVERY command which incurs measurable overhead.
/// </summary>
/// <remarks>
/// The regular RESP protocol does not include correlation identifiers between requests and responses; in exceptional
/// scenarios, protocol desynchronization can occur, which may not be noticed immediately; this option adds additional data
/// to ensure that this cannot occur, at the cost of some (small) additional bandwidth usage.
/// </remarks>
public virtual bool HighIntegrity => false;

/// <summary>
/// The number of times to repeat the initial connect cycle if no servers respond promptly.
/// </summary>
Expand Down
28 changes: 25 additions & 3 deletions src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ internal const string
CheckCertificateRevocation = "checkCertificateRevocation",
Tunnel = "tunnel",
SetClientLibrary = "setlib",
Protocol = "protocol";
Protocol = "protocol",
HighIntegrity = "highIntegrity";

private static readonly Dictionary<string, string> normalizedOptions = new[]
{
Expand Down Expand Up @@ -141,6 +142,7 @@ internal const string
WriteBuffer,
CheckCertificateRevocation,
Protocol,
HighIntegrity,
}.ToDictionary(x => x, StringComparer.OrdinalIgnoreCase);

public static string TryNormalize(string value)
Expand All @@ -156,7 +158,7 @@ public static string TryNormalize(string value)
private DefaultOptionsProvider? defaultOptions;

private bool? allowAdmin, abortOnConnectFail, resolveDns, ssl, checkCertificateRevocation, heartbeatConsistencyChecks,
includeDetailInExceptions, includePerformanceCountersInExceptions, setClientLibrary;
includeDetailInExceptions, includePerformanceCountersInExceptions, setClientLibrary, highIntegrity;

private string? tieBreaker, sslHost, configChannel, user, password;

Expand Down Expand Up @@ -279,6 +281,21 @@ public bool CheckCertificateRevocation
set => checkCertificateRevocation = value;
}

/// <summary>
/// A Boolean value that specifies whether to use per-command validation of strict protocol validity.
/// This sends an additional command after EVERY command which incurs measurable overhead.
/// </summary>
/// <remarks>
/// The regular RESP protocol does not include correlation identifiers between requests and responses; in exceptional
/// scenarios, protocol desynchronization can occur, which may not be noticed immediately; this option adds additional data
/// to ensure that this cannot occur, at the cost of some (small) additional bandwidth usage.
/// </remarks>
public bool HighIntegrity
{
get => highIntegrity ?? Defaults.HighIntegrity;
set => highIntegrity = value;
}

/// <summary>
/// Create a certificate validation check that checks against the supplied issuer even when not known by the machine.
/// </summary>
Expand Down Expand Up @@ -769,6 +786,7 @@ public int ConfigCheckSeconds
Protocol = Protocol,
heartbeatInterval = heartbeatInterval,
heartbeatConsistencyChecks = heartbeatConsistencyChecks,
highIntegrity = highIntegrity,
};

/// <summary>
Expand Down Expand Up @@ -849,6 +867,7 @@ public string ToString(bool includePassword)
Append(sb, OptionKeys.ResponseTimeout, responseTimeout);
Append(sb, OptionKeys.DefaultDatabase, DefaultDatabase);
Append(sb, OptionKeys.SetClientLibrary, setClientLibrary);
Append(sb, OptionKeys.HighIntegrity, highIntegrity);
Append(sb, OptionKeys.Protocol, FormatProtocol(Protocol));
if (Tunnel is { IsInbuilt: true } tunnel)
{
Expand Down Expand Up @@ -894,7 +913,7 @@ private void Clear()
{
ClientName = ServiceName = user = password = tieBreaker = sslHost = configChannel = null;
keepAlive = syncTimeout = asyncTimeout = connectTimeout = connectRetry = configCheckSeconds = DefaultDatabase = null;
allowAdmin = abortOnConnectFail = resolveDns = ssl = setClientLibrary = null;
allowAdmin = abortOnConnectFail = resolveDns = ssl = setClientLibrary = highIntegrity = null;
SslProtocols = null;
defaultVersion = null;
EndPoints.Clear();
Expand Down Expand Up @@ -1013,6 +1032,9 @@ private ConfigurationOptions DoParse(string configuration, bool ignoreUnknown)
case OptionKeys.SetClientLibrary:
SetClientLibrary = OptionKeys.ParseBoolean(key, value);
break;
case OptionKeys.HighIntegrity:
HighIntegrity = OptionKeys.ParseBoolean(key, value);
break;
case OptionKeys.Tunnel:
if (value.IsNullOrWhiteSpace())
{
Expand Down
6 changes: 5 additions & 1 deletion src/StackExchange.Redis/Enums/ConnectionFailureType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public enum ConnectionFailureType
/// <summary>
/// It has not been possible to create an initial connection to the redis server(s).
/// </summary>
UnableToConnect
UnableToConnect,
/// <summary>
/// High-integrity mode was enabled, and a failure was detected
/// </summary>
ResponseIntegrityFailure,
}
}
32 changes: 32 additions & 0 deletions src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Logging;
using StackExchange.Redis.Profiling;
using System;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand Down Expand Up @@ -52,6 +53,8 @@ internal abstract class Message : ICompletable
{
public readonly int Db;

private uint _highIntegrityToken;

internal const CommandFlags InternalCallFlag = (CommandFlags)128;

protected RedisCommand command;
Expand Down Expand Up @@ -198,6 +201,13 @@ public bool IsAdmin

public bool IsAsking => (Flags & AskingFlag) != 0;

public bool IsHighIntegrity => _highIntegrityToken != 0;

public uint HighIntegrityToken => _highIntegrityToken;

internal void WithHighIntegrity(uint value)
=> _highIntegrityToken = value;

internal bool IsScriptUnavailable => (Flags & ScriptUnavailableFlag) != 0;

internal void SetScriptUnavailable() => Flags |= ScriptUnavailableFlag;
Expand Down Expand Up @@ -710,6 +720,28 @@ internal void WriteTo(PhysicalConnection physical)
}
}

private static ReadOnlySpan<byte> ChecksumTemplate => "$4\r\nXXXX\r\n"u8;

internal void WriteHighIntegrityChecksumRequest(PhysicalConnection physical)
{
Debug.Assert(IsHighIntegrity, "should only be used for high-integrity");
try
{
physical.WriteHeader(RedisCommand.ECHO, 1); // use WriteHeader to allow command-rewrite

Span<byte> chk = stackalloc byte[10];
Debug.Assert(ChecksumTemplate.Length == chk.Length, "checksum template length error");
ChecksumTemplate.CopyTo(chk);
BinaryPrimitives.WriteUInt32LittleEndian(chk.Slice(4, 4), _highIntegrityToken);
physical.WriteRaw(chk);
}
catch (Exception ex)
{
physical?.OnInternalError(ex);
Fail(ConnectionFailureType.InternalFailure, ex, null, physical?.BridgeCouldBeNull?.Multiplexer);
}
}

internal static Message CreateHello(int protocolVersion, string? username, string? password, string? clientName, CommandFlags flags)
=> new HelloMessage(protocolVersion, username, password, clientName, flags);

Expand Down
42 changes: 42 additions & 0 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ internal sealed class PhysicalBridge : IDisposable

internal string? PhysicalName => physical?.ToString();

private uint _nextHighIntegrityToken; // zero means not enabled

public DateTime? ConnectedAt { get; private set; }

public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int timeoutMilliseconds)
Expand All @@ -82,6 +84,11 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int ti
#if !NETCOREAPP
_singleWriterMutex = new MutexSlim(timeoutMilliseconds: timeoutMilliseconds);
#endif
if (type == ConnectionType.Interactive && Multiplexer.RawConfig.HighIntegrity)
{
// we just need this to be non-zero to enable tracking
_nextHighIntegrityToken = 1;
}
}

private readonly int TimeoutMilliseconds;
Expand Down Expand Up @@ -1546,10 +1553,30 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne
break;
}

if (_nextHighIntegrityToken is not 0
&& !connection.TransactionActive // validated in the UNWATCH/EXEC/DISCARD
&& message.Command is not RedisCommand.AUTH or RedisCommand.HELLO // if auth fails, ECHO may also fail; avoid confusion
)
{
// make sure this value exists early to avoid a race condition
// if the response comes back super quickly
message.WithHighIntegrity(NextHighIntegrityTokenInsideLock());
Debug.Assert(message.IsHighIntegrity, "message should be high integrity");
}
else
{
Debug.Assert(!message.IsHighIntegrity, "prior high integrity message found during transaction?");
}
connection.EnqueueInsideWriteLock(message);
isQueued = true;
message.WriteTo(connection);

if (message.IsHighIntegrity)
{
message.WriteHighIntegrityChecksumRequest(connection);
IncrementOpCount();
}

message.SetRequestSent();
IncrementOpCount();

Expand Down Expand Up @@ -1602,6 +1629,21 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne
}
}

private uint NextHighIntegrityTokenInsideLock()
{
// inside lock: no concurrency concerns here
switch (_nextHighIntegrityToken)
{
case 0: return 0; // disabled
case uint.MaxValue:
// avoid leaving the value at zero due to wrap-around
_nextHighIntegrityToken = 1;
return ushort.MaxValue;
default:
return _nextHighIntegrityToken++;
}
}

/// <summary>
/// For testing only
/// </summary>
Expand Down
Loading

0 comments on commit 1de8dac

Please sign in to comment.