left-icon

The Kademlia Protocol Succinctly®
by Marc Clifton

Previous
Chapter

of
A
A
A

CHAPTER 9

Bucket Management

Bucket Management


It is time now to deal with the various issues of bucket management.

Eviction and queuing new contacts

When the bucket is full, we address evicting unresponsive contacts in full buckets as well as queuing pending contacts. We simulate this for unit testing with our VirtualProtocol in the AddContact method by setting up a flag that is used to indicate that the node did not respond.

Code Listing 66: VirtualProtocol Ping

public RpcError Ping(Contact sender)

{

  // Ping still adds/updates the sender's contact.

  if (Responds)

  {

    Node.Ping(sender);

  }

  return new RpcError() { TimeoutError = !Responds };

}

We can then implement the handling of a nonresponsive node.

Code Listing 67: Handling Nonresponsive Nodes (fragment)

else if (kbucket.IsBucketFull)

{

  if (CanSplit(kbucket))

  {

    // Split the bucket and try again.

    (KBucket k1, KBucket k2) = kbucket.Split();

    int idx = GetKBucketIndex(contact.ID);

    buckets[idx] = k1;

    buckets.Insert(idx + 1, k2);

    buckets[idx].Touch();

    buckets[idx + 1].Touch();

    AddContact(contact);

  }

  else

  {

    Contact lastSeenContact = kbucket.Contacts.OrderBy(c => c.LastSeen).First();

    RpcError error = lastSeenContact.Protocol.Ping(ourContact);

    if (error.HasError)

    {

      // Null continuation is used because unit tests may not initialize a DHT.

      dht?.DelayEviction(lastSeenContact, contact);

    }

    else

    {

      // Still can't add the contact, so put it into the pending list.

      dht?.AddToPending(contact);

    }

  }

}

Bucket refresh

From the spec: “Buckets are generally kept fresh by the traffic of requests traveling through nodes. To handle pathological cases in which there are no lookups for a particular ID range, each node refreshes any bucket to which it has not performed a node lookup in the past hour. Refreshing means picking a random ID in the bucket’s range and performing a node search for that ID.

The phrase “any bucket to which it has not performed a node lookup” is subject to at least two interpretations. One way to interpret this is possibly “the bucket whose range contains the key in the key-value pair for a Store or FindValue operation. Another interpretation is “the k-bucket containing the range for any contact ID queried during during the lookup process.” This second approach might seem more correct because the original alpha contacts is determined from the list of closest contacts across all buckets, but it then becomes arbitrary as to whether to also touch the buckets containing the contacts returned by the FindNodes query that are then queried further.

I am choosing the first interpretation, which means that the bucket containing the key gets touched in the Store and FindValue methods of the Dht class.

Code Listing 68: TouchBucketWithKey

public class Dht
{
  ...
  public void Store(ID key, string val)

  {

    TouchBucketWithKey(key);

    ...
  }

  public (bool found, List<Contact> contacts, string val) FindValue(ID key)

  {

    TouchBucketWithKey(key);
    ...
  }

  protected void TouchBucketWithKey(ID key)

  {

    node.BucketList.GetKBucket(key).Touch();

  }


}

Next, we set up a refresh timer in the Dht class.

Code Listing 69: SetupBucketRefreshTimer

protected void SetupBucketRefreshTimer()

{

  bucketRefreshTimer = new Timer(Constants.BUCKET_REFRESH_INTERVAL);

  bucketRefreshTimer.AutoReset = true;

  bucketRefreshTimer.Elapsed += BucketRefreshTimerElapsed;

  bucketRefreshTimer.Start();

}

protected void BucketRefreshTimerElapsed(object sender, ElapsedEventArgs e)

{

  DateTime now = DateTime.Now;

  // Put into a separate list as bucket collections may be modified.

  List<KBucket> currentBuckets =
    new List<KBucket>(node.BucketList.Buckets.
    Where(b => (now - b.TimeStamp).TotalMilliseconds >=
      Constants.BUCKET_REFRESH_INTERVAL));

  currentBuckets.ForEach(b => RefreshBucket(b));

}

protected void RefreshBucket(KBucket bucket)

