left-icon

The Kademlia Protocol Succinctly®
by Marc Clifton

Previous
Chapter

of
A
A
A

CHAPTER 10

Key-Value Management

Key-Value Management


In this section, you will learn something that is not at all clearly stated in the Kademlia specification—there are actually three kinds of data store:

  • Original publisher store
  • Republished store
  • Cached store

It is important to know that the Kademlia specification does not discuss this at all. The information in this section has been gleaned from closely looking at the discussion on the eMule project forum, particularly miniminime’s discussion of the roles that a node can take on.[14] Implementing this approach requires that the receiver peer knows whether to store the key-value in the republish store or in the cache store.

Original publisher store

The original publisher store is never written to as a result of an RPC store. This store is updated only by the peer itself when it is the originator of the key-value and wants to publish the key-value to other peers. Key-values from this store are republished once every 24 hours.

Republished store

This store contains key-values that have been republished by other peers as part of process of distributing the data among peers. This store never contains the peer’s (as an originator) own storage, but only key-values received from other peers. Key-values in this store are republished to other peers only if a closer peer is found. This check occurs every hour and is optimized to avoid calling the lookup algorithm:

  • For a particular bucket if it has already been queried for closer nodes.
  • If the key has been republished in the last hour.

Cached store

The cached store is used when republishing a FindValue request onto the next closest node. The intention here is to avoid hotspots during lookup of popular keys by temporarily republishing them onto other nearby peers. These temporary key-values have an expiration time. Key-values in the cached store are never republished, and are removed after the expiration time.

Storage mechanisms in the Dht class

The three storage mechanisms are managed in the Dht class.

Code Listing 73: Storage Mechanisms

protected IStorage originatorStorage;

protected IStorage republishStorage;

protected IStorage cacheStorage;

The Dht class implements a few constructor options, the first primarily for aiding unit testing.

Code Listing 74: Dht Constructors

/// <summary>

/// Use this constructor to initialize the stores to the same instance.

/// </summary>

public Dht(
  ID id,
  IProtocol protocol,
  Func<IStorage> storageFactory, BaseRouter router)

{

  originatorStorage = storageFactory();

  republishStorage = storageFactory();

  cacheStorage = storageFactory();

  FinishInitialization(id, protocol, router);

  SetupTimers();

}

/// <summary>

/// Supports different concrete storage types. For example, you may want
/// the cacheStorage to be an in-memory store, the originatorStorage to be
/// a SQL database, and the republish store to be a key-value database.

/// </summary>

public Dht(
  ID id,
  IProtocol protocol,
  BaseRouter router,
  IStorage originatorStorage,
  IStorage republishStorage,
  IStorage cacheStorage)

{

  this.originatorStorage = originatorStorage;

  this.republishStorage = republishStorage;

  this.cacheStorage = cacheStorage;

  FinishInitialization(id, protocol, router);

  SetupTimers();

}

The ability to specify different storage mechanisms can be very useful; however, this means that a Node must store the key-value in the appropriate storage.

Code Listing 75: The Node Store Method

/// <summary>

/// Store a key-value pair in the republish or cache storage.

/// </summary>

public void Store(
  Contact sender,
  ID key,
  string val,
  bool isCached = false,
  int expirationTimeSec = 0)

{

  Validate.IsFalse<SendingQueryToSelfException>(sender.ID == ourContact.ID,
    "Sender should not be ourself!");

  bucketList.AddContact(sender);

  if (isCached)

  {

    cacheStorage.Set(key, val, expirationTimeSec);

  }

  else

  {

    SendKeyValuesIfNewContact(sender);

    storage.Set(key, val, Constants.EXPIRATION_TIME_SECONDS);

  }

}

Republishing key-values

From the spec: “To ensure the persistence of key-value pairs, nodes must periodically republish keys. Otherwise, two phenomena may cause lookups for valid keys to fail. First, some of the k nodes that initially get a key-value pair when it is published may leave the network. Second, new nodes may join the network with IDs closer to some published key than the nodes on which the key-value pair was originally published. In both cases, the nodes with a key-value pair must republish it so as once again to ensure it is available on the k nodes closest to the key.

To compensate for nodes leaving the network, Kademlia republishes each key-value pair once an hour. A naive implementation of this strategy would require many messages—each of up to k nodes storing a key-value pair would perform a node lookup followed by k - 1 STORE RPCs every hour.

From Wikipedia,[15] which can be helpful for understanding the spec with different phrasing:

