Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Compatibility] Added CLIENT UNBLOCK command #886

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
37 changes: 37 additions & 0 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,43 @@
"Type": "String"
}
]
},
{
"Command": "CLIENT_UNBLOCK",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here about running CommentInfoUpdater - #864 (comment)

"Name": "CLIENT|UNBLOCK",
"Summary": "Unblocks a client blocked by a blocking command from a different connection.",
"Group": "Connection",
"Complexity": "O(log N) where N is the number of client connections",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "CLIENT-ID",
"DisplayText": "client-id",
"Type": "Integer"
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "UNBLOCK-TYPE",
"Type": "OneOf",
"ArgumentFlags": "Optional",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "TIMEOUT",
"DisplayText": "timeout",
"Type": "PureToken",
"Token": "TIMEOUT"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "ERROR",
"DisplayText": "error",
"Type": "PureToken",
"Token": "ERROR"
}
]
}
]
}
]
},
Expand Down
7 changes: 7 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,13 @@
"request_policy:all_nodes",
"response_policy:all_succeeded"
]
},
{
"Command": "CLIENT_UNBLOCK",
"Name": "CLIENT|UNBLOCK",
"Arity": -3,
"Flags": "Admin, Loading, NoScript, Stale",
"AclCategories": "Admin, Connection, Dangerous, Slow"
}
]
},
Expand Down
14 changes: 13 additions & 1 deletion libs/server/Objects/ItemBroker/CollectionItemBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public class CollectionItemBroker : IDisposable
private bool disposed = false;
private bool isStarted = false;

internal bool TryGetObserver(int sessionId, out CollectionItemObserver observer)
TalZaccai marked this conversation as resolved.
Show resolved Hide resolved
{
return SessionIdToObserver.TryGetValue(sessionId, out observer);
}

/// <summary>
/// Asynchronously wait for item from collection object
/// </summary>
Expand Down Expand Up @@ -118,13 +123,15 @@ private async Task<CollectionItemResult> GetCollectionItemAsync(CollectionItemOb
? TimeSpan.FromMilliseconds(-1)
: TimeSpan.FromSeconds(timeoutInSeconds);

var isError = false;
TalZaccai marked this conversation as resolved.
Show resolved Hide resolved
try
{
// Wait for either the result found notification or the timeout to expire
await observer.ResultFoundSemaphore.WaitAsync(timeout, observer.CancellationTokenSource.Token);
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (observer.CancellationTokenSource.IsCancellationRequested)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also be true if the session was disposed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should cause any issue right? earlier its just an empty catch, Now I am assigning a bool. Do you see any issue with that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because if the session was disposed you wouldn't need to return a CollectionItemResult.Error

{
isError = true;
}

SessionIdToObserver.TryRemove(observer.Session.ObjectStoreSessionID, out _);
Expand All @@ -136,6 +143,11 @@ private async Task<CollectionItemResult> GetCollectionItemAsync(CollectionItemOb
observer.HandleSetResult(CollectionItemResult.Empty);
}

if (isError)
{
return CollectionItemResult.Error;
}

return observer.Result;
}

Expand Down
15 changes: 15 additions & 0 deletions libs/server/Objects/ItemBroker/CollectionItemResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public CollectionItemResult(byte[] key, byte[][] items)
Items = items;
}

private CollectionItemResult(bool isError)
{
IsError = isError;
}

/// <summary>
/// True if item was found
/// </summary>
Expand All @@ -40,9 +45,19 @@ public CollectionItemResult(byte[] key, byte[][] items)
/// </summary>
internal byte[][] Items { get; }

/// <summary>
/// Indicates whether the result represents an error.
/// </summary>
internal readonly bool IsError { get; }
TalZaccai marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Instance of empty result
/// </summary>
internal static readonly CollectionItemResult Empty = new(null, item: null);

/// <summary>
/// Instance representing an error result.
/// </summary>
internal static readonly CollectionItemResult Error = new(true);
}
}
72 changes: 72 additions & 0 deletions libs/server/Resp/ClientCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -568,5 +568,77 @@ private bool NetworkCLIENTSETINFO()

return true;
}

/// <summary>
/// CLIENT UNBLOCK
/// </summary>
private bool NetworkCLIENTUNBLOCK()
{
if (parseState.Count is not (1 or 2))
{
return AbortWithWrongNumberOfArguments("client|unblock");
}

if (!parseState.TryGetLong(0, out var clientId))
{
return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER);
}

