Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use seed in Kademlia peer discovery #3568

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Libplanet.Net/Protocols/IProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ public interface IProtocol
/// <returns>An awaitable task without value.</returns>
Task RebuildConnectionAsync(int depth, CancellationToken cancellationToken);

/// <summary>
/// Reconstructs network connection between peers on network using seed peers.
/// </summary>
/// <param name="seedPeers">The list of the see peers.</param>
/// <param name="depth">Recursive operation depth to search peers from network.</param>
/// <param name="cancellationToken">A cancellation token used to propagate notification
/// that this operation should be canceled.</param>
/// <returns>An awaitable task without value.</returns>
Task RebuildConnectionAsync(
IEnumerable<BoundPeer> seedPeers,
int depth,
CancellationToken cancellationToken);

/// <summary>
/// Checks the <see cref="KBucket"/> in the <see cref="RoutingTable"/> and if
/// there is an empty <see cref="KBucket"/>, fill it with <see cref="BoundPeer"/>s
Expand Down
34 changes: 34 additions & 0 deletions Libplanet.Net/Protocols/KademliaProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,34 @@ public async Task RebuildConnectionAsync(int depth, CancellationToken cancellati
}
}

/// <inheritdoc />
public async Task RebuildConnectionAsync(
IEnumerable<BoundPeer> seedPeers,
int depth,
CancellationToken cancellationToken)
{
_logger.Verbose("Rebuilding connection using seed peers...");
var history = new ConcurrentBag<BoundPeer>();
var dialHistory = new ConcurrentBag<BoundPeer>();
var tasks = seedPeers.Select(seed =>
FindPeerAsync(
history,
dialHistory,
_address,
seed,
depth,
_requestTimeout,
cancellationToken)).ToList();

try
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
catch (TimeoutException)
{
}
}

/// <inheritdoc />
public async Task CheckReplacementCacheAsync(CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -722,6 +750,12 @@ await _transport.ReplyMessageAsync(pong, message.Identity, default)
"Some responses from neighbors found unexpectedly terminated");
}

if (depth == 1)
{
// depth 1 means spawn FindPeerAsync task of depth 0, and it does nothing.
return;
}

