Probleme beim Synchorniesieren von Threads
-
(Hoffe bin im Richtigen Bereich..)
Hallo ich habe gerade folgende Situation.
Ich besitze eine beliebiege Anzahl von Threads, welche "paralel" ihre Berechnungen machen und die berechneten Werte in eine gemeinsame Datenbasis Schreiben.
Diese Threads müssen miteinander synchroniesiert werden, da sie untereinander auf die Werte der anderen Threads aus dem letzten Rechendurchgang zurückgreifen.
Daher müssen alle Treads einen Rechendurchgang beendet haben, ihre Daten an die Datenbasis übermittelt haben befor der nächste Rechendurchgang starten darf.Im Moment versuche ich dies mittels EVENTS und einer Statusverwaltenen Klasse zu realiesieren. Betohnung liegt leider auf Versuche....

Denn leider funktioniert das einfach nicht so wie es soll.
Und ich sehe den Wald vor lauter Bäumen nicht mehr......
Im Prinziep suche ich eine Möglichkeit die beliebig vielen Threads
auf ein oder mehrere Events schlafen zu legen und alle gleichzeitig mit einem Signal(en) wieder zu wecken.
Bisher verweisen dafür alle Threads mittels Zeiger auf die eine Statusverwaltene Klasse. Diese zählt die gesammt Anzahl der Threads, sowie jene die sich als Aktiv bei ihr melden und jene die warten auf synch.Weiter hat die VerwalterKlasse zwei Instanz auf die Klasse EVENT und schickt Threads über diese Instanz schlafen wenn bei einer Prüfung der internen Zähler noch nicht alle Threads mit einem Rechnendurchgang fertig sind. Zu welchem Event die Threads geschickt werden ist abhängig vom Aktuellen Rechendurchlauf. Hintergrund es kamm vor das ein Thread länger zum reaktivieren brauchte und das anstehende Signal von einem anderen Thread der eigendlich warten sollte ,weil er den folgenden Rechendurchgang schon beendet hatte, genutzt wurde, daher 2 verschiedene Events
Der letzte Thread der fertig wird sollte dann alle anderen wieder wecken ohne sich selber schlafen zu legen. *tolle Therorie
*
Nur leider bekomme ich es nicht hin alle Threads zu wecken.
D.H. es kommt vor das ein Thread sein Signal einfach Tja überhört... und weiter schläft. Was einen Deadlock zur Folge hat. Dies geschieht sporadisch mal im 10 mal im 100 mal im 1000 Rechendurchlauf...Wenn Jemand einen Ansatz hat wie man das Problem lösen kann Bitte her damit,
wenn ihr noch mehr infos braucht bitte fragen.
Bin für jeden Tipp dankbar, denn solangsam bekomme ich HASS auf diesen Teil meines Programms

Und schon mal danke an jeden der sich das bis hier her durchgelesen hat

