CHAPTER 14
One of the optimizations in the Kademlia protocol can now be implemented—delayed eviction. From the spec: “When a Kademlia node receives an RPC from an unknown contact and the k-bucket for that contact is already full with k entries, the node places the new contact in a replacement cache of nodes eligible to replace stale k-bucket entries. The next time the node queries contacts in the k-bucket, any unresponsive ones can be evicted and replaced with entries in the replacement cache. The replacement cache is kept sorted by time last seen, with the most recently seen entry having the highest priority as a replacement candidate.”
What happens when the peer throws an exception or the random ID that the peer responds with doesn’t match what was sent? To make matters simple, in this implementation we’ll handle all error conditions, including timeouts, in the same way.
The implementation for handling evictions and replacing them with contacts waiting to be added to the bucket is handled in the Dht. First, the error handler.
Code Listing 108: HandleError Method
/// <summary> /// Put the timed out contact into a collection and increment the number of /// If it has timed out a certain amount, remove it from the bucket and /// recent pending contact that are queued for that bucket. /// </summary> public void HandleError(RpcError error, Contact contact) { // For all errors: int count = AddContactToEvict(contact.ID.Value); if (count == Constants.EVICTION_LIMIT) { ReplaceContact(contact); } } |
And the implementation of DelayEviction.
Code Listing 109: DelayEviction
/// <summary> /// The contact that did not respond (or had an error) gets n tries before /// </summary> /// <param name="toEvict">The contact that didn't respond.</param> /// <param name="toReplace">The contact that can replace public void DelayEviction(Contact toEvict, Contact toReplace) { // Non-concurrent list needs locking. lock (pendingContacts) { // Add only if it's a new pending contact. pendingContacts.AddDistinctBy(toReplace, c=>c.ID); } BigInteger key = toEvict.ID.Value; int count = AddContactToEvict(key); if (count == Constants.EVICTION_LIMIT) { ReplaceContact(toEvict); } } |
Note the difference:
The rest is just the implementation of the helper methods.
Code Listing 110: Eviction Helper Methods
protected int AddContactToEvict(BigInteger key) { if (!evictionCount.ContainsKey(key)) { evictionCount[key] = 0; } int count = evictionCount[key] + 1; evictionCount[key] = count; return count; } protected void ReplaceContact(Contact toEvict) { KBucket bucket = node.BucketList.GetKBucket(toEvict.ID); // Prevent other threads from manipulating the bucket list or buckets. lock (node.BucketList) { EvictContact(bucket, toEvict); ReplaceWithPendingContact(bucket); } } protected void EvictContact(KBucket bucket, Contact toEvict) { evictionCount.TryRemove(toEvict.ID.Value, out _); Validate.IsTrue<BucketDoesNotContainContactToEvict>(bucket.Contains(toEvict.ID), bucket.EvictContact(toEvict); } /// <summary> /// Find a pending contact that goes into the bucket that now has room. /// </summary> protected void ReplaceWithPendingContact(KBucket bucket) { Contact contact; // Nonconcurrent list needs locking while we query it. lock (pendingContacts) { contact = pendingContacts.Where(c => node.BucketList.GetKBucket(c.ID) == if (contact != null) { pendingContacts.Remove(contact); bucket.AddContact(contact); } } } |