Creating a Distributed Computing Engine with the Actor Model and .NET Core

Home / Creating a Distributed Computing Engine with the Actor Model and .NET Core

Over the past year and a few months, we, my coworkers and I that is, have been using an Actor Model framework to implement a sort-of state-engine-based distributing computing engine. Using a generalized framework for this seemed to introduce many complexities that, in my mind, could potentially be simplified. My weekend project is centered around exploring what can be done out-of-the-box with .NET Core.


An Actor model pattern seems like a viable method for handling computing tasks. According to Wikipedia, the basic Actor model (pattern) is this:

The Actor model adopts the philosophy that everything is an actor.

This is similar to the everything is an object philosophy used by some object-oriented programming languages.

An actor is a computational entity that, in response to a message it receives, can concurrently:

  • send a finite number of messages to other actors;
  • create a finite number of new actors;
  • designate the behavior to be used for the next message it receives.

I break away from this a bit in that I don’t consider everything an Actor.

When computing involves a work flow, like if all tasks in a given list of tasks must be completed before the next “step” can be performed, a state engine is needed to manage the work flow. I think of a cluster of nodes where each node contains actors/tasks. The cluster can accept any number of nodes. In this regard, each node is identical. The idea of a centralized cluster-control mechanism with state management is outside of the purview of the Actor model. In the clustering idea, my thought is that each node can contain the finite number of actors and it is the node that will receive messages from the cluster and tell the actors what to do. Conversely, each node can relay status back to the cluster. The actors can receive strongly typed messages and perform their computation as needed based on the type of message. In the context of Actors, each actor will have an inbox and the cluster will have an outbox for tracking tasks/work it has requested. The outbox concept will facilitate letting the nodes convey to the cluster that a particular request to an actors inbox was completed. The structure might look like this:

One of features touted by the latest .NET Core is a lighter pipeline that can send and receive messages at a faster rate. That is to say, scale-ability has been improved and .NET Core is decoupled from IIS. With the decoupling comes the ability to self-host with the Kestrel server as a Windows service or as a web site. Effectively, this means we can create our services just as we would a web site and use .NET Core’s MVC framework for our API communications between the cluster and nodes. I like using the built-in features of .NET, personally, for communications.

Jumping right in, I created (3) ASP.NET Core Websites and a class library within a Visual Studio solution. The ClusterMaster is, well, my cluster and the two worker projects are my nodes.

I wanted to keep everything reusable, so nearly all of the “meat” of the solution is the Common library. I have .NET Controllers, helpers, Actor code, and all messages/services/etc in this project. Hooking that code into a Web Site (Service) application, becomes very easy.

Part of this side-project was born out of another services-related project I was working in which I wanted to track service status. I wrote a little status tracker with callbacks (Action<T>) type stuff where T defined the status message that is sent back and forth. It become the ground-work, or idea, for tracking state. You’ll see later on that IWorkerStatus is injected as a singleton into each node and the cluster. Additionally, the WorkerStatus provides the WorkerNodes with the capability of defining their actors and passing messages to them in a round-robin queuing manner.

The WorkerStatus implementation provides options to add Actors, remove Actors, and add callbacks for status updates. I sometimes use these callbacks to trigger workflow steps and such.

public interface IWorkerSatus<T> : IDisposable where T : class
{
    T CurrentStatus { get; }
    DateTime Date { get; set; }
    ConcurrentQueue<T> StatusMessages { get; }
    TimeSpan TimeSpanSinceLastStatus { get; }
    string TimeSinceLastStatus { get; }
    int MaxMessages { get; set; }

    void AddStatus(T message);
    void AddCallback(string id, Action<T> action);
    void RemoveCallback(string id);

    void RemoveActor(string id);
    void AddActor(string id, IActor<T> actor);
    void AddActorTell(string id, IActor<T> action);

    IActor<T> Next { get; }
}

public class WorkerStatus<T> : IWorkerSatus<T> where T : class
{
    private DateTime _lastTimeReceived;
    private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private ConcurrentDictionary<string, IActor<T>> _actors = new ConcurrentDictionary<string, IActor<T>>();
    private ConcurrentDictionary<string, Action<T>> _actions = new ConcurrentDictionary<string, Action<T>>();
    private int _lastIndex = 0;

    public IActor<T> Next
    {
        get
        {
            if ((_actors?.Count ?? 0) == 0)
            {
                return null;
            }
                
            if (_lastIndex == _actors.Count - 1)
            {
                _lastIndex = 0;
            }
            else
            {
                _lastIndex++;
            }

            return _actors.ElementAt(_lastIndex).Value;
        }
    }

