beste game of life methode
-
Fytch schrieb:
return const_cast< T& >( static_cast< toroidal_mat const& >( *this )( x, y ) );
Dafür gibt es neuerdings std::as_const.
-
Bei meinem algorithmus komme ich zu folgendem ergebnis:
Creation: 164.986 ms
Time: 0.000389 ms
Time/Generation: 7.78e-06 ms
OHNE ausgabe. Mit ausgabe erhöhen sich die Werte drastisch.Wenn ich Fytchs algorithmus in meinen testcode einbette, dann erhalte ich folgende ergebnisse:
Creation: 144.972 ms
Time: 0.000465 ms
Time/Generation: 9.3e-06 ms
Auch hier ist keine ausgabe. Die werte sind fast immer minimal höher, aber nicht nennenswert.Getestet habe ich mit -O3 und x: 2000, y: 2000 und 50 generationen.
Wenn ich Fytchs code komplett übernehme:
2.19057 ms pro generation
Außerdem habe ich beobachtet, dass, wenn ich Fytchs thread-konstuktion genauso in meinen code übernehme, dass die zeiten um teils das 10-fache steigen. Ab über 1000x1000 verkleinert sich der abstand, aber ich habe es bisher noch überhaupt nicht schneller erlebt. Nur, wenn ich in Fytchs originalcode die thread-konstuktion herausnehme und durch single-thread-code ersetze, dann ist dieser minimal langsamer.Die fragen würde mich aber immernoch sehr interessieren.
-
Wäre es nicht zweckmäßiger, erst einmal dafür zu sorgen, dass der Code gut vektorisiert werden kann (bei Game of Life nun wirklich trivial), anstatt viele Threads mit ineffizientem Code arbeiten zu lassen?
-
quick&dirty Versuch (der autovectorizierer von gcc macht nicht mit)
#include <algorithm> #include <atomic> #include <cassert> #include <chrono> #include <cstddef> #include <exception> #include <iostream> #include <random> #include <string> #include <thread> #include <utility> #include <vector> #define block_size 16 struct gameoflife_state { std::size_t width, height; std::vector<char> data; gameoflife_state() noexcept = default; gameoflife_state(std::size_t w, std::size_t h) : width(w), height(h), data(internal_size()) { assert((width == 0) == (height == 0)); } gameoflife_state(std::size_t w, std::size_t h, char v) : width(w), height(h), data(internal_size(), v) { assert((width == 0) == (height == 0)); } std::size_t size() const noexcept { return width * height; } std::size_t internal_size() const noexcept { return ((width + 2) * (height + 2) + block_size - 1) / block_size * block_size; } char& operator()(int x, int y) noexcept { return const_cast<char&>(std::as_const(*this)(x, y)); } const char& operator()(int x, int y) const noexcept { return data[x + 1 + (width + 2) * (y + 1)]; } void swap(gameoflife_state& rhs) noexcept { using std::swap; swap(width, rhs.width); swap(height, rhs.height); swap(data, rhs.data); } void normalize() noexcept { int stride = width + 2; data[0] = data[(height + 1) * stride - 2]; for (std::size_t x = 1; x <= width; ++x) { data[x] = data[height * stride + x]; } data[width + 1] = data[height * stride + 1]; for (std::size_t y = 1; y <= height; ++y) { data[y * stride] = data[(y + 1) * stride - 2]; data[(y + 1) * stride - 1] = data[y * stride + 1]; } data[(height + 1) * stride] = data[2 * stride - 2]; for (std::size_t x = 1; x <= width; ++x) { data[(height + 1) * stride + x] = data[stride + x]; } data[(height + 2) * stride - 1] = data[stride + 1]; } }; #if 1 void compute_cells(char* __restrict__ dst, const char* __restrict__ src, int blocks, int stride) { typedef char vec_type __attribute__((vector_size(block_size),aligned(1))); for (; blocks--;) { vec_type n = (const vec_type&)src[-stride - 1] + (const vec_type&)src[-stride] + (const vec_type&)src[-stride + 1] + (const vec_type&)src[-1] + (const vec_type&)src[+1] + (const vec_type&)src[stride - 1] + (const vec_type&)src[stride] + (const vec_type&)src[stride + 1]; (vec_type&)*dst = (n == -3) | (n + (const vec_type&)*src == -3); src += block_size; dst += block_size; } } #else void compute_cells(char* __restrict__ dst, const char* __restrict__ src, int blocks, int stride) { for (; blocks--;) { for (int i = 0; i < block_size; ++i) { int n = src[i - stride - 1] + src[i - stride] + src[i - stride + 1] + src[i - 1] + src[i + 1] + src[i + stride - 1] + src[i + stride] + src[i + stride + 1]; dst[i] = -((n == -3) | (n + src[i] == -3)); } src += block_size; dst += block_size; } } #endif void compute_gen(gameoflife_state& current_gen, gameoflife_state& next_gen, std::size_t num_threads) { assert(current_gen.width == next_gen.width); assert(current_gen.height == next_gen.height); auto stride = current_gen.width + 2; std::vector<std::thread> threads; threads.reserve(num_threads - 1); char* src = ¤t_gen.data[stride+1]; char* dst = &next_gen.data[stride + 1]; auto blocks = (current_gen.height * stride + block_size - 1) / block_size; auto task_blocks = (blocks + num_threads - 1) / num_threads; for (std::size_t i = 1; i < num_threads; ++i) { threads.emplace_back([=] { compute_cells(dst, src, task_blocks, stride); }); src += task_blocks * block_size; dst += task_blocks * block_size; blocks -= task_blocks; } compute_cells(dst, src, blocks, stride); for (auto& i : threads) i.join(); compute_cells(dst, src, blocks, current_gen.width + 2); next_gen.normalize(); } int main(int argc, char** argv) { std::size_t w = 512, h = 512, num_threads = std::max<std::size_t>(std::thread::hardware_concurrency(), 1); switch (argc) { case 0: case 1: break; case 3: case 4: try { w = std::stol(argv[1]); h = std::stol(argv[2]); if ( argc == 4 ) num_threads = std::stol(argv[3]); } catch (std::exception const&) { std::cerr << "invalid arguments (must be integral)\n"; return -1; } break; default: std::cerr << "invalid number of arguments\n"; return -1; } gameoflife_state current_gen{w, h}; gameoflife_state next_gen{w, h}; { std::minstd_rand prng; std::bernoulli_distribution dist{0.3}; for (auto& i : current_gen.data) i = dist(prng) ? -1 : 0; current_gen.normalize(); } using clock_t = std::chrono::high_resolution_clock; const auto start = clock_t::now(); constexpr int gens = 256; for (int i = 0; i < gens; ++i) { compute_gen(current_gen, next_gen, num_threads); current_gen.swap(next_gen); } const auto end = clock_t::now(); const std::chrono::duration<double, std::milli> elapsed = end - start; std::cout << "average: " << elapsed.count() / gens << " ms per generation\n"; }
Bei mir ca. 10x schneller gegenüber der nicht vektorisierten Version. Wer avx2 hat, kann die Blockgröße auf 32 setzen. Theoretisch sollten größere Blöcke in jedem Fall möglich sein, aus irgendeinem Grund erzeugt der Compiler für zu große Vektoren aber ineffizienten Code.
-
ihcl schrieb:
Ich komme auf über 28 secs, wenn ich mit 2000x2000 rechne. Liegt wohl an meiner cpu.
Wenn ich 5ms messe und du 28s bekommst, dann machst du noch etwas anderes falsch.
ihcl schrieb:
- warum nutzt du ein template? Es steht doch fest, dass es nur diesen einen typ geben wird, oder nicht?
Ich habe aus Gewohnheit versucht, die Klasse allgemein und wiederverwendbar zu machen.
ihcl schrieb:
- was heißt assert( ( width == 0 ) == ( height == 0 ) )? Das ergibt doch auch true, wenn beides false ist. Wieso nicht && statt == in der mitte? Und das ist doch eigentlich im namespace std?
Es ergibt
false
, wenn nur einer der beiden Seitenlängen 0 ist. Damit wollte ich verhindern, dass man die Matrix mit z.B. 0x5 initialisieren kann. Das boolesche Äquivalent wäre übrigens XNOR (!(a^b)), nicht &&.ihcl schrieb:
- ist noexcept nicht relativ riskant immer?
Ne, wieso?
ihcl schrieb:
Wieso nutzt du das?
Resultiert teils in besserer Codegen und dokumentiert die Schnittstelle. Hier aber eigentlich nur aus Korrektheit und Gewohnheit.
ihcl schrieb:
- wieso kannst du (und machst es auch noch) das hier T const& operator()( int x, int y ) mit dem hier T& operator()( int x, int y ) überladen?
Siehe den
const
-Qualifier nach der Parameterliste.ihcl schrieb:
Vor allem auch: wofür?
Const-Correctness.
ihcl schrieb:
Auch dieses konstrukt hier const_cast< T& >( static_cast< toroidal_mat const& >( *this )( x, y ) ) finde ich irgendwie recht gewagt, oder nicht?
Ne, wieso?
ihcl schrieb:
- was soll der name toroidal_mat bedeuten?
Die Matrix verhält sich wie die Oberfläche eines 2-Torus.
ihcl schrieb:
Warum nicht einfach von dem vector erben, weil das ist doch im prinzip der vector mit ein paar zusätzlichen funktionen.
(Public) Vererbung stellt eine ist-ein-Beziehung dar. Eine Matrix ist kein Vektor.
ihcl schrieb:
- als letzte frage: warum nutzt du einen eindimensionalen vector? Ist das schneller? Weil ich sehe ansonsten keinen vorteil einem 2dvector gegenüber.
Kommt drauf an, was du mit "2D-Vektor" meinst. Km×n nennt man ja allgemeinhin Matrix und K2 wirst du kaum gemeint haben. Wenn du denkst,
std::vector<std::vector<T>>
sei eine Matrix, dann nein: Das ist eine falsche Repräsentation; Das wäre nämlich ein "jagged array".
Ich nehme x- und y-Koordinate und projiziere diese bijektiv auf den linearen Vektor, wobei man wahlweise row- oder column-major arbeiten kann. Das ist die Standard-Lösung für dieses Problem.LG
-
@camper: Sehr cool.
Ist bei mir (AVX2) 15 mal schneller als meine Version (0.36ms für 2k).
Zeile 64 hat mir zu denken gegeben, weil ich so gewohnt bin, dass(char)true == 1
. Als ich es dann verstanden habe, hat es auch Sinn ergeben, dass du die Population negativ speicherst.LG
PS: Wieso schreibst du
for(;x; )
stattwhile(x)
?
-
Fytch schrieb:
PS: Wieso schreibst du
for(;x; )
stattwhile(x)
?Das passiert für gewöhnlich wenn ich beim anfänglichen Schreiben noch nicht sicher bin, ob ich noch extra lokale Variablen brauche, die evtl. am besten im Schleifenkopf aufgehoben sind oder wenn es sich wie in
for (; blocks--;)
um eine Zählschleife handelt.
-
Ich meinte std::vector<std::vector<T>> mit 2d-vector.
Muss es denn unbedingt eine matrix sein? Ich habe in meinem code nämlich einfach einmal std::vector<std::vector<Zelle> > durch std::vector<Zelle> ersetzt und es läuft, aber das ist deutlich langsamer:Creation: 209.068 ms
Time: 11580 ms
Time/Generation: 45.2343 msDie position habe ich so berechnet:
Zelle& ZellenCoordSystem::operator ()(const std::size_t& x, const std::size_t& y)
{
return (*this)[x + y * height];
}Dagegen das ohne:
Creation: 189.572 ms
Time: 0.000957 ms
Time/Generation: 3.73828e-06 msDer sonstige code ist genau gleich. Ich habe eben nur [x][y] durch (x, y) ersetzt.
Gerechnet habe ich mit 2000x2000 auf 256 generationen.
Die klasse Zelle enthält nun nicht einmal mehr die eigenen coodinaten.campers code konnte ich noch nicht in meinem code testen, weil ich den irgendwie noch nicht so richtig verstehe. Muss ich mir noch ein wenig durchlesen. Was sind denn eigentlich die ganzen __atribute__ und __restrict__ makros? Sind die nicht eigentlich doof, weil die einen an den gcc-compiler binden?
Außerdem musste ich dort das std::as_const ersetzen, weil ich kein gcc7 habe.
-
ihcl schrieb:
Was sind denn eigentlich die ganzen __atribute__ und __restrict__ makros? Sind die nicht eigentlich doof, weil die einen an den gcc-compiler binden
Beides sind keine Makros sondern Compilererweiterungen.
__restrict__ hat die gleiche Bedeutung wie das restrict Schlüsselwort in C und bei den meisten C++ Compilern verfügbar, es wäre hier notwendig, damit der Compiler efektives Loopunrollings zwecks Autovektorisierung betreiben könnte - da er das aber sowieso nicht tut, kann es auch weggelassen werden.
Die __attribute__-Syntax wird auch von clang (und icc?) verstanden. Wenn noch andere Compiler unterstützt werden sollen, könnte man auf den <immintrin.h>-Header ausweichen. Nachteil dabei ist die schlechtere Lesbarkeit und die explizite Bindung an ein bestimmtes Instruction Set des Prozessors.Kurzer Test zeigt, dass clang auch mit größerem block_size vernünftigen Code erzeugt (solange genügend Register vorhanden sind), ein zusäzlicher Geschwindigkeitsvorteil ergibt sich dadurch aber nicht. Eine automatische Anpassung an die Verfügbarkeit von AVX2 könnte daher so aussehen:
#ifdef __AVX2__ #define block_size 32 #else #define block_size 16 #endif
wobei g++ hier auch schon recht umständlichen Code erzeugt.
-
Diese Version funktioniert mit gcc, clang und icc. icc kann nicht mit Skalaren in Vektoroperationen umgehen, deshalb der etwas umständliche Weg, um -3 zu erzeugen.
-
Ich habe wohl einen fehler gemacht. Aus irgendeinem grund werden meine berechnungen gar nicht ausgeführt, deswegen die niedrigen Werte.
Ich komme auf 24ms pro generation bei 2000x2000. Das ist eigentlich bereits schnell genug.
Ich werde mir aber trotzdem noch einmal campers code ansehen, auch um zu lernen. Kann mir da vielleicht jemand vielleicht erklären, warum der vector unnötig groß gemacht wird und was das geschiebe in normalize() soll?
Auch die compute_gen()-function ist mir irgendwie unklar. Die compute_cells()-function verstehe ich glaube ich, bin mir aber auch nicht sicher. Also ich verstehe schon den code. Aber nicht, warum.Wenn ich mir campers neue version anschaue, dann verstehe ich sogar noch weniger.
-
ihcl schrieb:
Kann mir da vielleicht jemand vielleicht erklären, warum der vector unnötig groß gemacht wird und was das geschiebe in normalize() soll?
Fytch schrieb:
3. Statt diesem komplizierten if-Gefrickel könntest du dein 2D-Array in alle Richtungen um 1 Feld vergrößern, und
false
in.lebt
der Ränder schreiben. Dann ist der gesamte Code von Zeile 1-179 hinfällig.Fytch hatte es fast richtig, nur dass statt einfach mit false zu füllen, wir die echten Werte von den Rändern kopieren. Das kostet zwar ein bisschen Zeit, ist aber nicht besonders wesentlich und je grösser das Feld wird, um so geringer wird relative Aufwand, der auf den Rand entfällt. Ausserdem arbeitet compute_cells immer auf block_size Zellen gleichzeitig, so dass ggf. ein bisschen extra Padding am Ende erforderlich ist, damit nicht auf nicht reservierten Speicher zugegriffen wird.
ihcl schrieb:
Auch die compute_gen()-function ist mir irgendwie unklar.
Von Fytchs Version adaptiert. Bei mir wird allerdings jedem Thread gleich beim Start der Bereich, auf dem dieser Thread arbeiten soll zugewiesen, anstatt dass erst später über eine progress-Variable herauszufinden.
Eine mögliche weitere Optimierung, die allerdings den Code weitere verkompliziert:
Gegenwärtig werden für jede Zelle 8 Additionen durchgeführt. Man könnte dies durch 8 Additionen + 1 Subtraktion ersetzen:void compute_cells(char* __restrict__ dst, const char* __restrict__ src, int blocks, int stride) { typedef char vec_type __attribute__((vector_size(block_size),aligned(1))); for (; blocks--;) { vec_type n = (const vec_type&)src[-stride - 1] + (const vec_type&)src[-stride] + (const vec_type&)src[-stride + 1] + (const vec_type&)src[-1] + (const vec_type&)src[0] + (const vec_type&)src[+1] + (const vec_type&)src[stride - 1] + (const vec_type&)src[stride] + (const vec_type&)src[stride + 1]; (vec_type&)*dst = (n == -3) | (n - (const vec_type&)*src == -3); src += block_size; dst += block_size; } }
was erst einmal keinen Vorteil bringt.
Wenn man genau hinschaut, sieht man aber, dass die gleiche Summe dreier horizontal benachbarter Zellen für die Berechnung von 3 Zellen, die vertikal benachbart sind, benötigt wird. Wenn die Berechnungsreihenfolge etwas umgestellt wird, könnte man hier bis zu 40% Additionen einsparen.
-
Seine Matrix ist erstmal 2 Felder größer in beide Dimensionen, damit er den Wrap-Around branchless bekommt; quasi ein Rand. Das Andere mit
block_size
in Zeile 28 (ceil div) macht er, damit der vektorisierte Code nicht auf Speicher zugreift, der ihm nicht gehört, da dieser immerblock_size
Zellen gleichzeitig verarbeitet: Padding.
normalize
bewerkstelligt, dass dieser eben erwähnte Rand rund um die Matrix korrekt gefüllt wird (z.B. muss die äußerste rechte Spalte in den linken Rand kopiert werden etc.).
compute_gen
teilt die Matrix auf (nochmals n ceil div), damit die Threads diese unabhängig bearbeiten können.
-
Bemerke gerade, dass im bisher gezeigten Code compute_cells im Hauptthread zweimal aufgerufen wird, dass ist nat. Unfug, der Aufruf nach join gehört da nicht hin. Die richtigen Zeiten sind erheblich besser.
Hier mein weiterer Optimierungsversuch.
#include <algorithm> #include <atomic> #include <cassert> #include <chrono> #include <cstddef> #include <exception> #include <iostream> #include <random> #include <string> #include <thread> #include <utility> #include <vector> #include <type_traits> #ifdef __AVX2__ #define block_size 32 #else #define block_size 16 #endif #ifndef __AVX__ #define stripe_size 4 #else #define stripe_size 8 #endif template <typename T> const T& as_const(T& x) noexcept { return x; } struct gameoflife_state { std::ptrdiff_t width, height, stride, stripes; std::vector<char> data; char* origin; gameoflife_state(std::ptrdiff_t w, std::ptrdiff_t h, char v = 0) : width(w), height(h), stride((w+2+block_size-1)/block_size*block_size), stripes((height+stripe_size-1)/stripe_size), data(internal_size(), v) { assert((width == 0) == (height == 0)); void* p = &data[stride+1]; std::size_t size = data.size(); origin = static_cast<char*>(std::align( block_size, block_size, p, size ) ); } gameoflife_state(const gameoflife_state&) = delete; gameoflife_state& operator=(const gameoflife_state&) = delete; std::size_t size() const noexcept { return width * height; } std::size_t internal_size() const noexcept { return stride * (stripes * stripe_size + 2) + block_size - 1; } char& operator()(std::ptrdiff_t x, std::ptrdiff_t y) noexcept { return const_cast<char&>(as_const(*this)(x, y)); } const char& operator()(std::ptrdiff_t x, std::ptrdiff_t y) const noexcept { return origin[x + stride * y]; } void swap(gameoflife_state& rhs) noexcept { using std::swap; swap(width, rhs.width); swap(height, rhs.height); swap(stride, rhs.stride); swap(stripes, rhs.stripes); swap(data, rhs.data); swap(origin, rhs.origin); } void normalize() noexcept { (*this)(-1, -1) = (*this)(width-1, height-1); for (std::ptrdiff_t x = 0; x < width; ++x) { (*this)(x, -1) = (*this)(x, height-1); } (*this)(width,-1) = (*this)(0, height-1); for (std::ptrdiff_t y = 0; y < height; ++y) { (*this)(-1, y) = (*this)(width-1, y); (*this)(width, y) = (*this)(0, y); } (*this)(-1, height) = (*this)(width-1, 0); for (std::ptrdiff_t x = 0; x < width; ++x) { (*this)(x, height) = (*this)(x, 0); } (*this)(width, height) = (*this)(0, 0); } }; template <int... I, int... I2, int... I_2> void compute_cells(char* __restrict__ dst, const char* __restrict__ src, int stripes, int stride, std::integer_sequence<int, I...>, std::integer_sequence<int, I2...>, std::integer_sequence<int, I_2...>) noexcept { typedef char vec_type __attribute__((vector_size(block_size))); static const vec_type _0 = {}; static const vec_type _1 = ~_0; static const vec_type _3 = _1 + _1 + _1; for (; stripes--;) { for (auto blocks = stride / block_size; blocks--; ) { vec_type l[] = { *(vec_type*)__builtin_assume_aligned(&src[(I2-1)*stride-1], block_size, block_size-1) +*(vec_type*)__builtin_assume_aligned(&src[(I2-1)*stride ], block_size) +*(vec_type*)__builtin_assume_aligned(&src[(I2-1)*stride+1], block_size, 1) ... }; vec_type p[] = { l[I_2*2+1] + l[I_2*2+2] ... }; vec_type s[] = { p[(I-1)/2] + l[I+(I%2?2:0)] ... }; char dummy[] = { ((*(vec_type*)__builtin_assume_aligned(&dst[I*stride], block_size) = (s[I] == _3) | (s[I] - *(vec_type*)__builtin_assume_aligned(&src[I*stride], block_size) == _3)),char{}) ... }; (void)dummy; src += block_size; dst += block_size; } src += stride * (stripe_size - 1); dst += stride * (stripe_size - 1); } } void compute_cells(char* __restrict__ dst, const char* __restrict__ src, int stripes, int stride) noexcept { compute_cells(dst, src, stripes, stride, std::make_integer_sequence<int, stripe_size>{}, std::make_integer_sequence<int, stripe_size+2>{}, std::make_integer_sequence<int, stripe_size/2>{}); } void compute_gen(gameoflife_state& current_gen, gameoflife_state& next_gen, std::size_t num_threads) { assert(current_gen.width == next_gen.width); assert(current_gen.height == next_gen.height); auto stride = current_gen.stride; std::vector<std::thread> threads; threads.reserve(num_threads - 1); char* src = current_gen.origin; char* dst = next_gen.origin; auto stripes = current_gen.stripes; auto task_stripes = (stripes + num_threads - 1) / num_threads; for (std::size_t i = 1; i < num_threads; ++i) { threads.emplace_back([=] { compute_cells(dst, src, task_stripes, stride); }); src += stride * task_stripes * stripe_size; dst += stride * task_stripes * stripe_size; stripes -= task_stripes; } compute_cells(dst, src, stripes, stride); for (auto& i : threads) i.join(); next_gen.normalize(); } int main(int argc, char** argv) { std::ptrdiff_t w = 512, h = 512, num_threads = std::max<std::ptrdiff_t>(std::thread::hardware_concurrency(), 1); switch (argc) { case 0: case 1: break; case 3: case 4: try { w = std::stol(argv[1]); h = std::stol(argv[2]); if ( argc == 4 ) num_threads = std::stol(argv[3]); } catch (std::exception const&) { std::cerr << "invalid arguments (must be integral)\n"; return -1; } break; default: std::cerr << "invalid number of arguments\n"; return -1; } gameoflife_state current_gen{w, h}; gameoflife_state next_gen{w, h}; { std::minstd_rand prng; std::bernoulli_distribution dist{0.3}; for (auto& i : current_gen.data) i = dist(prng) ? -1 : 0; current_gen.normalize(); } using clock_t = std::chrono::high_resolution_clock; const auto start = clock_t::now(); constexpr int gens = 256; for (int i = 0; i < gens; ++i) { compute_gen(current_gen, next_gen, num_threads); current_gen.swap(next_gen); } const auto end = clock_t::now(); const std::chrono::duration<double, std::milli> elapsed = end - start; std::cout << "average: " << elapsed.count() / gens << " ms per generation\n"; }
Konsistente Messwerte zu erhalten, gestaltet sich hier schwierig, am besten sollte wohl gens an die Matrixgröße angepasst werden und insgesamt etwas grösser sein.
In jedem Fall wird die Geschwindigkeit hier für große Matrizen weitgehend durch den Speicherdurchsatz begrenzt, so dass zusätzliche Threads kaum zusätzlichen Nutzen bringen oder sogar schädlich sind (bei mir leicht zu testen, indem die Anzahl der Threads als 3. Argument angegeben wird). Evtl. läßt sich hier noch etwas machen, indem die Matrix als Bitset gespeichert wird.
-
Nochmals Code etwas aufgeräumt und Rechenfehler beseitigt. Durch die Berechnung von jeweils 4 Zellen, die übereinander liegen, werden im Vergleich zum naiven linearen Algorithmus nur 8 statt 10 Integeroperationen pro Zelle benötigt. Dieser Unterschied schlägt bei der Geschwindigkeit voll durch.
Ein Vergrößerung von stripe_size würde die Anzahl der nötigen Operation weiter veringern, allerdings ist der Registersatz mit nur 16 Registern zu klein um dann alle Zwischenergebnisse halten zu können, so dass sich keine Verbesserung der Laufzeit ergibt.Benötigt bei mir mit einem einzigen Thread und generischem Code (nur SSE2, kein AVX) ca. 0.92ms für 2000x2000 Feld, bzw. ca. 0.8 Takte/Zelle. Ich nehme daher meine Behauptung zurück, der Algorithmus wäre durch den Speicherdurchsatz begrenzt, eine Verbesserung durch kompaktere Speicherung (bits oder nibbles) ist daher wahrscheinlich nicht zu erwarten. AVX bringt keine echte Verbesserung. Mit AVX2 sollte sich die Geschwindigkeit allerdings nahezu verdoppeln.
Ein zweiter Thread verbessert das Ergebnis bei mir nur um ca. 10-15%.#include <algorithm> #include <atomic> #include <cassert> #include <chrono> #include <cstddef> #include <exception> #include <iostream> #include <random> #include <string> #include <thread> #include <utility> #include <vector> #include <type_traits> #ifdef __AVX2__ #define block_size 32 #else #define block_size 16 #endif #define stripe_size 4 template <typename T> const T& as_const(T& x) noexcept { return x; } struct gameoflife_state { std::ptrdiff_t width, height, stride, stripes; std::vector<char> data; char* origin; gameoflife_state(std::ptrdiff_t w, std::ptrdiff_t h, char v = 0) : width(w), height(h), stride((w+2+block_size-1)/block_size*block_size), stripes((height+stripe_size-1)/stripe_size), data(internal_size(), v) { assert((width == 0) == (height == 0)); void* p = &data[stride+1]; std::size_t size = data.size(); origin = static_cast<char*>(std::align( block_size, block_size, p, size ) ); } gameoflife_state(const gameoflife_state&) = delete; gameoflife_state& operator=(const gameoflife_state&) = delete; std::size_t size() const noexcept { return width * height; } std::size_t internal_size() const noexcept { return stride * (stripes * stripe_size + 2) + block_size - 1; } char& operator()(std::ptrdiff_t x, std::ptrdiff_t y) noexcept { return const_cast<char&>(as_const(*this)(x, y)); } const char& operator()(std::ptrdiff_t x, std::ptrdiff_t y) const noexcept { return origin[x + stride * y]; } void swap(gameoflife_state& rhs) noexcept { using std::swap; swap(width, rhs.width); swap(height, rhs.height); swap(stride, rhs.stride); swap(stripes, rhs.stripes); swap(data, rhs.data); swap(origin, rhs.origin); } void normalize() noexcept { (*this)(-1, -1) = (*this)(width-1, height-1); for (std::ptrdiff_t x = 0; x < width; ++x) { (*this)(x, -1) = (*this)(x, height-1); } (*this)(width,-1) = (*this)(0, height-1); for (std::ptrdiff_t y = 0; y < height; ++y) { (*this)(-1, y) = (*this)(width-1, y); (*this)(width, y) = (*this)(0, y); } (*this)(-1, height) = (*this)(width-1, 0); for (std::ptrdiff_t x = 0; x < width; ++x) { (*this)(x, height) = (*this)(x, 0); } (*this)(width, height) = (*this)(0, 0); } }; typedef char vec_type __attribute__((vector_size(block_size),aligned(1),__may_alias__)); vec_type load(const void* src, int offset) noexcept { return *static_cast<const vec_type*>(__builtin_assume_aligned(src, block_size, offset)); } char store(void* dst, vec_type v) noexcept { *static_cast<vec_type*>(__builtin_assume_aligned(dst, block_size)) = -(v == 3); return 0; } template <int... I, int... I2, int... I_2> // 0..stripe_size-1, 0..stripe_size+1, 0..stripe_size/2 void compute_cells(char* dst, const char* src, int stripes, int stride, std::integer_sequence<int, I...>, std::integer_sequence<int, I2...>, std::integer_sequence<int, I_2...>) noexcept { for (; stripes--;) { for (auto blocks = stride / block_size; blocks--; ) { vec_type row[] = { load(&src[(I2-1)*stride-1], block_size-1) + load(&src[(I2-1)*stride+1], 1) ... }; vec_type pair[] = { row[I_2*2+1] + row[I_2*2+2] ... }; vec_type sum[] = { pair[I/2] + row[I+(I%2?2:0)] + load(&src[(I-1)*stride], 0) + load(&src[(I+1)*stride], 0)... }; char dummy[] = { store(&dst[I*stride], sum[I] | load(&src[I*stride], 0))... }; (void)dummy; src += block_size; dst += block_size; } src += stride * (stripe_size - 1); dst += stride * (stripe_size - 1); } } void compute_cells(char* __restrict__ dst, const char* __restrict__ src, int stripes, int stride) noexcept { compute_cells(dst, src, stripes, stride, std::make_integer_sequence<int, stripe_size>{}, std::make_integer_sequence<int, stripe_size+2>{}, std::make_integer_sequence<int, stripe_size/2>{}); } void compute_gen(gameoflife_state& current_gen, gameoflife_state& next_gen, std::size_t num_threads) { assert(current_gen.width == next_gen.width); assert(current_gen.height == next_gen.height); auto stride = current_gen.stride; std::vector<std::thread> threads; threads.reserve(num_threads - 1); char* src = current_gen.origin; char* dst = next_gen.origin; auto stripes = current_gen.stripes; auto task_stripes = (stripes + num_threads - 1) / num_threads; for (std::size_t i = 1; i < num_threads; ++i) { threads.emplace_back([=] { compute_cells(dst, src, task_stripes, stride); }); src += stride * task_stripes * stripe_size; dst += stride * task_stripes * stripe_size; stripes -= task_stripes; } compute_cells(dst, src, stripes, stride); for (auto& i : threads) i.join(); next_gen.normalize(); } void show(gameoflife_state& s) { for ( std::ptrdiff_t y = 0; y < s.height; ++y ) { for ( std::ptrdiff_t x = 0; x < s.width; ++x ) { std::cout << (s(x,y)?'x':'.'); } std::cout << '\n'; } std::cout << '\n'; } #if 0 #define SHOW(x) show(x) #else #define SHOW(x) #endif int main(int argc, char** argv) { std::ptrdiff_t w = 512, h = 512, num_threads = std::max<std::ptrdiff_t>(std::thread::hardware_concurrency(), 1), gens = -1; switch (argc) { case 0: case 1: break; case 3: case 4: case 5: try { w = std::stol(argv[1]); h = std::stol(argv[2]); if ( argc >= 4 ) num_threads = std::stol(argv[3]); if ( argc >= 5 ) gens = std::stol(argv[4]); } catch (std::exception const&) { std::cerr << "invalid arguments (must be integral)\n"; return -1; } break; default: std::cerr << "invalid number of arguments\n"; return -1; } gameoflife_state current_gen{w, h}; gameoflife_state next_gen{w, h}; { std::minstd_rand prng; std::bernoulli_distribution dist{0.3}; for (std::ptrdiff_t y = 0; y < current_gen.height; ++y) for (std::ptrdiff_t x = 0; x < current_gen.width; ++x) current_gen(x, y) = dist(prng) ? 1 : 0; current_gen.normalize(); } SHOW(current_gen); using clock_t = std::chrono::high_resolution_clock; const auto start = clock_t::now(); if ( gens == -1 ) gens = 30000000000/current_gen.height/current_gen.width; for (auto i = gens; i--; ) { compute_gen(current_gen, next_gen, num_threads); current_gen.swap(next_gen); SHOW(current_gen); } const auto end = clock_t::now(); const std::chrono::duration<double, std::milli> elapsed = end - start; std::cout << "average: " << elapsed.count() / gens << " ms per generation\n"; }