{

  bucket.Touch();

  ID rndId = ID.RandomIDWithinBucket(bucket);

  // Isolate in a separate list as contacts collection for this bucket might change.

  List<Contact> contacts = bucket.Contacts.ToList();

  contacts.ForEach(c =>

  {

    var (newContacts, timeoutError) = c.Protocol.FindNode(ourContact, rndId);

    HandleError(timeoutError, c);

    newContacts?.ForEach(otherContact => node.BucketList.AddContact(otherContact));

  });

}

Note that now when a bucket is refreshed, it is always touched, which updates its “last seen” timestamp.

Unit tests

Two unit tests for eviction verify the two possible conditions.

  • A nonresponding contact is evicted after a present number of ping attempts.
  • A new contact is placed into a delayed eviction buffer.

Code Listing 70: NonRespondingContactEvictedTest

/// <summary>

/// Tests that a nonresponding contact is evicted after
/// Constant.EVICTION_LIMIT tries.

/// </summary>

[TestMethod]

public void NonRespondingContactEvictedTest()

{

  // Create a DHT so we have an eviction handler.

  Dht dht = new Dht(ID.Zero, new VirtualProtocol(), () => null, new Router());

  IBucketList bucketList = SetupSplitFailure(dht.Node.BucketList);

  Assert.IsTrue(bucketList.Buckets.Count == 2, "Bucket split should have occurred.");

  Assert.IsTrue(bucketList.Buckets[0].Contacts.Count == 1,
    "Expected 1 contact in bucket 0.");

  Assert.IsTrue(bucketList.Buckets[1].Contacts.Count == 20,
    "Expected 20 contacts in bucket 1.");

  // The bucket is now full. Pick the first contact, as it is last
// seen (they are added in chronological order.)

  Contact nonRespondingContact = bucketList.Buckets[1].Contacts[0];

  // Since the protocols are shared, we need to assign a unique protocol
  // for this node for testing.

  VirtualProtocol vpUnresponding =
    new VirtualProtocol(((VirtualProtocol)nonRespondingContact.Protocol).Node,
      false);

  nonRespondingContact.Protocol = vpUnresponding;

  // Setup the next new contact (it can respond.)

  Contact nextNewContact = new Contact(dht.Contact.Protocol, ID.Zero.SetBit(159));

  // Hit the nonresponding contact EVICTION_LIMIT times, which will trigger
  // the eviction algorithm.

  Constants.EVICTION_LIMIT.ForEach(() => bucketList.AddContact(nextNewContact));

  Assert.IsTrue(bucketList.Buckets[1].Contacts.Count == 20,
    "Expected 20 contacts in bucket 1.");

  // Verify CanSplit -> Pending eviction happened.

  Assert.IsTrue(dht.PendingContacts.Count == 0,
    "Pending contact list should now be empty.");

Assert.IsFalse(bucketList.Buckets.SelectMany(
    b => b.Contacts).Contains(nonRespondingContact),
      "Expected bucket to NOT contain non-responding contact.");

Assert.IsTrue(bucketList.Buckets.SelectMany(
    b => b.Contacts).Contains(nextNewContact),
      "Expected bucket to contain new contact.");

  Assert.IsTrue(dht.EvictionCount.Count == 0,
    "Expected no contacts to be pending eviction.");

}

Code Listing 71: NonRespondingContactDelayedEvictionTest

/// <summary>

/// Tests that a nonresponding contact puts the new contact into a pending list.

/// </summary>

[TestMethod]

public void NonRespondingContactDelayedEvictionTest()