    public string TimeSinceLastStatus
    {
        get
        {
            var timespan = DateTime.UtcNow - _lastTimeReceived;
            return timespan.ToString(@"mm\:ss\.ffff");
        }
    }

    public TimeSpan TimeSpanSinceLastStatus { get { return DateTime.UtcNow - _lastTimeReceived; } }
    public T CurrentStatus { get { return StatusMessages.LastOrDefault(); } }
    public DateTime Date { get; set; } = DateTime.UtcNow;
    public ConcurrentQueue<T> StatusMessages { get { return _queue; } }
    public int MaxMessages { get; set; } = 20;

    public WorkerStatus()
    {

    }

    public void AddStatus(T message)
    {
        _lastTimeReceived = DateTime.UtcNow;
        _queue.Enqueue(message);

        T outItem;
        while (_queue.Count > this.MaxMessages)
        {
            _queue.TryDequeue(out outItem);
        }

        T dequeuedMessage = this.CurrentStatus;

        if (_actions.Count > 0)
        {
            foreach (var action in _actions)
            {
                action.Value.Invoke(dequeuedMessage);
            }
        }
    }

    public void AddCallback(string id, Action<T> action)
    {
        RemoveCallback(id);
        _actions.TryAdd(id, action);
    }

    public void RemoveCallback(string id)
    {
        if (_actions.ContainsKey(id))
        {
            Action<T> outAction;
            _actions.TryRemove(id, out outAction);
        }
    }

    public void Dispose()
    {

    }

    public void RemoveActor(string id)
    {
        if (_actors.ContainsKey(id))
        {
            IActor<T> outActor;
            if (_actors.TryRemove(id, out outActor))
            {
                //outActor.Tell<string>(x => "Hello");
            }
        }
    }

    public void AddActor(string id, IActor<T> actor)
    {
        RemoveActor(id);
        _actors.TryAdd(id, actor);
    }

    public void AddActorTell(string id, IActor<T> action)
    {
        throw new NotImplementedException();
    }
}

For the Actor, I have implemented an interface, an abstract class, and base concrete implementation of the abstract class.

Each actor must, at a minimum implement the basics – like being able to be told what to do, provide a cancellation token since they need to be async (multi threaded), and such. They also need to provide a “Callback” to allow telling their node that they are done with a task.

public interface IActor<T>
{
    string Id { get; set; }
    CancellationToken CancellationToken { get; set; }

    // Track my own status
    void AddStatus(T message);

    // For now, actor callbacks are used to pass completion
    void Callback(T message);

    // For cross actor/object communication
    void AddCallback(string id, Action<T> action);

    // For removing tells
    void RemoveCallback(string id);

    // Incoming message
    void Tell<V>(V message);
}

Here’s the abstract class for the BaseActor. T is the status message type. We have a status message queue, an IsBusy indicator which uses locks to make it thread safe, and some other plumbing.

public abstract class BaseActor<T> : IActor<T> where T : new()
{
    private string _id;
    private bool _isBusy;

    public string Id { get { return _id; } set { _id = value; } }
    public CancellationToken CancellationToken { get { return _cancellationToken; } set { _cancellationToken = value; } }
    public int MaxMessages { get; set; } = 20;
    public ConcurrentQueue<T> StatusMessages { get { return _queue; } }
    public TimeSpan TimeSpanSinceLastStatus { get { return DateTime.UtcNow - _lastTimeReceived; } }
    public T CurrentStatus { get { return StatusMessages.LastOrDefault(); } }

    protected CancellationToken _cancellationToken;
    private DateTime _lastTimeReceived;
    private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private ConcurrentDictionary<Type, Action<object>> _handlers = new ConcurrentDictionary<Type, Action<object>>();
    private ConcurrentDictionary<string, Action<T>> _actions = new ConcurrentDictionary<string, Action<T>>();
    private ConcurrentQueue<object> _mailbox = new ConcurrentQueue<object>();
    private object _lockObj = new object();

    /// <summary>
    /// Thread-safe busy indicator
    /// </summary>
    public bool IsBusy
    {
        get { lock (_lockObj) { return _isBusy; } }
        set { lock (_lockObj) { _isBusy = value; } }
    }

The _actions, _queue, RemoveCallback, and AddStatus are all provided for dealing with status messages. I’ll eventually make the the actor have a handle to its parent node with the node being an actor itself. Then, I can eliminate some of the T message type references and simply Tell the node the actors status. For now though, there is the explicit callback for status updates. That was the intent, though it changed a little bit.