Hier die Event Klasse HEADER
#if !defined EVENTS_H #define EVENTS_H #include <windows.h> class TEvent { public: TEvent(int = TRUE); ~TEvent(void); void EventSignal(void); void EventWait(void); void EventReset(void); int EventTest(void); private: HANDLE Event; }; #endifDie Event Klasse CPP
#include "Events.h" TEvent::TEvent(int automatic) { Event = CreateEvent(NULL,!automatic,FALSE,NULL); } TEvent::~TEvent(void) { CloseHandle(Event); } void TEvent::EventSignal(void) { SetEvent(Event); } void TEvent::EventReset(void) { ResetEvent(Event); } void TEvent::EventWait(void) { WaitForSingleObject(Event,INFINITE); } int TEvent::EventTest(void) { return (WaitForSingleObject(Event,0) != WAIT_TIMEOUT); }
-
Hat keiner einen Tipp oder auch nur eine Frage ?
*aaargh* Gruß wastman
-
der code den du gepostet hast stimmt so, also schwer zu sagen was nicht läuft
-
Hmmm
habe jetzt mal alternativ versucht statt alle Threads in 2 Events am ende der Berechnung zu halten, jeden Thread in seinem eigenen Event anzuhalten. Und natürlich dann auch jedem sein eigenes Signal zu schicken.
Hat auch nicht geklappt.
Denke mal der Fehler liegt in meiner ThreadStatus Klasse...
werd die gleich mal im Anschluß posten. Sieht leider Mittlerweile ein bissel
öhm unordentlich aus, da ich da nun schon X Sachen ausprobiert habe ....
Vielleicht findet ihr da einen gedanklichen, form, sonstigen Fehler meinerseits.
Eine 2. Meinung ist da ja manchmal ziemlich hielfreich :pThreadStatus Header
#if !defined THREADSTATUS_H #define THREADSTATUS_H #include "resource.h" #include <iostream.h> #include <windows.h> #include <direct.h> #include "Mutex.h" #include "Events.h" #include "GlobalData.h" #include <vector> #include <iterator> using namespace std; class TThreadStatus { public: TThreadStatus(bool DEBUG =true); void ThreadAlsAktivAnmelden(bool ThreadAnmeldeStatus, int FahrzeugNummer); void ThreadWartetAufSynch(bool ThreadAnmeldeStatus ,int FahrzeugNummer); void ThreadAusloggen(bool ThreadZustand , int FahrzuegNummer); void StatusReset(); bool SynchThreadStart(int FahrzuegNummer); TGlobalData *GlobalData; int FahrzeugeImSystem; int RechenDurchlauf; int ThreadAktiv ; int ThreadWartend ; int ThreadInsgesammt ; bool ErsteThreadAnmeldung; private: bool SynchDebug; void OnlineAuswertung(HWND hwnd); bool FileAuswertung(int FahrzeugNummer); void CreateNewEventGerade(); void CreateNewEventUngerade(); void SignalEvents(); void SetWaitEvent(); vector<TEvent> EventGerade; vector<TEvent> EventUngerade; vector<TEvent>::iterator EventPos; TMutex Mutex; }; #endifThreadStatus CPP
#include "ThreadStatus.h" TThreadStatus::TThreadStatus(bool Debug) // Konstructor { SynchDebug = false; ErsteThreadAnmeldung = false; ThreadInsgesammt =0; ThreadAktiv =0; ThreadWartend =0; RechenDurchlauf =1; TS_Dialog_Offen = false; } void TThreadStatus::CreateNewEventGerade() // Eine neue Instantz auf Event anlegen und im Vector speichern // für die geraden Rechendurchläufe { TEvent NewEvent; EventGerade.push_back(NewEvent); } void TThreadStatus::CreateNewEventUngerade() // Eine neue Instantz auf Event anlegen und im Vector speichern // Für die ungeraden Rechenduchläufe { TEvent NewEvent; EventUngerade.push_back(NewEvent); } void TThreadStatus::SetWaitEvent() // Fahrzeug auf sein Event warten lassen { if((RechenDurchlauf % 2) == 0) { CreateNewEventGerade(); EventPos = EventGerade.end(); } else { CreateNewEventUngerade(); EventPos = EventUngerade.end(); } EventPos--; // auf neu angelegtes Element bringen Mutex.LeaveMutex(); EventPos->EventWait(); } void TThreadStatus::SignalEvents() // Schickt an alle Events das Wecksignal // je nach RechenDurchlauf an Gerade oder Ungerade { int AnzahlEvents = 0; if((RechenDurchlauf % 2) == 0) { AnzahlEvents = EventGerade.size(); EventPos = EventGerade.begin(); } else { AnzahlEvents = EventUngerade.size(); EventPos = EventUngerade.begin(); } for (int i = 0; i < AnzahlEvents; i++) { EventPos->EventSignal(); EventPos++; } if((RechenDurchlauf % 2) == 0) EventGerade.clear(); else EventUngerade.clear(); } void TThreadStatus::ThreadAlsAktivAnmelden(bool ThreadAnmeldeStatus ,int FahrzeugNummer) // Thread Im Status System auf Aktiv setzen { Mutex.EnterMutex(); if(this->ErsteThreadAnmeldung == false) this->ErsteThreadAnmeldung = true; if(ThreadAnmeldeStatus == false) { this->ThreadInsgesammt++; Sleep(100); } //if((ThreadWartend >0) && (ThreadAnmeldeStatus == true)) // this->ThreadWartend--; this->ThreadAktiv++; this->FileAuswertung(FahrzeugNummer); Mutex.LeaveMutex(); } void TThreadStatus::ThreadWartetAufSynch(bool ThreadAnmeldeStatus , int FahrzeugNummer ) // Thread in wartezustand bis zur herstellunfg der Synchonisation versetzen { Mutex.EnterMutex(); if(this->ErsteThreadAnmeldung == false) ErsteThreadAnmeldung = true; if(ThreadAnmeldeStatus == false) { this->ThreadInsgesammt++; Sleep(100); } if((this->ThreadAktiv >0) && (ThreadAnmeldeStatus == true)) ThreadAktiv--; this->ThreadWartend++; this->FileAuswertung(FahrzeugNummer); if(this->SynchThreadStart(FahrzeugNummer) == FALSE) { // Mutex Freigeben in SetWaitEvent Funktion this->SetWaitEvent(); } else Mutex.LeaveMutex(); } bool TThreadStatus::SynchThreadStart(int FahrzeugNummer) // Gibt alle angehaltenen Threads für einen weiteren Berechnungsdurchgang frei { if((this->ThreadInsgesammt == this->ThreadWartend) && (this->ThreadAktiv == 0) && (this->ErsteThreadAnmeldung==true) && (this->ThreadInsgesammt == this->FahrzeugeImSystem)) { // Tauschen der Akt. liste mit der Use Liste GlobalData->GDListenTauschen(); this->SignalEvents(); this->RechenDurchlauf++; this->ThreadWartend = 0; this->FileAuswertung(FahrzeugNummer); return TRUE; } return FALSE; } void TThreadStatus::StatusReset() // Status in NULL Zustand zurückversetzen { Mutex.EnterMutex(); ThreadInsgesammt =0; ThreadAktiv =0; ThreadWartend =0; Mutex.LeaveMutex(); } void TThreadStatus::ThreadAusloggen(bool ThreadZustand ,int FahrzeugNummer) // Abmelden eines Threads aus dem Status { // was ist mit thread die terminiert werden ???? hmmm Mutex.EnterMutex(); ThreadInsgesammt--; if(this->ThreadInsgesammt == 0) this->ErsteThreadAnmeldung = false; if(ThreadZustand == true) if(this->ThreadAktiv > 0) this->ThreadAktiv--; else if(this->ThreadWartend > 0 ) this->ThreadWartend--; this->FileAuswertung(FahrzeugNummer); Mutex.LeaveMutex(); } bool TThreadStatus::FileAuswertung(int FahrzeugNummer) { FILE *pf; string sDateipfad = "\\TS.txt"; const char * cDateipfad; string Pfad; int curdrive; char path[600]; curdrive = _getdrive(); _getdcwd( curdrive, path, 600 ); string Programmpfad = path; Pfad += Programmpfad; Pfad += sDateipfad; cDateipfad = Pfad.c_str(); pf = fopen( cDateipfad, "a"); if (! pf) return FALSE; fprintf(pf,"===================FzN:%d\n",FahrzeugNummer); fprintf(pf,"Aktiv: %d\n", this->ThreadAktiv); fprintf(pf,"Wartend: %d\n",this->ThreadWartend); fprintf(pf,"Insgesammt: %d\n",this->ThreadInsgesammt); fprintf(pf,"SWABs: %d\n", this->RechenDurchlauf-1); fclose(pf); return TRUE; }Mutex Header
// stellt eine Klasse zur Behandlung von Mutexen zur Verfügung #if !defined MUTEX_H #define MUTEX_H #include <windows.h> class TMutex { public: TMutex(void); virtual ~TMutex(void); virtual void EnterMutex(void); virtual int EnterMutexWithTimeout(void); virtual void LeaveMutex(void); private: HANDLE Mutex; }; #endifMutex Cpp
#include <windows.h> #include "Mutex.h" TMutex::TMutex(void) { Mutex = CreateMutex(NULL, FALSE, NULL); } TMutex::~TMutex(void) { CloseHandle(Mutex); } void TMutex::EnterMutex(void) { WaitForSingleObject(Mutex,INFINITE); } int TMutex::EnterMutexWithTimeout(void) { return (WaitForSingleObject(Mutex,0) != WAIT_TIMEOUT); } void TMutex::LeaveMutex(void) { ReleaseMutex(Mutex); }Hoffe mal irgend jemand sieht meinen/einen Fehler .....