var toThrowError = false;
if (parseState.Count == 2)
{
var option = parseState.GetArgSliceByRef(1);
if (option.Span.EqualsUpperCaseSpanIgnoringCase(CmdStrings.TIMEOUT))
{
toThrowError = false;
}
else if (option.Span.EqualsUpperCaseSpanIgnoringCase(CmdStrings.ERROR))
{
toThrowError = true;
}
else
{
return AbortWithErrorMessage(CmdStrings.RESP_ERR_INVALID_CLIENT_UNBLOCK_REASON);
}
}

if (Server is GarnetServerBase garnetServer)
TalZaccai marked this conversation as resolved.
Show resolved Hide resolved
{
var session = garnetServer.ActiveConsumers().OfType<RespServerSession>().FirstOrDefault(x => x.Id == clientId);

if (session is null)
{
while (!RespWriteUtils.WriteInteger(0, ref dcurr, dend))
SendAndReset();
return true;
}

if (session.storeWrapper?.itemBroker is not null)
TalZaccai marked this conversation as resolved.
Show resolved Hide resolved
{
var isBlocked = session.storeWrapper.itemBroker.TryGetObserver(session.ObjectStoreSessionID, out var observer);

if (!isBlocked)
{
while (!RespWriteUtils.WriteInteger(0, ref dcurr, dend))
SendAndReset();
return true;
}

if (toThrowError)
{
observer.CancellationTokenSource.Cancel();
}
else
{
observer.ResultFoundSemaphore.Release();
}
}

while (!RespWriteUtils.WriteInteger(1, ref dcurr, dend))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client may be unblocked between you checking if it's blocked and attempting to unblock it (in which case you should return 0). In other words, you should get some feedback from the observer that indicates if you initiated the unblocking or not.

Copy link
Contributor Author

@Vijay-Nirmal Vijay-Nirmal Jan 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved this inside the if, I think that should work as expected, because only why this like can execute observer.CancellationTokenSource.Cancel() or observer.ResultFoundSemaphore.Release() is called. Let me know, If I understood it wrongly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still have a context switch happen here. What you need to do is have something similar to observer.HandleSetResult (say, observer.TryForceUnblock) that locks the ObserverStatusLock, and returns a boolean that indicates if the observer was force unblocked or not.

SendAndReset();
}

return true;
}
}
}
5 changes: 5 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> WEIGHTS => "WEIGHTS"u8;
public static ReadOnlySpan<byte> AGGREGATE => "AGGREGATE"u8;
public static ReadOnlySpan<byte> SUM => "SUM"u8;
public static ReadOnlySpan<byte> TIMEOUT => "TIMEOUT"u8;
public static ReadOnlySpan<byte> ERROR => "ERROR"u8;

/// <summary>
/// Response strings
Expand Down Expand Up @@ -215,6 +217,8 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_INCR_SUPPORTS_ONLY_SINGLE_PAIR => "ERR INCR option supports a single increment-element pair"u8;
public static ReadOnlySpan<byte> RESP_ERR_INVALID_BITFIELD_TYPE => "ERR Invalid bitfield type. Use something like i16 u8. Note that u64 is not supported but i64 is"u8;
public static ReadOnlySpan<byte> RESP_ERR_SCRIPT_FLUSH_OPTIONS => "ERR SCRIPT FLUSH only support SYNC|ASYNC option"u8;
public static ReadOnlySpan<byte> RESP_ERR_INVALID_CLIENT_UNBLOCK_REASON => "ERR CLIENT UNBLOCK reason should be TIMEOUT or ERROR"u8;
public static ReadOnlySpan<byte> RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK => "UNBLOCKED client unblocked via CLIENT UNBLOCK"u8;

/// <summary>
/// Response string templates
Expand Down Expand Up @@ -309,6 +313,7 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> KILL => "KILL"u8;
public static ReadOnlySpan<byte> GETNAME => "GETNAME"u8;
public static ReadOnlySpan<byte> SETINFO => "SETINFO"u8;
public static ReadOnlySpan<byte> UNBLOCK => "UNBLOCK"u8;
public static ReadOnlySpan<byte> USER => "USER"u8;
public static ReadOnlySpan<byte> ADDR => "ADDR"u8;
public static ReadOnlySpan<byte> LADDR => "LADDR"u8;
Expand Down
13 changes: 13 additions & 0 deletions libs/server/Resp/Objects/ListCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,13 @@ private bool ListBlockingPop(RespCommand command)