var findPeerTasks = new List<Task>();
BoundPeer closestKnownPeer = closestCandidate.FirstOrDefault();
var count = 0;
Expand Down
27 changes: 24 additions & 3 deletions Libplanet.Net/Protocols/RoutingTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ public class RoutingTable : IRoutingTable
/// <param name="address"><see cref="Address"/> of this peer.</param>
/// <param name="tableSize">The number of buckets in the table.</param>
/// <param name="bucketSize">The size of a single bucket.</param>
/// <param name="seedPeers">The list of the seed peers.
/// If null is given, <see cref="SeedPeers"/> is set to an empty list.</param>
/// <exception cref="ArgumentOutOfRangeException">
/// Thrown when <paramref name="tableSize"/> or <paramref name="bucketSize"/> is
/// less then or equal to 0.</exception>
public RoutingTable(
Address address,
int tableSize = Kademlia.TableSize,
int bucketSize = Kademlia.BucketSize)
int bucketSize = Kademlia.BucketSize,
IEnumerable<BoundPeer>? seedPeers = null)
{
if (tableSize <= 0)
{
Expand All @@ -45,6 +48,7 @@ public class RoutingTable : IRoutingTable
_address = address;
TableSize = tableSize;
BucketSize = bucketSize;
SeedPeers = seedPeers?.ToList() ?? new List<BoundPeer>();
_logger = Log
.ForContext<RoutingTable>()
.ForContext("Source", nameof(RoutingTable));
Expand All @@ -67,6 +71,13 @@ public class RoutingTable : IRoutingTable
/// </summary>
public int BucketSize { get; }

/// <summary>
/// The list of the seed peers.
/// <remarks>Seed peers are excluded from bound peer selection, and neighbor.</remarks>
/// <seealso cref="PeersToBroadcast"/>
/// </summary>
public IReadOnlyList<BoundPeer> SeedPeers { get; }

/// <inheritdoc />
public int Count => _buckets.Sum(bucket => bucket.Count);

Expand Down Expand Up @@ -144,7 +155,7 @@ public bool Contains(BoundPeer peer)
Peers.FirstOrDefault(peer => peer.Address.Equals(addr));

/// <summary>
/// Removes all peers in the table. This method does not affect static peers.
/// Removes all peers in the table.
/// </summary>
public void Clear()
{
Expand Down Expand Up @@ -179,7 +190,6 @@ public IReadOnlyList<BoundPeer> Neighbors(BoundPeer target, int k, bool includeT
/// <returns>An enumerable of <see cref="BoundPeer"/>.</returns>
public IReadOnlyList<BoundPeer> Neighbors(Address target, int k, bool includeTarget)
{
// TODO: Should include static peers?
var sorted = _buckets
.Where(b => !b.IsEmpty)
.SelectMany(b => b.Peers)
Expand All @@ -200,12 +210,17 @@ public IReadOnlyList<BoundPeer> Neighbors(Address target, int k, bool includeTar

/// <summary>
/// Marks <paramref name="peer"/> checked and refreshes last checked time of the peer.
/// If the given <paramref name="peer"/> is one of the <see cref="SeedPeers"/>,
/// it is not added.
/// </summary>
/// <param name="peer">The <see cref="BoundPeer"/> to check.</param>
/// <param name="start"><see cref="DateTimeOffset"/> at the beginning of the check.</param>
/// <param name="end"><see cref="DateTimeOffset"/> at the end of the check.</param>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="peer"/> is <see langword="null"/>.</exception>
/// <exception cref="ArgumentException">
/// Thrown when given <paramref name="peer"/>'s <see cref="Address"/> is equal to
/// <see cref="RoutingTable"/>'s <see cref="Address"/>.</exception>
public void Check(BoundPeer peer, DateTimeOffset start, DateTimeOffset end)
=> BucketOf(peer).Check(peer, start, end);

Expand All @@ -218,6 +233,12 @@ internal void AddPeer(BoundPeer peer, DateTimeOffset updated)
nameof(peer));
}

if (SeedPeers.Any(seed => peer.Address.Equals(seed.Address)))
{
_logger.Verbose("A seed peer is disallowed to add in the routing table.");
return;
}

_logger.Debug("Adding peer {Peer} to the routing table...", peer);
BucketOf(peer).AddPeer(peer, updated);
}
Expand Down
22 changes: 18 additions & 4 deletions Libplanet.Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ public partial class Swarm : IDisposable

Options = options ?? new SwarmOptions();
TxCompletion = new TxCompletion<BoundPeer>(BlockChain, GetTxsAsync, BroadcastTxs);
RoutingTable = new RoutingTable(Address, Options.TableSize, Options.BucketSize);
RoutingTable = new RoutingTable(
Address,
Options.TableSize,
Options.BucketSize,
Options.StaticPeers);

// FIXME: after the initialization of NetMQTransport is fully converted to asynchronous
// code, the portion initializing the swarm in Agent.cs in NineChronicles should be
Expand Down Expand Up @@ -1432,9 +1436,19 @@ private bool IsBlockNeeded(IBlockExcerpt target)
try
{
await Task.Delay(period, cancellationToken);
await PeerDiscovery.RebuildConnectionAsync(
Kademlia.MaxDepth,
cancellationToken);
if (RoutingTable.SeedPeers.Any())
{
await PeerDiscovery.RebuildConnectionAsync(
RoutingTable.SeedPeers,
1,
cancellationToken);
}
else
{
await PeerDiscovery.RebuildConnectionAsync(
Kademlia.MaxDepth,
cancellationToken);
}
}
catch (OperationCanceledException e)
{
Expand Down
Loading