Playing with TPL

Weihnachtszeit und etwas Zeit sich irgendwas anschauen – da muss auch ein wenig Platz für Entwicklung sein. Diesmal: Wie könnte man einen Teil eines Actor Frameworks im C# Style mit möglichst wenig Aufwand schreiben? Es gibt nichts schlimmeres im Code als ein Threading Massaker. An 1000 Stellen Thread.Run oder new Thread – da läufts jedem Entwickler kalt den Rücken runter, wenn der nächste Bug in Jira anflattert und nur ein Hauch von Race-Condition Problemen aus der Beschreibung herauszulesen ist („Nicht mehr reproduzierbar“, „passiert nur manchmal“ usw.).

http://proto.actor/ und https://petabridge.com/bootcamp/ nutzen eine Message-based Ansatz. Es gibt auch andere Frameworks, die C#-async verwenden – mich hat interessiert, wieviel Codezeilen dafür notwendig sind.

Approach 1: Tasks

public abstract class Actor : IActor
{
    private class Subscriber
    {
        private readonly object instance;
        private readonly MethodInvoker methodInvoker;

        internal Subscriber(object instance, Type type)
        {
            this.instance = instance;
            this.methodInvoker = instance.GetType().DelegateForCallMethod("Event", type);
        }

        internal void Invoke(object @event)
        {
            this.methodInvoker.Invoke(this.instance, @event);
        }
    }

    private readonly Dictionary<Type, IList<Subscriber>> subscriptions = new Dictionary<Type, IList<Subscriber>>();

    public void NotifyTo<TEvent>(IActor actor)
    {
        IList<Subscriber> mailboxList;

        var subscriber = new Subscriber(actor, typeof(TEvent));

        if (this.subscriptions.TryGetValue(typeof(Subscriber), out mailboxList))
        {
            mailboxList.Add(subscriber);
        }
        else
        {
            this.subscriptions.Add(typeof(TEvent), new List<Subscriber> { subscriber });
        }
    }

    public void Publish(object @event)
    {
        var subscriberOfInterest = this.subscriptions.FirstOrDefault(i => i.Key.IsInstanceOfType(@event)).Value;

        foreach (var subscriber in subscriberOfInterest)
        {
            Task.Run(() => subscriber.Invoke(@event));
        }
    }
}

Als erstes hab ich mir eine Actor Klasse gebaut. Diese erlaubt es sich bei einem anderen Actor zu „subscriben“ und anschließend Events zu empfangen. Das Project FastReflect hilft dabei, Methoden schnell mittels Delegates aufzurufen – nur marginal langsamer als ein direkter Aufruf. Würde man es per Reflection machen – sehr langsam.

public interface IActor
{
    void NotifyTo<TEvent>(IActor actor);
}

public interface IPerson : IActor
{
    Task<string> GetNameAsync(int id);
    Task<int> GetAgeAsync(int id);
}

public interface ISomeSubscriber : IActor
{
    Task Event(SecurityEvent @event);
}

Die Interfaces sind auch straight forward – ein Actor, bei dem man den Name und das Alter abfragen kann und ein ein Actor der auf Events hört.

public class Person : Actor, IPerson
{
    // Generated with https://www.fakenamegenerator.com/gen-random-gr-gr.php
    private readonly string[] names = {
        "Brigitte Freytag",
        "Marina Hoffmann",
        "Lea Wagner",
        "Frank Diederich",
        "Nadine Fuchs"
    };

    public async Task<string> GetNameAsync(int id)
    {
        ColoredConsole.WriteLine($"{nameof(this.GetNameAsync)} with id {id} ...");

        // Takes a long time ...
        await Task.Delay(3000);

        this.Publish(new SecurityEvent($"Id {id} was successfuly requested!"));

        return this.names[id];
    }

    public async Task<int> GetAgeAsync(int id)
    {
        ColoredConsole.WriteLine($"{nameof(this.GetAgeAsync)} with id {id} ...");

        await Task.Delay(2000);

        return id * 10;
    }
}

public class SomeSubscriber : Actor, ISomeSubscriber
{
    public Task Event(SecurityEvent @event)
    {
        ColoredConsole.WriteLine(@event.Content + " Start encrypting ...", ConsoleColor.Red);

        // Processing security event is complicated ...
        Thread.Sleep(6000);

        ColoredConsole.WriteLine(@event.Content + " Finished!", ConsoleColor.Red);

        return Task.CompletedTask;
    }
}