var result = storeWrapper.itemBroker.GetCollectionItemAsync(command, keysBytes, this, timeout).Result;

if (result.IsError)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK, ref dcurr, dend))
SendAndReset();
return true;
}

if (!result.Found)
{
while (!RespWriteUtils.WriteNullArray(ref dcurr, dend))
Expand Down Expand Up @@ -979,6 +986,12 @@ private unsafe bool ListBlockingPopMultiple()

var result = storeWrapper.itemBroker.GetCollectionItemAsync(RespCommand.BLMPOP, keysBytes, this, timeout, cmdArgs).Result;

if (result.IsError)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK, ref dcurr, dend))
SendAndReset();
}

if (!result.Found)
{
while (!RespWriteUtils.WriteNull(ref dcurr, dend))
Expand Down
6 changes: 6 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public enum RespCommand : ushort
CLIENT_GETNAME,
CLIENT_SETNAME,
CLIENT_SETINFO,
CLIENT_UNBLOCK,

MONITOR,
MODULE,
Expand Down Expand Up @@ -370,6 +371,7 @@ public static class RespCommandExtensions
RespCommand.CLIENT_GETNAME,
RespCommand.CLIENT_SETNAME,
RespCommand.CLIENT_SETINFO,
RespCommand.CLIENT_UNBLOCK,
// Command
RespCommand.COMMAND,
RespCommand.COMMAND_COUNT,
Expand Down Expand Up @@ -1687,6 +1689,10 @@ private RespCommand SlowParseCommand(ref int count, ref ReadOnlySpan<byte> speci
{
return RespCommand.CLIENT_SETINFO;
}
else if (subCommand.SequenceEqual(CmdStrings.UNBLOCK))
{
return RespCommand.CLIENT_UNBLOCK;
}
}
}
else if (command.SequenceEqual(CmdStrings.AUTH))
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp
RespCommand.CLIENT_GETNAME => NetworkCLIENTGETNAME(),
RespCommand.CLIENT_SETNAME => NetworkCLIENTSETNAME(),
RespCommand.CLIENT_SETINFO => NetworkCLIENTSETINFO(),
RespCommand.CLIENT_UNBLOCK => NetworkCLIENTUNBLOCK(),
RespCommand.COMMAND => NetworkCOMMAND(),
RespCommand.COMMAND_COUNT => NetworkCOMMAND_COUNT(),
RespCommand.COMMAND_DOCS => NetworkCOMMAND_DOCS(),
Expand Down
1 change: 1 addition & 0 deletions playground/CommandInfoUpdater/SupportedCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class SupportedCommand
new("CLIENT|GETNAME", RespCommand.CLIENT_GETNAME),
new("CLIENT|SETNAME", RespCommand.CLIENT_SETNAME),
new("CLIENT|SETINFO", RespCommand.CLIENT_SETINFO),
new("CLIENT|UNBLOCK", RespCommand.CLIENT_UNBLOCK),
]),
new("CLUSTER", RespCommand.CLUSTER,
[
Expand Down
15 changes: 15 additions & 0 deletions test/Garnet.test/Resp/ACL/RespCommandTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,21 @@ static async Task DoClientSetInfoAsync(GarnetClient client)
}
}

[Test]
public async Task ClientUnblockACLsAsync()
{
await CheckCommandsAsync(
"CLIENT UNBLOCK",
[DoClientUnblockAsync]
);

static async Task DoClientUnblockAsync(GarnetClient client)
{
var count = await client.ExecuteForLongResultAsync("CLIENT", ["UNBLOCK", "123"]);
ClassicAssert.AreEqual(0, count);
}
}

[Test]
public async Task ClusterAddSlotsACLsAsync()
{
Expand Down
1 change: 1 addition & 0 deletions test/Garnet.test/RespCommandTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ public void AofIndependentCommandsTest()
RespCommand.CLIENT_GETNAME,
RespCommand.CLIENT_SETNAME,
RespCommand.CLIENT_SETINFO,
RespCommand.CLIENT_UNBLOCK,
// Command
RespCommand.COMMAND,
RespCommand.COMMAND_COUNT,
Expand Down
Loading
Loading