CHAPTER 12
These entry points to the node must be re-entrant:
The first issue is with add contacts and the bucket manipulation that occurs. Collections should not be modified or otherwise manipulated while they are searched. We’ve seen the use of lock statements in previous code to ensure that collections are not modified asynchronously.
There are potentially more optimized approaches, such as locking only the specific KBucket being manipulated and only locking the BucketList when it itself is being modified; however, I will leave those for another time.
It is also assumed that the storage implementation can be re-entrant. In the virtual storage, this is handled by ConcurrentDictionary instances, for example:
protected ConcurrentDictionary<BigInteger, StoreValue> store;
From the spec: “The initiator then sends parallel, asynchronous find_node RPCs to the a [sic] nodes it has chosen, a is a system-wide concurrency parameter, such as 3.”
In the lookup algorithm, Kademlia uses parallel, asynchronous queries to reduce timeout delays from failed nodes. Waiting for at least some of the nodes to respond in each batch of three closest nodes gives the system a chance to get even closer nodes to those first set of close nodes with the hope of acquiring k closer contacts without having to explore farther contacts.
It’s not particularly clear why all the k closer contacts aren’t queried in parallel to start with. Maybe the idea is that you want to try to get closer contacts from already close contacts. Certainly all the contacts could be queried and from the ones that respond first, we can select k closer ones. On the other hand, querying all the contacts simultaneously probably results in unnecessary network traffic as many of the FindNode RPC calls will be ignored.
For unit testing, it’s useful to keep the nonparallel implementation, but ideally, both parallel and nonparallel calls to the Router should be made in the same way. An abstract BaseRouter class allows for this.
Code Listing 88: The BaseRouter Class
public abstract class BaseRouter { Func<ID, Contact, (List<Contact> contacts, Contact foundBy, string val)> bool giveMeAll = false); } |
The ParallelRouter queues contacts to query, in addition to some other information each thread needs to know about when executing the RPC call.
Code Listing 89: ContactQueueItem
public class ContactQueueItem { public ID Key { get; set; } public Contact Contact { get; set; } public Func<ID, Contact, (List<Contact> contacts, Contact foundBy, string val)> public List<Contact> CloserContacts { get; set; } public List<Contact> FartherContacts { get; set; } public FindResult FindResult { get; set; } } |
The ParallelRouter also initializes an internal thread pool.
Code Listing 90: InitializeThreadPool
protected void InitializeThreadPool() { threads = new List<Thread>(); Constants.MAX_THREADS.ForEach(() => { Thread thread = new Thread(new ThreadStart(RpcCaller)); thread.IsBackground = true; thread.Start(); }); } |
Work is queued and a semaphore is released for a thread to pick up the work.
Code Listing 91: QueueWork
protected void QueueWork( ID key, Contact contact, Func<ID, Contact, (List<Contact> contacts, Contact foundBy, string val)> rpcCall, List<Contact> closerContacts, List<Contact> fartherContacts, FindResult findResult ) { contactQueue.Enqueue(new ContactQueueItem() { Key = key, Contact = contact, RpcCall = rpcCall, CloserContacts = closerContacts, FartherContacts = fartherContacts, FindResult = findResult }); Semaphore.Release(); } |
Each thread dequeues an item for work in the RpcCaller method.
Code Listing 92: RpcCaller
protected void RpcCaller() { while (true) { semaphore.WaitOne(); ContactQueueItem item; if (contactQueue.TryDequeue(out item)) { string val; Contact foundBy; if (GetCloserNodes( item.Key, item.Contact, item.RpcCall, item.CloserContacts, item.FartherContacts, out val, out foundBy)) { if (!stopWork) { // Possible multiple "found" lock (locker) { item.FindResult.Found = true; item.FindResult.FoundBy = foundBy; item.FindResult.FoundValue = val; item.FindResult.FoundContacts = new List<Contact>(item.CloserContacts); } } } } } } |
The salient point with the previous code is that when a value is found, it takes a snapshot of the current closer contacts and stores all the information about a closer contact in fields belonging to the ParallelLookup class.
The ParallelRouter must terminate its search after a certain amount of time, which handles unresponsive contacts. Whenever a response is received and new contacts are added to the list of contacts that can be queried, a timer is reset. The Lookup call exits when a value is found (for FindValue), or k closer contacts have been found, or the time period expires.
Code Listing 93: SetQueryTime
/// <summary> /// Sets the time of the query to now. /// </summary> protected void SetQueryTime() { now = DateTime.Now; } /// Returns true if the query time has expired. /// </summary> protected bool QueryTimeExpired() { return (DateTime.Now - now).TotalMilliseconds > Constants.QUERY_TIME; } |
The Lookup inner loop is the where the work is done, as with the nonparallel version, but notice instead how work is queued and we wait for responses—particularly the check for whether we’ve waited long enough in the haveWork assignment.
Code Listing 94: haveWork
... ret.AddRangeDistinctBy(closerContacts, (a, b) => a.ID == b.ID); // Spec: The lookup terminates when the initiator has queried and gotten responses from the k closest nodes it has seen. while (ret.Count < Constants.K && haveWork) { Thread.Sleep(Constants.RESPONSE_WAIT_TIME); if (ParallelFound(findResult, ref foundReturn)) { StopRemainingWork(); return foundReturn; } List<Contact> closerUncontactedNodes = List<Contact> fartherUncontactedNodes = bool haveCloser = closerUncontactedNodes.Count > 0; bool haveFarther = fartherUncontactedNodes.Count > 0; haveWork = haveCloser || haveFarther || !QueryTimeExpired(); // Spec: Of the k nodes the initiator has heard of closest to the target... if (haveCloser) { // We're about to contact these nodes. var alphaNodes = closerUncontactedNodes.Take(Constants.ALPHA); contactedNodes.AddRangeDistinctBy(alphaNodes, (a, b) => a.ID == b.ID); alphaNodes.ForEach(n => QueueWork( SetQueryTime(); } else if (haveFarther) { // We're about to contact these nodes. var alphaNodes = fartherUncontactedNodes.Take(Constants.ALPHA); contactedNodes.AddRangeDistinctBy(alphaNodes, (a, b) => a.ID == b.ID); alphaNodes.ForEach( SetQueryTime(); } } |
We can now take the Dht tests for the nonparallel version and create parallel versions of those tests, passing in the ParallelRouter instead:
Dht dht = new Dht(ID.RandomID, vp, () => new VirtualStorage(), new ParallelRouter());
The result is that the parallel router unit tests also pass.

Figure 9
A potential problem occurs when there are threads still waiting for a response, and that response possibly occurs at some point after the Lookup method exits. We deal with this in several ways:
This ensures that even if there are threads still performing work on a previous lookup, they do not affect the results of the current lookup.
While the use of a single locker object for blocking updates to collections and updating the find value is slightly inefficient, it avoids using nested locks; otherwise the thread, when if finds a value, would technically have to lock both the closerContacts collection and the findResult instance. Nested locks should be avoided. Also note that the Lookup method itself is not intended to be re-entrant.