left-icon

Web Servers Succinctly®
by Marc Clifton

Previous
Chapter

of
A
A
A

CHAPTER 3

Threads, Tasks, and Async/Await

Threads, Tasks, and Async/Await


In order to begin looking at our architecture, we really need to take a deep dive into the issues of threading. Along the way, we’ll discover some surprising things.

There are two basic options for how to handle incoming requests:

  • Multiple listeners: We create multiple listeners and process the request on the thread allocated to the continuation of the awaited GetContextAsync call. Because there is not a Windows Form, the continuation is free to allocate its own thread, as opposed to the Windows application behavior, which marshals onto the main application thread.
  • Single listener: A single thread listens for incoming connections and immediately queues that request so that it can go back to listening for the next connection request. A separate thread (or threads) processes the requests.

The source code presented in this section is in the folder Examples\Chapter 3\Demo-AsyncAwait in the Bitbucket repository.

Multiple Listeners

Let's look at instrumenting the StartConnectionListener function in the previous code so that we can get a sense of the processing times and threads. First, we’ll add a couple basic instrumentation functions in the Program class:

protected static DateTime timestampStart;

static public void TimeStampStart()

{

  timestampStart = DateTime.Now;

}

static public void TimeStamp(string msg)

{

  long elapsed = (long)(DateTime.Now - timestampStart).TotalMilliseconds;

  Console.WriteLine("{0} : {1}", elapsed, msg);

}

Code Listing 6

Next, we add the instrumentation to the StartConnectionListener, replacing the previous method with information on when and what thread the listener starts on. I also have replaced the handling of the response with a common “handler” object (described next).

/// <summary>

/// Await connections.

/// </summary>

static async void StartConnectionListener(HttpListener listener)

{

  TimeStamp("StartConnectionListener Thread ID: " + Thread.CurrentThread.ManagedThreadId);

  // Wait for a connection. Return to caller while we wait.

  HttpListenerContext context = await listener.GetContextAsync();

  // Release the semaphore so that another listener can be immediately started up.

  sem.Release();

  handler.Process(context);

}

Code Listing 7

Recall that these listeners are all initialized on a separate thread, but as noted previously, we let the .NET framework allocate a thread on the continuation. Here again is the code from Chapter 2 that initializes the listeners:

Task.Run(() =>

{

  while (true)

  {

    sem.WaitOne();

    StartConnectionListener(listener);

  }

});

Code Listing 8

For this test, I've created a ListenerThreadHandler class:

public class ListenerThreadHandler : CommonHandler, IRequestHandler

{

  public void Process(HttpListenerContext context)

  {

    Program.TimeStamp("Process Thread ID: " + Thread.CurrentThread.ManagedThreadId);

    CommonResponse(context);

  }

}

Code Listing 9

CommonResponse (a method of ListenerThreadHandler) artificially injects a one-second delay to simulate some complex process before issuing the response:

public void CommonResponse(HttpListenerContext context)

{

  // Artificial delay.

  Thread.Sleep(1000);

  // Get the request.

  HttpListenerRequest request = context.Request;

  HttpListenerResponse response = context.Response;

  // Get the path, everything up to the first ? and excluding the leading "/"

  string path = request.RawUrl.LeftOf("?").RightOf("/");

  // Load the file and respond with a UTF8 encoded version of it.

  string text = File.ReadAllText(path);

  byte[] data = Encoding.UTF8.GetBytes(text);

  response.ContentType = "text/html";

  response.ContentLength64 = data.Length;

  response.OutputStream.Write(data, 0, data.Length);

  response.ContentEncoding = Encoding.UTF8;

  response.StatusCode = 200; // OK

  response.OutputStream.Close();

}

Code Listing 10

The handler object is instantiated in the Main:

static void Main(string[] args)