{

  // Create a DHT so we have an eviction handler.

  Dht dht = new Dht(ID.Zero, new VirtualProtocol(), () => null, new Router());

  IBucketList bucketList = SetupSplitFailure(dht.Node.BucketList);

 

  Assert.IsTrue(bucketList.Buckets.Count == 2,
    "Bucket split should have occurred.");

  Assert.IsTrue(bucketList.Buckets[0].Contacts.Count == 1,
    "Expected 1 contact in bucket 0.");

  Assert.IsTrue(bucketList.Buckets[1].Contacts.Count == 20,
    "Expected 20 contacts in bucket 1.");

 

  // The bucket is now full. Pick the first contact, as it is
  // last seen (they are added in chronological order.)

  Contact nonRespondingContact = bucketList.Buckets[1].Contacts[0];

  // Since the protocols are shared, we need to assign a unique protocol
  // for this node for testing.

  VirtualProtocol vpUnresponding = new
    VirtualProtocol(((VirtualProtocol)nonRespondingContact.Protocol).Node, false);

  nonRespondingContact.Protocol = vpUnresponding;

  // Setup the next new contact (it can respond.)

  Contact nextNewContact = new Contact(dht.Contact.Protocol, ID.Zero.SetBit(159));

  bucketList.AddContact(nextNewContact);

  Assert.IsTrue(bucketList.Buckets[1].Contacts.Count == 20,
    "Expected 20 contacts in bucket 1.");

  // Verify CanSplit -> Evict happened.

  Assert.IsTrue(dht.PendingContacts.Count == 1, "Expected one pending contact.");

  Assert.IsTrue(dht.PendingContacts.Contains(nextNewContact),
    "Expected pending contact to be the 21st contact.");

  Assert.IsTrue(dht.EvictionCount.Count == 1,
    "Expected one contact to be pending eviction.");

}

Both unit tests setup a “failed split” so that the eviction routines are triggered.

Code Listing 72: SetupSplitFailure

protected IBucketList SetupSplitFailure(IBucketList bucketList = null)

{

  // force host node ID to < 2^159 so the node ID is not in the 2^159 ... 2^160 range

  byte[] bhostID = new byte[20];

  bhostID[19] = 0x7F;

  ID hostID = new ID(bhostID);

  Contact dummyContact = new Contact(new VirtualProtocol(), hostID);

  ((VirtualProtocol)dummyContact.Protocol).Node =
      new Node(dummyContact, new VirtualStorage());

  bucketList = bucketList ?? new BucketList(hostID, dummyContact);

  // Also add a contact in this 0 - 2^159 range, arbitrarily something
  // not our host ID. This ensures that only one bucket split will occur
  // after 20 nodes with ID >= 2^159 are added,

  // otherwise, buckets will in the 2^159 ... 2^160 space.

  dummyContact = new Contact(new VirtualProtocol(), ID.One);

  ((VirtualProtocol)dummyContact.Protocol).Node =
    new Node(dummyContact, new VirtualStorage());

    bucketList.AddContact(new Contact(dummyContact.Protocol, ID.One));

  Assert.IsTrue(bucketList.Buckets.Count == 1,
    "Bucket split should not have occurred.");

  Assert.IsTrue(bucketList.Buckets[0].Contacts.Count == 1,
    "Expected 1 contact in bucket 0.");

  // make sure contact IDs all have the same 5-bit prefix
  // and are in the 2^159 ... 2^160 - 1 space

  byte[] bcontactID = new byte[20];

  bcontactID[19] = 0x80;

  // 1000 xxxx prefix, xxxx starts at 1000 (8)

  // this ensures that all the contacts in a bucket match only the prefix as
  // only the first 5 bits are shared.

  // |----| shared range

  // 1000 1000 ...

  // 1000 1100 ...

  // 1000 1110 ...

  byte shifter = 0x08;

  int pos = 19;

  Constants.K.ForEach(() =>

  {

    bcontactID[pos] |= shifter;

    ID contactID = new ID(bcontactID);

    dummyContact = new Contact(new VirtualProtocol(), ID.One);

  ((VirtualProtocol)dummyContact.Protocol).Node =
       new Node(dummyContact, new     VirtualStorage());

    bucketList.AddContact(new Contact(dummyContact.Protocol, contactID));

    shifter >>= 1;

    if (shifter == 0)

    {

      shifter = 0x80;

      --pos;

    }

  });

  return bucketList;

}

Scroll To Top
Disclaimer
DISCLAIMER: Web reader is currently in beta. Please report any issues through our support system. PDF and Kindle format files are also available for download.

Previous

Next



You are one step away from downloading ebooks from the Succinctly® series premier collection!
A confirmation has been sent to your email address. Please check and confirm your email subscription to complete the download.