-
TEvent NewEvent; EventGerade.push_back(NewEvent);Sowas geht nicht. Nach dem Ende der Funktion wird der Destruktor von TEvent aufgerufen und du hast in deinem vector ein Event mit geschlossenem Handle.
Ob das was mit deinem Problem zu tun hat? Keine Ahnung.

-
Hmmm könnte vielleicht was dran sein werde mir das gleich nochmal genauer ansehen.
Auf der anderen Seite muß ich sagen das das trotz dessen der Status eine gewisse Zeit mit geschlossenen Handle recht gut läuft. Nur irgendwann läuft dann was schief.
Ich Poste mal einen entsprechenden Auszug aus einer Logdatei.Auszug wo es Klappt
===================FzN:1 Aktiv: 1 Wartend: 0 Insgesammt: 1 SWABs: 0 ===================FzN:2 Aktiv: 2 Wartend: 0 Insgesammt: 2 SWABs: 0 ===================FzN:1 Aktiv: 1 Wartend: 1 Insgesammt: 2 SWABs: 0 ===================FzN:2 Aktiv: 0 Wartend: 2 Insgesammt: 2 SWABs: 0 ===================FzN:2 Aktiv: 0 Wartend: 0 Insgesammt: 2 SWABs: 1 ===================FzN:2 Aktiv: 1 Wartend: 0 Insgesammt: 2 SWABs: 1 ===================FzN:2 Aktiv: 0 Wartend: 1 Insgesammt: 2 SWABs: 1 ===================FzN:1 Aktiv: 1 Wartend: 1 Insgesammt: 2 SWABs: 1 ===================FzN:1 Aktiv: 0 Wartend: 2 Insgesammt: 2 SWABs: 1 ===================FzN:1 Aktiv: 0 Wartend: 0 Insgesammt: 2 SWABs: 2 .....SWABs heißen eigendlich SWAP(), weil da Listen getauscht werden, und stellen im übertragenen Sinne die RechenDurchlaufNummer da.
Fehler kann sporadisch bei jedem Durchgang passieren
... ===================FzN:1 Aktiv: 0 Wartend: 2 Insgesammt: 2 SWABs: 284 ===================FzN:1 Aktiv: 0 Wartend: 0 Insgesammt: 2 SWABs: 285 ===================FzN:2 Aktiv: 1 Wartend: 0 Insgesammt: 2 SWABs: 285 ===================FzN:2 Aktiv: 0 Wartend: 1 Insgesammt: 2 SWABs: 285 ===================FzN:1 Aktiv: 1 Wartend: 1 Insgesammt: 2 SWABs: 285 [b]===================FzN:2 // Fehlerhaftes Verhalten ab hier Aktiv: 2 Wartend: 1 Insgesammt: 2 SWABs: 285[/b] ===================FzN:1 Aktiv: 1 Wartend: 2 Insgesammt: 2 SWABs: 285 ===================FzN:2 Aktiv: 0 Wartend: 3 Insgesammt: 2 SWABs: 285 ===================FzN:1 Aktiv: 1 Wartend: 3 Insgesammt: 2 SWABs: 285 ===================FzN:1 Aktiv: 0 Wartend: 4 Insgesammt: 2 SWABs: 285 ===================FzN:2 Aktiv: 1 Wartend: 4 Insgesammt: 2 SWABs: 285 ===================FzN:2 Aktiv: 0 Wartend: 5 Insgesammt: 2 SWABs: 285 ...Der gleiche Fehler tritt auch bei mehr als 2 Threads auf, Getestet mit 3 und 4 Threads.

