beste game of life methode



  • hey!

    Ich schreibe derzeit ein game of life spiel. Leider dauert jede generation ab ca. 2000 x 2000 zwei secs auf meinem lappi.

    Mein vorgehen ist bisher, dass ich eine klasse "Zelle" habe, die eine reference auf ein coord-system hat, das alle zellen beinhaltet und die eigenen coords x und y. Bei jeder generation gehe ich also jede zelle in dem coord-system durch und berechne für die dann, ob sie in der nexten generation lebt. Dazu überprüfe ich erst, ob sie am rand liegt, dann wird nämlich auf der anderen seite weitergemacht.
    Das ist aber irgendwie sehr langsam.

    if(x == 0)												// left
    	{
    		if(y == 0)										// left + top
    		{
    			if(coordsystem.get()[0][1].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[1][0].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[1][1].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[maxX][0].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX][1].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[maxX][maxY].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[0][maxY].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[1][maxY].lebt)
    				++lebendeNachbarn;
    		}
    		else if(y == maxY)							// left + bottom
    		{
    			if(coordsystem.get()[0][maxY - 1].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[1][maxY].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[1][maxY - 1].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[maxX][maxY].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX][maxY - 1].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[maxX][0].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[0][0].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[1][0].lebt)
    				++lebendeNachbarn;
    		}
    		else										// left + mid
    		{
    			if(coordsystem.get()[0][y - 1].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[0][y + 1].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[1][y].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[1][y - 1].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[1][y + 1].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[maxX][y].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX][y - 1].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX][y + 1].lebt)
    				++lebendeNachbarn;
    		}
    
    	}
    	else if(x == maxX)										// right
    	{
    		if(y == 0)									// right + top
    		{
    			if(coordsystem.get()[maxX][1].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX - 1][0].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX - 1][1].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[0][0].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[0][1].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[maxX][maxY].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[0][maxY].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[1][maxY].lebt)
    				++lebendeNachbarn;
    		}
    		else if(y == maxY)					// right + bottom
    		{
    			if(coordsystem.get()[maxX][maxY - 1].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX - 1][maxY].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX - 1][maxY - 1].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[0][maxY].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[0][maxY - 1].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[0][0].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[maxX][0].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX - 1][0].lebt)
    				++lebendeNachbarn;
    		}
    		else								// right + mid
    		{
    			if(coordsystem.get()[maxX][y - 1].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX][y + 1].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX - 1][y].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX - 1][y - 1].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[maxX - 1][y + 1].lebt)
    				++lebendeNachbarn;
    
    			if(coordsystem.get()[0][y].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[0][y - 1].lebt)
    				++lebendeNachbarn;
    			if(coordsystem.get()[0][y + 1].lebt)
    				++lebendeNachbarn;
    		}
    
    	}
    	else if(y == 0)								// mid + top
    	{
    		if(coordsystem.get()[x][1].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x - 1][0].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x - 1][1].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x + 1][0].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x + 1][1].lebt)
    			++lebendeNachbarn;
    
    		if(coordsystem.get()[x][maxY].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x - 1][maxY].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x + 1][maxY].lebt)
    			++lebendeNachbarn;
    	}
    	else if(y == maxY)							// mid + bottom
    	{
    		if(coordsystem.get()[x][maxY - 1].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x - 1][maxY].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x - 1][maxY - 1].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x + 1][maxY].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x + 1][maxY - 1].lebt)
    			++lebendeNachbarn;
    
    		if(coordsystem.get()[x][0].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x - 1][0].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x + 1][0].lebt)
    			++lebendeNachbarn;
    	}
    	else										// mid + mid
    	{
    		if(coordsystem.get()[x][y - 1].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x][y + 1].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x - 1][y].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x - 1][y - 1].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x - 1][y + 1].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x + 1][y].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x + 1][y - 1].lebt)
    			++lebendeNachbarn;
    		if(coordsystem.get()[x + 1][y + 1].lebt)
    			++lebendeNachbarn;
    	}
    
    	if(lebendeNachbarn > 3)					// more than 3 = dead
    	{
    		coordsystem.get()[x][y].naechsterStatus = false;
    	}
    	else if(lebendeNachbarn < 2)				// less than 2 = dead
    	{
    		coordsystem.get()[x][y].naechsterStatus = false;
    	}
    	else if(lebendeNachbarn == 3)				// 3 = lebt
    	{
    		coordsystem.get()[x][y].naechsterStatus = true;
    	}
    	else if(lebendeNachbarn == 2 && coordsystem.get()[x][y].lebt)	// if lebt, 2 = lebt
    	{
    		coordsystem.get()[x][y].naechsterStatus = true;
    	}
    	else										// if not lebt, 2 = dead
    	{
    		coordsystem.get()[x][y].naechsterStatus = false;
    	}
    

    (Das get() kommt von einem reference_wrapper, den ich für den copy-ctor brauche, um die Zellen in einen vector zu packen.)
    Das ist viel code für wenig wirkung. Wie geht das deshalb besser?
    Außerdem habe ich noch ein problem: wenn ich statt coordsystem.get()[x][y].naechsterStatus = true; this->naechsterStatus = true; verwende, dann wird das nicht im coordsystem verändert.
    Warum nicht? Das müsste doch eigentlich die gleiche Zelle sein.

    void Zelle::calcNextGeneration(std::vector< std::vector<Zelle> >& coordsystem)
    {
    	std::vector< std::thread*> threads;
    	for(auto& a : coordsystem)
    	{
    		for(auto& b : a)
    		{
    			threads.push_back(new std::thread(&Zelle::nextGenFuerZelle, b));
    		}
    	}
    
    	for(auto& a : threads)
    	{
    		a->join();
    		delete a;
    	}
    }
    

    So wird das nämlich aufgerufen.

    Ich wäre echt dankbar für jede hilfe!!



  • 1. deine gesamten Fallunterscheidungen sind überflüssig, wenn du einfach die vorherige Zeile und die nächste Zeile (bzw. Spalte) berechnest.

    Das geht entweder mit modulo, also z.B.

    auto prevRow = (row + nRows - 1) % nRows;
    auto nextRow = (row + 1) % nRows;
    

    oder indem du dir eine Funktion machst:

    size_t prevModN(size_t current, size_t N) {
      if (current == 0) return N-1;
      return current - 1;
    }
    

    Müsste man mal profilen, was schneller ist. Modulo kann langsam sein, könnte aber sein, dass das gut optimiert wird.

    Der zweite Punkt: wie viele CPU-Kerne hast du, wie viele Threads machst du? 2000x2000 Threads? Wirklich? Das stört sich nur gegenseitig. Warum rohes "new" und "delete"? Direkt einen vector<thread> nehmen?

    Und irgendwie scheint mir das alles etwas komisch. Warum weiß die Zelle noch, wo sie ist? Die Matrix heißt "coordsystem"? Wie sieht bei die Definition von Zelle aus? Ich hätte jetzt eher sowas gemacht wie die Anzahl der Zeilen durch NCores genommen und dann sowiele Threads gestartet, die jeweils für die entsprechenden Zeilen die Berechnung durchführen.



  • Hallo

    Paar Kommentare:
    1. Du erstellst für jede Zelle einen Thread (zudem noch mit new ?!). Insgesamt sind das 4 Mio. Threads, also 'ne ganze Menge Syscalls. Normalerweise willst du nur ungefähr std::thread::hardware_concurrency() (8 auf modernen Quadcores) Threads erstellen. Damit diese nicht dieselbe Arbeit mehrmals machen, würde ich in deinem Falle einfach eine std::atomic_size_t nehmen, wo sich jeder Thread dann selbstständig neue Arbeit nehmen kann. Meiner Erfahrung nach ist es bei so kurzen Tasks eine markante Verbesserung, wenn sich jeder Thread so etwa 25 Tasks nimmt mit .fetch_add() ; müsstest aber selbst nachmessen.
    2. All deine Zeilen mit if(coordsystem.get()[][].lebt) ++lebendeNachbarn; kannst du verkürzen zu lebendeNachbarn += coordsystem.get()[][].lebt; . (Keine Performance-Verbesserung aber deutlich lesbarer)
    3. Statt diesem komplizierten if-Gefrickel könntest du dein 2D-Array in alle Richtungen um 1 Feld vergrö­ßern, und false in .lebt der Ränder schreiben. Dann ist der gesamte Code von Zeile 1-179 hinfällig.
    4. Du hast scheinbar nur ein coordsystem , aber jede Zelle hat sowohl .lebt als auch .naechsterStatus . Ich nehme an, du wirst später dann alle .naechsterStatus in .lebt schreiben. Stattdessen könntest du zwei separate coordsystem s speichern, eins für den aktuellen und eins für den nächsten Zustand. Um das dann zu updaten, müsstest du nur die Pointer auf die beiden Arrays swappen (ähnlich wie bei double buffering).

    LG



  • Fytch schrieb:

    3. Statt diesem komplizierten if-Gefrickel könntest du dein 2D-Array in alle Richtungen um 1 Feld vergrö­ßern, und false in .lebt der Ränder schreiben. Dann ist der gesamte Code von Zeile 1-179 hinfällig.

    Nein, denn hier soll ja wie in einem Restkörper gearbeitet werden, d.h. vor Zeile 0 ist nicht eine Zeile mit toten Zellen, sondern die letzte Zeile.



  • Und irgendwie scheint mir das alles etwas komisch. Warum weiß die Zelle noch, wo sie ist? Die Matrix heißt "coordsystem"? Wie sieht bei die Definition von Zelle aus? Ich hätte jetzt eher sowas gemacht wie die Anzahl der Zeilen durch NCores genommen und dann sowiele Threads gestartet, die jeweils für die entsprechenden Zeilen die Berechnung durchführen.

    #ifndef Zelle_h
    #define Zelle_h
    
    #include <vector>
    #include <functional>		// reference_wrapper
    
    class Zelle
    {
    	bool lebt;
    	bool naechsterStatus;
    
    	unsigned int generation;
    
    	void naechsterStatusFuerZelle();
    
    	std::reference_wrapper< std::vector< std::vector<Zelle> > > coordsystem;
    
    	unsigned int x;				// x-coord in coordsystem
    	unsigned int y;				// y-coord in coordsystem
    
    	unsigned int maxX;			// = coordsystem.size() - 1
    	unsigned int maxY;			// = coordsystem[x].size() - 1
    
    public:
    	Zelle(std::vector< std::vector<Zelle> >& coordsystem, const bool& lebt, const int& x, const int& y);
    	Zelle(std::vector< std::vector<Zelle> >& coordsystem, const bool& lebt, const int& x, const int& y, const int& MAX_X, const int& MAX_Y);
    
    	void toeten();
    
    	static std::vector< std::vector<Zelle> >& naechsterStatusGen(std::vector< std::vector<Zelle> >& coordsystem);
    
    	static void calcNextGeneration(std::vector< std::vector<Zelle> >& coordsystem);
    
    	inline const bool& esLebt() const { return this->lebt; }
    	inline const bool& wirdLeben() const { return this->naechsterStatus; }
    	inline const unsigned int& getGeneration() const { return this->generation; }
    
    	inline const unsigned int& getX() const { return x; }
    	inline const unsigned int& getY() const { return y; }
    
    	inline void setLebt(const bool& newState) { this->lebt = newState; naechsterStatus = lebt; }
    	inline void setGeneration(const int& newGen) { this->generation = newGen; }
    };
    
    #endif
    

    Das mit den threads war nur ein experiment. Ich habe noch nicht so viel mit threads gemacht, wie man vielleicht sieht.
    Das hier ist die klasse. Befuellt wird das coordsystem über zwei verschachtelte for()-schleifen. Deswegen auch der zweite ctor, weil da die Endgröße des vectors noch nicht feststeht. Ich habe es mal nachgemessen: ohne die threads ist es ca. 10 mal schneller. Ich habe zwei physische und vier virtuelle kerne.

    1. Du erstellst für jede Zelle einen Thread (zudem noch mit new?!). Insgesamt sind das 4 Mio. Threads, also 'ne ganze Menge Syscalls. Normalerweise willst du nur ungefähr std::thread::hardware_concurrency() (8 auf modernen Quadcores) Threads erstellen. Damit diese nicht dieselbe Arbeit mehrmals machen, würde ich in deinem Falle einfach eine std::atomic_size_t nehmen, wo sich jeder Thread dann selbstständig neue Arbeit nehmen kann. Meiner Erfahrung nach ist es bei so kurzen Tasks eine markante Verbesserung, wenn sich jeder Thread so etwa 25 Tasks nimmt mit .fetch_add(); müsstest aber selbst nachmessen.

    Da muss ich mich noch reinlesen. Oder hast du ein kleines beispiel, wie man dieses fetch_add() hier nutzen könnte?

    2. All deine Zeilen mit if(coordsystem.get()[][].lebt) ++lebendeNachbarn; kannst du verkürzen zu lebendeNachbarn += coordsystem.get()[][].lebt;. (Keine Performance-Verbesserung aber deutlich lesbarer)

    Keine schlechte idee, danke für den vorschlag.

    3. Statt diesem komplizierten if-Gefrickel könntest du dein 2D-Array in alle Richtungen um 1 Feld vergrö­ßern, und false in .lebt der Ränder schreiben. Dann ist der gesamte Code von Zeile 1-179 hinfällig.

    Das verstehe ich nicht. Scheint laut wob aber ja anscheinend nicht zu funktionieren.

    4. Du hast scheinbar nur ein coordsystem, aber jede Zelle hat sowohl .lebt als auch .naechsterStatus. Ich nehme an, du wirst später dann alle .naechsterStatus in .lebt schreiben. Stattdessen könntest du zwei separate coordsystems speichern, eins für den aktuellen und eins für den nächsten Zustand. Um das dann zu updaten, müsstest du nur die Pointer auf die beiden Arrays swappen (ähnlich wie bei double buffering).

    In dem coordsystem sind aber ja nicht die zustände sondern die Zellen gespeichert. Die werden nämlich auch noch in anderen klassen benötigt.
    Wobei eigentlich tatsächlich überflüßige infos dort gespeichert werden, z.B. die generation ist ja bei allen Zellen gleich. Muss ich noch etwas überlegen.

    1. deine gesamten Fallunterscheidungen sind überflüssig, wenn du einfach die vorherige Zeile und die nächste Zeile (bzw. Spalte) berechnest.

    Das sollte aber doch deutlich langsamer sein, oder nicht?
    Schließlich habe ich mir bereits die mühe gemacht und das alles ausgeschrieben.



  • wob schrieb:

    Nein, denn hier soll ja wie in einem Restkörper gearbeitet werden, d.h. vor Zeile 0 ist nicht eine Zeile mit toten Zellen, sondern die letzte Zeile.

    Stimmt, hab ich nicht realisiert. Wenn er sich auf Zweierpotenzen als Dimensionen beschränkt, kann er sich das Modulo sparen und Bit-Und verwenden; das dürfte am schnellsten sein.



  • Nein, denn hier soll ja wie in einem Restkörper gearbeitet werden, d.h. vor Zeile 0 ist nicht eine Zeile mit toten Zellen, sondern die letzte Zeile.
    Stimmt, hab ich nicht realisiert. Wenn er sich auf Zweierpotenzen als Dimensionen beschränkt, kann er sich das Modulo sparen und Bit-Und verwenden; das dürfte am schnellsten sein.

    Leider sind alle kombinationen möglich. Es muss nich einmal quadratisch sein.

    Was würdet ihr sonst von einer steuerklasse für die Zellen halten und einem struct, das die Zelle darstellt, also lebt und naechsterStatus beinhaltet? Die frage ist nur, wie ich dann generation speichere. Mir ist nur aufgefallen, dass z.B. maxX, maxY und generation eigentlich die bei allen Zellen in einem coordsystem gleich sind und ich die nur ändere, wenn ich das coordsystem neu befülle.
    Ich muss wohl noch ein wenig nachdenken.



  • 2. All deine Zeilen mit if(coordsystem.get()[][].lebt) ++lebendeNachbarn; kannst du verkürzen zu lebendeNachbarn += coordsystem.get()[][].lebt;. (Keine Performance-Verbesserung aber deutlich lesbarer)

    Ich habe mehrmals nachgemessen: Die methode ohne if() ist um ca. 0.3 sekunden langsamer auf 5000 generationen in einem 10x10 field.
    Ich verstehe aber irgendwie nicht, warum.
    Ich muss noch ein bisschen herumprobieren.



  • Hallo,

    Ich hab das Programm mal schnell nachimplementiert, wobei ich die Hinweise meines letzten Posts alle berücksichtigt habe. Es ist immer noch nicht optimal und hat Potential (z.B. werden für jede Generation stets neue Threads erstellt), aber ist schon so Größenordnungen schneller als die ursprünglichen 2s/Gen.

    Messungen:

    10x  10: 0.166884 ms
     512x 512: 0.467108 ms
    2000x2000: 5.27295  ms
    2048x2048: 5.58719  ms
    

    Code:

    #include <vector>
    #include <cstddef>
    #include <cassert>
    #include <utility>
    #include <algorithm>
    #include <thread>
    #include <atomic>
    #include <string>
    #include <iostream>
    #include <exception>
    #include <random>
    #include <chrono>
    
    template< typename T >
    struct toroidal_mat {
    	std::size_t width = 0, height = 0;
    	std::vector< T > vec;
    
    	toroidal_mat() noexcept = default;
    	toroidal_mat( std::size_t width, std::size_t height )
    	: width{ width }, height{ height }, vec( size() )
    	{
    		assert( ( width == 0 ) == ( height == 0 ) );
    	}
    	toroidal_mat( std::size_t width, std::size_t height, T const& value )
    	: width{ width }, height{ height }, vec( size(), value )
    	{
    		assert( ( width == 0 ) == ( height == 0 ) );
    	}
    
    	std::size_t size() const noexcept
    	{
    		return width * height;
    	}
    
    	T const& operator()( int x, int y ) const noexcept
    	{
    		if( x < 0 )
    			x = width - ( ( -x ) % width );
    		else if( x >= width )
    			x %= width;
    		if( y < 0 )
    			y = height - ( ( -y ) % height );
    		else if( y >= height )
    			y %= height;
    		return vec[ x + width * y ];
    	}
    	T& operator()( int x, int y ) noexcept
    	{
    		return const_cast< T& >( static_cast< toroidal_mat const& >( *this )( x, y ) );
    	}
    
    	void swap( toroidal_mat& rhs ) noexcept
    	{
    		using std::swap;
    		swap( width, rhs.width );
    		swap( height, rhs.height );
    		swap( vec, rhs.vec );
    	}
    };
    
    using cell_t = char;
    using mat_t = toroidal_mat< cell_t >;
    
    void compute_cell( mat_t const& current_gen, mat_t& next_gen, int x, int y )
    {
    	assert( current_gen.width == next_gen.width );
    	assert( current_gen.height == next_gen.height );
    	assert( x < current_gen.width );
    	assert( y < current_gen.height );
    
    	int n = 0;
    	n += current_gen( x - 1, y - 1 );
    	n += current_gen( x    , y - 1 );
    	n += current_gen( x + 1, y - 1 );
    	n += current_gen( x - 1, y     );
    	n += current_gen( x + 1, y     );
    	n += current_gen( x - 1, y + 1 );
    	n += current_gen( x    , y + 1 );
    	n += current_gen( x + 1, y + 1 );
    	next_gen( x, y ) = ( n == 3 ) | ( ( n == 2 ) & current_gen( x, y ) );
    }
    
    void compute_gen( mat_t& current_gen, mat_t& next_gen )
    {
    	assert( current_gen.width == next_gen.width );
    	assert( current_gen.height == next_gen.height );
    
    	std::atomic_int progress{ 0 };
    
    	const auto task = [ & ]
    	{
    		for( int y; ( y = progress++ ) < current_gen.height; )
    			for( int x = 0; x < current_gen.width; ++x )
    				compute_cell( current_gen, next_gen, x, y );
    	};
    
    	std::vector< std::thread > threads;
    	threads.reserve( std::max< std::size_t >( std::thread::hardware_concurrency(), 1 ) - 1 );
    	while( threads.size() < threads.capacity() )
    		threads.emplace_back( task );
    	task();
    	for( auto& i : threads )
    		i.join();
    }
    
    int main( int argc, char** argv )
    {
    	std::size_t w = 512, h = 512;
    	switch( argc ) {
    	case 0:
    	case 1:
    		break;
    
    	case 3:
    		try {
    			w = std::stol( argv[ 1 ] );
    			h = std::stol( argv[ 2 ] );
    		} catch( std::exception const& ) {
    			std::cerr << "invalid arguments (must be integral)\n";
    			return -1;
    		}
    		break;
    
    	default:
    		std::cerr << "invalid number of arguments\n";
    		return -1;
    	}
    
    	mat_t current_gen{ w, h };
    	mat_t next_gen{ w, h };
    
    	{
    		std::minstd_rand prng;
    		std::bernoulli_distribution dist{ 0.3 };
    		for( auto& i : current_gen.vec )
    			i = dist( prng );
    	}
    
    	using clock_t = std::chrono::high_resolution_clock;
    	const auto start = clock_t::now();
    	constexpr int gens = 256;
    	for( int i = 0; i < gens; ++i ) {
    		compute_gen( current_gen, next_gen );
    		current_gen.swap( next_gen );
    	}
    	const auto end = clock_t::now();
    	const std::chrono::duration< double, std::milli > elapsed = end - start;
    	std::cout << "average: " << elapsed.count() / gens << " ms per generation\n";
    }
    

    EDIT: Ich habe das Programm NICHT auf Korrektheit getestet!

    EDIT 2: Code und Messungen angepasst. Siehe wob's und meinen Post weiter unten.

    LG



  • ihcl schrieb:

    Ich habe mehrmals nachgemessen: Die methode ohne if() ist um ca. 0.3 sekunden langsamer auf 5000 generationen in einem 10x10 field.
    Ich verstehe aber irgendwie nicht, warum.
    Ich muss noch ein bisschen herumprobieren.

    Hast du -O2/-O3 an? Sonst macht die Messung natürlich nur wenig Sinn.



  • Fytch schrieb:

    EDIT: Ich habe das Programm NICHT auf Korrektheit getestet!

    Ich würde auch mal behaupten, dass es falsch ist 🙂

    Und zwar liegt der Grund dafür in den current_gen( x - 1, y - 1 ); -Zeilen. x und y sind hier size_t , also vorzeichenlos.
    Wenn x oder y = 0 ist, dann bekommen wir hier einen Überlauf, wir addieren also 2 hoch irgendwas (wie groß size_t auch immer ist, bei mir 2^64). Das dürfen wir aber nicht tun.

    Beispiel:
    Unsere Matrix ist 6x6 Felder groß. Dann soll 0 - 1 ≡ 5 (mod 6) sein. Dein Code rechnet aber 0 - 1 = 18446744073709551615 ≡ 3 (mod 6).

    Ich weiß gar nicht, ob ich direkt in eine toroidal_mat erzeugt hätte. Dann musst du ja bei jedem Zugriff x und y neu berechnen. Man kann sich diese Berechnungen sparen, wenn man das nicht erlaubt und nur eine Matrix hat. An der Stelle, wo man dann "vorherige" Zeile will, muss man dann natürlich diese berechnen, vielleicht mit einer Funktion prevModN / nextModN wie in meinem ersten Post angedeutet. Diese brauchen kein mod, sondern nur ein if auf 0 bzw. die size. Aber eigentlich finde ich deine Lösung mit der toroidal_mat schön. In den operator() könnte man ein if einbauen, ob überhaupt modulo gerechnet werden muss, denn in den meisten Fällen liegt x und y ja schon im erlaubten Wertebereich und man kann sich das sparen.



  • Ups, da hast du natürlich Recht. Ich habe den Bug gefixed und zugleich noch ifs eingeführt (die fast immer korrekt predicted werden; perf gibt 0.01% missed branches). Alte Messungen:

    10x  10:  0.176063 ms
     512x 512:  1.07619  ms
    2000x2000: 13.8264   ms
    2048x2048: 14.5158   ms
    2048x2048:  4.43072  ms (& statt %)
    

    Neu:

    10x  10: 0.166884 ms
     512x 512: 0.467108 ms
    2000x2000: 5.27295  ms
    2048x2048: 5.58719  ms
    

    Gewaltige Verbesserung! 🙂
    Eine einfache Verbesserung wäre es, in toroidal_mat::operator() nur += resp. -= das Limit zu rechnen statt Modulo, denn in diesem spezifischen Anwendungsfall sind die Indizes eh nur maximal 1 außerhalb der Grenzen.
    Aber wenn man wirklich ans Limit gehen will, wird das mit toroidal_mat wahrscheinlich nix. Es wäre vermutlich am besten, wenn man für den Innenraum 1..size-1 und für die Kanten jeweils separate Funktionen hat, damit man alles branchless machen kann. Dann würde man natürlich auch nur eine normale Matrix nehmen.

    LG



  • Ich komme auf über 28 secs, wenn ich mit 2000x2000 rechne. Liegt wohl an meiner cpu.

    Außerdem ein paar fragen:
    - warum nutzt du ein template? Es steht doch fest, dass es nur diesen einen typ geben wird, oder nicht?
    - was heißt assert( ( width == 0 ) == ( height == 0 ) )? Das ergibt doch auch true, wenn beides false ist. Wieso nicht && statt == in der mitte? Und das ist doch eigentlich im namespace std?
    - ist noexcept nicht relativ riskant immer? Wieso nutzt du das?
    - wieso kannst du (und machst es auch noch) das hier T const& operator()( int x, int y ) mit dem hier T& operator()( int x, int y ) überladen? Vor allem auch: wofür? Auch dieses konstrukt hier const_cast< T& >( static_cast< toroidal_mat const& >( *this )( x, y ) ) finde ich irgendwie recht gewagt, oder nicht?
    - was soll der name toroidal_mat bedeuten? Warum nicht einfach von dem vector erben, weil das ist doch im prinzip der vector mit ein paar zusätzlichen funktionen.
    - als letzte frage: warum nutzt du einen eindimensionalen vector? Ist das schneller? Weil ich sehe ansonsten keinen vorteil einem 2dvector gegenüber.

    Außerdem danke für deine mühe.


  • Mod

    Fytch schrieb:

    return const_cast< T& >( static_cast< toroidal_mat const& >( *this )( x, y ) );
    

    Dafür gibt es neuerdings std::as_const.



  • Bei meinem algorithmus komme ich zu folgendem ergebnis:
    Creation: 164.986 ms
    Time: 0.000389 ms
    Time/Generation: 7.78e-06 ms
    OHNE ausgabe. Mit ausgabe erhöhen sich die Werte drastisch.

    Wenn ich Fytchs algorithmus in meinen testcode einbette, dann erhalte ich folgende ergebnisse:
    Creation: 144.972 ms
    Time: 0.000465 ms
    Time/Generation: 9.3e-06 ms
    Auch hier ist keine ausgabe. Die werte sind fast immer minimal höher, aber nicht nennenswert.

    Getestet habe ich mit -O3 und x: 2000, y: 2000 und 50 generationen.

    Wenn ich Fytchs code komplett übernehme:
    2.19057 ms pro generation
    Außerdem habe ich beobachtet, dass, wenn ich Fytchs thread-konstuktion genauso in meinen code übernehme, dass die zeiten um teils das 10-fache steigen. Ab über 1000x1000 verkleinert sich der abstand, aber ich habe es bisher noch überhaupt nicht schneller erlebt. Nur, wenn ich in Fytchs originalcode die thread-konstuktion herausnehme und durch single-thread-code ersetze, dann ist dieser minimal langsamer.

    Die fragen würde mich aber immernoch sehr interessieren. 🤡


  • Mod

    Wäre es nicht zweckmäßiger, erst einmal dafür zu sorgen, dass der Code gut vektorisiert werden kann (bei Game of Life nun wirklich trivial), anstatt viele Threads mit ineffizientem Code arbeiten zu lassen?


  • Mod

    quick&dirty Versuch (der autovectorizierer von gcc macht nicht mit)

    #include <algorithm>
    #include <atomic>
    #include <cassert>
    #include <chrono>
    #include <cstddef>
    #include <exception>
    #include <iostream>
    #include <random>
    #include <string>
    #include <thread>
    #include <utility>
    #include <vector>
    
    #define block_size 16
    
    struct gameoflife_state {
        std::size_t width, height;
        std::vector<char> data;
        gameoflife_state() noexcept = default;
        gameoflife_state(std::size_t w, std::size_t h) : width(w), height(h), data(internal_size()) {
            assert((width == 0) == (height == 0));
        }
        gameoflife_state(std::size_t w, std::size_t h, char v) : width(w), height(h), data(internal_size(), v) {
            assert((width == 0) == (height == 0));
        }
        std::size_t size() const noexcept { return width * height; }
        std::size_t internal_size() const noexcept {
            return ((width + 2) * (height + 2) + block_size - 1) / block_size * block_size;
        }
        char& operator()(int x, int y) noexcept { return const_cast<char&>(std::as_const(*this)(x, y)); }
        const char& operator()(int x, int y) const noexcept { return data[x + 1 + (width + 2) * (y + 1)]; }
        void swap(gameoflife_state& rhs) noexcept {
            using std::swap;
            swap(width, rhs.width);
            swap(height, rhs.height);
            swap(data, rhs.data);
        }
        void normalize() noexcept {
            int stride = width + 2;
            data[0] = data[(height + 1) * stride - 2];
            for (std::size_t x = 1; x <= width; ++x) {
                data[x] = data[height * stride + x];
            }
            data[width + 1] = data[height * stride + 1];
            for (std::size_t y = 1; y <= height; ++y) {
                data[y * stride] = data[(y + 1) * stride - 2];
                data[(y + 1) * stride - 1] = data[y * stride + 1];
            }
            data[(height + 1) * stride] = data[2 * stride - 2];
            for (std::size_t x = 1; x <= width; ++x) {
                data[(height + 1) * stride + x] = data[stride + x];
            }
            data[(height + 2) * stride - 1] = data[stride + 1];
        }
    };
    #if 1
    void compute_cells(char* __restrict__ dst, const char* __restrict__ src, int blocks, int stride) {
        typedef char vec_type __attribute__((vector_size(block_size),aligned(1)));
        for (; blocks--;) {
            vec_type n = (const vec_type&)src[-stride - 1] + (const vec_type&)src[-stride]
                         + (const vec_type&)src[-stride + 1] + (const vec_type&)src[-1] + (const vec_type&)src[+1]
                         + (const vec_type&)src[stride - 1] + (const vec_type&)src[stride]
                         + (const vec_type&)src[stride + 1];
            (vec_type&)*dst = (n == -3) | (n + (const vec_type&)*src == -3);
            src += block_size;
            dst += block_size;
        }
    }
    #else
    void compute_cells(char* __restrict__ dst, const char* __restrict__ src, int blocks, int stride) {
        for (; blocks--;) {
            for (int i = 0; i < block_size; ++i) {
                int n = src[i - stride - 1] + src[i - stride] + src[i - stride + 1] + src[i - 1] + src[i + 1]
                        + src[i + stride - 1] + src[i + stride] + src[i + stride + 1];
                dst[i] = -((n == -3) | (n + src[i] == -3));
            }
            src += block_size;
            dst += block_size;
        }
    }
    #endif
    void compute_gen(gameoflife_state& current_gen, gameoflife_state& next_gen, std::size_t num_threads) {
        assert(current_gen.width == next_gen.width);
        assert(current_gen.height == next_gen.height);
        auto stride = current_gen.width + 2;
        std::vector<std::thread> threads;
        threads.reserve(num_threads - 1);
        char* src = &current_gen.data[stride+1];
        char* dst = &next_gen.data[stride + 1];
        auto blocks = (current_gen.height * stride + block_size - 1) / block_size;
        auto task_blocks = (blocks + num_threads - 1) / num_threads;
        for (std::size_t i = 1; i < num_threads; ++i) {
            threads.emplace_back([=] { compute_cells(dst, src, task_blocks, stride); });
            src += task_blocks * block_size;
            dst += task_blocks * block_size;
            blocks -= task_blocks;
        }
        compute_cells(dst, src, blocks, stride);
        for (auto& i : threads)
            i.join();
        compute_cells(dst, src, blocks, current_gen.width + 2);
        next_gen.normalize();
    }
    
    int main(int argc, char** argv) {
        std::size_t w = 512, h = 512, num_threads = std::max<std::size_t>(std::thread::hardware_concurrency(), 1);
        switch (argc) {
        case 0:
        case 1: break;
        case 3:
        case 4:
            try {
                w = std::stol(argv[1]);
                h = std::stol(argv[2]);
                if ( argc == 4 )
                    num_threads = std::stol(argv[3]);
            } catch (std::exception const&) {
                std::cerr << "invalid arguments (must be integral)\n";
                return -1;
            }
            break;
        default: std::cerr << "invalid number of arguments\n"; return -1;
        }
        gameoflife_state current_gen{w, h};
        gameoflife_state next_gen{w, h};
        {
            std::minstd_rand prng;
            std::bernoulli_distribution dist{0.3};
            for (auto& i : current_gen.data)
                i = dist(prng) ? -1 : 0;
            current_gen.normalize();
        }
        using clock_t = std::chrono::high_resolution_clock;
        const auto start = clock_t::now();
        constexpr int gens = 256;
        for (int i = 0; i < gens; ++i) {
            compute_gen(current_gen, next_gen, num_threads);
            current_gen.swap(next_gen);
        }
        const auto end = clock_t::now();
        const std::chrono::duration<double, std::milli> elapsed = end - start;
        std::cout << "average: " << elapsed.count() / gens << " ms per generation\n";
    }
    

    Bei mir ca. 10x schneller gegenüber der nicht vektorisierten Version. Wer avx2 hat, kann die Blockgröße auf 32 setzen. Theoretisch sollten größere Blöcke in jedem Fall möglich sein, aus irgendeinem Grund erzeugt der Compiler für zu große Vektoren aber ineffizienten Code.



  • ihcl schrieb:

    Ich komme auf über 28 secs, wenn ich mit 2000x2000 rechne. Liegt wohl an meiner cpu.

    Wenn ich 5ms messe und du 28s bekommst, dann machst du noch etwas anderes falsch.

    ihcl schrieb:

    - warum nutzt du ein template? Es steht doch fest, dass es nur diesen einen typ geben wird, oder nicht?

    Ich habe aus Gewohnheit versucht, die Klasse allgemein und wiederverwendbar zu machen.

    ihcl schrieb:

    - was heißt assert( ( width == 0 ) == ( height == 0 ) )? Das ergibt doch auch true, wenn beides false ist. Wieso nicht && statt == in der mitte? Und das ist doch eigentlich im namespace std?

    Es ergibt false , wenn nur einer der beiden Seitenlängen 0 ist. Damit wollte ich verhindern, dass man die Matrix mit z.B. 0x5 initialisieren kann. Das boolesche Äquivalent wäre übrigens XNOR (!(a^b)), nicht &&.

    ihcl schrieb:

    - ist noexcept nicht relativ riskant immer?

    Ne, wieso?

    ihcl schrieb:

    Wieso nutzt du das?

    Resultiert teils in besserer Codegen und dokumentiert die Schnittstelle. Hier aber eigentlich nur aus Korrektheit und Gewohnheit.

    ihcl schrieb:

    - wieso kannst du (und machst es auch noch) das hier T const& operator()( int x, int y ) mit dem hier T& operator()( int x, int y ) überladen?

    Siehe den const -Qualifier nach der Parameterliste.

    ihcl schrieb:

    Vor allem auch: wofür?

    Const-Correctness.

    ihcl schrieb:

    Auch dieses konstrukt hier const_cast< T& >( static_cast< toroidal_mat const& >( *this )( x, y ) ) finde ich irgendwie recht gewagt, oder nicht?

    Ne, wieso?

    ihcl schrieb:

    - was soll der name toroidal_mat bedeuten?

    Die Matrix verhält sich wie die Oberfläche eines 2-Torus.

    ihcl schrieb:

    Warum nicht einfach von dem vector erben, weil das ist doch im prinzip der vector mit ein paar zusätzlichen funktionen.

    (Public) Vererbung stellt eine ist-ein-Beziehung dar. Eine Matrix ist kein Vektor.

    ihcl schrieb:

    - als letzte frage: warum nutzt du einen eindimensionalen vector? Ist das schneller? Weil ich sehe ansonsten keinen vorteil einem 2dvector gegenüber.

    Kommt drauf an, was du mit "2D-Vektor" meinst. Km×n nennt man ja allgemeinhin Matrix und K2 wirst du kaum gemeint haben. Wenn du denkst, std::vector<std::vector<T>> sei eine Matrix, dann nein: Das ist eine falsche Repräsentation; Das wäre nämlich ein "jagged array".
    Ich nehme x- und y-Koordinate und projiziere diese bijektiv auf den linearen Vektor, wobei man wahlweise row- oder column-major arbeiten kann. Das ist die Standard-Lösung für dieses Problem.

    LG



  • @camper: Sehr cool. 👍 Ist bei mir (AVX2) 15 mal schneller als meine Version (0.36ms für 2k).
    Zeile 64 hat mir zu denken gegeben, weil ich so gewohnt bin, dass (char)true == 1 . Als ich es dann verstanden habe, hat es auch Sinn ergeben, dass du die Population negativ speicherst.

    LG

    PS: Wieso schreibst du for(;x; ) statt while(x) ? 😃


  • Mod

    Fytch schrieb:

    PS: Wieso schreibst du for(;x; ) statt while(x) ? 😃

    Das passiert für gewöhnlich wenn ich beim anfänglichen Schreiben noch nicht sicher bin, ob ich noch extra lokale Variablen brauche, die evtl. am besten im Schleifenkopf aufgehoben sind oder wenn es sich wie in

    for (; blocks--;)
    

    um eine Zählschleife handelt.



  • Ich meinte std::vector<std::vector<T>> mit 2d-vector.
    Muss es denn unbedingt eine matrix sein? Ich habe in meinem code nämlich einfach einmal std::vector<std::vector<Zelle> > durch std::vector<Zelle> ersetzt und es läuft, aber das ist deutlich langsamer:

    Creation: 209.068 ms
    Time: 11580 ms
    Time/Generation: 45.2343 ms

    Die position habe ich so berechnet:
    Zelle& ZellenCoordSystem::operator ()(const std::size_t& x, const std::size_t& y)
    {
    return (*this)[x + y * height];
    }

    Dagegen das ohne:

    Creation: 189.572 ms
    Time: 0.000957 ms
    Time/Generation: 3.73828e-06 ms

    Der sonstige code ist genau gleich. Ich habe eben nur [x][y] durch (x, y) ersetzt.
    Gerechnet habe ich mit 2000x2000 auf 256 generationen.
    Die klasse Zelle enthält nun nicht einmal mehr die eigenen coodinaten.

    campers code konnte ich noch nicht in meinem code testen, weil ich den irgendwie noch nicht so richtig verstehe. Muss ich mir noch ein wenig durchlesen. Was sind denn eigentlich die ganzen __atribute__ und __restrict__ makros? Sind die nicht eigentlich doof, weil die einen an den gcc-compiler binden?
    Außerdem musste ich dort das std::as_const ersetzen, weil ich kein gcc7 habe.


Anmelden zum Antworten