Was wollen wir erreichen? Es soll unterbunden werden, dass mehrere Benutzer zeitgleich den Namen und das Alter abfragen. D.h. wenn 5 Threads die Methode „GetNameAsync“ und „GetAgeAsync“ aufrufen, so sollen die Aufrufe sequentiell abgearbeitet werden.

class Program
{
    static void Main(string[] args)
    {
        var personActor = CreateActor<IPerson, Person>(new Person());
        var subscriberActor = CreateActor<ISomeSubscriber, SomeSubscriber>(new SomeSubscriber());

        personActor.NotifyTo<SecurityEvent>(subscriberActor);

        for (int i = 0; i < 5; i++)
        {
            var localI = i;

            new Thread(() =>
            {
                var nameTask = personActor.GetNameAsync(localI);
                var ageTask = personActor.GetAgeAsync(localI);

                Task.WaitAll(nameTask, ageTask);

                ColoredConsole.WriteLine($"{DateTime.Now} {nameTask.Result}, {ageTask.Result}", ConsoleColor.Green);

            }).Start();
        }

        Console.ReadLine();
    }

    private static TActorType CreateActor<TActorType, TConcreteType>(TActorType instance) where TActorType : class
    {
        var proxyGenerator = new ProxyGenerator();

        return proxyGenerator.CreateInterfaceProxyWithTarget(
            instance,
            ProxyGenerationOptions.Default,
            new ActorInterceptor<TConcreteType>());
    }
}

Es bietet sich hier das Projekt Castle Proxy an, welches erlaubt, Methoden zu intercepten. Ebenfalls kann ich das Projekt AsynEx empfehlen, welches einen async lock implementiert.

internal class ActorInterceptor<TActor> : IAsyncInterceptor
{
    private readonly AsyncLock mutex = new AsyncLock();
    private readonly Dictionary<MethodInfo, MethodInvoker> cache = new Dictionary<MethodInfo, MethodInvoker>();

    public ActorInterceptor()
    {
        foreach (var @interface in typeof(TActor).GetInterfaces())
        {
            foreach (var method in @interface.GetMethods())
            {
                var methodInvoker = typeof(TActor).DelegateForCallMethod(method.Name, method.Parameters().Select(i => i.ParameterType).ToArray());

                this.cache.Add(method, methodInvoker);
            }
        }
    }

    public void InterceptSynchronous(IInvocation invocation)
    {
        invocation.Proceed();
    }

    public void InterceptAsynchronous(IInvocation invocation)
    {
        invocation.ReturnValue = this.InternalInterceptAsynchronous(invocation);
    }

    public void InterceptAsynchronous<TResult>(IInvocation invocation)
    {
        invocation.ReturnValue = this.InternalInterceptAsynchronous<TResult>(invocation);
    }

    private async Task<TResult> InternalInterceptAsynchronous<TResult>(IInvocation invocation)
    {
        using (await this.mutex.LockAsync())
        {
            MethodInvoker methodInvoker;

            if (this.cache.TryGetValue(invocation.Method, out methodInvoker))
            {
                return await ((Task<TResult>)methodInvoker.Invoke(invocation.InvocationTarget, invocation.Arguments)).ConfigureAwait(false);
            }

            throw new MissingMethodException("Method missing");
        }
    }

    private async Task InternalInterceptAsynchronous(IInvocation invocation)
    {
        using (await this.mutex.LockAsync())
        {
            MethodInvoker methodInvoker;

            if (this.cache.TryGetValue(invocation.Method, out methodInvoker))
            {
                await ((Task)methodInvoker.Invoke(invocation.InvocationTarget, invocation.Arguments)).ConfigureAwait(false);
            }
        }
    }
}

Das ganze Projekt findet ihr unter gitHub bzw. die Packages ebenfalls auf gitHub Zu beachten: der SynchronizationContext.Current – ist in diesem Fall immer null – d.h. Sachen nach dem await werden wieder im ThreadPool ausgeführt. Fazit: Die Performance ist solala – kann aber auch sein, dass ich was falsch gemacht habe ;-)

Approach 2: Performance

