Problem mit Parallel.ForEach() => Datenverlust



  • Hallo zusammen!

    ich habe ein kleines, aber schwerwiegendes Problem bezüglich Parallel.ForEach() und Datenverlust während der Verarbeitung.

    Ich bin dabei, mit meinem Programm innerhalb eines Threads parallel diverse Dateien zu überprüfen und je nach Ergebnis den entsprechenden Dateipfad in eine List<string> zu speichern.

    Theoretisch gesehen sollte das auch alles soweit funktionieren, allerdings sobald die Anzahl der zu bearbeitenden Dateien größer wird, variieren die Ergebnisse, obwohl sich an den Umständen der Testumgebung (Anzahl und Beschaffenheit der Dateien) nichts geändert hat.

    Soll heißen, dass ich mein Programm starte, klicke auf "ok" und der Prozess beginnt damit, die 1.000 Dateien zu verarbeiten. Nach Abschluss erhalte ich die Nachricht, dass z.B. alle 1.000 Dateien verarbeitet wurden. Klicke ich kurz dannach erneut mögen es diesmal evtl. nur 995 oder auch wieder 1.000 sein. Hier ist kein Muster zu erkennen. Benutze ich statt der Parallel.ForEach die normale foreach-Variante, funktioniert es einwandfrei.

    Da die Parallel.ForEach()-Funktion ja alle verfügbaren Prozessorkerne nutzt gehe ich davon aus, dass es mit der Auslastung dieser Kerne variiert. Hat einer von Euch einen Ansatz oder gar eine Lösung zu meinem Problem?

    Nachfolgend mal der Code zu der verarbeitenden Funktion:

    // Gets a list of all valid SC2 replay files and returns them
            // in a string list. Also returns the number of files in the list.
            public static int GetFileList(string strPath, ref List<string> PathList)
            {
                // Variables
                List<string> TempList = new List<string>();
                string[] fileEntries;
    
                // Clear the path list.
                PathList.Clear();
    
                // Check if the path represents a file...
                if (File.Exists(strPath) == true)
                    TempList.Add(strPath);
                // ...if not then check if it represents a directory.
                else if (Directory.Exists(strPath) == true)
                {
                    // Get a list of all "SC2Replay" files.
                    fileEntries = Directory.GetFiles(strPath, "*.SC2Replay", SearchOption.AllDirectories);
    
                    // Add all the files found to a temporary list.
                    foreach (string strTemp in fileEntries)
                    {
                        TempList.Add(strTemp);
                    }
                }
                // ...if not then return an error code.
                else
                    return -1;
    
                // Now check which of the found files are valid SC2 replay files
                // and add the valid filenames to the return value list.
                List<string> TempParallelList = new List<string>();
                Parallel.ForEach(TempList, strTemp =>
                    {
                        if (CheckFile(strTemp) == true)
                        TempParallelList.Add(strTemp);
                    });
                PathList = TempParallelList;
                TempParallelList = null;
    
                // Exit method
                return PathList.Count;
            }
    

    Vielen Dank schon mal im Voraus!

    Gruß
    Skubidus



  • List<string> ist nicht Threadsicher. Nicht nur, dass die Elementanzahl variieren kann, es können auch Exceptions auftreten. Zum Beispiel wenn zwei Threads gleichzeitig Daten einfügen aber nur durch einen von beiden die Liste für mehr Speicherplatz erweitert wurde.

    Parallel.ForEach übernimmt das Dispatchen/Partitionieren der Arbeitsaufträge auf mehrere Threads im Threadpool. Aber um das korrekt synchronisierte Einsammeln der Ergebnisse musst Du Dich selbst kümmern.

    Es gibt mehrere Möglichkeiten den Bug zu fixen. PLINQ kümmert sich im Vergleich zur Parallel-Klasse auch um das synchronisierte Rückschreiben der Daten. Wenn Du bei Parallel.ForEach bleiben willst könntest Du auf eine Collection in System.Collections.Concurrent umsteigen.

    Mach mal aus
    List<string> TempParallelList = new List<string>();
    folgendes:
    ConcurrentQueue<string> TempParallelQueue = new ConcurrentQueue<string>();

    und im ForEach-Rumpf
    TempParallelQueue.Enqueue(strTemp);

    Wenn Die Reihenfolge keine Rolle spielt und eine Queue passt, wärst Du damit fertig.



  • Du kannst den Zugriff auch über einen Index in Parallel.For synchronisieren.

    Damit bleibst Du bei einer List. Der Nachteil ist, dass die List vorher genügend Speicher reservieren muss. Das rentiert sich also nur, wenn die Arbeitslast in CheckFile liegt.

    List<string> TempList = new List<string>();
    List<string> TempParallelList = new List<string>(new string[TempList.Count]);
    
    Parallel.For(0, TempList.Count, i =>
    	{
    		string strTemp = TempList[i];
    		if(CheckFile(strTemp))
    			TempParallelList[i] = strTemp;
    	});
    

    Edit: TempParallelList ist am Ende zu groß, weil optimistisch Speicher reserviert wird.



  • Und noch in PLINQ.

    List<string> TempParallelList = 
    		  (from strTemp in TempParallelList.AsParallel()
    		  where CheckFile(strTemp)
    		  select strTemp).ToList();
    

    AsParallel erlaubt einiges an Feintuning. Das könnte eine nette Lösung sein, erfordert aber viel rumtesten.



  • Ok. Geht noch besser:

    List<string> TempParallelList =
    	TempList.AsParallel().Aggregate(
    		() => new List<string>()
    		,
    		(localList, strTemp) =>
    		{
    			if (CheckFile(strTemp))
    				localList.Add(strTemp);
    			return localList;
    		}
    		,
    		(mainList, localList) =>
    		{
    			mainList.AddRange(localList);
    			return mainList;
    		}
    		,
    		mainList => mainList
    	);
    

    Hier werden Threadlokale temporäre Listen verwendet die am Ende aggregiert werden. Das heißt teures synchronisieren findet nur am Ende statt während sich die Threads in der Zwischenzeit bei der Hauptarbeit nicht in die Quere kommen.



  • Hallo,

    vielen Dank für deine Antworten und Mühen!
    Ich werde diese Ansätze sobald ich Zeit habe mal testen und dann berichten 😉

    Gruß
    Skubidus



  • Skubidus schrieb:

    vielen Dank für deine Antworten und Mühen!
    Ich werde diese Ansätze sobald ich Zeit habe mal testen und dann berichten 😉

    Gern geschehen. War mal eine der interessanten Fragen.

    Es gibt noch 100 andere Ansätze, die es sich zu testen lohnen würde.
    Nicht, um jetzt Deinen eher kleinen Anwendungsfall zu optimieren, sondern allgemein, um ein besseres Gefühl für PLINQ, die Parallel-Klasse und die TPL zu bekommen. Allein mit dem vorhin erwähnten "Feintuning" von PLINQs AsParallel kann man Stunden und Tage verbringen.
    Das ist alles deutlich abstrakter als die klassischen Thread- und Monitor/lock/WaitHandle/Mutex-Konzepte. Die Details sind verborgen und schwierig zu erfassen.

    Mit C# 5 wird das Konzept asynchroner Methoden in die Sprache aufgenommen. Ich denke echt es lohnt, sich mit den Grundlagen zu befassen, um den Anschluß an die Parallele Entwicklung nicht zu verlieren. 👍



  • Hallo,

    habe es jetzt mit der ersten Deiner vorgeschlagenen Varianten (ConcurrentQueue<string>) gelöst. Habe erst die Geschichte mit dem AsParallel probiert, allerdings nicht hinbekommen, weil da irgendwas mit der List<string> nicht hingehauen hat.

    Diese Lösung ist völlig zufriedenstellend und funktioniert einwandfrei. Ich musste lediglich noch eine foreach-Sektion hinzufügen, mit der die Daten von der Queue in die List kommen.

    // Gets a list of all valid SC2 replay files and returns them
            // in a string list. Also returns the number of files in the list.
            public static int GetFileList(string strPath, ref List<string> PathList)
            {
                // Variables
                List<string> TempList = new List<string>();
                string[] fileEntries;
    
                // Clear the path list.
                PathList.Clear();
    
                // Check if the path represents a file...
                if (File.Exists(strPath) == true)
                    TempList.Add(strPath);
                // ...if not then check if it represents a directory.
                else if (Directory.Exists(strPath) == true)
                {
                    // Get a list of all "SC2Replay" files.
                    fileEntries = Directory.GetFiles(strPath, "*.SC2Replay", SearchOption.AllDirectories);
    
                    // Add all the files found to a temporary list.
                    foreach (string strTemp in fileEntries)
                    {
                        TempList.Add(strTemp);
                    }
                }
                // ...if not then return an error code.
                else
                    return -1;
    
                // Now check which of the found files are valid SC2 replay files
                // and add the valid filenames to the return value list.
                ConcurrentQueue<string> TempParallelQueue = new ConcurrentQueue<string>(); 
                Parallel.ForEach(TempList, strTemp =>
                {
                    if (CheckFile(strTemp) == true)
                        TempParallelQueue.Enqueue(strTemp); 
                });
    
                // Now get all the found and valid files from the "queue" into
                // the output "List<string>".
                foreach (string strTemp in TempParallelQueue)
                {
                    PathList.Add(strTemp);
                }
    
                // Clear up stuff that is not needed anymore.
                TempParallelQueue = null;
    
                // Exit method
                return PathList.Count;
            }
    

    Vielen Dank nochmal für die Hilfe!

    Gruß
    Skubi


Anmelden zum Antworten