{

  // Supports 20 simultaneous connections.

  sem = new Semaphore(20, 20);

  handler = new ListenerThreadHandler();
…etc…

Code Listing 11

After initializing the listeners, we’ll add a test to Main to see how the server responds to 10 effectively simultaneous, asynchronous requests:

TimeStampStart();

for (int i = 0; i < 10; i++)

{

  Console.WriteLine("Request #" + i);

  MakeRequest(i);

}

Code Listing 12

and:

/// <summary>

/// Issue GET request to localhost/index.html

/// </summary>

static async void MakeRequest(int i)

{

  TimeStamp("MakeRequest " + i + " start, Thread ID: " + Thread.CurrentThread.ManagedThreadId);

  string ret = await RequestIssuer.HttpGet("http://localhost/index.html");

  TimeStamp("MakeRequest " + i + " end, Thread ID: " + Thread.CurrentThread.ManagedThreadId);

}

Code Listing 13

RequestIssuer is an “awaitable” request and response function, meaning that it will issue a web request and return to the caller while awaiting the response. The response is handled in the await continuation:

public class RequestIssuer

{

  public static async Task<string> HttpGet(string url)

  {

    string ret;

    try

    {

      HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(url);

      request.Method = "GET";

      using (WebResponse response = await request.GetResponseAsync())

      {

        using (StreamReader reader = new StreamReader(response.GetResponseStream()))

        {

          ret = await reader.ReadToEndAsync();

        }

      }

    }

    catch (Exception ex)

    {

      ret = ex.Message;

    }

    return ret;

  }

}

Code Listing 14

In the previous code, once an asynchronous function blocks, the await will return to the caller and the next MakeRequest is issued. When the asynchronous function completes, MakeRequest continues.

Test Results

What we want to know is:

  • When was the request issued?
  • How long did it take to complete?
  • Was the continuation on the same thread as the request call, or a different thread?

In the trace log, we first see all the MakeRequest function calls all on the same thread, which is expected since they're all being issued by the same Task:

Request #0
 3 : MakeRequest 0 start, Thread ID: 1
 Request #1
 55 : MakeRequest 1 start, Thread ID: 1
 Request #2
 57 : MakeRequest 2 start, Thread ID: 1
 Request #3
 58 : MakeRequest 3 start, Thread ID: 1
 Request #4
 59 : MakeRequest 4 start, Thread ID: 1
 Request #5
 61 : MakeRequest 5 start, Thread ID: 1
 Request #6
 62 : MakeRequest 6 start, Thread ID: 1
 Request #7
 63 : MakeRequest 7 start, Thread ID: 1
 Request #8
 63 : MakeRequest 8 start, Thread ID: 1
 Request #9
 63 : MakeRequest 9 start, Thread ID: 1

Code Listing 15

Next, we see the process messages coming in as well as the MakeRequest "end" calls (I'm omitting the StartConnectionListener and MakeRequest messages for clarity):

78 : Process Thread ID: 11
79 : Process Thread ID: 5
80 : Process Thread ID: 9
81 : Process Thread ID: 10
 
783 : Process Thread ID: 12
 
1080 : Process Thread ID: 11
1084 : Process Thread ID: 5
1091 : Process Thread ID: 9
1106 : Process Thread ID: 10
 
1315 : Process Thread ID: 13
 
1789 : MakeRequest 7 end, Thread ID: 12

Code Listing 16

What's revealing here is that:

  • The requests appear to be processed in batches of four (the computer I'm testing on has four cores).
  • Threads are being re-used.
  • The continuation is not happening on the same thread. We expect that because this is a console application and we haven't defined a continuation context.
  • Because only "roughly" four threads are active at once, the whole process takes about 2.3 seconds to complete (odd how 10 requests / 4 threads is 2.5).

Conversely, observe what happens on an 8-core system:

38 : Process Thread ID: 15

38 : Process Thread ID: 13

38 : Process Thread ID: 5

38 : Process Thread ID: 16

39 : Process Thread ID: 17

39 : Process Thread ID: 14

40 : Process Thread ID: 19

41 : Process Thread ID: 18

782 : Process Thread ID: 20

1039 : Process Thread ID: 15

Code Listing 17

Now we see eight requests being processed simultaneously, and the last two occurring later. What's going on?

Why Async/Await is Not the Right Solution

From the previous trace, we can surmise that the thread being allocated for the continuation is allocated based on the number of CPU cores. This is really not the behavior we want. Many requests will involve file I/O, interacting with the database, contacting social media, and so forth, all of which are processes where the thread will be blocked waiting for a response. We certainly don’t want to delay the processing of other incoming requests simply because the mechanism for allocating the continuation thread thinks it should be based on available cores. Unfortunately, this mechanism seems to be in the bowels of how continuations are handled. It is not controllable through TaskCreationOptions because we’re dealing with how the continuation of the awaited call is being handled. All we can declare here is that this is not the implementation we want.

Allocating Our Own Threads

The source code presented in this section is in the Examples\Chapter 3\Demo-Threading folder in the Bitbucket repository.

What happens when we allocate the threads ourselves? Let's give that a try. First, we change the way the context listener threads are initialized, replacing TaskRun and semaphores with the creation of 20 listener threads:

for (int i = 0; i < 20; i++)

{

  Thread thread = new Thread(new ParameterizedThreadStart(WaitForConnection));

  thread.IsBackground = true;

  thread.Start(listener);

}

Code Listing 18

Then, instead of using async/await and semaphores, each thread blocks until a connection is received:

/// <summary>

/// Block until a connection is received.

/// </summary>

static void WaitForConnection(object objListener)

{

  HttpListener listener = (HttpListener)objListener;

  while (true)

  {

    TimeStamp("StartConnectionListener Thread ID: " + Thread.CurrentThread.ManagedThreadId);

    HttpListenerContext context = listener.GetContext();

    handler.Process(context);

  }

}

Code Listing 19

Now, when our requests are issued, we see immediately that they are processed by 10 unique threads:

75 : Process Thread ID: 3
 75 : Process Thread ID: 9
 75 : Process Thread ID: 4
 75 : Process Thread ID: 5
 76 : Process Thread ID: 8
 75 : Process Thread ID: 10
 76 : Process Thread ID: 7
 76 : Process Thread ID: 6
 76 : Process Thread ID: 11
 76 : Process Thread ID: 12

Code Listing 20

And we also see that the responses are all in the same "one second later" block of time:

1083 : MakeRequest 4 end, Thread ID: 31
 1090 : MakeRequest 2 end, Thread ID: 31
 1098 : MakeRequest 3 end, Thread ID: 31
 1097 : MakeRequest 1 end, Thread ID: 28
 1104 : MakeRequest 0 end, Thread ID: 32
 1091 : MakeRequest 8 end, Thread ID: 29
 1113 : MakeRequest 6 end, Thread ID: 29
 1088 : MakeRequest 5 end, Thread ID: 30
 1119 : MakeRequest 7 end, Thread ID: 32
 1121 : MakeRequest 9 end, Thread ID: 29

Code Listing 21

This unequivocally shows us that using async/await is not the right implementation choice! 

What about ThreadPool?

The source code presented in this section is in the Examples\Chapter 3\Demo-ThreadPool folder in the Bitbucket repository. But is the problem with async/await or the system ThreadPool? Using a ThreadPool is not ideal because we’re implementing long-running threads, but we’ll try it regardless:

For (int i = 0; i < 20; i++)

{

  ThreadPool.QueueUserWorkItem(WaitForConnection, listener);

}

Code Listing 22

Look at what happens to the initialization process:

781 : StartConnectionListener Thread ID: 7
 1313 : StartConnectionListener Thread ID: 8
 1845 : StartConnectionListener Thread ID: 9
 2377 : StartConnectionListener Thread ID: 10
 2909 : StartConnectionListener Thread ID: 11
 3441 : StartConnectionListener Thread ID: 12
 3973 : StartConnectionListener Thread ID: 13
 4505 : StartConnectionListener Thread ID: 14
 5037 : StartConnectionListener Thread ID: 15
 5569 : StartConnectionListener Thread ID: 16
 6100 : StartConnectionListener Thread ID: 17

Code Listing 23

We certainly experience what the MSDN documentation says regarding ThreadPool: “As part of its thread-management strategy, the thread pool delays before creating threads. Therefore, when a number of tasks are queued in a short period of time, there can be a significant delay before all the tasks are started.”

Fortunately though, once the threads have been initialized, we see that the processing happens simultaneously:

12121 : Process Thread ID: 4
 12123 : Process Thread ID: 5
 12125 : Process Thread ID: 6
 12125 : Process Thread ID: 3
 12127 : Process Thread ID: 7
 12127 : Process Thread ID: 10
 12127 : Process Thread ID: 11
 12128 : Process Thread ID: 9
 12128 : Process Thread ID: 12
 12128 : Process Thread ID: 8

Code Listing 24

So, while they work, thread pools are also not the correct solution. And as the MSDN documentation indicates, a thread pool is not the right solution here because 1) we’re creating a number of threads in a very short time, and 2) these threads will run perpetually for the life of the server. Furthermore, the threads will potentially block for long periods of timing waiting for connection requests—they are not short-lived threads.

Conclusion

It is now very clear that we should not use async/await to implement asynchronous connection requests. Async/await limits you to processing requests based on the number of cores, preventing you (and the CPU) from distributing request processing across more threads than you have cores. This will definitely be an issue, as it is common to query a database or third-party social media API in your request handler, and your thread will for the most part be waiting for a response, which should not stop other requests from being handled.

Single Thread Listener

The source code presented in this section is in the folder Examples\Chapter 3\Demo-SingleThreadListener in the Bitbucket repository.

Besides having determined that we need to use threads rather than the Task async/await mechanism, we also should consider whether we want multiple threads listening for requests or a single thread. With a single thread, one and only one thread is listening for incoming requests. As soon as a request is received, the request is placed into a queue and the thread immediately waits for the next request. In a separate thread, requests are de-queued and en-queued into a worker thread. We can implement different algorithms for determining which worker thread to en-queue the request, but in the implementation that follows, we use a simple round-robin algorithm.

We’ll begin with a helper class that allows us to create a queue for each thread and a semaphore for signaling the thread:

/// <summary>

/// Track the semaphore and context queue associated with a worker thread.

/// </summary>

public class ThreadSemaphore

{

  public int QueueCount { get { return requests.Count; } }

  protected Semaphore sem;

  protected ConcurrentQueue<HttpListenerContext> requests;

  public ThreadSemaphore()

  {

    sem = new Semaphore(0, Int32.MaxValue);

    requests = new ConcurrentQueue<HttpListenerContext>();

  }

  /// <summary>

  /// Enqueue a request context and release the semaphore that

  /// a thread is waiting on.

  /// </summary>

  public void Enqueue(HttpListenerContext context)

  {

    requests.Enqueue(context);

    sem.Release();

  }

  /// <summary>

  /// Wait for the semaphore to be released.

  /// </summary>

  public void WaitOne()

  {

    sem.WaitOne();

  }

  /// <summary>

  /// Dequeue a request.

  /// </summary>

  public bool TryDequeue(out HttpListenerContext context)

  {

    return requests.TryDequeue(out context);

  }

}

Code Listing 25

Note the use of .NET’s concurrent collection class, ConcurrentQueue, in Code Listing 25. These are high-performance collections that handle concurrent read/writes and alleviate the complexity of us having to write thread-safe collections.

Instead of processing the request immediately, our handler queues the request and returns. A separate thread de-queues the request and assigns it, round-robin, to a worker thread.

public class SingleThreadedQueueingHandler

{

  protected ConcurrentQueue<HttpListenerContext> requests;

  protected Semaphore semQueue;

  protected List<ThreadSemaphore> threadPool;

  protected const int MAX_WORKER_THREADS = 20;

  public SingleThreadedQueueingHandler()

  {

    threadPool = new List<ThreadSemaphore>();

    requests = new ConcurrentQueue<HttpListenerContext>();

    semQueue = new Semaphore(0, Int32.MaxValue);

    StartThreads();

    MonitorQueue();

  }

  protected void MonitorQueue()

  {

    Task.Run(() =>

    {

      int threadIdx = 0;

      // Forever...

      while (true)

      {

        // Wait until we have received a context.

        semQueue.WaitOne();

        HttpListenerContext context;

        if (requests.TryDequeue(out context))

        {

          // In a round-robin manner, queue up the request on the current

          // thread index then increment the index.

          threadPool[threadIdx].Enqueue(context);

          threadIdx = (threadIdx + 1) % MAX_WORKER_THREADS;

        }

      }

    });

  }

  /// <summary>

  /// Enqueue the received context rather than processing it.

  /// </summary>

  public void Process(HttpListenerContext context)

  {

    requests.Enqueue(context);

    semQueue.Release();

  }

  /// <summary>

  /// Start our worker threads.

  /// </summary>

  protected void StartThreads()

  {

    for (int i = 0; i < MAX_WORKER_THREADS; i++)

    {

      Thread thread = new Thread(new ParameterizedThreadStart(ProcessThread));

      thread.IsBackground = true;

      ThreadSemaphore ts = new ThreadSemaphore();

      threadPool.Add(ts);

      thread.Start(ts);

    }

  }

  /// <summary>

  /// As a thread, we wait until there's something to do.

  /// </summary>

  protected void ProcessThread(object state)

  {

    ThreadSemaphore ts = (ThreadSemaphore)state;

    while (true)

    {

      ts.WaitOne();

      HttpListenerContext context;

      if (ts.TryDequeue(out context))

      {

        Program.TimeStamp("Processing on thread " + Thread.CurrentThread.ManagedThreadId);

        CommonResponse(context);

      }

    }

  }

}

Code Listing 26

The result is what we should expect—our 10 requests begin processing simultaneously and complete processing simultaneously.

76 : Processing on thread 4
 76 : Processing on thread 3
 76 : Processing on thread 5
 77 : Processing on thread 6
 78 : Processing on thread 7
 78 : Processing on thread 8
 79 : Processing on thread 10
 79 : Processing on thread 11
 79 : Processing on thread 9
 81 : Processing on thread 12
 
 1086 : MakeRequest 0 end, Thread ID: 31
 1086 : MakeRequest 8 end, Thread ID: 29
 1093 : MakeRequest 1 end, Thread ID: 29
 1094 : MakeRequest 2 end, Thread ID: 29
 1102 : MakeRequest 7 end, Thread ID: 29
 1102 : MakeRequest 9 end, Thread ID: 31
 1109 : MakeRequest 3 end, Thread ID: 31
 1110 : MakeRequest 4 end, Thread ID: 29
 1111 : MakeRequest 6 end, Thread ID: 31
 1113 : MakeRequest 5 end, Thread ID: 31

Code Listing 27

Conclusion

The advantage of the single-threaded connection queuing approach is that it can consume thousands of requests very quickly, and those requests can then be queued onto a finite number of worker threads. The multi-listener approach will stop accepting requests when all the worker threads become busy. In either implementation, the client ends up waiting for its request to be serviced. The major advantage of the second approach is that you are not creating potentially thousands of threads to handle high volume periods. In fact, the single-thread listener approach could even be implemented to dynamically start allocating more threads as volume increases, or even to spool up additional servers. This approach is a much more flexible solution.

Scroll To Top
Disclaimer
DISCLAIMER: Web reader is currently in beta. Please report any issues through our support system. PDF and Kindle format files are also available for download.

Previous

Next



You are one step away from downloading ebooks from the Succinctly® series premier collection!
A confirmation has been sent to your email address. Please check and confirm your email subscription to complete the download.