-
So der Tipp mit dem geschlossenen Handle war schon mal ganz gut *Danke*

Nur Leider haben ich jetzt gleich das nächste Problem. Und zwar ein Preformance Problem. Es kann nicht sein das ich bei 5 Threads und 1000 RechenDurchläufen 5 Min und länger warten muß. Somal die Threads noch keine Arbeit verrichten außer An und Abmelden

Daher meine Vermutung , das die Threads ewig brauchen um auf die Signale zu reagieren.
Da eine fern Diagnose schwierig ist habe ich mal eine zusammgegefasste Testversion geschrieben ( mit MVS 6.0 Autoren Version).
Kann sich jeder der möchte gerne mal rein/runter ziehen. Bin immer noch für jeden Tip dankbar.
-
Soooooo
für alle die das hier vielleicht verfolgen oder über die Suche mit einem ähnlichen Problem hier landen, ich hab es für mich zumindest gelößt.
Und das beste ist mit recht guter Preformance.
Super Simpel.
Und ganz ohne Events... (<-- Blöden S#+#!"# Dinger da....)Hätte ich mir echt früher überlegen sollen, hätte mir eine menge Zeit und
Frust ersparrt. Na Ja hinterher ist man immer schlauer.
Ist dennoch schon extrem, wie langsam das Programm mit den Event wurde
ThreadStatus Header
#if !defined THREADSTATUS_H #define THREADSTATUS_H #include "resource.h" #include <iostream.h> #include <windows.h> #include <direct.h> #include "Mutex.h" #include "GlobalData.h" using namespace std; class TThreadStatus { public: TThreadStatus(bool DEBUG =true); void ThreadAlsAktivAnmelden(bool ThreadAnmeldeStatus, int FahrzeugNummer); void ThreadWartetAufSynch(bool ThreadAnmeldeStatus ,int FahrzeugNummer); void ThreadAusloggen(bool ThreadZustand , int FahrzuegNummer); void StatusReset(); bool SynchThreadStart(int FahrzuegNummer); TGlobalData *GlobalData; int FahrzeugeImSystem; int RechenDurchlauf; int ThreadAktiv ; int ThreadWartend ; int ThreadInsgesammt ; bool ErsteThreadAnmeldung; bool SperreGeradeDurchlaufe; bool SperreUngeradeDurchlaufe; HWND TS_Dialog; bool TS_Dialog_Offen; TMutex Mutex; private: bool SynchDebug; void OnlineAuswertung(HWND hwnd); bool FileAuswertung(int FahrzeugNummer);w TMutex TestMutex; }; #endifThreadStatus CPP
#include "ThreadStatus.h" TThreadStatus::TThreadStatus(bool Debug) // Konstruktor { this->SynchDebug = true; this->ErsteThreadAnmeldung = false; this->ThreadInsgesammt =0; this->ThreadAktiv =0; this->ThreadWartend =0; this->RechenDurchlauf =1; this->SperreGeradeDurchlaufe = true; this->SperreUngeradeDurchlaufe = false; } void TThreadStatus::ThreadAlsAktivAnmelden(bool ThreadAnmeldeStatus ,int FahrzeugNummer) // Thread Im Status System auf Aktiv setzen { Mutex.EnterMutex(); if(this->ErsteThreadAnmeldung == false) this->ErsteThreadAnmeldung = true; if(ThreadAnmeldeStatus == false) { this->ThreadInsgesammt++; Sleep(100); } //if((ThreadWartend >0) && (ThreadAnmeldeStatus == true)) // this->ThreadWartend--; this->ThreadAktiv++; this->FileAuswertung(FahrzeugNummer); Mutex.LeaveMutex(); } void TThreadStatus::ThreadWartetAufSynch(bool ThreadAnmeldeStatus , int FahrzeugNummer ) // Thread in wartezustand bis zur herstellunfg der Synchonisation versetzen { Mutex.EnterMutex(); if(this->ErsteThreadAnmeldung == false) ErsteThreadAnmeldung = true; if(ThreadAnmeldeStatus == false) { this->ThreadInsgesammt++; Sleep(100); } if((this->ThreadAktiv >0) && (ThreadAnmeldeStatus == true)) ThreadAktiv--; this->ThreadWartend++; this->FileAuswertung(FahrzeugNummer); if(this->SynchThreadStart(FahrzeugNummer) == FALSE) { bool test; if(this->RechenDurchlauf % 2 == 0) { TestMutex.EnterMutex(); test = this->SperreGeradeDurchlaufe; TestMutex.LeaveMutex(); Mutex.LeaveMutex(); while(test != true) { TestMutex.EnterMutex(); test = this->SperreGeradeDurchlaufe; TestMutex.LeaveMutex(); Sleep(1); } } else { TestMutex.EnterMutex(); test = this->SperreUngeradeDurchlaufe; TestMutex.LeaveMutex(); Mutex.LeaveMutex(); while(test != true) { TestMutex.EnterMutex(); test = this->SperreUngeradeDurchlaufe; TestMutex.LeaveMutex(); Sleep(1); } } } else Mutex.LeaveMutex(); } bool TThreadStatus::SynchThreadStart(int FahrzeugNummer) // Gibt alle angehaltenen Threads für einen weiteren Berechnungsdurchgang frei { if((this->ThreadInsgesammt == this->ThreadWartend) && (this->ThreadAktiv == 0) && (this->ErsteThreadAnmeldung==true) && (this->ThreadInsgesammt == this->FahrzeugeImSystem)) { // Tauschen der Akt. liste mit der Use Liste GlobalData->GDListenTauschen(); TestMutex.EnterMutex(); if(this->RechenDurchlauf % 2 == 0) // Sicherstellen das die Akt. Sperre erst freigegeben wird wenn die andere wieder gesetzt ist { this->SperreUngeradeDurchlaufe = false; this->SperreGeradeDurchlaufe = true; } else { this->SperreGeradeDurchlaufe = false; this->SperreUngeradeDurchlaufe = true; } TestMutex.LeaveMutex(); this->RechenDurchlauf++; this->ThreadWartend = 0; this->FileAuswertung(FahrzeugNummer); return TRUE; } return FALSE; } void TThreadStatus::StatusReset() // Status in NULL Zustand zurückversetzen { Mutex.EnterMutex(); ThreadInsgesammt =0; ThreadAktiv =0; ThreadWartend =0; RechenDurchlauf = 1; TestMutex.EnterMutex(); this->SperreGeradeDurchlaufe = true; this->SperreUngeradeDurchlaufe = false; TestMutex.LeaveMutex(); Mutex.LeaveMutex(); } void TThreadStatus::ThreadAusloggen(bool ThreadZustand ,int FahrzeugNummer) // Abmelden eines Threads aus dem Status { // was ist mit thread die terminiert werden ???? hmmm noch nicht beachtet Mutex.EnterMutex(); ThreadInsgesammt--; if(this->ThreadInsgesammt == 0) this->ErsteThreadAnmeldung = false; if(ThreadZustand == true) if(this->ThreadAktiv > 0) this->ThreadAktiv--; else if(this->ThreadWartend > 0 ) this->ThreadWartend--; this->FileAuswertung(FahrzeugNummer); Mutex.LeaveMutex(); } bool TThreadStatus::FileAuswertung(int FahrzeugNummer) { if(this->SynchDebug == true) { FILE *pf; string sDateipfad = "\\TS.txt"; const char * cDateipfad; string Pfad; int curdrive; char path[600]; curdrive = _getdrive(); _getdcwd( curdrive, path, 600 ); string Programmpfad = path; Pfad += Programmpfad; Pfad += sDateipfad; cDateipfad = Pfad.c_str(); pf = fopen( cDateipfad, "a"); if (! pf) return FALSE; fprintf(pf,"===================FzN:%d\n",FahrzeugNummer); fprintf(pf,"Aktiv: %d\n", this->ThreadAktiv); fprintf(pf,"Wartend: %d\n",this->ThreadWartend); fprintf(pf,"Insgesammt: %d\n",this->ThreadInsgesammt); fprintf(pf,"SWABs: %d\n", this->RechenDurchlauf-1); fclose(pf); return TRUE; } return FALSE; }OK zugegeben meine Rechtschreibung ist manchmal (öhm) komisch, einfach ignorrieren

So nochmal Danke an alle die sich das Problem angeschaut haben und vielleicht den ein oder anderen Gedanken dran mit verschwendet haben.