CHAPTER 9
It is time now to deal with the various issues of bucket management.
When the bucket is full, we address evicting unresponsive contacts in full buckets as well as queuing pending contacts. We simulate this for unit testing with our VirtualProtocol in the AddContact method by setting up a flag that is used to indicate that the node did not respond.
Code Listing 66: VirtualProtocol Ping
public RpcError Ping(Contact sender) { // Ping still adds/updates the sender's contact. if (Responds) { Node.Ping(sender); } return new RpcError() { TimeoutError = !Responds }; } |
We can then implement the handling of a nonresponsive node.
Code Listing 67: Handling Nonresponsive Nodes (fragment)
else if (kbucket.IsBucketFull) { if (CanSplit(kbucket)) { // Split the bucket and try again. (KBucket k1, KBucket k2) = kbucket.Split(); int idx = GetKBucketIndex(contact.ID); buckets[idx] = k1; buckets.Insert(idx + 1, k2); buckets[idx].Touch(); buckets[idx + 1].Touch(); AddContact(contact); } else { Contact lastSeenContact = kbucket.Contacts.OrderBy(c => c.LastSeen).First(); RpcError error = lastSeenContact.Protocol.Ping(ourContact); if (error.HasError) { // Null continuation is used because unit tests may not initialize a DHT. dht?.DelayEviction(lastSeenContact, contact); } else { // Still can't add the contact, so put it into the pending list. dht?.AddToPending(contact); } } } |
From the spec: “Buckets are generally kept fresh by the traffic of requests traveling through nodes. To handle pathological cases in which there are no lookups for a particular ID range, each node refreshes any bucket to which it has not performed a node lookup in the past hour. Refreshing means picking a random ID in the bucket’s range and performing a node search for that ID.”
The phrase “any bucket to which it has not performed a node lookup” is subject to at least two interpretations. One way to interpret this is possibly “the bucket whose range contains the key in the key-value pair for a Store or FindValue operation. Another interpretation is “the k-bucket containing the range for any contact ID queried during during the lookup process.” This second approach might seem more correct because the original alpha contacts is determined from the list of closest contacts across all buckets, but it then becomes arbitrary as to whether to also touch the buckets containing the contacts returned by the FindNodes query that are then queried further.
I am choosing the first interpretation, which means that the bucket containing the key gets touched in the Store and FindValue methods of the Dht class.
Code Listing 68: TouchBucketWithKey
public class Dht { TouchBucketWithKey(key); ... public (bool found, List<Contact> contacts, string val) FindValue(ID key) { TouchBucketWithKey(key); { node.BucketList.GetKBucket(key).Touch(); } } |
Next, we set up a refresh timer in the Dht class.
Code Listing 69: SetupBucketRefreshTimer
protected void SetupBucketRefreshTimer() { bucketRefreshTimer = new Timer(Constants.BUCKET_REFRESH_INTERVAL); bucketRefreshTimer.AutoReset = true; bucketRefreshTimer.Elapsed += BucketRefreshTimerElapsed; bucketRefreshTimer.Start(); } { DateTime now = DateTime.Now; // Put into a separate list as bucket collections may be modified. List<KBucket> currentBuckets = currentBuckets.ForEach(b => RefreshBucket(b)); } { bucket.Touch(); ID rndId = ID.RandomIDWithinBucket(bucket); // Isolate in a separate list as contacts collection for this bucket might change. List<Contact> contacts = bucket.Contacts.ToList(); contacts.ForEach(c => { var (newContacts, timeoutError) = c.Protocol.FindNode(ourContact, rndId); HandleError(timeoutError, c); newContacts?.ForEach(otherContact => node.BucketList.AddContact(otherContact)); }); } |
Note that now when a bucket is refreshed, it is always touched, which updates its “last seen” timestamp.
Two unit tests for eviction verify the two possible conditions.
Code Listing 70: NonRespondingContactEvictedTest
/// <summary> /// Tests that a nonresponding contact is evicted after /// </summary> [TestMethod] public void NonRespondingContactEvictedTest() { // Create a DHT so we have an eviction handler. Dht dht = new Dht(ID.Zero, new VirtualProtocol(), () => null, new Router()); IBucketList bucketList = SetupSplitFailure(dht.Node.BucketList); Assert.IsTrue(bucketList.Buckets.Count == 2, "Bucket split should have occurred."); Assert.IsTrue(bucketList.Buckets[0].Contacts.Count == 1, Assert.IsTrue(bucketList.Buckets[1].Contacts.Count == 20, // The bucket is now full. Pick the first contact, as it is last Contact nonRespondingContact = bucketList.Buckets[1].Contacts[0]; // Since the protocols are shared, we need to assign a unique protocol VirtualProtocol vpUnresponding = nonRespondingContact.Protocol = vpUnresponding; // Setup the next new contact (it can respond.) Contact nextNewContact = new Contact(dht.Contact.Protocol, ID.Zero.SetBit(159)); // Hit the nonresponding contact EVICTION_LIMIT times, which will trigger Constants.EVICTION_LIMIT.ForEach(() => bucketList.AddContact(nextNewContact)); Assert.IsTrue(bucketList.Buckets[1].Contacts.Count == 20, // Verify CanSplit -> Pending eviction happened. Assert.IsTrue(dht.PendingContacts.Count == 0, Assert.IsFalse(bucketList.Buckets.SelectMany( Assert.IsTrue(bucketList.Buckets.SelectMany( Assert.IsTrue(dht.EvictionCount.Count == 0, } |
Code Listing 71: NonRespondingContactDelayedEvictionTest
/// <summary> /// Tests that a nonresponding contact puts the new contact into a pending list. /// </summary> [TestMethod] public void NonRespondingContactDelayedEvictionTest() { // Create a DHT so we have an eviction handler. Dht dht = new Dht(ID.Zero, new VirtualProtocol(), () => null, new Router()); IBucketList bucketList = SetupSplitFailure(dht.Node.BucketList);
Assert.IsTrue(bucketList.Buckets.Count == 2, Assert.IsTrue(bucketList.Buckets[0].Contacts.Count == 1, Assert.IsTrue(bucketList.Buckets[1].Contacts.Count == 20,
// The bucket is now full. Pick the first contact, as it is Contact nonRespondingContact = bucketList.Buckets[1].Contacts[0]; // Since the protocols are shared, we need to assign a unique protocol VirtualProtocol vpUnresponding = new nonRespondingContact.Protocol = vpUnresponding; // Setup the next new contact (it can respond.) Contact nextNewContact = new Contact(dht.Contact.Protocol, ID.Zero.SetBit(159)); bucketList.AddContact(nextNewContact); Assert.IsTrue(bucketList.Buckets[1].Contacts.Count == 20, // Verify CanSplit -> Evict happened. Assert.IsTrue(dht.PendingContacts.Count == 1, "Expected one pending contact."); Assert.IsTrue(dht.PendingContacts.Contains(nextNewContact), Assert.IsTrue(dht.EvictionCount.Count == 1, } |
Both unit tests setup a “failed split” so that the eviction routines are triggered.
Code Listing 72: SetupSplitFailure
protected IBucketList SetupSplitFailure(IBucketList bucketList = null) { // force host node ID to < 2^159 so the node ID is not in the 2^159 ... 2^160 range byte[] bhostID = new byte[20]; bhostID[19] = 0x7F; ID hostID = new ID(bhostID); Contact dummyContact = new Contact(new VirtualProtocol(), hostID); ((VirtualProtocol)dummyContact.Protocol).Node = bucketList = bucketList ?? new BucketList(hostID, dummyContact); // Also add a contact in this 0 - 2^159 range, arbitrarily something // otherwise, buckets will in the 2^159 ... 2^160 space. dummyContact = new Contact(new VirtualProtocol(), ID.One); ((VirtualProtocol)dummyContact.Protocol).Node = bucketList.AddContact(new Contact(dummyContact.Protocol, ID.One)); Assert.IsTrue(bucketList.Buckets.Count == 1, Assert.IsTrue(bucketList.Buckets[0].Contacts.Count == 1, // make sure contact IDs all have the same 5-bit prefix byte[] bcontactID = new byte[20]; bcontactID[19] = 0x80; // 1000 xxxx prefix, xxxx starts at 1000 (8) // this ensures that all the contacts in a bucket match only the prefix as // |----| shared range // 1000 1000 ... // 1000 1100 ... // 1000 1110 ... byte shifter = 0x08; int pos = 19; Constants.K.ForEach(() => { bcontactID[pos] |= shifter; ID contactID = new ID(bcontactID); dummyContact = new Contact(new VirtualProtocol(), ID.One); ((VirtualProtocol)dummyContact.Protocol).Node = bucketList.AddContact(new Contact(dummyContact.Protocol, contactID)); shifter >>= 1; if (shifter == 0) { shifter = 0x80; --pos; } }); return bucketList; } |