    public void AddCallback(string id, Action<T> action)
    {
        RemoveCallback(id);
        _actions.TryAdd(id, action);
    }

    public void RemoveCallback(string id)
    {
        if (_actions.ContainsKey(id))
        {
            Action<T> outAction;
            _actions.TryRemove(id, out outAction);
        }
    }

    public void AddStatus(T message)
    {
        _lastTimeReceived = DateTime.UtcNow;
        //_queue.Enqueue(new StatusMessage() { Message = message.Replace(@"{", @"{{").Replace(@"}", @"}}") });
        _queue.Enqueue(message);

        T outItem;
        while (_queue.Count > this.MaxMessages)
        {
            _queue.TryDequeue(out outItem);
        }
    }

As I mentioned, the _actions dictionary was intended to provide actions to invoke when status messages are bubbled up from the actor. However, I found that I wasn’t using it through my test project and created a separate Callback so that only messages that explicitly sent to this Callback invoke an action. That action is intended to be the “actor is done” call to allow enqueuing/dequeuing of requests to proceed. The node will also relay this completion, eventually, to the cluster for state tracking.

    public void Callback(T message)
    {
        if (_actions.Count > 0)
        {
            foreach (var action in _actions)
            {
                action.Value.Invoke(message);
            }
        }
    }

The Receive “action adder” method is interesting. With this method in the abstract class, we can define our type message handlers. These handlers are then stored in our ConcurrentDictionary and keyed by message type. Having to convert the actions to Action<object> was an interesting little problem to figure out.

    protected void Receive<V>(Action<V> handler)
    {
        var key = typeof(V);
        // We have to wrap the action order for it to be Action<object>!
        var handlerObj = new Action<object>(obj => { var castObj = (V)Convert.ChangeType(obj, typeof(V)); handler(castObj); });
        _handlers.TryAdd(typeof(V), handlerObj);
    }

The Tell method is the point of entry where we can communicate with the Actor. We can tell the actor anything by passing in a strongly typed message. The actor will enqueue the message and then process it when it gets around to it. The actors process messages one at a time from the queue.

The ProcessMessage implementation dequeues the messages, sets the busy status, and checks to see is there is a specific action that is defined to handle the message. If the Action is found, then a Task is spawned to run to invoke the action.

    public void Tell<V>(V message)
    {
        _mailbox.Enqueue(message);

        // Run a separate thread to kick off the next process so that any incoming tells
        // don't wait.
        Task.Run(() =>
        {
            ProcessNextMessage();
        }, _cancellationToken);
    }

    private void ProcessNextMessage()
    {
        if (_mailbox.Count > 0 && _handlers.Count > 0 && !IsBusy)
        {
            IsBusy = true;
            object message = null;
            _mailbox.TryDequeue(out message);
            Action<object> handler = null;
            var key = message.GetType();
            if (_handlers.TryGetValue(key, out handler))
            {
                var task = new Task(() =>
                {
                    try
                    {
                        handler(message);
                        IsBusy = false;
                        if (!IsBusy)
                        {
                            ProcessNextMessage();
                        }
                    }
                    catch (Exception ex)
                    {
                        IsBusy = false;
                        Console.WriteLine(ex);
                    }
                }, _cancellationToken);
                task.Start();
            }                
        }
}

Last, but not least, for the Actor, I have defined my own implementation of the abstract class that actually does some work. You’ll notice the actor can receive a StartJob message or a ProcessMarket message. The actual jobs only print to the console window, but it was enough to see things working. You’ll also notice that the Callback method is called after updating the status. This will let the node know, if it has attached a callback handler, that the Actor has completed the requested Task. I should also mention that all Tasks have an identifier for tracking purposes.

public class BasicActor : BaseActor<WorkerStatusMessage>
{
    public BasicActor()
    {
        Receive<StartJob>(job =>
        {
            Task.Run(() =>
            {
                StartJob(job);
                WorkerStatusMessage message = new WorkerStatusMessage() { TaskId = job.TaskId, Message = "Complete" };
                AddStatus(message);
                Callback(message);
            }, _cancellationToken);
        });

        Receive<ProcessMarket>(job =>
        {
            Task.Run(() =>
            {
                ProcessMarket(job);
                WorkerStatusMessage message = new WorkerStatusMessage() { TaskId = job.TaskId, Message = "Complete" };
                AddStatus(message);
                Callback(message);
            }, _cancellationToken);
        });
    }

