pipeline.h

00001 /*
00002     Copyright 2005-2012 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_pipeline_H 
00022 #define __TBB_pipeline_H 
00023 
00024 #include "atomic.h"
00025 #include "task.h"
00026 #include "tbb_allocator.h"
00027 #include <cstddef>
00028 
00029 #if !TBB_IMPLEMENT_CPP0X
00030 #include <type_traits>
00031 #endif
00032 
00033 namespace tbb {
00034 
00035 class pipeline;
00036 class filter;
00037 
00039 namespace internal {
00040 
00041 // The argument for PIPELINE_VERSION should be an integer between 2 and 9
00042 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00043 
00044 typedef unsigned long Token;
00045 typedef long tokendiff_t;
00046 class stage_task;
00047 class input_buffer;
00048 class pipeline_root_task;
00049 class pipeline_cleaner;
00050 
00051 } // namespace internal
00052 
00053 namespace interface6 {
00054     template<typename T, typename U> class filter_t;
00055 
00056     namespace internal {
00057         class pipeline_proxy;
00058     }
00059 }
00060 
00062 
00064 
00065 class filter: internal::no_copy {
00066 private:
00068     static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
00069 protected:    
00071     static const unsigned char filter_is_serial = 0x1; 
00072 
00074 
00076     static const unsigned char filter_is_out_of_order = 0x1<<4;  
00077 
00079     static const unsigned char filter_is_bound = 0x1<<5;  
00080 
00082     static const unsigned char filter_may_emit_null = 0x1<<6;
00083 
00085     static const unsigned char exact_exception_propagation =
00086 #if TBB_USE_CAPTURED_EXCEPTION
00087             0x0;
00088 #else
00089             0x1<<7;
00090 #endif /* TBB_USE_CAPTURED_EXCEPTION */
00091 
00092     static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
00093     static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
00094 public:
00095     enum mode {
00097         parallel = current_version | filter_is_out_of_order, 
00099         serial_in_order = current_version | filter_is_serial,
00101         serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00103         serial = serial_in_order
00104     };
00105 protected:
00106     filter( bool is_serial_ ) : 
00107         next_filter_in_pipeline(not_in_pipeline()),
00108         my_input_buffer(NULL),
00109         my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
00110         prev_filter_in_pipeline(not_in_pipeline()),
00111         my_pipeline(NULL),
00112         next_segment(NULL)
00113     {}
00114     
00115     filter( mode filter_mode ) :
00116         next_filter_in_pipeline(not_in_pipeline()),
00117         my_input_buffer(NULL),
00118         my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
00119         prev_filter_in_pipeline(not_in_pipeline()),
00120         my_pipeline(NULL),
00121         next_segment(NULL)
00122     {}
00123 
00124     // signal end-of-input for concrete_filters
00125     void __TBB_EXPORTED_METHOD set_end_of_input();
00126 
00127 public:
00129     bool is_serial() const {
00130         return bool( my_filter_mode & filter_is_serial );
00131     }  
00132     
00134     bool is_ordered() const {
00135         return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00136     }
00137 
00139     bool is_bound() const {
00140         return ( my_filter_mode & filter_is_bound )==filter_is_bound;
00141     }
00142 
00144     bool object_may_be_null() { 
00145         return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
00146     }
00147 
00149 
00150     virtual void* operator()( void* item ) = 0;
00151 
00153 
00154     virtual __TBB_EXPORTED_METHOD ~filter();
00155 
00156 #if __TBB_TASK_GROUP_CONTEXT
00158 
00160     virtual void finalize( void* /*item*/ ) {};
00161 #endif
00162 
00163 private:
00165     filter* next_filter_in_pipeline;
00166 
00168     //  (pipeline has not yet reached end_of_input or this filter has not yet
00169     //  seen the last token produced by input_filter)
00170     bool has_more_work();
00171 
00173 
00174     internal::input_buffer* my_input_buffer;
00175 
00176     friend class internal::stage_task;
00177     friend class internal::pipeline_root_task;
00178     friend class pipeline;
00179     friend class thread_bound_filter;
00180 
00182     const unsigned char my_filter_mode;
00183 
00185     filter* prev_filter_in_pipeline;
00186 
00188     pipeline* my_pipeline;
00189 
00191 
00192     filter* next_segment;
00193 };
00194 
00196 
00197 class thread_bound_filter: public filter {
00198 public:
00199     enum result_type {
00200         // item was processed
00201         success,
00202         // item is currently not available
00203         item_not_available,
00204         // there are no more items to process
00205         end_of_stream
00206     };
00207 protected:
00208     thread_bound_filter(mode filter_mode): 
00209          filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
00210     {}
00211 public:
00213 
00218     result_type __TBB_EXPORTED_METHOD try_process_item(); 
00219 
00221 
00225     result_type __TBB_EXPORTED_METHOD process_item();
00226 
00227 private:
00229     result_type internal_process_item(bool is_blocking);
00230 };
00231 
00233 
00234 class pipeline {
00235 public:
00237     __TBB_EXPORTED_METHOD pipeline();
00238 
00241     virtual __TBB_EXPORTED_METHOD ~pipeline();
00242 
00244     void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00245 
00247     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00248 
00249 #if __TBB_TASK_GROUP_CONTEXT
00251     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00252 #endif
00253 
00255     void __TBB_EXPORTED_METHOD clear();
00256 
00257 private:
00258     friend class internal::stage_task;
00259     friend class internal::pipeline_root_task;
00260     friend class filter;
00261     friend class thread_bound_filter;
00262     friend class internal::pipeline_cleaner;
00263     friend class tbb::interface6::internal::pipeline_proxy;
00264 
00266     filter* filter_list;
00267 
00269     filter* filter_end;
00270 
00272     task* end_counter;
00273 
00275     atomic<internal::Token> input_tokens;
00276 
00278     atomic<internal::Token> token_counter;
00279 
00281     bool end_of_input;
00282 
00284     bool has_thread_bound_filters;
00285 
00287     void remove_filter( filter& filter_ );
00288 
00290     void __TBB_EXPORTED_METHOD inject_token( task& self );
00291 
00292 #if __TBB_TASK_GROUP_CONTEXT
00294     void clear_filters();
00295 #endif
00296 };
00297 
00298 //------------------------------------------------------------------------
00299 // Support for lambda-friendly parallel_pipeline interface
00300 //------------------------------------------------------------------------
00301 
00302 namespace interface6 {
00303 
00304 namespace internal {
00305     template<typename T, typename U, typename Body> class concrete_filter;
00306 }
00307 
00309 class flow_control {
00310     bool is_pipeline_stopped;
00311     flow_control() { is_pipeline_stopped = false; }
00312     template<typename T, typename U, typename Body> friend class internal::concrete_filter;
00313 public:
00314     void stop() { is_pipeline_stopped = true; }
00315 };
00316 
00318 namespace internal {
00319 
00320 template<typename T> struct tbb_large_object {enum { value = sizeof(T) > sizeof(void *) }; };
00321 
00322 #if TBB_IMPLEMENT_CPP0X
00323 // cannot use SFINAE in current compilers.  Explicitly list the types we wish to be
00324 // placed as-is in the pipeline input_buffers.
00325 template<typename T> struct tbb_trivially_copyable { enum { value = false }; };
00326 template<typename T> struct tbb_trivially_copyable <T*> { enum { value = true }; };
00327 template<> struct tbb_trivially_copyable <short> { enum { value = true }; };
00328 template<> struct tbb_trivially_copyable <unsigned short> { enum { value = true }; };
00329 template<> struct tbb_trivially_copyable <int> { enum { value = !tbb_large_object<int>::value }; };
00330 template<> struct tbb_trivially_copyable <unsigned int> { enum { value = !tbb_large_object<int>::value }; };
00331 template<> struct tbb_trivially_copyable <long> { enum { value = !tbb_large_object<long>::value }; };
00332 template<> struct tbb_trivially_copyable <unsigned long> { enum { value = !tbb_large_object<long>::value }; };
00333 template<> struct tbb_trivially_copyable <float> { enum { value = !tbb_large_object<float>::value }; };
00334 template<> struct tbb_trivially_copyable <double> { enum { value = !tbb_large_object<double>::value }; };
00335 #else
00336 #if __GNUC__==4 && __GNUC_MINOR__>=4 && __GXX_EXPERIMENTAL_CXX0X__
00337 template<typename T> struct tbb_trivially_copyable { enum { value = std::has_trivial_copy_constructor<T>::value }; };
00338 #else
00339 template<typename T> struct tbb_trivially_copyable { enum { value = std::is_trivially_copyable<T>::value }; };
00340 #endif //
00341 #endif // __TBB_USE_CPP0X
00342 
00343 template<typename T> struct is_large_object {enum { value = tbb_large_object<T>::value || !tbb_trivially_copyable<T>::value }; };
00344 
00345 template<typename T, bool> class token_helper;
00346 
00347 // large object helper (uses tbb_allocator)
00348 template<typename T>
00349 class token_helper<T, true> {
00350     public:
00351     typedef typename tbb::tbb_allocator<T> allocator;
00352     typedef T* pointer;
00353     typedef T value_type;
00354     static pointer create_token(const value_type & source) {
00355         pointer output_t = allocator().allocate(1);
00356         return new (output_t) T(source);
00357     }
00358     static value_type & token(pointer & t) { return *t;}
00359     static void * cast_to_void_ptr(pointer ref) { return (void *) ref; }
00360     static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
00361     static void destroy_token(pointer token) {
00362         allocator().destroy(token);
00363         allocator().deallocate(token,1);
00364     }
00365 };
00366 
00367 // pointer specialization
00368 template<typename T>
00369 class token_helper<T*, false > {
00370     public:
00371     typedef T* pointer;
00372     typedef T* value_type;
00373     static pointer create_token(const value_type & source) { return source; }
00374     static value_type & token(pointer & t) { return t;}
00375     static void * cast_to_void_ptr(pointer ref) { return (void *)ref; }
00376     static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
00377     static void destroy_token( pointer /*token*/) {}
00378 };
00379 
00380 // small object specialization (converts void* to the correct type, passes objects directly.)
00381 template<typename T>
00382 class token_helper<T, false> {
00383     typedef union {
00384         T actual_value;
00385         void * void_overlay;
00386     } type_to_void_ptr_map;
00387     public:
00388     typedef T pointer;  // not really a pointer in this case.
00389     typedef T value_type;
00390     static pointer create_token(const value_type & source) {
00391         return source; }
00392     static value_type & token(pointer & t) { return t;}
00393     static void * cast_to_void_ptr(pointer ref) { 
00394         type_to_void_ptr_map mymap; 
00395         mymap.void_overlay = NULL;
00396         mymap.actual_value = ref; 
00397         return mymap.void_overlay; 
00398     }
00399     static pointer cast_from_void_ptr(void * ref) { 
00400         type_to_void_ptr_map mymap;
00401         mymap.void_overlay = ref;
00402         return mymap.actual_value;
00403     }
00404     static void destroy_token( pointer /*token*/) {}
00405 };
00406 
00407 template<typename T, typename U, typename Body>
00408 class concrete_filter: public tbb::filter {
00409     const Body& my_body;
00410     typedef token_helper<T,is_large_object<T>::value > t_helper;
00411     typedef typename t_helper::pointer t_pointer;
00412     typedef token_helper<U,is_large_object<U>::value > u_helper;
00413     typedef typename u_helper::pointer u_pointer;
00414 
00415     /*override*/ void* operator()(void* input) {
00416         t_pointer temp_input = t_helper::cast_from_void_ptr(input);
00417         u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
00418         t_helper::destroy_token(temp_input);
00419         return u_helper::cast_to_void_ptr(output_u);
00420     }
00421 
00422 public:
00423     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00424 };
00425 
00426 // input 
00427 template<typename U, typename Body>
00428 class concrete_filter<void,U,Body>: public filter {
00429     const Body& my_body;
00430     typedef token_helper<U, is_large_object<U>::value > u_helper;
00431     typedef typename u_helper::pointer u_pointer;
00432 
00433     /*override*/void* operator()(void*) {
00434         flow_control control;
00435         u_pointer output_u = u_helper::create_token(my_body(control));
00436         if(control.is_pipeline_stopped) {
00437             u_helper::destroy_token(output_u);
00438             set_end_of_input();
00439             return NULL;
00440         }
00441         return u_helper::cast_to_void_ptr(output_u);
00442     }
00443 
00444 public:
00445     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : 
00446         filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
00447         my_body(body)
00448     {}
00449 };
00450 
00451 template<typename T, typename Body>
00452 class concrete_filter<T,void,Body>: public filter {
00453     const Body& my_body;
00454     typedef token_helper<T, is_large_object<T>::value > t_helper;
00455     typedef typename t_helper::pointer t_pointer;
00456    
00457     /*override*/ void* operator()(void* input) {
00458         t_pointer temp_input = t_helper::cast_from_void_ptr(input);
00459         my_body(t_helper::token(temp_input));
00460         t_helper::destroy_token(temp_input);
00461         return NULL;
00462     }
00463 public:
00464     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00465 };
00466 
00467 template<typename Body>
00468 class concrete_filter<void,void,Body>: public filter {
00469     const Body& my_body;
00470     
00472     /*override*/ void* operator()(void*) {
00473         flow_control control;
00474         my_body(control);
00475         void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1; 
00476         return output;
00477     }
00478 public:
00479     concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00480 };
00481 
00483 
00484 class pipeline_proxy {
00485     tbb::pipeline my_pipe;
00486 public:
00487     pipeline_proxy( const filter_t<void,void>& filter_chain );
00488     ~pipeline_proxy() {
00489         while( filter* f = my_pipe.filter_list ) 
00490             delete f; // filter destructor removes it from the pipeline
00491     }
00492     tbb::pipeline* operator->() { return &my_pipe; }
00493 };
00494 
00496 
00497 class filter_node: tbb::internal::no_copy {
00499     tbb::atomic<intptr_t> ref_count;
00500 protected:
00501     filter_node() {
00502         ref_count = 0;
00503 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00504         ++(__TBB_TEST_FILTER_NODE_COUNT);
00505 #endif
00506     }
00507 public:
00509     virtual void add_to( pipeline& ) = 0;
00511     void add_ref() {++ref_count;}
00513     void remove_ref() {
00514         __TBB_ASSERT(ref_count>0,"ref_count underflow");
00515         if( --ref_count==0 ) 
00516             delete this;
00517     }
00518     virtual ~filter_node() {
00519 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00520         --(__TBB_TEST_FILTER_NODE_COUNT);
00521 #endif
00522     }
00523 };
00524 
00526 template<typename T, typename U, typename Body>
00527 class filter_node_leaf: public filter_node  {
00528     const tbb::filter::mode mode;
00529     const Body body;
00530     /*override*/void add_to( pipeline& p ) {
00531         concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
00532         p.add_filter( *f );
00533     }
00534 public:
00535     filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
00536 };
00537 
00539 class filter_node_join: public filter_node {
00540     friend class filter_node; // to suppress GCC 3.2 warnings
00541     filter_node& left;
00542     filter_node& right;
00543     /*override*/~filter_node_join() {
00544        left.remove_ref();
00545        right.remove_ref();
00546     }
00547     /*override*/void add_to( pipeline& p ) {
00548         left.add_to(p);
00549         right.add_to(p);
00550     }
00551 public:
00552     filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
00553        left.add_ref();
00554        right.add_ref();
00555     }
00556 };
00557 
00558 } // namespace internal
00560 
00562 template<typename T, typename U, typename Body>
00563 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
00564     return new internal::filter_node_leaf<T,U,Body>(mode, body);
00565 }
00566 
00567 template<typename T, typename V, typename U>
00568 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
00569     __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
00570     __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
00571     return new internal::filter_node_join(*left.root,*right.root);
00572 }
00573 
00575 template<typename T, typename U>
00576 class filter_t {
00577     typedef internal::filter_node filter_node;
00578     filter_node* root;
00579     filter_t( filter_node* root_ ) : root(root_) {
00580         root->add_ref();
00581     }
00582     friend class internal::pipeline_proxy;
00583     template<typename T_, typename U_, typename Body>
00584     friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
00585     template<typename T_, typename V_, typename U_>
00586     friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
00587 public:
00588     filter_t() : root(NULL) {}
00589     filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
00590         if( root ) root->add_ref();
00591     }
00592     template<typename Body>
00593     filter_t( tbb::filter::mode mode, const Body& body ) :
00594         root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
00595         root->add_ref();
00596     }
00597 
00598     void operator=( const filter_t<T,U>& rhs ) {
00599         // Order of operations below carefully chosen so that reference counts remain correct
00600         // in unlikely event that remove_ref throws exception.
00601         filter_node* old = root;
00602         root = rhs.root; 
00603         if( root ) root->add_ref();
00604         if( old ) old->remove_ref();
00605     }
00606     ~filter_t() {
00607         if( root ) root->remove_ref();
00608     }
00609     void clear() {
00610         // Like operator= with filter_t() on right side.
00611         if( root ) {
00612             filter_node* old = root;
00613             root = NULL;
00614             old->remove_ref();
00615         }
00616     }
00617 };
00618 
00619 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
00620     __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t"  );
00621     filter_chain.root->add_to(my_pipe);
00622 }
00623 
00624 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
00625 #if __TBB_TASK_GROUP_CONTEXT
00626     , tbb::task_group_context& context
00627 #endif
00628     ) {
00629     internal::pipeline_proxy pipe(filter_chain);
00630     // tbb::pipeline::run() is called via the proxy
00631     pipe->run(max_number_of_live_tokens
00632 #if __TBB_TASK_GROUP_CONTEXT
00633               , context
00634 #endif
00635     );
00636 }
00637 
00638 #if __TBB_TASK_GROUP_CONTEXT
00639 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
00640     tbb::task_group_context context;
00641     parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
00642 }
00643 #endif // __TBB_TASK_GROUP_CONTEXT
00644 
00645 } // interface6
00646 
00647 using interface6::flow_control;
00648 using interface6::filter_t;
00649 using interface6::make_filter;
00650 using interface6::parallel_pipeline;
00651 
00652 } // tbb
00653 
00654 #endif /* __TBB_pipeline_H */

Copyright © 2005-2012 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.