Akka.NET + Azure: Azure ServiceBus integration

I know that there is some confusion out there on how Akka.NET relates to products like NServiceBus and Azure ServiceBus, I think that Akka.NET Co-founder Aaron Stannard said it the best;

they’re very complimentary Akka.NET makes a great consumer or producer for NServiceBus

Another closely related question that comes up from time to time is how to integrate Akka.NET actors with service buses.

How can we pull messages from a service bus and pass those to a number of worker actors w/o message loss?

One approach we can use to solve this is since actors in Akka.NET support the Ask operator.
We can pass a message to an actor and expect a response, this response will be delivered in form of a Task.

As the response is a task, we can pipe this task into a continuation and depending on if the response represents a processing success or failure from the worker actor, we can then decide what we want to do with the service bus message.

In this case, we might want to Ack the service bus message, telling the service bus that we are done with this message and it can be removed from the queue.

If the response was a failure, just ignore the failure and continue processing other messages.
As we haven’t acked the message to the service bus, the service bus will try to re-deliver the message to our client and we get the chance to try again some time later.

A simple implementation of this approach using Azure Service bus could look something like this:

namespace ConsoleApplication13
    //define your worker actor
    public class MyBusinessActor : ReceiveActor
        public MyBusinessActor()
            //here is where you should receive your business messages
            //apply domain logic, store to DB etc.
            Receive<string>(str =>
                Console.WriteLine("{0} Processed {1}", Self.Path, s);

                //reply to the sender that everything went well
                //in this example, we pass back the message we received in a built in `Success` message
                //you can send back a Status.Failure incase of exceptions if you desire too
                //or just let it fail by timeout as we do in this example
                Sender.Tell(new Status.Success(s));

    internal class Program
        private static void Main(string[] args)

            using (var system = ActorSystem.CreateSystem("mysys"))
                //spin up our workers
                //this should be done via config, but here we use a
                //hardcoded setup for simplicity

                //Do note that the workers can be spread across multiple
                //servers using Akka.Remote or Akka.Cluster
                var businessActor =
                       .WithRouter(new ConsistentHashingPool(10)));

                //start the message processor

                //wait for user to end the application

        private static async void ProcessMessages(IActorRef myBusinessActor)
            //set up a azure SB subscription client
            //(or use a Queue client, or whatever client your specific MQ supports)
            var subscriptionClient = SubscriptionClient.Create("service1","service1");

            while (true)
                //fetch a batch of messages
                var batch = await subscriptionClient.ReceiveBatchAsync(100, TimeSpan.FromSeconds(1));

                //transform the messages into a list of tasks
                //the tasks will either be successful and ack the MQ message
                //or they will timeout and do nothing
                var tasks = (
                    from res in batch
                    let importantMessage = res.GetBody<string>()
                    let ask = myBusinessActor
                        .Ask<Status.Success>(new ConsistentHashableEnvelope(importantMessage,
                    let done = ask.ContinueWith(t =>;
                        if (t.IsCanceled)
                            Console.WriteLine("Failed to ack {0}", importantMessage);
                            Console.WriteLine("Completed {0}", importantMessage);
                    select done).ToList();

                //wait for all messages to either succeed or timeout
                await Task.WhenAll(tasks);
                Console.WriteLine("All messages handled (acked or failed)");
                //continue with the next batch

        //dummy method only used to prefill the msgqueue with data for this example
        private static void CreateMessages()
            var client = TopicClient.Create("service1");

            for (var i = 0; i < 100; i++)
                client.SendAsync(new BrokeredMessage("hello" + i)
                    MessageId = Guid.NewGuid().ToString()

But do note that when applying this pattern, we now go from the default Akka.NET “At most once” deliver to “At least once”.


Because if we fail to ack the message back to the service bus, we will eventually receive the same message again at a later time.

It could be that our worker actor have processed the message correctly, stored it in some persistent store, but the ack back to the client might have failed, network problems, timeout or something similar.

Thus, the wervice bus meesage will not be removed from the queue and the client will receive it as soon as whatever locking mechanism is in place frees the message again.

One extremely nice feature in Akka.NET is the cluster support. cluster nodes can be added or removed to a live application, so we can easily spread our load over multiple worker actors on remove nodes.

Completely w/o writing any special code for this, we just need to configure our actor system to be part of a Akka.NET cluster.



Learning Azure, Day 1 | Servicebus

I’ve decided to bite the bullet and finally dig into Azure.

My first learning experience was to toy around with the Servicebus.
Azure Servicebus is a message queue pretty much like the old MSMQ, but with a few more nifty features.
e.g. messages can have a time to live duration, you can chose between At most once or At least once delivery (ReceiveMode.ReceiveAndDelete vs ReceiveMode.PeekLock).

The first gotcha was that the sample I downloaded from MS, recreated the queue I intended to communicate with.
Which in turn causes your credentials for the queue to disappear, don´t fall for that :-)

It also turns out that the built in batch operations are way more performant than single operations.
So if possible, go for batching if you need high throughput.

Another thing that was expected but maybe not to that extent, was that using ReceiveAndDelete gives a lot better throughput than using PeekLock.
Using PeekLock, you have to Ack back to the queue yourself to notify the queue that you have dealt with the message.

This was using a single threaded client, so maybe the difference is not that big if processing async, I don’t know yet, but in the very naive example I’m using, there was a huge difference.

And for those who are just looking for some example code, here it is:

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using Microsoft.ServiceBus.Messaging;

namespace ConsoleApplication5
    class Program
        private static QueueClient _queueClient;
        static void Main(string[] args)
            Console.WriteLine("Press anykey to start sending messages ...");
            Console.WriteLine("Press anykey to start receiving messages that you just sent ...");
            Console.WriteLine("nEnd of scenario, press anykey to exit.");

        private static void SendMessages()
            // connect to queue named "queue1"
            // use At most once delivery
            _queueClient = QueueClient.Create("queue1", ReceiveMode.ReceiveAndDelete);

            var messageList = new List<BrokeredMessage>();

            for (int i = 0; i < 1000; i++)
                messageList.Add(new BrokeredMessage("Some message")
                    MessageId = i.ToString(CultureInfo.InvariantCulture)

            Console.WriteLine("nSending messages to Queue...");

            // send 1000 messages to the queue

        private static void ReceiveMessages()
            Console.WriteLine("nReceiving message from Queue...");

            while (true)
                //receive a batch of messages from Queue
                var messages = _queueClient.ReceiveBatch(100,


                foreach (var message in messages)
                    if (message != null)
                        Console.WriteLine("Message received: Id = {0}, Body = {1}", 
                            message.MessageId, message.GetBody<string>());

Introducing Akka.NET


There are a lot of things going on right now.
First, Pigeon Framework now has a new name; Akka.NET.
We got OK from Typesafe to use the name since we are a pure port of real Akka.

We are also doing a lot of work on the core and remote libs.
We now have a real EndpointManager actor managing transports.
And we have and Endpoint actor for each active endpoint.
Thus, we now support multiple transports, even if only Tcp is provided out of the box right now.

There have been some progress on Routers too.
We now support “Group” routers, e.g. RoundRobinGroup and ConsistentHashingGroup.
“Pool” router support is currently being developed.

Another nice feature we have ported is remote deployment.
This is IMO maybe the coolest feature we have ported so far, this means that we can now via configuration decide if an actor should be deployed locally or remote.
If remote deployment is used, the local actor system will send a message to the remote system, telling it to create the given actor with all of its configuration on the remote node.

For more info see: https://github.com/rogeralsing/Pigeon

Massive improvements to Pigeon – Akka Actors for .NET

The last few weeks have been busy busy.
Me and Aaron have been making some massive improvements to Pigeon.
Most of the Akka features are now completed, remoting still needs some love and after that we will start porting Akka clustering.

One of the latest features we have added is logging.
We support the same features as real Akka, so logging can be done using the ActorSystem.EventStream, and there is also a BusLogging LoggingAdapter.

We are also leveraging the real Akka config, so we can enable logging based on the same configuration options that Akka has.
Here is a snapshot of the output from the StandardOutLogger when booting the ChatServer example:

2014-02-21 22:40:44 DebugLevel EventStream - subscribing [akka://all-systems/StandardOutLogger] to channel Pigeon.Event.Info [Thread 9]
2014-02-21 22:40:44 DebugLevel EventStream - subscribing [akka://all-systems/StandardOutLogger] to channel Pigeon.Event.Warning [Thread 9]
2014-02-21 22:40:44 DebugLevel EventStream - subscribing [akka://all-systems/StandardOutLogger] to channel Pigeon.Event.Error [Thread 9]
2014-02-21 22:40:44 DebugLevel EventStream - StandardOutLogger started [Thread 9]
2014-02-21 22:40:44 DebugLevel akka.tcp://MyServer@localhost:8081 - now supervising akka.tcp://MyServer@localhost:8081/user [Thread 10]
2014-02-21 22:40:44 DebugLevel akka.tcp://MyServer@localhost:8081 - now supervising akka.tcp://MyServer@localhost:8081/system [Thread 10]
2014-02-21 22:40:44 DebugLevel akka.tcp://MyServer@localhost:8081/system - now supervising akka.tcp://MyServer@localhost:8081/system/deadLetterListener [Thread 11]
2014-02-21 22:40:44 DebugLevel EventStream - subscribing [akka.tcp://MyServer@localhost:8081/system/deadLetterListener] to channel Pigeon.Event.DeadLetter [Thread 9]
2014-02-21 22:40:44 DebugLevel akka.tcp://MyServer@localhost:8081/system - now supervising akka.tcp://MyServer@localhost:8081/system/logMyServer-DefaultLogger [Thread 10]
2014-02-21 22:40:44 DebugLevel EventStream(MyServer) - Default Loggers started [Thread 11]
2014-02-21 22:40:44 DebugLevel EventStream - subscribing [akka.tcp://MyServer@localhost:8081/system/logMyServer-DefaultLogger] to channel Pigeon.Event.Error [Thread 11]
2014-02-21 22:40:44 DebugLevel EventStream - unsubscribing [akka.tcp://MyServer@localhost:8081/system/logMyServer-DefaultLogger] from channel Pigeon.Event.Debug [Thread 11]
2014-02-21 22:40:44 DebugLevel EventStream - unsubscribing [akka.tcp://MyServer@localhost:8081/system/logMyServer-DefaultLogger] from channel Pigeon.Event.Info [Thread 11]
2014-02-21 22:40:44 DebugLevel EventStream - unsubscribing [akka.tcp://MyServer@localhost:8081/system/logMyServer-DefaultLogger] from channel Pigeon.Event.Warning [Thread 11]
2014-02-21 22:40:44 WarningLevel ActorSystem(MyServer) - {
  akka : {
    log-config-on-start : on
    stdout-loglevel : DEBUG
    loglevel : ERROR
    actor : {
      provider : "Pigeon.Remote.RemoteActorRefProvider, Pigeon.Remote"
      debug : {
        receive : on
        autoreceive : on
        lifecycle : on
        event-stream : on
        unhandled : on
    remote : {
      server : {
        host : localhost
        port : 8081
} [Thread 9]
2014-02-21 22:40:44 DebugLevel akka.tcp://MyServer@localhost:8081/user - now supervising akka.tcp://MyServer@localhost:8081/user/ChatServer [Thread 10]
2014-02-21 22:40:45 DebugLevel akka.tcp://MyServer@localhost:8081/user/ChatServer - received handled message ChatMessages.ConnectRequest [Thread 11]

Alot of the existing features have also been refined to conform even more to real Akka, e.g. ActorSelection have been rewritten to behave exactly like in Akka.
ActorRefs are now also serializable in messages, so you can send messages containing actorrefs across the wire and use them on remote systems.
The Akka RemoteDaemon is also coming along nicely, so we can create actors on remote nodes this way, the underlying deployment features are however not completed yet.

The Props class have also gotten new features, we can now configure dispatchers and mailboxes via the config and there is also a new dispatcher that lets you run actors in the main thread, to allow GUI updates in WPF/WinForms.

Well, there are alot of new features and bugfixes, so if you are interested in Actor Model Programming in .NET be sure to check out the Pigeon repository at https://github.com/rogeralsing/Pigeon



Configuration and Remote support for Pigeon – Akka Actors for .NET

I’ve been working quite a bit on my Akka port this weekend.
Finally got a a configuration system in place.
Trying to stay close to how Akka works, I decided to go for a Json based configuration, this is fairly close to the real Akka configurations while still not beeing too alien to .NET developers.

A config could look something like this:

Pigeon : {
    Actor : {
        Serializers : {
            json : ""Pigeon.Serialization.JsonSerializer"",
            java : ""Pigeon.Serialization.JavaSerializer"",
            proto : ""Pigeon.Remote.Serialization.ProtobufSerializer""
        DefaultDispatcher: {
            Throughput : 100
    Remote : {
        Server : {
            Host : """",
            Port : 8080

Remoting is also treated as an extension to the ActorSystem, so there is no longer any awkward subclass, like this:

using (var system = ActorSystem.Create("MyClient",config,new RemoteExtension()))

So now it’s possible to actually use Pigeon on two different machines using the config for host/port.

Read more at: https://github.com/rogeralsing/Pigeon

Throughput of Pigeon – Akka Actors for .NET

As some of you might know, I’m making an unofficial port of Akka for .NET.
(Why not Akka# or dotAkka? I simply assume that TypeSafe that makes Akka don’t want to be associated with spare time projects like this, so I try not to stain their brandname)

One important factor of a successful actor framework is how fast you are able to process messages – less overhead on message processing means better scalability.
Akka made some noise when they managed to process 50 million messages per second in their ping-pong benchmark example on an 48 core server.

I’ve now ported this benchmark to Pigeon so that one can get a hint of how the two compares.
This is the results from an 8 core laptop:

 Worker threads: 1023
 OSVersion: Microsoft Windows NT 6.2.9200.0
 ProcessorCount: 8
 ClockSpeed: 3392 MHZ
 Actor count, Messages/sec
 2, 7073000 messages/s
 4, 11760000 messages/s
 6, 14534000 messages/s
 8, 18039000 messages/s
 10, 20161000 messages/s
 12, 18785000 messages/s
 14, 17523000 messages/s
 16, 17482000 messages/s
 18, 17931000 messages/s
 20, 18575000 messages/s
 22, 18975000 messages/s
 24, 20920000 messages/s

I’m pretty pleased with the performance so far.
The code for the benchmark actors looks like this:

private static object Msg = new object();
private static object Run = new object();

public class Destination : UntypedActor
    protected override void OnReceive(object message)
        if (message == Msg)

public class Client : UntypedActor
    public long received;
    public long sent;

    public long repeat;
    private ActorRef actor;
    private TaskCompletionSource<bool> latch;

    public Client(ActorRef actor,long repeat,TaskCompletionSource<bool> latch )
        this.actor = actor;
        this.repeat = repeat;
        this.latch = latch;
    protected override void OnReceive(object message)
        if (message == Msg)
            if (sent < repeat)
            else if (received >= repeat)
                latch.SetResult(true); //instead of java countDownLatch
        if (message == Run)
            for (int i = 0; i < Math.Min(1000,repeat); i++)

If there are anyone out there that would like to join building a high performance Actor Model framework (staying as close to Akka as possible), let me know.

Hotswap and Supervision – Pigeon – Akka Actors for .NET

This is a follow up on my previous post; http://rogeralsing.com/2014/01/01/pigeon-akka-actors-for-net/

The code for this project can be found here: https://github.com/rogeralsing/Pigeon

And again, before I begin, please do note that Pigeon is just my hobby project of cloning some of the Akka Actor API.

Akka is in no way affiliated with me or my project.

I’ve done some refactoring to mimic Akka’s API more closely.
e.g. I’ve removed the need for IMessage on messages, you can now pass any object as an actor message.

I’ve also added some more nifty features from Akka – Hotswap and Supervision.

Hotswap is the ability to change the message receiver, this is useful if you have some sort of state machine, e.g. if you receive one message and you want to handle messages differently once that message is received.

Like this:

public class GreetingActor : UntypedActor
    protected override void OnReceive(IMessage message)
            .With<Greet>(m => {
                Console.WriteLine("Hello {0}", m.Who);
                //this could also be a lambda

    void OtherReceive(IMessage message)
            .With<Greet>(m => {
                Console.WriteLine("You already said hello!");
                //Unbecome() to revert to old behavior

In the example above, the actor will first output “Hello {who}” if a Greet message is passed to it.
But if we pass another greet message to it, it will output “You already said hello”.
This is ofcourse a contrived example, but it is very useful for larger state machines.
e.g. Lets say we write a game and each player/bot has its own actor.
We could then make it respond differently if you are stunned, dead, or trapped by some spell or whatever.

You can read more about this feature in the Akka documentation here: http://doc.akka.io/docs/akka/snapshot/java/untyped-actors.html#HotSwap

The next feature is Supervision, supervision lets a parent actor monitor it’s children and decide what to do if they start failing.
e.g. you can configure it to restart, restart or escalate if an exception of a given type occurs too many times in a given timespan.

Like this:

public class MyActor : UntypedActor
    private ActorRef logger = Context.ActorOf<LogActor>();

    // if any child, e.g. the logger above. throws an exception
    // apply the rules below
    // e.g. Restart the child if 10 exceptions occur in 30 seconds or less
    protected override SupervisorStrategy SupervisorStrategy()
        return new OneForOneStrategy(
            maxNumberOfRetries: 10,
            duration: TimeSpan.FromSeconds(30),
            decider: x =>
                if (x is ArithmeticException)
                    return Directive.Resume;
                if (x is NotSupportedException)
                    return Directive.Stop;

                return Directive.Restart;


The “logger” above is a child actor of “MyActor”, this means that MyActor is the parent and supervisor for logger.
If logger starts throwing exceptions, those will be forwarded to the supervisor strategy, and depending on configuration, the parent can resolve the issue by restarting or stopping the child etc.
You can read about this in the Akka documentation: http://doc.akka.io/docs/akka/snapshot/general/supervision.html