    void StartJob(StartJob job)
    {
        Console.WriteLine("Starting job.. ");
    }

    void ProcessMarket(ProcessMarket market)
    {
        Console.WriteLine("Processing market.. ");
    }
}

Let’s take a look at the WorkerNode piece.

The worker node is a stand alone class that houses the actors and provides all communicator to/from the cluster. Within the constructor, the list of BasicActors is populated and callbacks are added.

public class WorkerNode
{
private IWorkerSatus<WorkerStatusMessage> _workerStatus;
private IServiceProvider _serviceProvider { get; set; }
private string _hostUrl;
private string _clusterUrl = "http://localhost:9000";
public IServiceProvider ServiceProvider { set { _serviceProvider = value; } }
public string HostUrl { set { _hostUrl = value; } }
public string ClusterUrl { get; set; }

public WorkerNode(IWorkerSatus<WorkerStatusMessage> workerStatus)
{
    _workerStatus = workerStatus;

    var maxActors = 2;
    for (var i = 0; i < maxActors; i++)
    {
        var actor = new BasicActor();
        actor.AddCallback("node", message =>
        {
            var apiService = _serviceProvider.GetRequiredService<IApiService>();
            var url = string.Format("{0}/api/worker/taskComplete", _clusterUrl);
            apiService.PutOrPostToApi<string, string>(url, message.TaskId);
        });
        _workerStatus.AddActor(Guid.NewGuid().ToString(), actor);
    }
}

When the service/website starts up, the WorkerNode defines a TaskRepeater that continually attempts to join the cluster. It tries every 30 seconds. This may seem like overkill, but I kept it like this so that this can be seen as a heartbeat. The cluster could, for example, use a timestamp to know when the last communication occurred with the WorkerNode based on this heartbeat join request. Conversely, if the WorkerNode is stopped, we can hook in the Service/WebSite lifecycle to let the cluster know we are no longer available.

public void Started()
{
    // Connect our callback to handle all messages received from the cluster
    _workerStatus.AddCallback("test", action =>
    {
        if (action.PayLoad == null)
        {
            _workerStatus.Next.Tell(action.Message);
        }
        else
        {
            _workerStatus.Next.Tell(action.PayLoad);
        }
    });

    var cancellationTokenSource = new CancellationTokenSource();

    // Tell "cluster" I'm ready
    TaskRepeater.Interval(TimeSpan.FromSeconds(30), () =>
    {
        try
        {
            var apiService = _serviceProvider.GetRequiredService<IApiService>();
            var encodedHostUrl = WebUtility.UrlEncode(_hostUrl.Replace("http://", string.Empty));
            apiService.GetFromApi<string>(string.Format("{0}/api/worker/ready/{1}", _clusterUrl, encodedHostUrl));
        }
        catch (Exception ex)
        {
            Console.WriteLine("Couldn't join cluster.  Keep trying?");
            Console.WriteLine(ex);
        }
    }, cancellationTokenSource.Token, true);
}

public void Stopping()
{
    _workerStatus.AddCallback("test", action =>
    {
        Console.WriteLine(action.TimeStamp);
    });
    Console.WriteLine();
}

public void Stopped()
{
    // Tell "cluster" I'm gone!.
    try
    {
        var apiService = _serviceProvider.GetRequiredService<IApiService>();
        apiService.GetFromApi<string>(string.Format(string.Format("{0}/api/worker/left/{1}", _clusterUrl, WebUtility.HtmlEncode(_hostUrl))));
    }
    catch (Exception ex)
    {
        Console.WriteLine("Couldn't join cluster.  Keep trying?");
        Console.WriteLine(ex);
    }
}

At this point, you may be wondering.. so how are the cluster and node(s) communicating already? The Common library has Controller definitions that the WorkerNodes must inherit from. This eliminated some of the redundancy. At the moment, there are Status and Worker controllers. The IWorkerStatus is used to bubble up messages received from the controllers. For example, when the cluster sends “StartJob” to a WorkerNode, the implemented controller has this endpoint defined:

[HttpPost("startJob")]
public string GetStartJob(string message, [FromBody]StartJob startJob)
{
    var workerMessage = new WorkerStatusMessage();
    workerMessage.Message = string.Format("StartJob: {0}", message);
    workerMessage.Source = "cluster";
    workerMessage.PayLoadType = typeof(StartJob);
    workerMessage.PayLoad = startJob;
    _status.AddStatus(workerMessage);
    return "Ok";
}

If you didn’t see it above, the WorkerNode defines a callback for all messages bubbled up on the IWorkerStatus. If the WorkerStatusMessage has a PayLoad, the WorkerNode “Tells” that PayLoad (typed message) to the Next Actor in the round-robin queue.

Let’s take a look at the cluster piece.

The cluster is pretty simple. Note that it inherits from my BasicActor because I wanted to be able to have it receive strongly typed messages, for when jobs complete and such, and having that code-reuse was helpful.

Here is the basic cluster and its constructor. Within the constructor, the cluster uses the BasicActor’s implementation of defining handlers as Action<T> that are executed based the type of message (T) received. This helps keeps things generic with a huge amount of cruft.

/// <summary>
/// Basic cluster - inherit from BasicActor so that we can receive type messages (Tell)
/// </summary>
public class Cluster : BasicActor
{
    private IWorkerSatus<WorkerStatusMessage> _workerStatus;
    private IServiceProvider _serviceProvider { get; set; }

