left-icon

The Kademlia Protocol Succinctly®
by Marc Clifton

Previous
Chapter

of
A
A
A

CHAPTER 14

RPC Error Handling and Delayed Eviction

RPC Error Handling and Delayed Eviction


One of the optimizations in the Kademlia protocol can now be implemented—delayed eviction. From the spec: “When a Kademlia node receives an RPC from an unknown contact and the k-bucket for that contact is already full with k entries, the node places the new contact in a replacement cache of nodes eligible to replace stale k-bucket entries. The next time the node queries contacts in the k-bucket, any unresponsive ones can be evicted and replaced with entries in the replacement cache. The replacement cache is kept sorted by time last seen, with the most recently seen entry having the highest priority as a replacement candidate.

What happens when the peer throws an exception or the random ID that the peer responds with doesn’t match what was sent? To make matters simple, in this implementation we’ll handle all error conditions, including timeouts, in the same way.

The implementation for handling evictions and replacing them with contacts waiting to be added to the bucket is handled in the Dht. First, the error handler.

Code Listing 108: HandleError Method

/// <summary>

/// Put the timed out contact into a collection and increment the number of
// times it has timed out.

/// If it has timed out a certain amount, remove it from the bucket and
/// replace it with the most

/// recent pending contact that are queued for that bucket.

/// </summary>

public void HandleError(RpcError error, Contact contact)

{

  // For all errors:

  int count = AddContactToEvict(contact.ID.Value);

  if (count == Constants.EVICTION_LIMIT)

  {

    ReplaceContact(contact);

  }

}

And the implementation of DelayEviction.

Code Listing 109: DelayEviction

/// <summary>

///  The contact that did not respond (or had an error) gets n tries before
/// being evicted and replaced with the most recently contact that wants to
/// go into the non-responding contact's kbucket.

/// </summary>

/// <param name="toEvict">The contact that didn't respond.</param>

/// <param name="toReplace">The contact that can replace
/// the non-responding contact.</param>

public void DelayEviction(Contact toEvict, Contact toReplace)

{

  // Non-concurrent list needs locking.

  lock (pendingContacts)

  {

    // Add only if it's a new pending contact.

    pendingContacts.AddDistinctBy(toReplace, c=>c.ID);

  }

  BigInteger key = toEvict.ID.Value;

  int count = AddContactToEvict(key);

  if (count == Constants.EVICTION_LIMIT)

  {

    ReplaceContact(toEvict);

  }

}

Note the difference:

  • When a contact fails to respond with an RPC call, its eviction count is incremented, and if exceeded, it is removed and replaced with the most recently seen contact that goes in the bucket.
  • Alternatively, when a contact is being added to a full bucket and the last seen contact fails to respond (or has an error), it is added to the eviction pool, and the new contact wanting to be added is placed into the pending contacts pool.

The rest is just the implementation of the helper methods.

Code Listing 110: Eviction Helper Methods

protected int AddContactToEvict(BigInteger key)

{

  if (!evictionCount.ContainsKey(key))

  {

    evictionCount[key] = 0;

  }

  int count = evictionCount[key] + 1;

  evictionCount[key] = count;

  return count;

}

protected void ReplaceContact(Contact toEvict)

{

  KBucket bucket = node.BucketList.GetKBucket(toEvict.ID);

  // Prevent other threads from manipulating the bucket list or buckets.

  lock (node.BucketList)

  {

    EvictContact(bucket, toEvict);

    ReplaceWithPendingContact(bucket);

  }

}

protected void EvictContact(KBucket bucket, Contact toEvict)

{

  evictionCount.TryRemove(toEvict.ID.Value, out _);

  Validate.IsTrue<BucketDoesNotContainContactToEvict>(bucket.Contains(toEvict.ID),
   "Bucket doesn't contain the contact to be evicted.");

  bucket.EvictContact(toEvict);

}

/// <summary>

/// Find a pending contact that goes into the bucket that now has room.

/// </summary>

protected void ReplaceWithPendingContact(KBucket bucket)

{

  Contact contact;

  // Nonconcurrent list needs locking while we query it.

  lock (pendingContacts)

  {

    contact = pendingContacts.Where(c => node.BucketList.GetKBucket(c.ID) ==
      bucket).OrderBy(c => c.LastSeen).LastOrDefault();

    if (contact != null)

    {

      pendingContacts.Remove(contact);

      bucket.AddContact(contact);

    }

  }

}

Scroll To Top
Disclaimer
DISCLAIMER: Web reader is currently in beta. Please report any issues through our support system. PDF and Kindle format files are also available for download.

Previous

Next



You are one step away from downloading ebooks from the Succinctly® series premier collection!
A confirmation has been sent to your email address. Please check and confirm your email subscription to complete the download.