Service Fabric services done right

Reading Time: 12 minutes

So, currently I’m working on a brand-new project involving variety of services being run on Service Fabric cluster in Azure. I’ve never had anything to do with Service Fabric (except for one, short hackathon) before, nor have any of my team mates. For the last couple of weeks, we’ve been struggling to make good architectural choices for the platform we’re still learning about. For the last few days we were working on a service that on certain triggers will populate Redis cache with certain sets of values. The logic of such service is pretty complex, as it has to keep two data sources (Redis and SQL DB) in sync, based on some sets of business rules. Another problem is that there are millions of those values to keep in sync. I’m not going to reveal all those rules and explain all the business domain (NDA + it would take too much time). I will just give you the quick overview of how we initially approached the problem and why we were wrong.

Initial approach

It was pretty obvious we will have some kind of concurrency in this case. As we mostly worked with services living on regular servers, we were all stuck in this mindset. We wanted to parallelize this long running process, but we were closed in regular, more common way of solving this kinda stuff. Something like: basing on certain business rules and our ways of distinguishing change, choose entities to update. Then chunk the collection up and schedule thread-safe processing. What could go wrong?

Good to have a people smarter than me around

Source: giphy.com

Just right before we were about to wrap work on this up, one of my team mates approached me and asked why the hell are we using Service Fabric at all, if we just solve the problems the wrong way. Service Fabric cluster operates on nodes, where multiple instances, replicas and partitions of multiple services can be run under one parent application. That means we are not limited to one thread, or one processor or even one machine. We have a whole cluster of multiple machines (potentially – for now I will use it locally on one machine) abstracted away into nodes where we can operate just like on a single machine. So why are we designing the application that will not use the power of more than one node? If we write the service in the classical way, deploying it to more than one node makes absolutely no sense. It will not make the processing quicker; it will have no positive effect at all.  It can even cause some bugs and a huge headache for us – handling concurrency is always pain in the ass. Sorry then, we must immediately change our mindset.

Actors to the rescue!

Source: giphy.com

Microsoft kindly gave us a few (almost) out of the box solutions to handle the problems like this. One of them is a tool called: Service Fabric Reliable Actors. Microsoft Docs described it as:

An actor is an isolated, independent unit of compute and state with single-threaded execution. The actor pattern is a computational model for concurrent or distributed systems in which a large number of these actors can execute simultaneously and independently of each other. Actors can communicate with each other and they can create more actors. Service Fabric Reliable Actors is an implementation of the actor design pattern. As with any software design pattern, the decision whether to use a specific pattern is made based on whether or not a software design problem fits the pattern.

So, an actor is like a very kind person that will get our message with a kind request to do some simple task and just go and do it. The best part we can have multiple people like this doing the same task with their own state, all thanks to service fabric undelaying bunch of code, logic and magic executing at the same time on multiple nodes. Accordingly, to the last sentence of the quote I will try to asses if the problem fits the proposed solution.

Just so you know, I’ve just learnt about all this, I’m still discovering this stuff. So, I decided to try it out on a very simple example and see how this is gonna work out.

Let’s write some code

For a proof of concept, I don’t need any kind of business domain or complicated set of business rules. I want to verify will this solution solve our problem and if it’s worth the effort. So, let’s create a new Service Fabric solution with:

  • NET Core stateless service that will receive triggers to execute long running logic
  • Actor service that will contain the execution of one unit of logic with one unit of state
  • Another actor service that will supervise the processing actors, handle task distribution etc.
  • Additionally, I will need a redis instance to test how this approach works with it

In short I will need a structure somewhat like this:

All this should be fairly quick to implement and give me a solid feedback about a sense of putting any effort into this approach. Let’s start with the last point, I will need redis to perform any operations and verify the results.

  • I will run redis using docker, cause I don’t want to waste my time on configuring this right now
  • I need windows port of redis cli to query redis instance and see what’s going on there

(I’m assuming you have docker installed – if not go there) Let’s hit the cmder with that:

docker run --name some-redis -p 6379:6379 -d redis