    private ConcurrentDictionary<string, string> _availableNodes = new ConcurrentDictionary<string, string>();
    private ConcurrentDictionary<Guid, object> _outbox = new ConcurrentDictionary<Guid, object>();
    public IServiceProvider ServiceProvider { set { _serviceProvider = value; } }
    private int _lastIndex = 0;

    public Cluster(IWorkerSatus<WorkerStatusMessage> workerStatus)
    {
        _workerStatus = workerStatus;

        Receive<NodeReady>(n =>
            _availableNodes.TryAdd(n.IpAddress, n.IpAddress)
        );

        Receive<NodeLeft>(n =>
        {
            string node;
            _availableNodes.TryRemove(n.IpAddress, out node);
        });

        Receive<TaskComplete>(t =>
        {
            object task;
            var success = _outbox.TryRemove(new Guid(t.TaskId), out task);
            if (task == null)
            {
                // Why wasn't it found?
                Console.WriteLine();
            }
        });
    }

The NextNode of the cluster provides simple round-robin access to the nodes that have joined the cluster.

    /// <summary>
    /// Simple round-robin to get next node
    /// </summary>
    public string NextNode
    {
        get
        {
            if ((_availableNodes?.Count ?? 0) == 0)
            {
                return null;
            }

            if (_lastIndex == _availableNodes.Count - 1)
            {
                _lastIndex = 0;
            }
            else
            {
                _lastIndex++;
            }

            return _availableNodes.ElementAt(_lastIndex).Value;
        }
    }

When the service/website starts, we have a simple random task repeater sending messages to the nodes. This is illustrative test code. The TaskRepeater helper is in the Common.Utilities. Note that this bit of code uses the NextNode method to perform round-robin requests to the nodes and randomly sends one of two messages to the nodes.

    public void Started()
    {
        var cancellationTokenSource = new CancellationTokenSource();
        TaskRepeater.Interval(TimeSpan.FromMilliseconds(50), () =>
        {
            if ((_availableNodes?.Count ?? 0) > 0)
            {
                var random = new Random();
                var nodeHost = this.NextNode;

                // Randomly choose what to send to the node(s)
                if (!string.IsNullOrWhiteSpace(nodeHost))
                {
                    // Randomly choose what to send to the waiting nodes.
                    if (random.Next(0, 1000) > 500)
                    {
                        TellNodeToProcessMarket(nodeHost);
                    }
                    else
                    {
                        TellNodeToStart(nodeHost);
                    }
                }
            }
        }, cancellationTokenSource.Token, true);
    }

    public void Stopping()
    {
        // Perform some cleanup?
    }

    public void Stopped()
    {
        // Perform some cleanup?
    }