“Periodically, a node that stores a value will explore the network to find the k nodes that are close to the key value and replicate the value onto them. This compensates for disappeared nodes.”

And:

“The node that is providing the file [key-value] will periodically refresh the information onto the network (perform FIND_NODE and STORE messages). When all of the nodes having the file [key-value] go offline, nobody will be refreshing its values (sources and keywords) and the information will eventually disappear from the network.”

The Wikipedia write-up clarifies what is meant by “on the k nodes closest to the key.” In other words, for each key, a FindNode is called to find closer nodes, and the value is republished.  Without the optimizations, this can be a time-consuming process if there’s a lot of key-values in a node’s store, which is addressed in an optimization later.

First optimization

From the spec: “Fortunately, the republishing process can be heavily optimized. First, when a node receives a STORE RPC for a given key-value pair, it assumes the RPC was also issued to the other k - 1 closest nodes, and thus the recipient will not republish the key-value pair in the next hour. This ensures that as long as republication intervals are not exactly synchronized, only one node will republish a given key-value pair every hour.

This first optimization is simple—when receiving a store, update the timestamp on the key-value. Any key-value that has been touched within the last hour is not republished, as we can assume:

  • For a new key-value, it was also published to k closer nodes.
  • If it’s been republished by another node, that node republished it to k closer nodes.

Second optimization

From the spec: “A second optimization avoids performing node lookups before republishing keys. As described in Section 2.4, to handle unbalanced trees, nodes split k-buckets as required to ensure they have complete knowledge of a surrounding subtree with at least k nodes. If, before republishing key-value pairs, a node u refreshes all k-buckets in this subtree of k nodes, it will automatically be able to figure out the k closest nodes to a given key. These bucket refreshes can be amortized over the republication of many keys.

This second optimization is straightforward—if we’ve done a bucket refresh within the last hour, we can avoid calling FindNode (the node lookup algorithm.) How do we determine the bucket to test if it’s been refreshed? The bucket for which the key is in range should contain some closer contacts we’ve seen for that key. While the answer might be obvious, it’s worthwhile to discuss the reasoning here.

Buckets in the bucket list are maintained in range order rather than in a tree, which naturally orders them by their prefix.

Table 1: Bucket Range(s)

State

Bucket Range(s)

Prefix(es)

Initial Bucket

0 .. 2160

1

Two Buckets

0 .. 2159 | 2159 .. 2160

01, 1

Four Buckets

0 .. 2158 | 2158 .. 2159 | 2159 - 2159 + 2158 | 2159 - 2159+2158 .. 2160

001, 01, 10, 1

When we identify a bucket with a given key, the contacts in that bucket are closest, as per the XOR computation on the prefix. For example, looking at the four buckets with prefixes 001, 01, 10, and 1, we see that the contacts in the key’s bucket range are closest (the closest bucket contacts are in green, and farther bucket contacts are in red).

Table 2: Key Prefixes

Key Prefix

Key Prefix ^ Bucket Prefixes

Explanation

1

101, 11, 01, 0

Bucket with prefix 1 always has contacts that are closer

01

011, 00, 11, 11

Bucket with prefix 01 always has contacts that are closer

001

000, 011, 101, 101

Bucket with prefix 001 always has contacts that are closer

0001

0011, 0101, 1001, 1001

Bucket with prefix 001 always has contacts that are closer

For this reason, we use the bucket for which the key is in range. Also, new key-values that are published onto to closer nodes persist for 24 hours.

Implementation

Key-values in the republish store are republished at a particular interval, typically every hour.

Code Listing 76: KeyValueRepublishElapsed

/// <summary>

/// Replicate key values if the key-value hasn’t been touched within
/// the republish interval. Also don't do a FindNode lookup if the bucket
/// containing the key has been refreshed within the refresh interval.

/// </summary>

protected void KeyValueRepublishElapsed(object sender, ElapsedEventArgs e)

{

  DateTime now = DateTime.Now;

  republishStorage.Keys.Where(k =>
    (now - republishStorage.GetTimeStamp(k)).TotalMilliseconds >=
      Constants.KEY_VALUE_REPUBLISH_INTERVAL).ForEach(k=>

  {

    ID key = new ID(k);

    StoreOnCloserContacts(key, republishStorage.Get(key));

    republishStorage.Touch(k);               

  });

}

Note how a lookup is only performed if the bucket containing the key hasn’t itself been refreshed recently (within the past hour).

Code Listing 77: StoreOnCloserContacts

/// <summary>

/// Perform a lookup if the bucket containing the key has not been refreshed,

