suche thread-Pool für c++



  • Und zwar am besten eine Implementierung mit boost::thread und boost::thread_group

    Hat jemand nen Link? Oder sowas schon implementiert?



  • Folgendes stammt mit minimaler Abänderung aus
    http://forum.hardware.fr/hardwarefr/Programmation/thread-pool-sujet-66893-1.htm

    #include <boost/smart_ptr.hpp>
    #include <boost/thread.hpp>
    #include <iostream>
    #include <vector>
    #include <windows.h>
    
    template< typename X > 
    struct fifo : private boost::noncopyable 
    { 
        fifo( const int& n ) : 
        begin( 0 ), 
        end( 0 ), 
        buffered( 0 ), 
        circular_buf( n ) 
        { 
        } 
        void write( const X& m ) 
        {  
            boost::mutex::scoped_lock lk( monitor ); 
            while ( buffered == circular_buf.size() ) 
                buffer_not_full.wait( lk ); 
            circular_buf[ end ] = m; 
            end = ( end + 1 ) % circular_buf.size(); 
            ++buffered; 
            buffer_not_empty.notify_one(); 
        } 
        X read() 
        { 
            boost::mutex::scoped_lock lk( monitor ); 
            while ( buffered == 0 ) 
                buffer_not_empty.wait( lk ); 
            X i = circular_buf[ begin ]; 
            begin = ( begin + 1 ) % circular_buf.size(); 
            --buffered; 
            buffer_not_full.notify_one(); 
            return i; 
        } 
    private: 
        unsigned int begin, end, buffered; 
        std::vector< X > circular_buf; 
        boost::condition buffer_not_full, buffer_not_empty; 
        boost::mutex monitor; 
    };
    
    template< typename task> 
    struct thread_pool : private boost::noncopyable 
    { 
    
        // one thread of the pool 
        struct one_thread 
        { 
            // constructor 
            one_thread( bool & r, fifo< task >& f ) : running( r ), task_queue( f ) {}; 
    
            // thread function 
            void operator()() const 
            { 
                while( running ) 
                {                   
                    task_queue.read()();
                }
            } 
    
        private: 
    
            bool& running; 
            fifo< task >& task_queue; 
        }; 
    
        // constructor 
        thread_pool( unsigned const & thread_number ) : 
        task_queue( thread_number * 2 ), 
        running( true ) 
        { 
            // create all the threads of the pool 
            for( unsigned index = 0; index != thread_number; ++index ) 
                tg.create_thread( one_thread( running, task_queue ) ); 
        } 
    
        // new task 
        inline void push_task( task const & t ) 
        { 
            task_queue.write( t ); 
        } 
    
        // stop 
        inline void stop() 
        { 
            running = false; 
        } 
        // destructor 
        ~thread_pool() 
        { 
            running = false; 
            tg.join_all(); 
        } 
    
    private: 
    
        // thread group 
        boost::thread_group tg; 
    
        // message queue 
        fifo< task > task_queue; 
    
        // status 
        bool running; 
    };
    
    //und ein kleines Anwendungsbeispiel:
    
    class SomeTask
    {
    public:
      void operator()()
      {
        std::cout<< "Task::op() "<<GetCurrentThreadId()<<std::endl;
      }
    };
    
    int main()
    {
       thread_pool< SomeTask>
      pool( 1);  
    
       SomeTask
      task;
    
      pool.push_task( task);
    
      Sleep( 5000);
    
      pool.push_task( task);
    
      Sleep( 5000);
    
      return 0;
    }
    

    Hätte aber trotzdem noch Interesse an anderen Lösungen. Insbesondere einen Threadpool, der die Poolgröße dynamisch anpasst (neue Threads erstellt, wenn viel zu tun ist, diese wieder freigibt, wenn weniger zu tun ist.


Anmelden zum Antworten