    private void TellNodeToStart(string nodeHost)
    {
        try
        {
            var taskId = Guid.NewGuid().ToString();
            var message = "start please";
            var apiService = _serviceProvider.GetRequiredService<IApiService>();
            var url = string.Format("http://{0}/api/worker/startJob/", nodeHost);
            var startJob = new StartJob() { TaskId = taskId, MarketIdentity = message };
            _outbox.TryAdd(new Guid(taskId), startJob);
            apiService.PutOrPostToApi<StartJob, string>(url, startJob);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
    }

    private void TellNodeToProcessMarket(string nodeHost)
    {
        try
        {
            var taskId = Guid.NewGuid().ToString();
            var apiService = _serviceProvider.GetRequiredService<IApiService>();
            var url = string.Format("http://{0}/api/worker/processMarket/{1}", nodeHost, "process");
            var processMarket = new ProcessMarket() { MarketIdentity = RandomHelper.RandMarket(), TaskId = taskId };
            _outbox.TryAdd(new Guid(taskId), processMarket);
            apiService.PutOrPostToApi<ProcessMarket, string>(url, processMarket);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
    }
}

Like the WorkerNodes, the Cluster has Controllers defined for receiving messages from the nodes. Unlike the WorkerNodes, since the Cluster is a type of Actor, it can be Told that it needs to perform some Task (eventually, I’ll make the the WorkerNode the same). For example, the NodeReady is bubbled up through this controller action:

[HttpGet("ready/{ip}")]
public string GetWorkerReady(string ip)
{
    var nodeReady = new NodeReady() { IpAddress = ip };
    _cluster.Tell(nodeReady);
    return "OK";
}

Ok, that’s great, but with this bunch of code, how do we get it all working? After a WebSite is created, if we want it to be a Cluster, we only have to, technically, add a single line to our .NET Core Web application’s ConfigureServices method

// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
    // Add framework services.
    services.AddMvc();

    // Add cluster
    services.AddCluster();
}

to “AddCluster”:

// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
    // Add framework services.
    services.AddMvc();

    // Add cluster
    services.AddCluster();
}

The same is true if we want to create a WorkerNode:

// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
    // Add framework services.
    services.AddMvc();

    // Add cluster
    services.AddWorkerNode();
}

However, I do still have the Controllers defined rather than Middleware endpoints, so the Controllers do have to be defined and simply inherit. For a first pass, it was easier to create the Controllers, but, obviously, adding Middleware handlers for specific routes will be much more flexible since they could be dynamically added based on what messages we want to support and such. As an example, here’s the WorkerController in one of the WorkerNodes

[Route("api/[controller]")]
public class WorkerController : Common.Controllers.Node.WorkerController
{
    public WorkerController(IWorkerSatus<WorkerStatusMessage> status) : base(status) { }
}

There is one finally piece to the puzzle if we want to run the Cluster and Node(s) as services rather than hosting in IIS or on some other WebServer. Within the Common.Utilities, I have defined a class that inherits from WebHostService:

public class CustomWebHostService : WebHostService
{
    public CustomWebHostService(IWebHost host) : base(host)
    {
    }

    protected override void OnStarting(string[] args)
    {
        base.OnStarting(args);
    }

    protected override void OnStarted()
    {
        base.OnStarted();
    }

    protected override void OnStopping()
    {
        base.OnStopping();
    }
}

public static class CustomWebHostServiceServiceExtensions
{
    public static void RunAsCustomService(this IWebHost host)
    {
        var webHostService = new CustomWebHostService(host);
        ServiceBase.Run(webHostService);
    }
}

We can utilize this WebHostService to self-host as a Windows Service. In our Main method, we can conditionally run as a service.

public static void Main(string[] args)
{
    var host = new WebHostBuilder()
        .UseKestrel()
        .UseContentRoot(Directory.GetCurrentDirectory())
        .UseIISIntegration()
        .UseStartup<Startup>()
        .Build();

    if (Debugger.IsAttached || args.Contains("--debug"))
    {
        host.Run();
    }
    else
    {
        host.RunAsCustomService();
    }
}

Once the Program.cs has been modified, and we have compiled the source, we can install the *.exe’s as services with “sc.” Why bother? Well, we have effectively implementing everything as WebSites which means that, in addition to being installed as services, our services will have the full .NET Core Kestrel pipeline available. This opens up opportunities to see statuses of the Cluster, WorkerNodes, or Actors in real-time with very little additional tooling. We could just pop-in some Angular views and controllers and additional API endpoints to grab statuses for a nice graphic of the health of our distributed computing system.

The full source code for this project is available on Github.

https://github.com/long2know/distributed-actor-system

To use it, download load it, build the solution, and hit F5 to start the Cluster. I generally right-click the Workers in solution explorer and use “Start New Instance” to launch them one at a time. You can put in break points to see the the WorkerNodes connecting to the Cluster, the WorkerNodes receiving Task requests, and the Actors running/completing the Tasks. I have not built any sort of UI around this project at this time.

You could also, theoretically, add as many WorkerNode projects as you want, point them to the Cluster, and it would all work in an orchestrated fashion.

One thought on “Creating a Distributed Computing Engine with the Actor Model and .NET Core”

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.