Da die Performance beim Approach 1 nicht so gut war, hab ich nochmal den ActionBlock getestet – er war sehr performant. Allerdings dürfen es nicht zu viele werden – bei 1000 ActionBlocks wurde es schon etwas zäh. Normal entwickelt man die Actor Sachen immer nach dem Prinzip Tell, Dont Ask. Allerdings macht es hier und da das Leben leichter, wenn man einen Request an einen anderen Actor schicken kann. 10000 Request brauchen ca. 1.7 Sekunden und es steigt bis 100 Aktoren ziemlich linear – aber dann wird’s wieder langsam.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Fasterflect;

namespace ActorTest
{
    class Challenge
    { }

    class Qeustion
    {
        public int A { get; }
        public int B { get; }

        public Qeustion(int a, int b)
        {
            this.A = a;
            this.B = b;
        }
    }

    class Answer
    {
        public int Result { get; }

        public Answer(int result)
        {
            this.Result = result;
        }
    }

    internal class Oracle : Mailbox
    {
        public Oracle()
        {
            this.Handle<Qeustion>();
        }

        private Answer Handle(Qeustion message)
        {
            return new Answer(message.A + message.B);
        }
    }

    internal class Interest : Mailbox
    {
        private readonly Oracle oracle;
        private int count;
        private Stopwatch sw = new Stopwatch();

        public Interest(Oracle oracle)
        {
            this.oracle = oracle;

            this.Handle<Challenge>();
        }

        private async Task Handle(Challenge message)
        {
            if (this.count == 0)
            {
                this.sw.Start();
            }            

            var answer = await this.oracle.Request<Qeustion, Answer>(new Qeustion(5, 6));

            if (answer.Result != 11)
            {
                Console.WriteLine("Fehler");
            }

            if (++this.count == 10000)
            {
                this.sw.Stop();

                Console.WriteLine(this.sw.ElapsedMilliseconds);
            }
        }
    }

    internal abstract class Mailbox
    {
        private class Handler
        {
            private readonly object instance;
            private readonly MethodInvoker methodInvoker;

            internal Handler(object instance, Type type)
            {
                this.instance = instance;
                this.methodInvoker = instance.GetType().DelegateForCallMethod("Handle", type);
            }

            internal object Invoke(object parameters)
            {
                return this.methodInvoker.Invoke(this.instance, parameters);
            }
        }

        private class Message
        {
            private readonly Action<object> callback;
            public object Instance { get; }

            public Message(object instance, Action<object> callback)
            {
                this.callback = callback;
                this.Instance = instance;
            }

            public Message(object instance)
            {
                this.Instance = instance;
            }

            public void SetResult(object result)
            {
                this.callback?.Invoke(result);
            }
        }

        private readonly ActionBlock<Message> messagesBlock;
        private readonly Dictionary<Type, Handler> handlers = new Dictionary<Type, Handler>();

        protected Mailbox()
        {
            this.messagesBlock = new ActionBlock<Message>((Action<Message>)this.Action);
        }

        private void Action(Message message)
        {
            try
            {
                var result = this.handlers[message.Instance.GetType()].Invoke(message.Instance);

                message.SetResult(result);

            }
            catch (Exception exception)
            {
                Console.WriteLine(exception.Message);
            }
        }

        protected void Handle<TMessage>()
        {
            var handler = new Handler(this, typeof(TMessage));

            this.handlers.Add(typeof(TMessage), handler);
        }

        public void Tell(object message)
        {
            this.messagesBlock.Post(new Message(message));
        }

        public Task<TResponse> Request<TRequest, TResponse>(TRequest instance)
        {
            var taskCompletionSource = new TaskCompletionSource<TResponse>();

            this.messagesBlock.Post(new Message(instance, result =>
            {
                taskCompletionSource.SetResult((TResponse)result);
            }));

            return taskCompletionSource.Task;
        }
    }

    class Program
    {
        static void Main(string[] args)
        {

            var oracle = new Oracle();

            List<Interest> interests = new List<Interest>();

            for (int i = 0; i < 10; i++)
            {
                interests.Add(new Interest(oracle));
            }

            for (int i = 0; i < 10000; i++)
            {
                foreach (var interest in interests)
                {
                    interest.Tell(new Challenge());
                }
            }



            Console.ReadKey();
        }
    }  
}

Aufpassen muss man bei der Implementierung von Request: Ein „return taskCompletionSource.Task.Result“ blockiert den aktuellen Thread und der ThreadPool wird dann relativ schnell aufgebraucht.

Fazit: die schnellste und einfachste Methode ist sicher mit simplen Messages in einen ActionBlock. Am besten nimmt man aber ein fertiges Framework und nutzt auch gleich Supervisors.