See, how easy that is! Now, we need redis-cli to actually use it. If you are using windows, here download this (It’s not a virus, I swear! You can trust me, I am a guy from the internet!). If you’re on linux you can use the normal download page like a civilized human being. If you’re on Mac then I’m sorry (just kidding, I’m sure there is some way).

Now, exciting! Just execute redis-cli to get inside!

λ redis-cli
127.0.0.1:6379> SADD testSet "Test"
(integer) 1
127.0.0.1:6379> SMEMBERS testSet
1) "Test"

That’s it. Now that we have our infrastructure, let’s get to the main dish.

Add a new Service Fabric service. Note: I recommend using VS2017 with Service Fabric SDK.

As a service type I will choose ASP.NET Core Stateless service. We will need this service to receive a trigger request. As a result of this operation we should have two projects: one is a service fabric application project and the other one is the web service I will use to send triggers to. To run the application in the local SF cluster I should setup SF application project as a Startup project. When code generation is done, I can add a new controller.

namespace WebService.Controllers
{
    [Route("api/longtask")]
    public class StartLongTaskController : Controller
    {
        private const int ActorsLimit = 20;

        [HttpGet("start")]
        public async Task<IActionResult> StartLongTask()
        {

        }
    }
}

Okay, that should be good enough for now. I’ll add a logic after implementing the actor services. I think the business logic of this application can be simply inserting a huge amount of numbers into redis. Should give me good enough results to make any decision about this approach.

Now, when I right-click on the SF app project I should see the option to add a new SF service.

Now I will choose Actor Service. It should generate two projects: one with the actor service logic and one with the interface implemented by a service. I will need that soon, to be able to reference actor instances from the web service. The interface will be used to call a specific async method on an actor instance. Actually, we will need two interfaces. One for supervisor and the other one for processor actor:

namespace LongTaskActorService.Interfaces
{
    public interface ILongTaskActorSupervisorService : IActor
    {
        Task StartProcessingAsync(Message<int[]> values);
    }
}

 

namespace LongTaskActorService.Interfaces
{
    public interface ILongTaskActorProcessorService : IActor
    {
        Task ProcessAsync(string supervisorId, Message<int[]> message, CancellationToken cancellationToken);
        Task<string> HelloFromActorAsync();
    }
}

Now I have to implement both actor classes:

 

namespace LongTaskActorService
{
    [ActorService(Name = "LongTaskActorSupervisorService")]
    [StatePersistence(StatePersistence.Persisted)]
    public class LongTaskActorSupervisorService : Actor, ILongTaskActorSupervisorService
    {
        private const int ActorsLimit = 20;
        private readonly Uri _processorActorUri = new Uri("fabric:/LongConcurrentTasks/LongTaskActorProcessorService");

        public LongTaskActorSupervisorService(ActorService actorService, ActorId actorId) : base(actorService, actorId)
        {
        }

        public async Task StartProcessingAsync(Message<int[]> message)
        {
            try
            {
                var result = await StateManager.TryGetStateAsync<CancellationTokenSource>(message.Id.ToString());

                if (result.HasValue)
                {
                    ActorEventSource.Current.Message($"SupervisorActor=[{Id}] is already processing MessageId=[{message.Id}].");
                    return;
                }

                var cancellationTokenSource = new CancellationTokenSource();
                await StateManager.TryAddStateAsync(message.Id.ToString(), cancellationTokenSource, cancellationTokenSource.Token);

                foreach (var chunkedCollection in message.Payload.Chunk(ActorsLimit))
                {
                    var processingActorProxy = ActorProxy.Create<ILongTaskActorProcessorService>(ActorId.CreateRandom(), _processorActorUri);
                    await processingActorProxy.ProcessAsync(Id.ToString(), new Message<int[]>
                    {
                        Id = Guid.NewGuid(),
                        Payload = chunkedCollection.ToArray()
                    }, cancellationTokenSource.Token);
                }
            }
            catch (Exception exception)
            {
                ActorEventSource.Current.Message($"Error occured in SupervisorActor=[{Id}]: {exception}");
            }
        }
    }
}

 

