[X] Parallelisierung mit .NET 4.0 - Teil I



  • Parallelisierung mit .NET 4.0 - Teil 1

    Inhalt:

    • 1 Einleitung
    • 2 Parallelisierung vor .NET 4.0
    • 3 Parallelisierung von Schleifen
    • 3.1 Simple Parallelisierung von Schleifen
    • 3.2 Abbruch von parallelisierten Schleifen
    • 3.3 Partitioner verwenden
    • 3.4 Thread-lokale Variablen
    • 3.5 Fallstricke und Hinweise
    • 4 Parallelisierung von Methoden mit Tasks
    • 4.1 Tasks erstellen und verwalten
    • 4.2 Tasks verknüpfen
    • 4.3 Exceptionhandling bei Tasks
    • 5 Ausblick

    1 Einleitung

    Parallelisierung ist ein Thema, das schon länger als wichtig angesehen wird und es ist offensichtlich, dass es durch die immer stärkere Verbreitung von Mehrkernprozessoren weiterhin Auftrieb erhält. Abgesehen von den Verlockungen einer höheren Performance bietet das Thema aber auch eins: Nämlich sehr viele Dinge, an die man als Programmierer denken muss und die einem das Leben schwer machen... Synchronistation, (Dead)Locks, Race conditions, nur um mal ein paar Stichwörter zu nennen. Und in der Tat birgt Parallelprogrammierung mit reinen Betriebssystemmitteln, also Prozessen, eine sehr hohe Komplexität. Diese Komplexität wirkt oft abschreckend und führt schnell zu Fehlern. Aus diesem Grund existieren für so gut wie jede Sprache diverse Parallelisierungsbibliotheken, die uns das Leben als Programmierer erleichtern (sollen ;)).

    War Parallelisierung mit .NET bisher auch schon möglich, gibt es durch die Erweiterungen durch die Task Parallel Library und Parallel LINQ in .NET 4.0 eine ganze Menge an neuer Funktionalität.
    Das Hauptaugenmerk dieses Artikels liegt auf der TPL, während in Teil 2 dann auf Parallel LINQ eingegangen wird. Was nicht behandelt wird sind Themen wie: "Was ist ein Thread?", "Wozu ist Multithreading gut?", "Was bedeutet thread-safe?", "Was ist ein Mutex?" usw. Es wird also eine grundsätzliche Vetrautheit mit Multithreading bzw. Parallelisierung und seinen Begriffen vorausgesetzt. Ebenso wird angenommen, dass man mit C# dem Enumerable-Namensraum und LINQ sehr gut vertraut ist.

    Bei den Codeausschnitten wurde häufig auf eine copy & paste & compile - Version des Codes verzichtet.
    Deshalb sei an dieser Stelle gesagt, dass die Beispiele die Namensräume System.Threading sowie System.Threading.Task brauchen, um in einem kompletten Programm kompiliert werden zu können. Diese Namensräume also vorher per import-Statement einbinden.

    2 Parallelisierung vor .NET 4.0

    Auch in vorherigen .NET-Versionen war es natürlich möglich, parallele Programmierung bzw. Multithreading einzusetzen. Der wichtigste Namensraum dafür ist System.Threading. Dieser beherbergt die Klassen Thread und ThreadPool. Während die Thread-Klasse einen einzelnen Thread repräsentiert, ist der ThreadPool als eine Ansammlung von Threads zu verstehen, die man beauftragen kann, Operationen asynchron auszuführen.
    Der Thread-Klasse gibt man im Konstruktor ein Delegate mit, dass die auszuführende Aktion enthält:

    Thread t = new Thread(this.DoLotsOfWork);
    t.Start();
    

    Die Thread-Klasse stellt auch Methoden zum Abbrechen, Stoppen und Warten bereit. Eben alles, was man von einer Thread-Klasse erwartet und kennt.

    Während man mit der Thread-Klasse einzelne Threads erzeugen kann und diese dann auch selbst verwalten muss, nimmt einem der ThreadPool wenigstens ein ganzes Stück Verwaltungsaufwand ab. Die Benutzung des ThreadPool ist recht simpel:

    ThreadPool.QueueUserWorkItem(this.DoLotsOfWork);
    

    Die Methode DoLotsOfWork wird ausgeführt, sobald ein Thread verfügbar ist. Die Anzahl an Threads kann man mit SetMaxThreads bzw. SetMinThreads steuern, um den ThreadPool so an seine Bedürfnisse anzupassen.

    Als zusätzliche Möglichkeit sei noch der BackgroundWorker (Namensraum: System.ComponentModel) erwähnt, der hauptsächlich verwendet wird, um die GUI bei längeren Hintergrundoperationen nicht einfrieren zu lassen. Der BackgroundWorker benutzt intern auch die normalen Threadklassen, bietet aber noch ein paar hilfreiche Methoden wie ReportProgress(percentage) oder CancelAsync() an.

    Mit diesen Mitteln war es möglich, Multithreading einzusetzen und es gab auch schon ein gewisses Level an Komfort. In den nächsten Kapiteln werden wir aber sehen, dass dazu deutliche Steigerungen möglich sind.

    3 Parallelisierung von Schleifen

    Schleifen sind in imperativen Sprachen ein Brot und Butter-Konstrukt, das sehr häufig Anwendung findet. Zudem sind Schleifen oft hervorragende Kandidaten für parallele Programmierung. Wenn eine Schleife z.B. 1000 Iterationen durchführt und man diese Schleife parallelisiert und dann auf einem Vierkernprozessor ausführt, wird sich die Laufzeit im besten Fall erheblich reduzieren, und zwar auf ca. 25%. Eine generelle Aussage "Auf genau 25%" ist nicht möglich, da es von vielen Faktoren abhängt, was man für einen Geschwindigkeitszuwachs erhält. Diese Faktoren umfassen z.B. die gleichmäßige Verteilung der Iterationen auf die Prozessorkerne oder ob manche Elemente länger zur Verarbeitung brauchen oder was die CPUs sonst noch erledigen müssen etc etc.

    3.1 Simple Parallelisierung von Schleifen

    Mit der TPL ist es möglich, for- und foreach-Schleifen auf einfache Art und Weise zu parallelisieren. Schauen wir uns das ganze mal bei einer foreach-Schleife an:

    //GetOrders() liefert ein IEnumerable<Order>
    
    //Sequentielle Version:
    foreach (var o in GetOrders())
        ProcessOrder(o);
    
    //Parallelisierte Version
    Parallel.ForEach(GetOrders(), o => ProcessOrder(o));
    

    In diesem Beispiel werden Bestellungen einmal sequentiell und einmal parallel verarbeitet. Der erste Parameter ist die Bestellungsliste und der zweite ist ein Delegate (bzw. hier ein Lambda-Ausdruck), der angibt was mit den Elementen zu tun ist. Die Parallel.ForEach-Methode ist in einigen (zwanzig an der Zahl) Varianten überladen, wobei die gerade gezeigte als die minimalste angesehen werden darf.

    Im Hintergrund passiert folgendes: Die GetOrders-Liste wird in n Sublisten gesplittet und jede dieser Sublisten wird dann von einem Thread verarbeitet. Im Idealfall wird automatisch sinnvoll gesplittet, es gibt jedoch auch die Möglichkeit, der TPL mit so genannten Partitionern zu sagen, wie gesplittet werden soll (mehr dazu in Abschnit 3.3).

    Die parallelisierte for-Schleife benutzt sich ähnlich unkompliziert:

    //Aus einer IEnumerable<Order> in ein Order[] konvertieren
    Order[] orders = GetOrders().ToArray();
    
    //Sequentielle Version
    for (int i = 0; i < orders.Length; ++i)
        ProcessOrder(orders);
    
    //Parallelisierte Version
    Parallel.For(0, orders.Length, i => ProcessOrder(orders[i]));
    

    Der erste Parameter ist der Von-Index (inklusive), der zweite der Bis-Index (exklusive) und der dritte wieder ein Delegate, dass angibt was zu tun ist. Im Gegensatz zur foreach-Schleife bekommen wir hier einen Index als Parameter des Delegates und nicht das Objekt selbst.

    3.2 Abbruch von parallelisierten Schleifen

    Soweit so gut. Aber was ist wenn man eine parallelisierte for/foreach-Schleife abbrechen will? Ein break-Statement wird nicht mehr reichen, weil dies nur für einen Thread und nicht für alle gilt. Die Lösung ist gar nicht so kompliziert, man gibt der Schleife ein "Zustandsflag" vom Typ ParallelLoopState mit:

    Parallel.ForEach(GetOrders(), (o, loopState) => {
        if (!o.isValid)    //Abbrechen, wenn eine Bestellung ungültig ist
            loopState.Stop();
    
        ProcessOrder(o);
    });
    

    Nun wird die Abarbeitung der Schleife sobald wie möglich (wenn alle Threads angehalten werden können) gestoppt. Neben Stop() gibt es auch Break(), welches jedoch die aktuelle Iteration zu Ende führt und die Schleife erst danach anhält. Das Vorgehen ist übrigens für For- und ForEach-Methoden identisch, außer dass For-Methoden eben zuerst den Von- und Bis-Index erwarten.

    Wir haben gesehen, wie man eine parallelisierte Schleife von innen anhält. Manchmal hat man aber auch den Wunsch, eine Schleife von außen anzuhalten, beispielsweise wenn man dem Benutzer die Möglichkeit geben will, eine umfangreiche Operation abzubrechen. Das folgende, zur Abwechslung vollständige, Beispiel zeigt wie das mit der Klasse CancellationTokenSource geht:

    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApplication1 {
        class Program {
            //Eine Dummy-Klasse, die Bestellungen symbolisiert
            class Order {
                public int Id {
                    get;
                    private set;
                }
    
                public Order(int id) {
                    this.Id = id;
                }
    
                public bool isValid { 
                    get { return this.Id >= 0; } 
                }
            }
    
            //Erstellt 10000 Order-Instanzen
            static IEnumerable<Order> GetOrders() {         
                List<Order> orders = new List<Order>();
                for (int i = 0; i < 10000; ++i)
                    orders.Add(new Order(i));
    
                return orders;
            }
    
            //"Verarbeitet" eine Bestellung
            static void ProcessOrder(Order o) {
                Console.WriteLine("Processing order: " + o.Id);
            }
    
            static void Main(string[] args) {
                CancellationTokenSource cancelToken = new CancellationTokenSource();
    
                ParallelOptions parOpts = new ParallelOptions();
                parOpts.CancellationToken = cancelToken.Token;
                parOpts.MaxDegreeOfParallelism = Environment.ProcessorCount;    //Nutze alle Prozessoren
    
                //Dieser Thread simuliert den Benutzerabbruch
                Thread t = new Thread(() => {
                    Thread.Sleep(100);
                    cancelToken.Cancel();
                });
                t.Start();
    
                try {
                    //Hier werden die parallel options (und somit auch das CancellationToken) als Parameter übergeben
                    Parallel.ForEach(GetOrders(), parOpts, o => ProcessOrder(o));
                }
                catch (OperationCanceledException ex) {
                    Console.Error.WriteLine(ex.Message);
                }
            }
        }
    }
    

    In diesem Beispiel verwenden wir auch erstmals die Klasse ParallelOptions, die uns einige für die [i]Ausführung* der Schleife relevanten Optionen angeben lässt.
    Sobald der Abbruch von außen erfolgt, wirft die ForEach-Methode eine Exception, auf die man natürlich reagieren sollte. Auch hier gilt, dass das Vorgehen für For-Methoden fast identisch ist.

    3.3 Partitioner verwenden

    Bisher haben wir die Parallel.For/Parallel.ForEach-Methoden einfach auf unsere Datenquellen losgelassen und uns darauf verlassen, dass die Datenquelle sinnvoll aufgeteilt und verarbeitet wird. In der überwältigenden Anzahl der Fälle klappt das auch hervorragend, nur will man manchmal selbst Hand anlegen und die Aufteilung vorgeben. Das Werkzeug dazu nennt sich Partitioner.

    Grundsätzlich können wir damit zwei Arten von Partitionierung vornehmen:
    Range-Partitionierung und Chunk-Partitionierung.

    Range-Partitionierung bezieht sich darauf, dass wir eine Datenquelle per Indizes aufteilen und so die aufzuteilenden Bereiche vorgeben. Das bedeutet dass jeder Thread einen festgelegten Bereich aus der Datenquelle abarbeitet. Der Vorteil ist, dass wir kaum Overhead für die Aufteilung haben. Sind die Bereiche einmal festgelegt, ist die Aufteilung abgeschlossen. Der Nachteil ist, dass die CPUs ungünstig ausgelastet werden können. Wenn ein Thread seinen Bereich abgearbeitet hat und vor den anderen Threads fertig ist, kann er keine weitere Arbeit übernehmen, schließlich würde er sonst ja in Bereichen von anderen Threads wildern. Dieses Szenario tritt besonders dann auf, wenn die Dauer einer Operation von den Datenelementen abhängt.
    Eine Range-Partitionierung wäre also für die parallele Ausführung einer Fakultätsberechnung nicht optimal, denn große Zahlen brauchen länger zur Berechnung als kleine und in negativen Szenarien bekommt ein Thread dann den Bereich zugeteilt, der die größten Zahlen enthält. Dieser Thread braucht natürlich am längsten bis er fertig ist, während alle anderen schon fertig sind und warten. Das hebelt den Sinn von Parallelisierung natürlich in gewisser Weise aus.

    Chunk-Partitionierung wiederum bedeutet, dass sich mehrere Threads immer häppchenweise Daten von der Datenquelle holen, diese verarbeiten und danach zur Datenquelle zurückkehren, um weitere Elemente zur Verarbeitung abzuholen. Dies wiederholt sich solange, bis alle Elemente der Datenquelle abgearbeitet sind. Damit hätten wir schon mal den ersten Vorteil erwähnt: Kein Thread hört auf, nur weil er den ersten Satz an Elementen abgearbeitet hat.
    Außerdem ist es sehr wahrscheinlich, dass sich die aufwändigen Elemente besser unter den Threads verteilen und nicht nur einem Thread zugeteilt werden, da sich die Threads ja immer nur einen Satz Elemente holen und diese abarbeiten. Somit wird eher jeder Thread ein paar aufwändige Elemente abarbeiten. Der Nachteil von Chunk-Partitionierung ist ein etwas höherer Overhead bei der Verteilung der Elemente aus der Datenquelle.
    Zusätzlich haben wir bei Chunk-Partitionierung noch die Möglichkeit, load-balancing durchzuführen. Dies veranlasst den Partitioner, die Arbeitslast möglichst gut unter den Threads zu verteilen. Der Nutzen hängt vom Anwendungsfall ab und es sollte gemessen werden, ob man durch load-balancing schneller zum Ziel kommt oder nicht. Schließlich ist load-balancing nicht gratis, sondern bedingt einen kleinen Overhead. Der Default-Partitioner von Parallel LINQ benutzt übrigens kein load-partitioning.

    Allgemein liefert Chunk-Partitionierung aus oben genannten Gründen die bessere Performance als Range-Partitionierung. Wenn man überschaubare Datenmengen hat, die alle ungefähr gleich aufwändig zu verabeiten sind, kann Range-Partitionierung schneller werden, ansonsten hat Chunk-Partitionierung fast immer die Nase vorn.
    Die Partitioner-Klasse stellt einige Create-Methoden zur Verfügung, um einen Partitioner zu erstellen. Die Überladungen, die numerische Parameter erwarten, nutzen Range-Partitionierung. Die anderen nutzen Chunk-Partitionierung. Create<TSource>(IEnumerable<TSource>) nutzt load-balancing, bei den anderen beiden Chunk-Partitionern kann man per bool-flag angeben, ob sie load balancing anwenden sollen. Die Range-Partitioner nutzen niemals load-balancing (wie auch, es sind ja feste Bereiche zugeteilt).

    Nach dem ganzen Text soll ein kleines Beispiel die Verwendung eines Partitioners zeigen (Der namespace System.Collections.Concurrent wird benötigt):

    var numbers = Enumerable.Range(0, 1000);
    
    //Chunk-Partitionierung mit load-balancing
    var chunkPartitioner = Partitioner.Create(numbers);
    Parallel.ForEach(chunkPartitioner, a => {
        DoOperation(a);
    });
    
    Console.Out.WriteLine();
    
    //Die Zahlenliste in ein Array umwandeln
    var array = numbers.ToArray();
    //Range-Partitionierung über das gesamte Array
    var rangePartitioner = Partitioner.Create(0, array.Length);
    
    //Über die Bereiche iterieren
    Parallel.ForEach(rangePartitioner, range => {
        //Einen einzelnen Bereich abarbeiten
        for (int i = range.Item1; i < range.Item2; ++i)
            DoOperation(array);
    });
    

    Bei der Chunk-Partitionierung ändert sich recht wenig, abgesehen davon dass der Partitioner nun als Datenquelle fungiert. Ansonsten geht alles seine gewohnten Wege.
    Die Range-Partitionierung ist mit mehr Aufwand verbunden. Zuerst sagen wir dem Partitioner, dass er insgesamt einen Bereich von 0 - array.Length aufteilen muss. Wichtig ist, dass es hier nur um einen allgemeinen Bereich geht, nicht spezifisch um den Bereich unseres Arrays. Wir könnten diesen Partitioner also auch für eine andere Datenquelle gleicher Größe verwenden.
    Im zweiten Schritt benutzen wir eine parallele ForEach-Schleife, um über die vom Partitioner erstellten Bereiche zu iterieren. Diese sind als Tupel<int, int> vorhanden und definieren die untere und obere Grenze des Bereichs als Index.
    Schließlich führen wir eine Operation auf die Elemente dieses Bereichs aus.

    Möchte man steuern wie viele Elemente ein Thread bei einer Chunk-Partitionierung jeweils abholt, kann man seinen eigenen Partitioner implementieren, allerdings besprechen wir das hier nicht weiter, da das Beispiel in der MSDN alles erläutert.

    3.4 Thread-lokale Variablen

    Als letztes Beispiel wollen wir uns anschauen, wie man thread-lokale Variablen realisiert. Im Gegensatz zu gemeinsamen Variablen für alle Threads haben diese den Vorteil, dass man den Zugriff darauf nicht synchronisieren bzw. sie nicht locken muss, schließlich hat ja jeder Thread seine eigenen lokalen Variablen.

    Hier bitte auch den Namensraum System.Linq einbinden, da wir die Range-Erweiterungsmethode benutzen.

    //Alle Zahlen von 0-99999 generieren
    var numbers = Enumerable.Range(0, 100000);
    
    Parallel.ForEach<int, long>                 //Jetzt das ganze parallel
    (
            Partitioner.Create<int>(numbers),       //Datenquelle per Partitioner
            () => 0,                                //Initialisierung der lokalen Variablen
            (num, loopState, subSum) =>             //Schleifenkörper
            {
                subSum += num;                      //Bilde lokale Summe - pro Thread
                return subSum;
            },
            subSum => Console.Out.WriteLine(subSum) //Lokale Summe ausgeben
    );
    

    Nun gut, hier geht sehr viel mehr vor als in den vorherigen Beispielen. Schauen wir uns das genauer an.
    Neu ist, dass ForEach jetzt zwei Typparameter mitbekommt. Der erste gibt den Typ der Elemente von numbers an und der zweite den Typ der lokalen Variablen. Wenn man sich die diversen Überladungen von For- und ForEach ansieht, wird man feststellen dass alle ForEach-Methoden und die meisten For-Methoden mindestens einen Typparameter haben, wir aber bisher nie einen angegeben haben. Hier kommt die automatische Typinferenz bei Generics in's Spiel. Sie versucht, den Typ der Elemente der Datenquelle automatisch herzuleiten und ermöglicht es uns so, diesen Typparameter wegzulassen.

    Dann verwenden wir dieses mal einen eigenen Partitioner, wobei wir den einfachste Variante verwenden, wir geben ihm nämlich nur unsere Zahlenliste mit und überlassen dem Partitioner den Rest (siehe auch Abschnitt 3.3).

    Ebenso neu ist die Initialisierung der lokalen Variable (zweiter Parameter), die per Lambda realisiert wird. Der Schleifenkörper (dritter Parameter) wurde um eine weitere Variable erweitert, nämlich unsere lokale Variable. Den Abschluss schließlich bildet ein Lambda-Ausdruck, welches am Ende eines Threads ausgeführt wird. In dem Fall entscheiden wir uns dafür, einfach alle lokalen Summe auf der Konsole auszugeben. Hier kann man anhand der Anzahl der lokalen Summen auch sehen, wie der Partitioner aufgeteilt hat.

    Wer mehr zum Thema Parallelisierung von Schleifen erfahren will, dem sei die MSDN an's Herz gelegt, in der das Thema unter "Data Parallelism" firmiert.

    3.5 Fallstricke und Hinweise

    Obwohl .NET uns bei der Parallelprogrammierung nun ganz schön unter die Arme greift, kann man sich nach wie vor im Multithreadinglabyrinth verirren und böse hinfallen.

    Was man beispielsweise vermeiden sollte ist mit goto aus einer Parallel.For/ForEach-Schleife herauszuspringen. Der Ratschlag gilt zwar auch bei sequentieller Programmierung, allerdings führt er da nur aus stilistischen Gründen zu einem fahlen Beigeschmack. In der parallelen Variante fliegt einem aber im schlechtesten Fall die ganze Geschichte um die Ohren und im besten Fall erhält man einfach falsche Ergebnisse.
    Dass ein break-Statement nicht die komplette Schleife anhält sondern man dazu ein CancellationToken benötigt, wurde ja besprochen. Insofern sollte man auch von break absehen.

    In der MSDN gibt es weitere Ratschläge, die man beachten sollte.

    Zum Abschluss noch ein Hinweis, wie man parallelisierten For/ForEach-Schleifen einen kleinen Extra-Turbo verpasst. Indem man nämlich aufwändig zu verarbeitende Elemente zu Beginn verarbeitet, wird die Schleife oftmals schneller fertig sein als wie wenn man diese Elemente irgendwann oder ganz am Ende verarbeitet. Der Grund dafür ist, dass der Scheduler die Elemente schön in die "Lücken" der CPU auffüllen kann und somit Leerlauf vermieden wird. Im nächsten Artikel werden wir diese Vorgehensweise auf Parallel LINQ übertragen und damit auch einiges an Geschwindigkeit gewinnen.

    4 Parallelisierung von Methoden mit Tasks

    Dieser Abschnitt wird sich mit dem Herzstück der TPL befassen, nämlich der Klasse Task (daher auch der Name TPL). Diese Klasse stellt eine Operation (man könnte auch Vorgang sagen) dar, welche asynchron ausgeführt werden kann. Und davon natürlich mehrere parallel.

    4.1 Tasks erstellen und verwalten

    Es gibt grundsätzlich zwei Arten, mit Tasks zu arbeiten. Einmal implizit und einmal explizit.
    Der implizite Weg sieht so aus:

    Parallel.Invoke(() => OperationA(), () => OperationB());
    

    Der Invoke-Methode können beliebig viele Action-Delegates übergeben werden, da sie den params-Modifizierer benutzt. Eine zweite Überladung ermöglicht die Übergabe von ParallelOptions (In Abschnitt 3.2 haben wir diese benutzt, um eine ForEach-Schleife abzubrechen). Was mit Invoke leider nicht möglich ist, ist Methoden mit Parametern oder Rückgabewerten zu übergeben. Dafür müssen die Tasks explizit erstellt werden.

    Der Aufruf von Parallel.Invoke erinnert ein bisschen an den ThreadPool und seine QueueUserWorkItem-Methode. Und tatsächlich benutzen Tasks intern den ThreadPool, um die Delegates auszuführen. Allerdings können die Tasks den ThreadPool effizienter nutzen als es uns normalsterblichen Programmierern möglich ist, da sie Zugang zu neuen Methoden und Algorithmen haben, die z.B. die möglichst optimale Anzahl an Threads ermitteln.
    Als weiteren Vorteil erhält man mit der Task-Klasse mehr Kontrolle über den Thread. Dies zeigt sich aber erst, wenn wir die Task-Klasse explizit benutzen:

    Task t = new Task(() => OpenDatabaseConnections());
    t.Start();
    

    Auch hier gibt es diverse Überladungen des Konstruktors, bei denen man beispielsweise ein CancellationToken oder einen TaskScheduler übergeben kann.
    Wie bereits erwähnt gibt es einige grundlegende Methoden bzw. Eigenschaften, die uns die Task-Klasse anbietet:

    • Start() - Startet die Ausführung der Operation
    • Wait() - Wartet bis die Operation beendet wurde
    • Id - Eine eindeutige Id der Instanz, besonders zum Debuggen sinnvoll
    • Status - Gibt Auskunft über den momentanen Status der Taskinstanz
    • IsFaulted - Gibt an, ob die Operation durch eine Exception abgebrochen wurde
    • IsCanceled - Gibt an, ob die Operation abgebrochen wurde
    • IsCompleted - Gibt an, ob die Operation abgeschlossen wurde

    Interessant sind auch die statischen Methoden WaitAll und WaitAny der Task-Klasse. WaitAll wartet bis alle übergebenen Tasks beendet sind und WaitAny wartet solange, bis eine der übergebenen Tasks beendet ist.

    Unsere bisherigen Beispiele waren allesamt Methoden, die void als Rückgabetyp hatten. Jedoch haben sehr viele Methoden einen Rückgabetyp wie int, double, IEnumeralbe<T> etc. Natürlich kann man auch diese mittels Tasks ausführen:

    using System;
    using System.Threading.Tasks;
    
    namespace ConsoleApplication1 {
        class Program {
            static long ComputeBigValue() {
                long value = 0L;
                //Berechnen und dann zurückgeben
                return value;
            }
    
            static void Main(string[] args) {
                Task<long> t = new Task<long>(ComputeBigValue);
                t.Start();
    
                Console.Out.WriteLine("Result of computation: {0}", t.Result);
            }
        }
    }
    

    Die Task-Klasse hat hier einen Typparameter, welcher dem Rückgabetyp der Methode ComputeBigValue entspricht. Das Ergebnis der Operation kann man mit der Result-Eigenschaft abfragen. Was ist aber, wenn man auf Result zugreift, obwohl die Operation noch gar nicht abgeschlossen ist? Ganz einfach, es wird gewartet bis die Operation abgeschlossen wurde und man sicher auf Result zugreifen kann. Das ist als ob wir noch ein t.Wait() vor dem Zugriff auf t.Result eingebaut hätten.

    Es bietet sich noch eine zweite Möglichkeit, Tasks explizit zu erstellen bzw. zu starten. Die TPL stellt nämlich eine TaskFactory zur Verfügung, die offensichtlich auf Basis des Factory-Patterns modelliert wurde. Glücklicherweise haben die Task-Klassen eine TaskFactory als statisches Member, man muss sie also nichtmal selbst erstellen. Und auch hierzu haben wir ein kleines Beispiel:

    Task t = Task.Factory.StartNew(() => Console.Out.WriteLine("IM IN YR TASK"));
    Console.Out.WriteLine("IM IN MAIN");
    t.Wait();   //Auf jeden Fall warten, bis Ausgabe des Threads erfolgt ist
    

    Die Factory-Methode bietet alle Überladungen an, die auch der Task-Konstruktor anbietet, aber sie erspart einem nochmal ein bisschen Schreibarbeit, da sie eine Task-Instanz erstellt und diese sofort startet.

    Was wir bisher noch nicht gesehen haben, ist wie man per Task eine Methode ausführt die sowohl einen Rückgabewert als auch einen Parameter hat:

    using System;
    using System.Threading.Tasks;
    
    namespace ConsoleApplication1 {
        class Program {
            static long faculty(Object o) {
                long n = (long)o;
    
                if (n == 1)
                    return n;
                else
                    return n * faculty(n - 1);
            }
    
            static void Main(string[] args) {
                Task<long> t = Task<long>.Factory.StartNew(faculty, 10L);
                Console.Out.WriteLine(t.Result);
            }
        }
    }
    

    Das einzige das hier neu ist, ist der zusätzliche Parameter für StartNew, der in dem Fall die Daten für faculty enthält. Etwas unschön ist auf den ersten Blick, dass man hier mit Object arbeiten muss. Aber es hat den Vorteil, dass man Instanzen eines beliebigen Typs an eine Methode übergeben kann und somit auch die Frage erschlägt: Wie übergebe ich mehrere Parameter an eine Methode? Die Antwort lautet: Mit einer Hilfsklasse die z.B. so aussehen könnte:

    public class PropertyBag {
        private Dictionary<string, object> items;
    
        public PropertyBag() { this.items = new Dictionary<string, object>(); }
    
        public Dictionary<string, object> Items { get { return this.items; } }
    }
    

    Im Prinzip ist das nur ein weggekapseltes Dictionary, in dem wir die Parameter ablegen können. Der Einsatz dieser Klasse gestaltet sich dann auch recht simpel:

    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    
    namespace ConsoleApplication1 {
        class Program {
            /// <summary>
            /// Legt das Exportformat fest
            /// </summary>
            enum ExportFormat { Excel, CSV, HTML };
    
            //Stub das ein Dokument mit Daten repräsentiert
            class Document { }      
    
            //Führt den eigentlichen Export durch
            static void Export(ExportFormat format, string path, Document d) {
                Console.Out.WriteLine("Starting to export to path '{0}' in {1}-format", path, format);
            }
    
            //Wrapper für Export
            static void Export(Object o) {
                PropertyBag bag = o as PropertyBag;
                if (bag == null)
                    throw new ArgumentException("Parameter must be of type PropertyBag");
    
                Export((ExportFormat)bag.Items["format"], (string)bag.Items["path"], (Document)bag.Items["doc"]);
            }
    
            static void Main(string[] args) {
                PropertyBag bag = new PropertyBag();
                bag.Items.Add("format", ExportFormat.HTML);
                bag.Items.Add("path", "export/html/output.html");
                bag.Items.Add("doc", new Document());
    
                Task t = Task.Factory.StartNew(Export, bag);
                t.Wait();       //Damit man die Ausgabe von Export noch sieht
            }
        }
    
        //Hilfsklasse
        class PropertyBag {
            private Dictionary<string, object> items;
    
            public PropertyBag() { this.items = new Dictionary<string, object>(); }
    
            public Dictionary<string, object> Items { get { return this.items; } }
        }
    }
    

    Damit wären die grundlegendsten Funktionen der Task-Klasse besprochen und wie man gesehen hat, stellt sie ein mächtiges und flexibles Konstrukt dar, um unkomplizierte parallel zu programmieren. Im Hinterkopf sollte man behalten, dass auch Tasks intern Threads verwenden, also auch nur mit Wasser kochen, obwohl alles auf einer höheren Abstraktionsebene passiert. Genau dies kommt uns als Programmier aber im Endeffekt zugute.

    4.2 Tasks verknüpfen

    Wer mittels IntelliSense (oder bei einem Blick in die MSDN) einmal die Methoden von der Task-Klasse durchgesehen hat, dem sind vielleicht die ContinueWith bzw. ContinueWith<TResult>-Methoden in's Auge gesprungen. Sie erlauben uns auf elegante Weise, zwei Tasks so miteinander zu verknüpfen, dass Task Nr. 2 ausgeführt wird, wenn Task Nr. 1 fertig ist.

    Leser, die Erfahrung in funktionalen Programmiersprachen haben, wird vermutlich der Begriff Continuation im Kopf rumspringen. Auch in imperativen Sprachen findet sich (mehr oder weniger intuitive) Möglichkeiten für diese Art der Programmierung. C# bildet da keine Ausnahme. Die ContinueWith-Methoden bauen genau auf diesem Ansatz auf.

    Bevor wir aber zum Beispiel kommen, sollen die beiden ContinueWith-Methoden kurz erläutert werden. Die ContinueWith-Methode ist für Operationen zuständig, die keinen Rückgabewert haben, während die ContinueWith<TResult>-Methode für Operationen mit Rückgabewert benutzt wird.
    Wie funktioniert das Verknüpfen jetzt aber genau? Es ist im Grunde ganz einfach: Die ContinueWith/ContinueWith<TResult>-Methode nimmt ein Delegate als Parmeter entgegen (das ist der minimale Fall, natürlich existieren noch Überladungen in diversen Farben und Formen). Dieses Delegate gibt an, welche Operation auszuführen ist, [i]nachdem* die erste Operation beendet wurde.
    Ein weiterer wichtiger Mechanismus ist, dass man das Resultat einer Operation als Eingabe für die folgende Operation verwenden kann. Dies wird dadurch realisiert, dass ContinueWith<TResult> die gerade durchgeführte Operation als Task<TResult> zurückgibt. Wir erinnern uns, Task<TResult> speichert das Ergebnis der Operation in der Result-Eigenschaft

    Die Rückgabe einer Task-Instanz hat den angenehmen Nebeneffekt, dass man an diese Task gleich wieder ein ContinueWith dranhängen kann. Ein etwas umfangreicheres Beispiel soll dies verdeutlichen:

    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    
    namespace ConsoleApplication1 {
        class Program {
            /// <summary>
            /// Eine Dummy-Klasse, die Bestellungen symbolisiert
            /// </summary>
            class Order {
                public int Id {
                    get;
                    set;
                }
    
                public Order(int id) {
                    this.Id = id;
                }
    
                public void PreProcess() {
                    Console.Out.WriteLine("Preprocessing order #{0}", this.Id);
                }
    
                public void Process() {
                    Console.Out.WriteLine("Processing order #{0}", this.Id);
                }
            }
    
            /// <summary>
            /// Erstellt eine Bestellungsliste
            /// </summary>
            static IEnumerable<Order> GetOrders() {
                List<Order> orders = new List<Order>();
                for (int i = 0; i < 100; ++i)
                    orders.Add(new Order(i));
    
                PrintStatusMsg("Assembling orders finished");
                return orders;
            }
    
            /// <summary>
            /// Bereitet eine Bestellungsliste auf
            /// </summary>
            static IEnumerable<Order> PreProcessOrders(IEnumerable<Order> orders) {
                Parallel.ForEach(orders, o => o.PreProcess());
                PrintStatusMsg("Preprocessing orders finished");
                return orders;
            }
    
            /// <summary>
            /// Verarbeitet eine Bestellungsliste
            /// </summary>
            static void ProcessOrders(IEnumerable<Order> orders) {
                Parallel.ForEach(orders, o => o.Process());
                PrintStatusMsg("Processing orders finished");
            }
    
            /// <summary>
            /// Main
            /// </summary>
            static void Main(string[] args) {
                var t1 = Task<IEnumerable<Order>>.Factory.StartNew(GetOrders);
                var t2 = t1.ContinueWith(x => PreProcessOrders(x.Result));
                var t3 = t2.ContinueWith(y => ProcessOrders(y.Result));
    
                t3.Wait();
    
                //Alternative Schreibweise:
                //var t = Task<IEnumerable<Order>>.Factory.StartNew(GetOrders)
                //                                        .ContinueWith(x => PreProcessOrders(x.Result))
                //                                        .ContinueWith(y => ProcessOrders(y.Result));
    
                //t.Wait();
            }
    
            /// <summary>
            /// Hilfsmethode, um Statusmeldungen auszugeben
            /// </summary>
            static void PrintStatusMsg(string msg) {
                string dashes = new string('=', 10);
                Console.Out.WriteLine(dashes);
                Console.Out.WriteLine(msg);
                Console.Out.WriteLine(dashes);
            }
        }
    }
    

    Da ist doch ein bisschen was zusammengekommen, aber der Reihe nach. Die Klasse Order wurde um die Dummy-Methoden PreProcess und Process erweitert. Des Weiteren gibt es nun die neuen Methoden PreProcessOrders und ProcessOrders, welche jeweils eine Bestellungsliste erwarten und diese unter Verwendung von Parallel.ForEach-abarbeiten. Die Methode PrintStatusMsg sollte selbsterklärend sein. Die GetOrders-Funktion liefert im Moment nur 100 Bestellungen, da es ja alles auf der Konsole nachvollziehbar sein soll. Ein Geschwindigkeitszuwachs durch den Aufruf der Parallel.ForEach-Methoden wir bei solch kleinen Datenmengen und Dummy-Methoden allerdings nicht erzielt. Eventuell wird es durch den Overhead, der bei der Threaderstellung- und Verwaltung entsteht, sogar langsamer. Steht man in einem realen Szenario vor dieser Unklarheit, hilft nur Geschwindigkeitsmessung.

    In der Main-Methode passiert der interessante Teil, hier werden die Tasks miteinander verknüpft. Als erstes wird mit t1 per Factory eine Task-Instanz erzeugt, die die GetOrders-Methode ausführt. Bei t2 wenden wir nun die ContinueWith-Methode an, bzw. eigentlich die ContinueWith<IEnumerable<Order>>-Methode, aber die Typinferenz ermöglicht das Weglassen des Typparameters. Der Lambda-Ausdruck nimmt ein x vom Typ Task<IEnumerable<Order>> entgegen und reicht Result (das ist in dem Fall die Bestellungsliste) dann an PreProcessOrders weiter. Dieser Lambda-Ausdruck wird ausgewertet, sobald GetOrders() von t1 ausgeführt wurde. Die Prozedur wiederholt sich für t3, nur dass eben ProcessOrders dann das Ergebnis von PreProcessOrders von t2 erhält. Schließlich warten wir noch darauf, dass t3 abgeschlossen wird, damit wir in den Genuss der Ausgabe kommen.
    Die auskommentierte Variante tut übrigens dasselbe und ist kürzer, aber für Einsteiger nicht ganz so einfach nachzuvollziehen.

    Wenn man sich die Ausgabe dieses kleinen Programms ansieht, kann man sehr gut die Nebenläufigkeit feststellen, da in den Methoden PreProcessOrders und ProcessOrders zwischen den (internen) Threads gewechselt wird.

    Werfen wir noch einen kurzen Blick auf eine wichtige Überladung der ContinueWith-Methoden, nämlich die Methoden, die TaskContinuationOptions als Parameter erwarten. Mit dieser Klasse kann man u.a. festlegen, was für eine Art von Operation die Continuation ist (z.B. eine die sehr lange dauert und daher entsprechend vom TaskScheduler eingeplant werden soll) und unter welchen Umständen die Continuation tatsächlich ausgeführt werden soll. Soll eine Continuation beispielsweise nur ausgeführt werden, wenn die vorherige Operation nicht abgebrochen wurde, dann gibt man NotOnCanceled als Option mit (im nächsten Abschnitt sehen wir ein Beispiel für die Verwendung der TaskContinuationOptions).

    4.3 Exceptionhandling bei Tasks

    Das Exceptionhandling bei Tasks unterscheidet sich nur geringfügig vom normalen Exceptionhandling in sequentiellem Code.
    Sollte eine Task Exception(s) auslösen, werden diese in einer AggregateException gesammelt und dann an den Thread weitergereicht, der die Task erstellt hat. Dort kann die Exception dann wie gewohnt behandelt werden. Beachten sollte man, dass man Wait-Methoden unbedingt in den try-catch-Block einbindet, da diese die AggregateException werfen.
    Abgesehen von try-catch bietet die Task-Klasse über die Exception-Eigenschaft die Möglichkeit, auf die geworfene(n) Exception(s) zuzugreifen. Dieser Zugriff verhindert jedoch, dass die Exception(s) bis zum aufrufenden Thread durchgereicht wird:

    using System;
    using System.Threading.Tasks;
    
    namespace ConsoleApplication1 {
        class Program {
            class CustomException : Exception {
                public CustomException(string msg) : base(msg) { }
            }
    
            static void FailMethod() {
                throw new CustomException("Epic fail");
            }
    
            /// <summary>
            /// Main
            /// </summary>
            static void Main(string[] args) {
                Task t = Task.Factory.StartNew(FailMethod);
    
                //Die Exception wird durchgereicht und in Main gefangen
                try {
                    t.Wait();
                }
                catch (AggregateException ex) {
                    foreach (var ie in ex.InnerExceptions) {
                        if (ie is CustomException)
                            Console.Error.WriteLine("CustomException caught in Main: " + ex.InnerException.Message);
                        else
                            throw;      //Exception ist nicht von uns, also weiterreichen
                    }               
                }
    
                //Hier wird die Exception-Eigenschaft abgerufen und ein Durchreichen der Exception an Main verhindert
                t = Task.Factory.StartNew(FailMethod)
                                .ContinueWith(x => Console.Error.WriteLine(x.Exception.InnerException.Message), TaskContinuationOptions.OnlyOnFaulted);
                t.Wait();
            }
        }
    }
    

    Beim ersten Aufruf von FailMethod fangen wir die Exception in einem try-catch-Block ab. Hierbei hat es sich eingebürgert, dass man nur die Exceptions behandelt, die man auch selbst geworfen hat bzw. auf die man vernünftig reagieren kann. Alles andere sollte nach oben durchgereicht werden (was hier natürlich zu einer unbehandelten Exception mit folgendem Programmabsturz führt).
    Beim zweiten Aufruf von FailMethod wird die Exception nicht durchgereicht, da wir die Exception-Eigenschaft schon vorher in einer Continuation abfragen, die nur dann ausgeführt wird, wenn die vorhergehende Operation tatsächlich eine Exception geworfen hat.

    5 Ausblick

    Dieser Artikel hat sich mit der TPL, insbesondere mit der Parallelisierung von Schleifen und von Methoden, beschäftigt. Im nächsten Teil wird Parallel LINQ vorgestellt, welches dem allseits heißgeliebten LINQ einen ordentlichen Performanceboost beschert.



  • Da sind wir ja jetzt voll parallelisiert, ich poste auch noch die restlichen Themen von der Intel Conf in den nächsten Tagen.

    Noch ein Hinweis zu dem parallel_for:

    Steve Teixeira hat in seinem Vortrag darauf hingewiesen, daß man bei for-Schleifen die Reihenfolge der Schleife auch prüfen sollte: bei vielen Schleifen gibt es die Situation, daß die geleistete Arbeit pro Durchlauf nicht konstant ist, sondern sich ändert. Bei parallelen Schleifen sollte die Schleife so durchlaufen werden, daß die Durchläufe mit der größten Arbeit (soweit möglich) zu Beginn ausgeführt werden, da auf diese Weise der Scheduler die kleineren Stücke später schöner "in die Lücken" der CPUs füllen könne. Dies kann man dadurch erreichen, daß man z.B. den Schleifendurchlauf herumdreht (mit dem größten Element beginnt), oder den Schleifenzähler durch eine Funktion auf einem anderen Counter abbildet und diesen zählen lässt.

    Du könntest vielleicht noch kurz erläutern, warum Partitioner sinnvoll sind und was deren Bedeutung ist.

    Und mir fällt noch ein: die parallele Schleife stellt einige Bedingungen an ihren Schleifenrumpf, u.a. den Verzicht auf gegenseitige Verriegelung und Disjunktheit der Durchläufe. Sollte man als Vorbedingung noch erwähnen.



  • Marc++us schrieb:

    Steve Teixeira hat in seinem Vortrag darauf hingewiesen, daß man bei for-Schleifen die Reihenfolge der Schleife auch prüfen sollte: bei vielen Schleifen gibt es die Situation, daß die geleistete Arbeit pro Durchlauf nicht konstant ist, sondern sich ändert. Bei parallelen Schleifen sollte die Schleife so durchlaufen werden, daß die Durchläufe mit der größten Arbeit (soweit möglich) zu Beginn ausgeführt werden, da auf diese Weise der Scheduler die kleineren Stücke später schöner "in die Lücken" der CPUs füllen könne. Dies kann man dadurch erreichen, daß man z.B. den Schleifendurchlauf herumdreht (mit dem größten Element beginnt), oder den Schleifenzähler durch eine Funktion auf einem anderen Counter abbildet und diesen zählen lässt.

    Das stimmt allerdings. Hast du das auch irgendwie schriftlich, damit ich auf was verlinken kann?

    Die restlichen Anmerkungen bau ich ein, danke 😉



  • Den Beitrag schreibe ich noch bis zur nächsten Veröffentlichung (Intel Conf Teil III). Ich warte noch auf einen Quellcode zum Vortrag.



  • Bitte um inhaltliches Feedback



  • Dieser Thread wurde von Moderator/in GPC aus dem Forum Die Redaktion in das Forum Archiv verschoben.

    Im Zweifelsfall bitte auch folgende Hinweise beachten:
    C/C++ Forum :: FAQ - Sonstiges :: Wohin mit meiner Frage?

    Dieses Posting wurde automatisch erzeugt.


Log in to reply