/// otherwise just get the contacts the k closest contacts we know about.

/// </summary>

protected void StoreOnCloserContacts(ID key, string val)

{

  DateTime now = DateTime.Now;

  KBucket kbucket = node.BucketList.GetKBucket(key);

  List<Contact> contacts;

  if ((now - kbucket.TimeStamp).TotalMilliseconds <
    Constants.BUCKET_REFRESH_INTERVAL)

  {

    // Bucket has been refreshed recently, so don't do a lookup as we
    // have the k closes contacts.

    contacts = node.BucketList.GetCloseContacts(key, node.OurContact.ID);

  }

  else

  {

    contacts = router.Lookup(key, router.RpcFindNodes).contacts;

  }

  contacts.ForEach(c =>

  {

    RpcError error = c.Protocol.Store(node.OurContact, key, val);

    HandleError(error, c);

  });

}

Expiring key-value pairs

Expired key-values are removed from the republish and cache storage, which happens in the Dht class.

Code Listing 78: ExpireKeysElapsed

/// <summary>

/// Any expired keys in the republish or node's cache are removed.

/// </summary>

protected virtual void ExpireKeysElapsed(object sender, ElapsedEventArgs e)

{

  RemoveExpiredData(cacheStorage);

  RemoveExpiredData(republishStorage);

}

protected void RemoveExpiredData(IStorage store)

{

  DateTime now = DateTime.Now;

  // ToList so our key list is resolved now as we remove keys.

  store.Keys.Where(k => (now - store.GetTimeStamp(k)).TotalSeconds >=
    store.GetExpirationTimeSec(k)).ToList().ForEach(k =>

  {

    store.Remove(k);

  });

}

Originator republishing

From the spec: “For Kademlia’s current application (file sharing), we also require the original publisher of a (key,value) pair to republish it every 24 hours. Otherwise, (key,value) pairs expire 24 hours after publication, so as to limit stale index information in the system. For other applications, such as digital certificates or cryptographic hash to value mappings, longer expiration times may be appropriate.

Republishing originator data is handled in a timer event that resends the key-values in the originator’s storage.

Code Listing 79: OriginatorRepublishElapsed

protected void OriginatorRepublishElapsed(object sender, ElapsedEventArgs e)

{

  DateTime now = DateTime.Now;

  originatorStorage.Keys.Where(
    k => (now - originatorStorage.GetTimeStamp(k)).TotalMilliseconds >=
      Constants.ORIGINATOR_REPUBLISH_INTERVAL).ForEach(k =>

  {

    ID key = new ID(k);

    // Just use close contacts, don't do a lookup.

    var contacts = node.BucketList.GetCloseContacts(key, node.OurContact.ID);

    contacts.ForEach(c =>

    {

      RpcError error = c.Protocol.Store(ourContact, key, originatorStorage.Get(key));

      HandleError(error, c);

    });

    originatorStorage.Touch(k);

  });

}

Republished key-values persist for 24 hours.

Storing key-values onto the new node when a new node registers

From the spec: “When a new node joins the system, it must store any key-value pair to which it is one of the k closest. Existing nodes, by similarly exploiting complete knowledge of their surrounding subtrees, will know which key-value pairs the new node should store. Any node learning of a new node therefore issues STORE RPCs to transfer relevant key-value pairs to the new node. To avoid redundant STORE RPCs, however, a node only transfers a key-value pair if it’s own ID is closer to the key than are the IDs of other nodes.

Interpretation:

A new node (contact) will be instructed to store key-values that exist on the bootstrapping node (the one it’s boostrapping with) for key-values that meet the following condition: The key XOR’d with the bootstrapping node’s ID < (closer than) the key XOR’d the IDs of other nodes.

What does “other nodes” mean? Are these all other contacts the bootstrapping node knows about, or just the k closest contacts in the joining node’s bucket, or some other interpretation? We have to understand what “exploiting complete knowledge of their surrounding subtrees” means. First, this indicates that it isn’t just the joining node’s bucket. It would make sense to interpret this as “store the values onto the joining node for any key-value where the joining node will be closer to that key when there are no other nodes that are closer.” If the joining node becomes the closest node to a key-value, then it is requested to store that key-value.

It’s interesting to note that this algorithm executes regardless of whether the bootstrapping node actually added the the joining node to a k-bucket. Remember also that “joining” actually means contacting another node with any one of the four RPC calls. When a new node registers, republished key-values persist for 24 hours.

The implementation is as follows.