namespace LongTaskActorService
{
    [ActorService(Name = "LongTaskActorProcessorService")]
    [StatePersistence(StatePersistence.Volatile)]
    internal class LongTaskActorProcessorService : Actor, ILongTaskActorProcessorService, IRemindable
    {
        private const string RedisKey = "Test";
        private const string RedisConnectionString = "127.0.0.1:6379";
        private IConnectionMultiplexer _connectionMultiplexer;
        private IDatabase _redisDb;

        public LongTaskActorProcessorService(ActorService actorService, ActorId actorId)
            : base(actorService, actorId)
        {
        }

        protected override async Task OnActivateAsync()
        {
            await base.OnActivateAsync();
            _connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync(RedisConnectionString);
            _redisDb = _connectionMultiplexer.GetDatabase();
            ActorEventSource.Current.ActorMessage(this, "Actor activated.");
        }

        protected override async Task OnDeactivateAsync()
        {
            await base.OnDeactivateAsync();
            _connectionMultiplexer.Close();
        }

        public async Task ProcessAsync(string supervisorId, Message<int[]> message, CancellationToken cancellationToken)
        {
            try
            {
                await StateManager.TryAddStateAsync(message.Id.ToString(), message, cancellationToken);
                await StateManager.TryAddStateAsync(Id.ToString(), cancellationToken, cancellationToken);

                await RegisterReminderAsync(message.Id.ToString(), null, TimeSpan.Zero, TimeSpan.FromMilliseconds(-1));
            }
            catch (Exception exception)
            {
                ActorEventSource.Current.ActorHostInitializationFailed(exception.ToString());
            }
        }

        public async Task<string> HelloFromActorAsync()
        {
            await _redisDb.SetAddAsync(RedisKey, "Hello");
            return "Hello";
        }

        public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)
        {
            try
            {
                GetReminder(reminderName);
                var reminder = GetReminder(reminderName);

                if (reminder != null)
                {
                    await UnregisterReminderAsync(reminder);
                }
                
                var message = await StateManager.TryGetStateAsync<Message<int[]>>(reminderName);

                if (message.HasValue)
                {
                    foreach (var value in message.Value.Payload)
                    {
                        await _redisDb.SetAddAsync(RedisKey, value);
                    }
                }
                else
                {
                    ActorEventSource.Current.ActorMessage(this, "Received message has no value");
                }
            }
            catch (Exception exception)
            {
                ActorEventSource.Current.ActorHostInitializationFailed($"Error occured in ProcessorActor=[{Id}]: {exception}");
            }
        }
    }
}

 

Well, in a real world I could parametrized all those private constants and use service environmental parameters. But I don’t need it for this proof of concept, I just need to know how does it work (assuming it will work at all). Let me explain real quick what I did here.

[StatePersistence(StatePersistence.Volatile)]

This line means that the state of an actor will be held in the memory of a few nodes. So in case of a fatal failure of one of them, the state is still there and it’s not lost. But If the whole cluster fails, the state will be lost. If I would need more persistent state I would choose StatePersistance.Persisted, as it uses disk space to persist the state. But I don’t need it here, so let’s stick with the volatile type.

When an actor is activated, the following occurs:

  • When a call comes for an actor and one is not already active, a new actor is created.
  • The actor’s state is loaded if it’s maintaining state.
  • The OnActivateAsync method is called – I have overridden it to open a connection to redis

public async Task ProcessAsync(string supervisorId, Message<int[]> message, CancellationToken cancellationToken)

public async Task StartProcessingAsync(Message<int[]> message)

Both above lines are enforced by implementing the services interfaces I wrote before. Message class is my custom class containing the information for an actor to process. Important note: when sending messages between services in a cluster or when saving any information to the services state with the StateManager – used types (message type or type of data we save to the state) have to be serializable. Each time I send a message to an actor or add/get some important data into/from an actor state it will get serialized everytime. So, make sure your class is serializable, don’t waste your time like me trying to figure out why the actors won’t do their job and crash. Here is my implementation of the message:

using System.Runtime.Serialization;

namespace LongTaskActorService.Models
{
    [DataContract]
    public class Message<TPayload>
    {
        [DataMember]
        public Guid Id { get; set; }
        [DataMember]
        public TPayload Payload { get; set; }
    }
}

See, those simple attributes do the required magic to make the messaging work. But be careful with the stuff you put into the state, it can add plenty of memory and computational complexity to your service as that information will get serialized/deserialized each time you access them.

You might have noticed that processor actor implements one more mysterious interface:

IRemindable

As I mentioned at a beginning, the whole point of this proof of concept is to assert the possibility of parallelization of the operations. We want it to be scalable in a Service Fabric way, so when I deploy the service on a few more nodes – it will actually process stuff faster. Simple, classic parallelization will not do it. Well, not in my case. So, this interface comes with this method

public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period)

One way to provide parallelization in actor services is using reminders. So our main ProcessAsync contains none of the actual logic we want to execute. I use this method to save a received message in an actor state and then send a reminder to the very same actor about processing it as soon as it receives my reminder.

await RegisterReminderAsync(message.Id.ToString(), null, TimeSpan.Zero, TimeSpan.FromMilliseconds(-1));

I’m using message id as a reminder name, because why not – it’s fairly unique and is pretty consistent with the purpose of the reminder. Second parameter is the state we want to pass with the reminder, but I don’t need it – I will use the actor state. Hence null as a second parameter. Third parameter tells the service to remind the actor about this message immediately after registering it. As a last parameter, I passed -1 milliseconds, which is interpreted as – I don’t want you to retry reminding actor about this, if the first reminder fails.

So, the flow should look like this:

Of course, in this case I will have fire-and-forget kind of parallelism. But it is enough for me for now – if I will need the functionality of checking on status of the processing (I will for sure) then I will implement another method that will send a message to the supervisor with a progress on processing. Or I could go the other way around and implement a method of supervisor to check on each actor it created. It’s all possible and easy so whatever suits my needs – for now I want to know if the actor service is the way to go at all.

One last thing to explain is this line:

ActorProxy.Create<ILongTaskActorProcessorService>(ActorId.CreateRandom(), _processorActorUri);

To access the actor service, that runs somewhere in my service fabric cluster, I need to use ActorProxy and the uri pointing to it. The uri is basically something like this:

fabric:/[AplicationName]/[ServiceName]

So, in my case it will look like this:

fabric:/LongConcurrentTasks/LongTaskActorProcessorService

Now, when I have two basic actors implemented. I can write some logic for the web service endpoint. Let’s just create an IEnumerable with plenty of numbers and give that to actors in chunks so they can write it to Redis. During the execution, I will watch redis if it takes the numbers from the actors and if everything’s in order. Also, I will have a look at Service Fabric Explorer to see how the services are divided between nodes and if it fulfills my expectations.

namespace WebService.Controllers
{
    [Route("api/longtask")]
    public class StartLongTaskController : Controller
    {
        [HttpGet("start")]
        public async Task<IActionResult> StartLongTask()
        {
            IEnumerable<int> aLotOfValuesToAddToRedis = Enumerable.Range(0, 1000000);
            ILongTaskActorSupervisorService actorProcessorService = ActorProxy.Create<ILongTaskActorSupervisorService>(ActorId.CreateRandom(),
                new Uri("fabric:/LongConcurrentTasks/LongTaskActorSupervisorService"));

            try
            {
                await actorProcessorService.StartProcessingAsync(new Message<int[]>
                {
                    Id = Guid.NewGuid(),
                    Payload = aLotOfValuesToAddToRedis.ToArray()
                });
            }
            catch (Exception exception)
            {
                return new ObjectResult(exception);
            }

            return Accepted();
        }

