hacker / welder / mechanic / carpenter / photographer / musician / writer / teacher / student

Musings of an Earth-bound carbon-based life form.

3 min read

High Throughput SNS with C#

The C# AWS SDK focuses mostly on async methods - these are great when you are writing applications that have some UI component and you need to access AWS resources in the background, often in a one-off fashion.

Where this model falls down is if when doing a large number of concurrent async activities because each async call spawns a thread leading to a huge number of pending async requests. In tests, this can cause throughput to drop to single or double-digit messages per second from a publisher and memory consumption to skyrocket.

Using a Thread Pool with SNS

One approach to this problem is to use a thread pool model where you build a collection of publishers that share a queue of work. This queue is implemented using the C# concurrent collection so that it’s thread safe and can be accessed by all the workers in the pool. A shortened version of the code is below; the full code can be found on GitHub.

… The Thread Pool

Our thread pool is pretty simple – a single ConcurrentQueue that is passed to a handful of Publisher actors. Each Publisher will poll the queue, dequeueing the next messate, and then use its SNS client to fire off a message. In tests, this consumes very little memory (a few MB) and can submit several hundred messages per second to SNS.

class PublisherPool
{
    private ConcurrentQueue<String> messageQueue;
    private List<Publisher> workerPool;
    private List<Thread> threadPool;

    public PublisherPool(int number)
    {
        this.messageQueue = new ConcurrentQueue<string>();
        this.workerPool = new List<Publisher>(number);
        this.threadPool = new List<Thread>(number);
        for(int i = 0; i < number; i++)
        {
            Publisher p = new Publisher();
            workerPool.Add(p);

            Thread t = p.WorkOn(messageQueue);
            t.Start();
            threadPool.Add(t);
        }
    }

    public void Publish(String message)
    {
        this.messageQueue.Enqueue(message);
    }

    public void WaitUntilEmpty()
    {
      // Sleep periodically waiting for queue to drain
    }

    public void StopAll()
    {
      // Stop all publishers and wait for all threads to join
    }
}

… The Publisher

The Publisher has only one job – dequeue a message, construct an SNS request, and pass that off to SNS, pausing only when the queue is empty to avoid busy waiting (an enteprising person could possibly eliminate any busy waiting with some sort of events…).

class Publisher
{
    private AmazonSimpleNotificationServiceClient _client;
    private Boolean _running;
    private ConcurrentQueue<String> messageQueue; 
    public string TopicArn => "TOPIC_ARN";

    public Publisher()
    {
        _running = true;  
        _client = new AmazonSimpleNotificationServiceClient 
                    ("KEY", "SECRET", Amazon.RegionEndpoint.USEast1);
    }


    public Thread WorkOn(ConcurrentQueue<String> queue)
    {
        this.messageQueue = queue; 
        return new Thread(new ThreadStart(Run));
    }

    private void Run() {
        String message;

        while (_running)
        {
            if (!messageQueue.IsEmpty)
            {
                messageQueue.TryDequeue(out message);
                PublishMessage(message);
                Console.WriteLine("Publishing: {0}", message);
            }
            else
            {
                Thread.Sleep(200);
            }
        }

        Console.WriteLine("Done!");
    }

    public void Stop()
    {
        _running = false;
    }

    void PublishMessage(string message)
    {
        var request = new PublishRequest(); // Fill out your message

        try
        {
            // Publish message synchronously
            _client.PublishAsync(request).Wait();
        } catch (Exception e)
        {
            Console.WriteLine("Error! {0}", e);
        } 
    }
}