Code Listing 80: SendKeyValuesIfNewContact

/// <summary>

/// For a new contact, we store values to that contact whose keys ^ ourContact
/// are less than stored keys ^ [otherContacts].

/// </summary>

protected void SendKeyValuesIfNewContact(Contact sender)

{

  List<Contact> contacts = new List<Contact>();

  if (IsNewContact(sender))

  {

    lock (bucketList)

    {

      // Clone so we can release the lock.

      contacts = new List<Contact>(bucketList.Buckets.SelectMany(b => b.Contacts));

    }

    if (contacts.Count() > 0)

    {

      // and our distance to the key < any other contact's distance to the key...

      storage.Keys.AsParallel().ForEach(k =>

      {

        // our min distance to the contact.

        var distance = contacts.Min(c => k ^ c.ID);

        // If our contact is closer, store the contact on its node.

        if ((k ^ ourContact.ID) < distance)

        {

          var error = sender.Protocol.Store(ourContact, new ID(k), storage.Get(k));

          dht?.HandleError(error, sender);

        }

      });

    }

  }

}

Annoyingly, for every stored value, there just isn’t any way to avoid performing the XOR computation on every contact. This could get expensive, and it is currently optimized using Linq’s parallel feature.

Determining whether a contact is new is slightly more complicated than one would think. We need to check not only whether the contact exists in any of our buckets, but also whether it’s a pending contact—one that wasn’t placed in a bucket because the bucket was full, but nonetheless has already received any closer keys.

Code Listing 81: IsNewContact

/// <summary>

/// Returns true if the contact isn't in the bucket list or the
/// pending contacts list.

/// </summary>

protected bool IsNewContact(Contact sender)

{

  bool ret;

  lock (bucketList)

  {

    // If we have a new contact...

    ret = bucketList.ContactExists(sender);

  }

  if (dht != null)            // for unit testing, dht may be null

  {

    lock (dht.PendingContacts)

    {

      ret |= dht.PendingContacts.ContainsBy(sender, c => c.ID);

    }

  }

  return !ret;

}

Over-caching

From the spec: “To avoid ‘over-caching,’ we make the expiration time of a (key,value) pair in any node’s database exponentially inversely proportional to the number of nodes between the current node and the node whose ID is closest to the key ID.

  • Inversely proportional: Meaning that the expiration time is shorter the more nodes that are between the current node and the closest node.
  • Exponentially inversely proportional: Meaning the expiration time is a lot shorter with the more nodes that are between the current node and closest node.

The specification provides no guidance for what the calculation for “exponentially inversely proportional” should actually be. It’s also undefined as to what the time constants are—what is a baseline time for which a key-value should persist? It is assumed that this should be a maximum of 24 hours. We also need to track an expiration time that is separate from the key-value republish timestamp. Furthermore, up to this point, I haven’t implemented the concept of accelerated lookup optimization, which is where the value of b comes from. In this implementation, where we have bucket ranges, rather than a bucket-per-bit in the key space, the accelerated lookup optimization is irrelevant, so we’ll use b==5 which is the spec’s recommended value for that optimization.

Also, who does the computation “between the current node and the node whose ID is closest to the key ID?” Is the current node:

  • The sender that is caching the key-value on another code and counts the number of nodes between itself and receiving node?
  • The receiver that is handling the store request and counts the number of nodes between itself and the sender node?

As discussed earlier, the entire concept of having separate stores (originator, republished, cached) is never discussed in the Kademlia specification. Without understanding these three different stores, trying to understand how caching works is probably impossible.

Caching occurs in only one place—when a value being looked up (and successfully found) is stored on a “close” node:

Code Listing 82: Handling Over-Caching

public (bool found, List<Contact> contacts, string val) FindValue(ID key)