        [HttpGet("start/normalParallel")]
        public IActionResult StartLongTaskWithoutActor()
        {
            IEnumerable<int> aLotOfValuesToAddToRedis = Enumerable.Range(0, 1000000);
            IDatabase redisDb = ConnectionMultiplexer.Connect("localhost:6379").GetDatabase();

            Parallel.ForEach(aLotOfValuesToAddToRedis, itemToAdd => redisDb.SetAdd("Test", itemToAdd));

            return Ok();
        }
    }
}

As you can see, I’ve added two endpoints – one with the usage of actor services and the other with simple parallel iterator adding stuff to redis. Now I will be able to compare how those two compare.

Should be good enough to check how it works. Let’s play!

Comparing the results

To assert the results, I’m using Postman and redis-cli. There is a very useful command for redis:

MONITOR

Now I will see everything that happens with my local redis. Let’s start with the actors. I will use the default settings for the services:

<Parameters>
    <Parameter Name="WebService_InstanceCount" DefaultValue="-1" />
    <Parameter Name="LongTaskActorSupervisorService_PartitionCount" DefaultValue="10" />
    <Parameter Name="LongTaskActorSupervisorService_MinReplicaSetSize" DefaultValue="3" />
    <Parameter Name="LongTaskActorSupervisorService_TargetReplicaSetSize" DefaultValue="3" />
    <Parameter Name="LongTaskActorProcessorService_PartitionCount" DefaultValue="10" />
    <Parameter Name="LongTaskActorProcessorService_MinReplicaSetSize" DefaultValue="3" />
    <Parameter Name="LongTaskActorProcessorService_TargetReplicaSetSize" DefaultValue="3" />
  </Parameters>

The application started correctly and I can see all the information under beautiful Service Fabric Explorer:

I’m sending the request to the service now.

And it’s going so fast! Man, look at this:

 

Well, on the gif it’s not looking that fast. But believe me, it’s working pretty good! I measured how long it took to insert 1 million of integers into redis. It was 156 seconds.

Let’s try with the classic way. Boom, here goes the request:

It’s going equally good I’d say. But taking into consideration that – the operation performed by both methods is very basic and not so time consuming – hence I lose more time on the technicals than the actual logic – and I’m doing it all on a one machine – so service fabric cluster loses its advantage and even gets the disadvantage of the overhead of the service fabric framework…  Even if the result of the classic parallel way is way better now – it will not scale as pretty as the service fabric services.

It took 74 seconds to process the same amount of data for the classic parallel iterator. So, I definitely have an answer for the single machine environment, if you’re going to use service fabric cluster on single machine – don’t bother with any complications. Just keep it simple. But if you’re going with multiple machines then you probably want to go with the more scalable way. There is a point where the classic approach reaches its limit and where the actors on multiple machines work more efficiently.

Pretty cool experiment!

Source: https://pixabay.com/en/rubber-duck-toy-bath-squeaky-fun-47245/

I was writing this post in parallel (hehe) with coding this proof of concept. A few times I had to delete whole paragraphs or a bunch of code. But it was fun! I hope you learned something along with me. If you have any comments about the solution or the code – let me know, man! I want to learn as much as possible about this, as I will spend many months working with this stuff. So, help me out.

I have a better understanding of the Service Fabric actor pattern implementation. It will surely allow me to solve the problems in the actual work more efficiently.

I don’t have my decision about using the actors yet, but this post will give me some arguments in a discussion with my team mates. Whatever we choose, I feel like it will be a lot more sensible decision after this investigation.

I had a lot of fun. Coding and writing a blog post about it in the same time boost up my productivity a lot and was a fun way to write a simple proof of concept. It kinda felt like pair programming with a rubbing duck, if you know what I mean.

If you want to have a look at whole code base I produced during this post, just go to my github repo. Feel free to comment!

4 Replies to “Service Fabric services done right”

  1. Pingback: dotnetomaniak.pl
  2. This article was the worst I’ve read. Simply a waste of time.
    Btw, you should design your web page such that the second half of the screen is not wasted on your picture, especially when you present code as part of the article.

Leave a Reply

Your email address will not be published. Required fields are marked *