Wire – Writing one of the fastest .NET serializers


First of all, there is no such thing as “the fastest” serializer, it is all contextual.
But under some conditions, I would however argue that Wire is, by far, the fastest of all the .NET serializers out there.

Given the following POCO type.

public class Poco
{
   public string StringProp { get; set; }
   public int IntProp { get; set; }
   public Guid GuidProp { get; set; }
   public DateTime DateProp { get; set; }
}

Round tripping one million objects of this type, that is, serializing and then deserializing a million objects using Wire with all optimizations on, completes in about 550 milliseconds on my personal laptop.

Doing the same using MS Bond, which is the second fastest serializer in the benchmark, takes about 830 milliseconds, and this is while being very generous to Bond as it has some very specific prerequisites.
Protobuf.NET which is the third serializer on this benchmark completes in about 1360 milliseconds.

Other serializers that was included in the same benchmark was Jil, NetSerializer, FS Pickler, Json.NET and .NET BinaryFormatter.

Just to clarify; this is very selective benchmarking, the old Lies, damned lies, and statistics and all that.
Running the same benchmark with smaller types, e.g. a POCO with only one or two properties favors Jil and NetSerializer a lot, NetSerializer beats Wire under those conditions.

I would also imagine that running benchmarks with a lot bigger types might favor e.g. Protobuf.NET as it does some clever byte buffer pooling.

Wire was originally built as a replacement for the Json.NET based serializer we use in Akka.NET.
Akka.NET is a concurrency and distributed computing framework based on messaging.

As such, the requirements we had for this was that it should support polymorphic types, it should support “surrogate” types, it should support the plethora of standard types we have in the .NET ecosystem, such as immutable collections, F# Discriminated Unions and so on, all of the things that had been bugging us with Json.NET.
Note: there is nothing wrong with Json.NET, it is a fantastic serializer, it is just not the right choice for Akka.NET.

Messages in Akka.NET is typically quite small, thus the shape of the POCO used in the benchmark above, it is fairly representative of a message.

This post is not intended to show how fast and fancy Wire is, but rather about some of the lessons learned while building and optimizing it.

When I stared to build Wire, speed was not my main concern, I just wanted it to be “fast enough”, the main concerns was the above mentioned requirements.

Having never built a serializer before, I did not know much about the topic.
I had a rough idea how I wanted to go about this, I knew I needed to preserve type information, even for primitives in some cases, e.g. if you serialize an array of object containing different primitives, which is exactly what we do for Actor constructor arguments in Akka.NET remote deployment.

It was also fairly obvious that it would be inefficient to write the entire type name for every such occurrence.

Therefore I introduced the idea of a ValueSerialize, this is a type that can serialize and deserialize the content of a given type, be it a complex or a primitive type.

Looking up value serializers by type

The very early attempts contained a concurrent dictionary of vale serializers, so that the serializer could check whatever value was about to be serialized or deserialized and then look up the correct value serializer.

This worked, but doing dictionary lookups is fairly costly, so this is where I first started to introduce some optimizations.

Instead of having code like:

public ValueSerializer GetSerializerByType(Type type)
{
  ValueSerializer serializer;

  if (_serializers.TryGetValue(type, out serializer))
    return serializer;

  //more code to build custom type serializers.. ignore for now.
}

I turned the code into something like:

public ValueSerializer GetSerializerByType(Type type)
{
  if (type == typeof(string))
    return StringSerializer.Instance;

  if (type == typeof(Int32))
    return Int32Serializer.Instance;

  if (type == typeof(Int64))
    return Int64Serializer.Instance;
  ....

This was a faster for primitives, no hashing or lookup needed, only reference checking.
But, there is one call in there for each comparison, can you spot it?

Calls to typeof() actually generates a bit of IL code:

ldtoken      [mscorlib]System.String
call         class [mscorlib]System.Type [mscorlib]System.Type::GetTypeFromHandle(valuetype [mscorlib]System.RuntimeTypeHandle)

We can prevent these extra calls per type simply by storing references to each primitive in advance:

public ValueSerializer GetSerializerByType(Type type)
{
  if (ReferenceEquals(type.GetTypeInfo().Assembly, ReflectionEx.CoreAssembly))
  {
    if (type == TypeEx.StringType) //we simply keep a reference to each primitive type
      return StringSerializer.Instance;

    if (type == TypeEx.Int32Type)
      return Int32Serializer.Instance;

    if (type == TypeEx.Int64Type)
      return Int64Serializer.Instance;

Another optimization that was introduced here was to only do these primitive lookups, if the type we want to lookup belongs to the the System.Core assembly.

This prevents unnecessary comparisons for any user defined type.

Once we had this, we could do fast serializer lookups for primitive types.
The conclusion from this part was to never assume the cost of anything, always profile, always decompile.

Looking up types when deserializing

Another issue that bit me big time early on, was to lookup types via. fully qualified names during deserialization.

If we want to deserialize a complex type, we first need to:

  1. Read the length of the type name
  2. Read an UTF8 encoded byte array containing the type name
  3. Translate the byte array to a string
  4. Lookup the type with this name
  5. Then finally lookup the value serializer for this type.

It turns out that looking up types through their name was really slow, e.g. Type.GetType(name).
Another thing that is horribly slow is to translate strings to and from UTF8 encoded byte arrays.

To avoid both of these issues together, I introduced the idea of an ByteArrayKey, a struct that contains a byte array with a pre-computed hash code.

This way, we can have a concurrent dictionary from ByteArrayKey to Type for fast lookups.
So instead of doing step 3 and 4, I could simply take the byte array and lookup the type directly.

The only time we need to execute 3 and 4 is when we get a cache miss, if the type have not been used before. This is a one time operation per type and process.

This had some really nice effect on the deserialization performance. and I’m pretty sure most other serializers do not do this trick yet.

Byte buffers, allocations and GC

In the very early code of Wire, I simply ignored how many allocations were made. was there a need for writing data into a buffer, I allocated the buffer in place and used it.

For example, when deserializing a string, the code looked something like:

//StringValueSerializer.cs
public override object ReadValue(Stream stream)
{
    var length = stream.ReadInt();
    var buffer = new byte[length];  //allocate a new buffer
    stream.Read(buffer,0,length);
    return Encoding.Utf8.GetString(buffer);
}

The above might be a bit pseudo but you get the gist of it.
This is clearly inefficient, it will take time to allocate new buffers and it will hit the GC hard if we have a lot of unused byte arrays floating around.

To solve this issue, I introduced the concept of “Sessions”, there is a SerializerSession and a DeserializerSession.

In the beginning, only the deserializer session contained code to deal with buffer recycling.
This allowed us to do something like this:

//StringValueSerializer.cs
public override object ReadValue(Stream stream, DeserializerSession session)
{
    var length = stream.ReadInt();          //length of the string in bytes
    var buffer = session.GetBuffer(length);  //fetch a preallocated buffer
    return Encoding.Utf8.GetString(buffer);
}

The session contains a small pre-allocated buffer, which can be re-used whenever a buffer is needed.
If a buffer with a larger size than the existing is requested, only then the buffer was re-allocated.

This saves us a lot of allocations and execution time overhead.

Recently, Szymon Kulec (http://blog.scooletz.com, https://twitter.com/Scooletz), part of the Particular Software team, started contributing to Wire.

He has added some truly awesome optimizations to Wire.
One of the things that he did was to introduce the same buffer concept for serializer sessions.

So when data is being written to a stream, we can now use the same trick.
He created an allocation free bitconverter, much like the built in BitConverter but instead writing bytes into an existing array.

This allowed us to go from code that looked like this:

//Int64ValueSerializer.cs
public override void WriteValue(Stream stream, object value)
{
     long l = (long)value;
     var bytes = BitConverter.GetBytes(l) //this allocates a new byte array every time
     stream.Write(bytes,0,bytes.length)
}

To something like this:

//Int64ValueSerializer.cs
public override void WriteValue(Stream stream, object value, SerializerSession session)
{
     const size = 8;
     long l = (long)value;
     var buffer = session.GetBuffer(size); //fetch a preallocated buffer 
     NoBitConverter.GetBytes(l, buffer)    //write the Int64 to the buffer
     stream.Write(bytes,0,size)
}

This way, we eliminate the same allocation and execution time overhead for allocating buffers when serializing.

There are some interesting tradeoffs here also.

Buffer recycling

In Protobuf.NET, Marc Gravell uses a BufferPool, which contains a lock free pool of byte arrays.
These arrays are fairly large, IIRC they are 1024 bytes, and they can be recycled, so once you are done with one of them, you can release it back to the pool.

This is obviously good if you need large buffers as you avoid allocations and execution time overhead from creating them.

It is fairly easy to use those from within Wire and make the session types use the same BufferPool type, I have tried this myself.
However, it turns out that just by touching the Interlocked members that are used inside the buffer pool, this hurts our performance in Wire for the kind of objects we aim to optimize for.

Therefore, we do not do this. we instead allocate a small byte array for each call to Serialize or Deserialize and resize if needed.

Clever allocations

The session types contains different types of lookups, e.g. there are lookups from identifier to object, from identifier to type, from type to identifier and so forth. things that the different sessions need to keep track of while serializing or deserializing.

One such lookup that is being hammered pretty hard during serialization is for checking if we need to output the type manifest for the object that is about to be written.

The type manifest should only be written once per session and then it should instead output an identifier to the already written manifest.

This was originally done using a Dictionary.

There are two things going on here, first we need to allocate this dictionary object for each session, as it keeps track of types per session, and we need to perform lookups against it.

Both of those operations are a bit costly, and most messages that we want to serialize are simple POCO’s with a few primitive properties only.

Do we really need to allocate and use this dictionary even if there only will be a single type in it most of the times?

No, we can simply cheat and allocate it later.
Like this:

public class FastTypeUShortDictionary
{
  private int _length; //this keeps track on how many types have been added
  private Type _firstType; //at first, just just set this member field
  private Dictionary<Type, ushort> _all; //this is only allocated once there are two types

The lookup will have 0 to n entries.
When there are 0 entries, we know there is nothing in it, so there are no allocations and any lookup will just return directly.
When there is 1 entry, the _firstType is set, so any lookup just compares the lookup type with the _firstType field.. still no allocations or hash lookups.
Only once we add a second type to the lookup, we will fallback and allocate the dictionary.

This save us a lot of allocations and heavy hashing lookups as most types are just a single user type and a few primitives.

Boxing, Unboxing and Virtual calls

As you might have seen already, the interface of the ValueSerializer type contains methods like abstract object Read(...) and abstract void Write(..., object value,...)

This causes boxing to occur for any value type being written or read.
I was skeptical that there would be any good solution to this due to the shape of the value serializer type that I defined very early on in the project.

Szymon however figured out that as we already do code generation for complex types, we could just as well let the value serializer join the code generation process.

He introduced the idea of EmitWriter and EmitReader into the value serializer.
This allows us to have typed implementations for each primitive and let the value serializer hook into the code generation process to inject the correct code to read and write the primitive, without calling any virtual method and without boxing.

We let the value serializer emit its code using an ICompiler abstraction, like so:

public sealed override void EmitWriteValue(ICompiler<ObjectWriter> c, int stream, int fieldValue, int session)
{
    var byteArray = c.GetVariable<byte[]>(DefaultCodeGenerator.PreallocatedByteBuffer);
    c.EmitStaticCall(_write, stream, fieldValue, byteArray);
}

Fast creation of empty objects

Wire relies on the old FormatterServices.GetUninitializedObject(type) in order to create empty instances of objects, this is because all types do not have a default constructor, and, we can’t know if the constructor has side effects or not.

But it turns out that calling a constructor is actually faster, the problem is that we need to know if it has side-effects or not.

You can however extract this information:

var defaultCtor = type.GetTypeInfo().GetConstructor(new Type[] {});
var il = defaultCtor?.GetMethodBody()?.GetILAsByteArray();
var sideEffectFreeCtor = il != null && il.Length <= 8; //this is the size of an empty ctor
if (sideEffectFreeCtor)

By extracting the constructor method body as IL byte code, and then just checking if it is 8 bytes (or less) then we know it is an empty constructor, and thus side effect free.

There are of-course a lot of other optimizations and interesting things going on in Wire, but at least this post give some insight into what and how we solved the main issues we experienced while building it.

Random things learned building Akka.NET – Part 1


In this short post I will explain some of the things I’ve learned building Akka.NET.
I will describe some of the friction points I have noticed and why I personally don’t use features like Akka Cluster to build entire systems.
Some of these thoughts might be obvious, some might be naive, but they do reflect my current view on building distributed systems.

Location transparency and Message contracts

Actors are supposed to be location transparent.
DCom, Corba and .NET remoting were all based on a local model, trying to make remote calls appear as local, in process calls, all failed for this very reason.
Never try to make the explicit implicit.
The Actor Model is remote first and can be optimized when used locally.

BUT:
We constantly fall into the trap of trying to make local message objects somehow become wire friendly.
Making something that is designed to work local only and try to shoehorn that into a world of network calls will result in problems.

Messages should be designed with a remote first mindset, explicit contracts that can support versioning and be maintained over time.

In Akka.NET we have tried very hard to make serialization just work magically for any message type, currently using Json.NET serializer and soon the Wire serializer.
This gives you low friction when getting started with Akka.NET, but it is the wrong design to use for real systems.

You should swap that out for a custom serializer using e.g. ProtoBuf, where your message contract are explicit and there are no magic or unexpected behavior involved.

Distributed monolith and the Unix philosophy

Actor model frameworks and languages does not play nicely between platforms.
Erlang, Pony, Akka, Akka.NET, Project Orleans, Service Fabric ActFab, Orbit, none of those can communicate with any of the others.
If you base your entire infrastructure on such framework or language, you are building a distributed monolith.

You pay a very high price when you decide to build your systems this way, none of the components or services in your systems can be replaced with another tech stack, you are forever bound to use the same stack until you replace the entire thing or surgically cut one of the parts out of it.

Instead, architect your systems as isolated islands, bounded contexts, and connect them using standard protocols and contracts, HTTP, JSON, XML, AMQP etc.
Then inside each of those isolated islands, feel free to use any of the above technologies.

The above does not just apply to actor model frameworks, it applies to any RPC or micro-service framework that have their own special service discovery, clustering support or protocols.

The Unix philosophy should be applied.
Use tools like Consul, Etcd, Zookeeper for service discovery, Docker using Swarm, Rancher, Kubernetes for deployment and clustering.
This gives you a lot more flexibility and options.
If it turns out that one of your choices didn’t work out, there are plenty of others to solve the same problem without completely redesigning your system.

There are of course cases where your system might have special requirements, such as extremely high throughput and/or latency requirements, then other rules apply.
Maybe you need to build a distributed monolith for such reasons, but it should not be the default when designing a new system.

More on this in the next post.

//Roger

Building a framework – The early Akka.NET history


In this post, I will try to cover some of the early history of Akka.NET and how and why things turned out the way they did.
Akka.NET of course have some parallel histories going as there are many contributors on the project.
But the post is written from my own point of view and my reasons for getting involved in this.

The butterfly effect

Back in 2005, I attended an architecture workshop initiated by Jimmy Nilsson, hosted in Lillehammer Norway.
One of the attendees there was a Einar Landre, he worked for Statoil at the time, and he talked about how they used asynchronous systems and how you could build eventually consistent systems using message passing.
I was totally sold on the concepts and as soon as I got back from the workshop, I introduced the concepts at my work, and build my first asynchronous message passing application, which is actually still in use today, ten years later.
This had a huge impact on me, and changed how I came to reason about systems and integrations and why I almost a decade later thought it was a good idea to port Akka.

I also met Mats Helander, he had just started developing an Object Relatonal mapper called NPersist.
NPersist was based on code generation, so I showed him my framework for aspect oriented programming I was building at the time, and explained how that would be able to get rid of all the code generation and make NPersist persistence ignorant via POCOs.
Me and Mats started working on these two tools together, we packaged them under an umbrella project called Puzzle Framework.
Back then, our competitor NHibernate was in alpha stage and featurewise we were way ahead of them.

But as time passed by, it turned out that NHibernate would become the winner, not because it was better, but because it attracted a lot more people due to it’s well known sister project, Hibernate on the JVM.
Hibernate had a lot of learning material; books, videos and tutorials.
So having the same framework on .NET of-course ment that you could re-use existing knowledge or learn from the vast set of resources.

Eventually me and Mats dropped the development of NPersist, at this time, NHibernate was already the de-facto standard and Linq to SQL had just been released, there were simply no reason for us to keep the project alive any more.

The most important thing that I learned in this process, was that adoption will always outweigh features, that is; documentation, ease of use and familiarity are worth more than shiny features if no one knows how to use them.

Laying the foundation

Now fast forward to 2013.
I was doing a consultancy gig for a Swedish agency, that project contained a fair deal of concurrency, multiple systems integrating with each other, all touching the same data, possibly at the same time.
During this project, I got more and more frustrated with the lack of concurrency tools for .NET, I started reading up on this topic and eventually stumbled upon the actor model and Akka on the JVM.

As often when I find an interesting programming topic, I had to try to implement some of these concepts myself, as this is my way of learning.
I did some weekend hacking, first using F# with pattern matching, mailbox processors and all the goodness that exists there.
I played around with some proof of concept implementations of the core concepts of Akka, as a learning experience and with the intent to make something I might be able to use in my everyday work.

However, I knew that if I ever should have any chance to get to use any of this in my client projects, I would have to switch over to C#, and the same was true to a large extent for attracting contributors, simply because C# has a much larger market share than F# has.

I also remembered the lesson learned from NPersist vs NHbernate, there were already a handful of small hobby hacks or abandoned actor frameworks on .NET, but I knew that if I would contribute to one of those, or roll my own, the result would still be something new unproven, untrusted and it would be extremely hard to get any adoption of such effort.

Porting Akka

A few weeks passed and eventually I actually had something that worked pretty well, quite a few of the core Akka-Actor features and some rudimentary Akka-Remote support like remote deployment and a fairly complete HOCON configuration parser was now in place.
The code was published on Github and the project was named “Pigeon” in a lame attempt to play on carrier pigeons for message passing.
(The name Pigeon can still be seen in the Akka.NET source code, as the main configuration file is still called “Pigeon.conf”)

The networking layer was a problem, I didn’t have much experience writing low level networking code, so the first early attempts of Akka-Remote used SignalR for communication, which later was replaced with a very naive socket implementation.

First class support for F#

Even if I decided to go for C# as the language of implementation, I still wanted to involve the F# community.
F# has a truly awesome opensource community around it, and I had seen that there was a genuine interest in the actor model over at the F# camp.
So I sent out a few requests on the F# forums, looking for someone who could help me build an idiomatic F# API on top of the C# code.

The Co-Pilot

One day in February (2014), I got an email by a guy named Aaron.

This is the actual letter:

Hello Roger!

My name is Aaron Stannard – I’m the Founder of MarkedUp Analytics, a .NET startup in Los Angeles. We build analytics and marketing automation tools for developers who author native applications for Microsoft platforms, including native Windows.
I began my own port of Akka to C# beginning in early December, and took a break right around Christmas. I just got back to it this week and discovered Pigeon when I was researching some details about the TPL Dataflow! I wish you had started this project a few weeks earlier ;)
My implementation of Akka is right around the same stage / maturity as yours, but Pigeon offers much better performance (3.5-5x), is more simply designed than mine, and you’ve already made a lot of headway on features that I haven’t even started on like Remoting and Configuration.
Therefore, I would like to stop working on my own implementation of Akka and support Pigeon instead. I think that would be a much better use of my time than trying to invent it all on my own.
I’m an experienced .NET OSS contributor – I currently maintain FluentCassandra (popular C# Cassandra driver) and have a bunch of projects of my own that I’ve open-sourced.

I plan on using Pigeon in at least two of our services, both of which operate under high loads.

Please let me know how I can help!
Best,
Aaron Stannard • Founder • MarkedUp

“I plan on using Pigeon in at least two of our services, both of which operate under high loads.”

Say WAAT!?

It turned out that Aaron had created his own networking lib called Helios, which was exactly what Akka-Remote needed.
Aaron joined the effort and started working on the akka-remote bits while I focused mostly on akka-actor and akka-testkit.
We had some nice progress going, and we contacted Jonas Bonér of Typesafe to see if we could use the name Akka as we aimed to be a pure port, which we got an OK to do.

Lift off

Now the project started to gain some real attention.
Håkan Canberger joined the team and Jérémie Chassaing contributed the first seed of the F# API.

At the same time, my youngest son Theo was born, 10 weeks too early and 1195 grams small, so I spend the next two months full time in the hospital, managing pull requests and issues on my phone.

This turned out to be a good thing for the project, up until that point, I had seen the project as “mine”, which is not a good mindset to have when trying to run a community project.

Meanwhile we gained more users and contributors and Aaron and Håkan were busy pushing new features.
Now all of a sudden we have people like the F# language inventor Don Syme retweeting our tweets.

Bartosz joins the team and sets out to complete the F# API.
This results in even more attention from the F# community, and Don Syme even did a code review of one of the F# API pull requests.

From that point on, the project have been pretty much self sustaining, with new contributors stepping up and contributing entire modules or integrations.

A lot more have of course happened since then, which may be the subject of another post, but I hope this post gives some insight into why Akka.NET came to be and why some of the early design choices was made.

With that being said, I’m sure the other developers have some interesting stories to share on why they got involved and what lead them down this route.

//Roger

Akka.NET + Azure: Azure ServiceBus integration


I know that there is some confusion out there on how Akka.NET relates to products like NServiceBus and Azure ServiceBus, I think that Akka.NET Co-founder Aaron Stannard said it the best;

they’re very complimentary Akka.NET makes a great consumer or producer for NServiceBus

Another closely related question that comes up from time to time is how to integrate Akka.NET actors with service buses.

How can we pull messages from a service bus and pass those to a number of worker actors w/o message loss?

One approach we can use to solve this is since actors in Akka.NET support the Ask operator.
We can pass a message to an actor and expect a response, this response will be delivered in form of a Task.

As the response is a task, we can pipe this task into a continuation and depending on if the response represents a processing success or failure from the worker actor, we can then decide what we want to do with the service bus message.

In this case, we might want to Ack the service bus message, telling the service bus that we are done with this message and it can be removed from the queue.

If the response was a failure, just ignore the failure and continue processing other messages.
As we haven’t acked the message to the service bus, the service bus will try to re-deliver the message to our client and we get the chance to try again some time later.

A simple implementation of this approach using Azure Service bus could look something like this:

namespace ConsoleApplication13
{
    //define your worker actor
    public class MyBusinessActor : ReceiveActor
    {
        public MyBusinessActor()
        {
            //here is where you should receive your business messages
            //apply domain logic, store to DB etc.
            Receive<string>(str =>
            {
                Console.WriteLine("{0} Processed {1}", Self.Path, s);

                //reply to the sender that everything went well
                //in this example, we pass back the message we received in a built in `Success` message
                //you can send back a Status.Failure incase of exceptions if you desire too
                //or just let it fail by timeout as we do in this example
                Sender.Tell(new Status.Success(s));
            });
        }
    }

    internal class Program
    {
        private static void Main(string[] args)
        {
            CreateMessages();

            using (var system = ActorSystem.CreateSystem("mysys"))
            {
                //spin up our workers
                //this should be done via config, but here we use a
                //hardcoded setup for simplicity

                //Do note that the workers can be spread across multiple
                //servers using Akka.Remote or Akka.Cluster
                var businessActor =
                    system.ActorOf(Props
                       .Create<MyBusinessActo>()
                       .WithRouter(new ConsistentHashingPool(10)));

                //start the message processor
                ProcessMessages(businessActor);

                //wait for user to end the application
                Console.ReadLine();
            }
        }

        private static async void ProcessMessages(IActorRef myBusinessActor)
        {
            //set up a azure SB subscription client
            //(or use a Queue client, or whatever client your specific MQ supports)
            var subscriptionClient = SubscriptionClient.Create("service1","service1");

            while (true)
            {
                //fetch a batch of messages
                var batch = await subscriptionClient.ReceiveBatchAsync(100, TimeSpan.FromSeconds(1));

                //transform the messages into a list of tasks
                //the tasks will either be successful and ack the MQ message
                //or they will timeout and do nothing
                var tasks = (
                    from res in batch
                    let importantMessage = res.GetBody<string>()
                    let ask = myBusinessActor
                        .Ask<Status.Success>(new ConsistentHashableEnvelope(importantMessage,
                            importantMessage.GetHashCode()),TimeSpan.FromSeconds(1))
                    let done = ask.ContinueWith(t =>;
                    {
                        if (t.IsCanceled)
                        {
                            Console.WriteLine("Failed to ack {0}", importantMessage);
                        }
                        else
                        {
                            res.Complete();
                            Console.WriteLine("Completed {0}", importantMessage);
                        }
                    },TaskContinuationOptions.None)
                    select done).ToList();

                //wait for all messages to either succeed or timeout
                await Task.WhenAll(tasks);
                Console.WriteLine("All messages handled (acked or failed)");
                //continue with the next batch
            }
        }

        //dummy method only used to prefill the msgqueue with data for this example
        private static void CreateMessages()
        {
            var client = TopicClient.Create("service1");

            for (var i = 0; i < 100; i++)
            {
                client.SendAsync(new BrokeredMessage("hello" + i)
                {
                    MessageId = Guid.NewGuid().ToString()
                });
            }
        }
    }
}

But do note that when applying this pattern, we now go from the default Akka.NET “At most once” deliver to “At least once”.

Why?

Because if we fail to ack the message back to the service bus, we will eventually receive the same message again at a later time.

It could be that our worker actor have processed the message correctly, stored it in some persistent store, but the ack back to the client might have failed, network problems, timeout or something similar.

Thus, the wervice bus meesage will not be removed from the queue and the client will receive it as soon as whatever locking mechanism is in place frees the message again.

One extremely nice feature in Akka.NET is the cluster support. cluster nodes can be added or removed to a live application, so we can easily spread our load over multiple worker actors on remove nodes.

Completely w/o writing any special code for this, we just need to configure our actor system to be part of a Akka.NET cluster.

HTH

Learning Azure, Day 2 | Servicebus


This is a continuation of my completely random learning experiences while trying to learn the Azure plattform.

Future messages

When you send a message on the Azure Servicebus, you have the option to set a ScheduledEnqueueTimeUtc property.
This value decides when the message will be visible to the receivers, it is kind of like sending a message in the future.

So when is this useful?

Let’s say we want to send a payment reminder 20 days after an order was placed, but only if no payments have arrived.
We could solve this using future messages.

Consider the following flow of messages

Receive PlaceOrder message -> Send Reminder (set ScheduledEnqueueTimeUtc to 20 days in the future)

..20 days pass

Receive Reminder message -> Check if the order have been paied, if not, send the reminder to the buyer using snailmail or email

This way, you don’t have to build additional scheduling logic to your system or add polling tables to your database, it is handled by the Servicebus itself.

Looks super useful, I have to look into how this scales, if messages can be kept for years without side effects.

That’s all for now

Learning Azure, Day 1 | Servicebus


I’ve decided to bite the bullet and finally dig into Azure.

My first learning experience was to toy around with the Servicebus.
Azure Servicebus is a message queue pretty much like the old MSMQ, but with a few more nifty features.
e.g. messages can have a time to live duration, you can chose between At most once or At least once delivery (ReceiveMode.ReceiveAndDelete vs ReceiveMode.PeekLock).

The first gotcha was that the sample I downloaded from MS, recreated the queue I intended to communicate with.
Which in turn causes your credentials for the queue to disappear, don´t fall for that :-)

It also turns out that the built in batch operations are way more performant than single operations.
So if possible, go for batching if you need high throughput.

Another thing that was expected but maybe not to that extent, was that using ReceiveAndDelete gives a lot better throughput than using PeekLock.
Using PeekLock, you have to Ack back to the queue yourself to notify the queue that you have dealt with the message.

This was using a single threaded client, so maybe the difference is not that big if processing async, I don’t know yet, but in the very naive example I’m using, there was a huge difference.

And for those who are just looking for some example code, here it is:

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using Microsoft.ServiceBus.Messaging;

namespace ConsoleApplication5
{
    class Program
    {
        private static QueueClient _queueClient;
        static void Main(string[] args)
        {          
            Console.WriteLine("Press anykey to start sending messages ...");
            Console.ReadKey();
            SendMessages();
            Console.WriteLine("Press anykey to start receiving messages that you just sent ...");
            Console.ReadKey();
            ReceiveMessages();
            Console.WriteLine("nEnd of scenario, press anykey to exit.");
            Console.ReadKey();
        }

        private static void SendMessages()
        {
            // connect to queue named "queue1"
            // use At most once delivery
            _queueClient = QueueClient.Create("queue1", ReceiveMode.ReceiveAndDelete);

            var messageList = new List<BrokeredMessage>();

            for (int i = 0; i < 1000; i++)
            {
                messageList.Add(new BrokeredMessage("Some message")
                {
                    MessageId = i.ToString(CultureInfo.InvariantCulture)
                });
            }

            Console.WriteLine("nSending messages to Queue...");

            // send 1000 messages to the queue
            _queueClient.SendBatch(messageList);
        }

        private static void ReceiveMessages()
        {
            Console.WriteLine("nReceiving message from Queue...");

            while (true)
            {
                //receive a batch of messages from Queue
                var messages = _queueClient.ReceiveBatch(100,
                    TimeSpan.FromMilliseconds(500)).ToList();

                Console.WriteLine("nDone...");

                foreach (var message in messages)
                {
                    if (message != null)
                    {
                        Console.WriteLine("Message received: Id = {0}, Body = {1}", 
                            message.MessageId, message.GetBody<string>());
                    }
                }
            }
        }
    }
}

Actor based distributed transactions


One question that often shows up when talking about the Actor Model, is how to deal with distributed transactions.

In .NET there is the concept of MSDTC, Microsoft Distributed Transaction Coordinator, that can be used to solve this problem when working with things like SQL Server etc.

The MS Research project Orleans (MS Azure Actor framework) also supports distributed transactions.
See 3.8 http://research.microsoft.com/pubs/153347/socc125-print.pdf on this.
[Edit] The transactions described in that paper was revoked, the Orleans team are currently working on a new implementation.

The problem with distributed transactions is that they are expensive, very expensive, they do not scale very well.

We as programmers are also trained to think of transactions as some sort of binary instant event that occurs, it either succeeds or it fails, and the time span is very short, but during this timespan, you freeze and lock everything that is involved with it.

In the real world however, transactions are more fuzzy, they don’t necessarily succeed or fail in a binary fashion, and they are far from instant.

And just to avoid confusion here, lets think of this as technical transactions vs. business transactions. In the end, they both ensures a degree of consistency, even though the concepts are different.

In the actor world, we can approach this the same way that the Saga pattern works.
See @Kellabytes excellent post on this topic: http://kellabyte.com/2012/05/30/clarifying-the-saga-pattern/

In Kellys post, she talks about failures in a technical context, but the failure could very well be a failure to comply to an agreement also. there would be no distinction between technical and business failures.

Let’s say that you purchase something on credit in a store, you make up a payment plan that stretches over X months.
During this time, you agree to pay Y amount of money at the end of each month for example.
If you do so, everything is fine, if you don’t, the store will send you a reminder, and if you still don’t pay, you will get fined.
This is a transaction that stretches over a very long time, and it can partially succeed.

Happy path:

happypath

Partial failure with compensating action:

overdue
Complete failure with compensating action:

failure

This is how you could model business transactions in a distributed system, it is asynchronous and it scales extremely well.

When two or more parties begin a transaction, all parties have to agree to some sort of contract before the transaction starts, this is your message flow, much like a protocol, that defines what happens if you violate the contract.A scheme of messages and actions that describe exactly how your (business) transaction is supposed to be resolved, and which of the involved parties that needs to ensure that a specific part of this agreement is handled.

This will make your transactional flow very business oriented, it goes very well with the concept of domain driven design. The transactional flow is actually just a process of domain events.