{
  ...

  var lookup = router.Lookup(key, router.RpcFindValue);

  if (lookup.found)

  {

    ret = (true, null, lookup.val);

    // Find the first close contact (other than the one the value was found by)
    // in which to *cache* the key-value.

    var storeTo = lookup.contacts.Where(c => c != lookup.foundBy).
      OrderBy(c => c.ID ^ key).FirstOrDefault();

    if (storeTo != null)

    {

      int separatingNodes = GetSeparatingNodesCount(ourContact, storeTo);

    int expTimeSec = (int)(Constants.EXPIRATION_TIME_SECONDS /
      Math.Pow(2, separatingNodes));

    RpcError error = storeTo.Protocol.Store(node.OurContact, key, lookup.val,
      true, expTimeSec);

    HandleError(error, storeTo);

  }

}

Note the true flag, indicating that this RPC Store call is for caching purposes.

Never expiring republished key-values

It is reasonable for a cached key-value to expire, but we may never want to expire originator or republished key-values. One good example is a distributed blockchain (or distributed ledger) where data should never disappear, even if the original publisher disappears from the peer network. There are a variety of ways to do this, such as overriding, as in Code Listing 83.

Code Listing 83: Never Expiring Republished Key-Value

protected override void ExpireKeysElapsed(object sender, ElapsedEventArgs e)

{

  RemoveExpiredData(cacheStorage);

  // RemoveExpiredData(republishStorage);
}

In a subclass of the Dht overriding this method, only the cached store expires.

Storing key-values onto the new node when a new node registers

There’s a lot of setup here for creating two existing contacts and two key-values whose IDs have been specifically set. See the comments for the XOR distance “math.”

Code Listing 84: TestNewContactGetsStoredContactsTest

[TestMethod]

public void TestNewContactGetsStoredContactsTest()

{

  // Set up a node at the midpoint.

  // The existing node has the ID 10000....

  Node existing = new Node(new Contact(null, ID.Mid), new VirtualStorage());

  string val1 = "Value 1";

  string valMid = "Value Mid";

  // The existing node stores two items, one with an ID "hash" of 1,
  //the other with ID.Max

  // Simple storage, rather than executing the code for Store.

  existing.SimpleStore(ID.One, val1);

  existing.SimpleStore(ID.Mid, valMid);

  Assert.IsTrue(existing.Storage.Keys.Count == 2,
    "Expected the existing node to have two key-values.");

  // Create a contact in the existing node’s bucket list that is closer
  // to one of the values.

  // This contact has the prefix 010000....

  Contact otherContact = new Contact(null, ID.Zero.SetBit(158));

  Node other = new Node(otherContact, new VirtualStorage());

  existing.BucketList.Buckets[0].Contacts.Add(otherContact);

  // The unseen contact has a prefix 0110000....

  VirtualProtocol unseenvp = new VirtualProtocol();

  Contact unseenContact = new Contact(unseenvp, ID.Zero.SetBit(157));

  Node unseen = new Node(unseenContact, new VirtualStorage());

  unseenvp.Node = unseen;     // final fixup.

  Assert.IsTrue(unseen.Storage.Keys.Count == 0,
    "The unseen node shouldn't have any key-values!");

  // An unseen node pings, and we should get back valMin only,
  // as ID.One ^ ID.Mid < ID.Max ^ ID.Mid

  existing.Ping(unseenContact);

  // Contacts             V1          V2         

  // 10000000             00...0001   10...0000

  // 01000000

  // Math:

  // c1 ^ V1      c1 ^ V2      c2 ^ V1     c2 ^ V2    

  // 100...001    000...000    010...001   110...000

  // c1 ^ V1 > c1 ^ V2, so v1 doesn’t get sent to the unseen node.

  // c1 ^ V2 < c2 ^ V2, so it does get sent.

  Assert.IsTrue(unseen.Storage.Keys.Count == 1,
    "Expected 1 value stored in our new node.");

  Assert.IsTrue(unseen.Storage.Contains(ID.Mid),
    "Expected valMid to be stored.");

  Assert.IsTrue(unseen.Storage.Get(ID.Mid) == valMid,
    "Expected valMid value to match.");

}

Other optimizations

Ping is simply a “respond back with the random ID” that was sent. Internally, the buckets are potentially updated, and if the contact is new, store RPC calls are made to it for any values that it should store, as discussed above when a new node registers.

Piggy-backed ping

In his paper, Bruno Spori writes:[16]

“The situation is different when the first message a node received is a request message. In this case, the receiver cannot be sure whether the sender’s contact information [is] correct. It could be that the request was faked. To determine this, the piggy-backed ping is used. The effect of the piggy-backed ping is that the original sender of the request must send a ping reply upon receiving the reply message. Thus, the receiver of the request message is able to determine the correctness of the sender as well.”

We will instead rely on the error-handling mechanism for evicting contacts that do not respond or respond incorrectly or with errors. Error handling will be discussed later.

Scroll To Top
Disclaimer
DISCLAIMER: Web reader is currently in beta. Please report any issues through our support system. PDF and Kindle format files are also available for download.

Previous

Next



You are one step away from downloading ebooks from the Succinctly® series premier collection!
A confirmation has been sent to your email address. Please check and confirm your email subscription to complete the download.