00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_pipeline_H
00022 #define __TBB_pipeline_H
00023
00024 #include "atomic.h"
00025 #include "task.h"
00026 #include "tbb_allocator.h"
00027 #include <cstddef>
00028
00029 #if !TBB_IMPLEMENT_CPP0X
00030 #include <type_traits>
00031 #endif
00032
00033 namespace tbb {
00034
00035 class pipeline;
00036 class filter;
00037
00039 namespace internal {
00040
00041
00042 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00043
00044 typedef unsigned long Token;
00045 typedef long tokendiff_t;
00046 class stage_task;
00047 class input_buffer;
00048 class pipeline_root_task;
00049 class pipeline_cleaner;
00050
00051 }
00052
00053 namespace interface6 {
00054 template<typename T, typename U> class filter_t;
00055
00056 namespace internal {
00057 class pipeline_proxy;
00058 }
00059 }
00060
00062
00064
00065 class filter: internal::no_copy {
00066 private:
00068 static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
00069 protected:
00071 static const unsigned char filter_is_serial = 0x1;
00072
00074
00076 static const unsigned char filter_is_out_of_order = 0x1<<4;
00077
00079 static const unsigned char filter_is_bound = 0x1<<5;
00080
00082 static const unsigned char filter_may_emit_null = 0x1<<6;
00083
00085 static const unsigned char exact_exception_propagation =
00086 #if TBB_USE_CAPTURED_EXCEPTION
00087 0x0;
00088 #else
00089 0x1<<7;
00090 #endif
00091
00092 static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
00093 static const unsigned char version_mask = 0x7<<1;
00094 public:
00095 enum mode {
00097 parallel = current_version | filter_is_out_of_order,
00099 serial_in_order = current_version | filter_is_serial,
00101 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00103 serial = serial_in_order
00104 };
00105 protected:
00106 filter( bool is_serial_ ) :
00107 next_filter_in_pipeline(not_in_pipeline()),
00108 my_input_buffer(NULL),
00109 my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
00110 prev_filter_in_pipeline(not_in_pipeline()),
00111 my_pipeline(NULL),
00112 next_segment(NULL)
00113 {}
00114
00115 filter( mode filter_mode ) :
00116 next_filter_in_pipeline(not_in_pipeline()),
00117 my_input_buffer(NULL),
00118 my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
00119 prev_filter_in_pipeline(not_in_pipeline()),
00120 my_pipeline(NULL),
00121 next_segment(NULL)
00122 {}
00123
00124
00125 void __TBB_EXPORTED_METHOD set_end_of_input();
00126
00127 public:
00129 bool is_serial() const {
00130 return bool( my_filter_mode & filter_is_serial );
00131 }
00132
00134 bool is_ordered() const {
00135 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00136 }
00137
00139 bool is_bound() const {
00140 return ( my_filter_mode & filter_is_bound )==filter_is_bound;
00141 }
00142
00144 bool object_may_be_null() {
00145 return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
00146 }
00147
00149
00150 virtual void* operator()( void* item ) = 0;
00151
00153
00154 virtual __TBB_EXPORTED_METHOD ~filter();
00155
00156 #if __TBB_TASK_GROUP_CONTEXT
00158
00160 virtual void finalize( void* ) {};
00161 #endif
00162
00163 private:
00165 filter* next_filter_in_pipeline;
00166
00168
00169
00170 bool has_more_work();
00171
00173
00174 internal::input_buffer* my_input_buffer;
00175
00176 friend class internal::stage_task;
00177 friend class internal::pipeline_root_task;
00178 friend class pipeline;
00179 friend class thread_bound_filter;
00180
00182 const unsigned char my_filter_mode;
00183
00185 filter* prev_filter_in_pipeline;
00186
00188 pipeline* my_pipeline;
00189
00191
00192 filter* next_segment;
00193 };
00194
00196
00197 class thread_bound_filter: public filter {
00198 public:
00199 enum result_type {
00200
00201 success,
00202
00203 item_not_available,
00204
00205 end_of_stream
00206 };
00207 protected:
00208 thread_bound_filter(mode filter_mode):
00209 filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
00210 {}
00211 public:
00213
00218 result_type __TBB_EXPORTED_METHOD try_process_item();
00219
00221
00225 result_type __TBB_EXPORTED_METHOD process_item();
00226
00227 private:
00229 result_type internal_process_item(bool is_blocking);
00230 };
00231
00233
00234 class pipeline {
00235 public:
00237 __TBB_EXPORTED_METHOD pipeline();
00238
00241 virtual __TBB_EXPORTED_METHOD ~pipeline();
00242
00244 void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00245
00247 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00248
00249 #if __TBB_TASK_GROUP_CONTEXT
00251 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00252 #endif
00253
00255 void __TBB_EXPORTED_METHOD clear();
00256
00257 private:
00258 friend class internal::stage_task;
00259 friend class internal::pipeline_root_task;
00260 friend class filter;
00261 friend class thread_bound_filter;
00262 friend class internal::pipeline_cleaner;
00263 friend class tbb::interface6::internal::pipeline_proxy;
00264
00266 filter* filter_list;
00267
00269 filter* filter_end;
00270
00272 task* end_counter;
00273
00275 atomic<internal::Token> input_tokens;
00276
00278 atomic<internal::Token> token_counter;
00279
00281 bool end_of_input;
00282
00284 bool has_thread_bound_filters;
00285
00287 void remove_filter( filter& filter_ );
00288
00290 void __TBB_EXPORTED_METHOD inject_token( task& self );
00291
00292 #if __TBB_TASK_GROUP_CONTEXT
00294 void clear_filters();
00295 #endif
00296 };
00297
00298
00299
00300
00301
00302 namespace interface6 {
00303
00304 namespace internal {
00305 template<typename T, typename U, typename Body> class concrete_filter;
00306 }
00307
00309 class flow_control {
00310 bool is_pipeline_stopped;
00311 flow_control() { is_pipeline_stopped = false; }
00312 template<typename T, typename U, typename Body> friend class internal::concrete_filter;
00313 public:
00314 void stop() { is_pipeline_stopped = true; }
00315 };
00316
00318 namespace internal {
00319
00320 template<typename T> struct tbb_large_object {enum { value = sizeof(T) > sizeof(void *) }; };
00321
00322 #if TBB_IMPLEMENT_CPP0X
00323
00324
00325 template<typename T> struct tbb_trivially_copyable { enum { value = false }; };
00326 template<typename T> struct tbb_trivially_copyable <T*> { enum { value = true }; };
00327 template<> struct tbb_trivially_copyable <short> { enum { value = true }; };
00328 template<> struct tbb_trivially_copyable <unsigned short> { enum { value = true }; };
00329 template<> struct tbb_trivially_copyable <int> { enum { value = !tbb_large_object<int>::value }; };
00330 template<> struct tbb_trivially_copyable <unsigned int> { enum { value = !tbb_large_object<int>::value }; };
00331 template<> struct tbb_trivially_copyable <long> { enum { value = !tbb_large_object<long>::value }; };
00332 template<> struct tbb_trivially_copyable <unsigned long> { enum { value = !tbb_large_object<long>::value }; };
00333 template<> struct tbb_trivially_copyable <float> { enum { value = !tbb_large_object<float>::value }; };
00334 template<> struct tbb_trivially_copyable <double> { enum { value = !tbb_large_object<double>::value }; };
00335 #else
00336 #if __GNUC__==4 && __GNUC_MINOR__>=4 && __GXX_EXPERIMENTAL_CXX0X__
00337 template<typename T> struct tbb_trivially_copyable { enum { value = std::has_trivial_copy_constructor<T>::value }; };
00338 #else
00339 template<typename T> struct tbb_trivially_copyable { enum { value = std::is_trivially_copyable<T>::value }; };
00340 #endif //
00341 #endif // __TBB_USE_CPP0X
00342
00343 template<typename T> struct is_large_object {enum { value = tbb_large_object<T>::value || !tbb_trivially_copyable<T>::value }; };
00344
00345 template<typename T, bool> class token_helper;
00346
00347
00348 template<typename T>
00349 class token_helper<T, true> {
00350 public:
00351 typedef typename tbb::tbb_allocator<T> allocator;
00352 typedef T* pointer;
00353 typedef T value_type;
00354 static pointer create_token(const value_type & source) {
00355 pointer output_t = allocator().allocate(1);
00356 return new (output_t) T(source);
00357 }
00358 static value_type & token(pointer & t) { return *t;}
00359 static void * cast_to_void_ptr(pointer ref) { return (void *) ref; }
00360 static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
00361 static void destroy_token(pointer token) {
00362 allocator().destroy(token);
00363 allocator().deallocate(token,1);
00364 }
00365 };
00366
00367
00368 template<typename T>
00369 class token_helper<T*, false > {
00370 public:
00371 typedef T* pointer;
00372 typedef T* value_type;
00373 static pointer create_token(const value_type & source) { return source; }
00374 static value_type & token(pointer & t) { return t;}
00375 static void * cast_to_void_ptr(pointer ref) { return (void *)ref; }
00376 static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
00377 static void destroy_token( pointer ) {}
00378 };
00379
00380
00381 template<typename T>
00382 class token_helper<T, false> {
00383 typedef union {
00384 T actual_value;
00385 void * void_overlay;
00386 } type_to_void_ptr_map;
00387 public:
00388 typedef T pointer;
00389 typedef T value_type;
00390 static pointer create_token(const value_type & source) {
00391 return source; }
00392 static value_type & token(pointer & t) { return t;}
00393 static void * cast_to_void_ptr(pointer ref) {
00394 type_to_void_ptr_map mymap;
00395 mymap.void_overlay = NULL;
00396 mymap.actual_value = ref;
00397 return mymap.void_overlay;
00398 }
00399 static pointer cast_from_void_ptr(void * ref) {
00400 type_to_void_ptr_map mymap;
00401 mymap.void_overlay = ref;
00402 return mymap.actual_value;
00403 }
00404 static void destroy_token( pointer ) {}
00405 };
00406
00407 template<typename T, typename U, typename Body>
00408 class concrete_filter: public tbb::filter {
00409 const Body& my_body;
00410 typedef token_helper<T,is_large_object<T>::value > t_helper;
00411 typedef typename t_helper::pointer t_pointer;
00412 typedef token_helper<U,is_large_object<U>::value > u_helper;
00413 typedef typename u_helper::pointer u_pointer;
00414
00415 void* operator()(void* input) {
00416 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
00417 u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
00418 t_helper::destroy_token(temp_input);
00419 return u_helper::cast_to_void_ptr(output_u);
00420 }
00421
00422 public:
00423 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00424 };
00425
00426
00427 template<typename U, typename Body>
00428 class concrete_filter<void,U,Body>: public filter {
00429 const Body& my_body;
00430 typedef token_helper<U, is_large_object<U>::value > u_helper;
00431 typedef typename u_helper::pointer u_pointer;
00432
00433 void* operator()(void*) {
00434 flow_control control;
00435 u_pointer output_u = u_helper::create_token(my_body(control));
00436 if(control.is_pipeline_stopped) {
00437 u_helper::destroy_token(output_u);
00438 set_end_of_input();
00439 return NULL;
00440 }
00441 return u_helper::cast_to_void_ptr(output_u);
00442 }
00443
00444 public:
00445 concrete_filter(tbb::filter::mode filter_mode, const Body& body) :
00446 filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
00447 my_body(body)
00448 {}
00449 };
00450
00451 template<typename T, typename Body>
00452 class concrete_filter<T,void,Body>: public filter {
00453 const Body& my_body;
00454 typedef token_helper<T, is_large_object<T>::value > t_helper;
00455 typedef typename t_helper::pointer t_pointer;
00456
00457 void* operator()(void* input) {
00458 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
00459 my_body(t_helper::token(temp_input));
00460 t_helper::destroy_token(temp_input);
00461 return NULL;
00462 }
00463 public:
00464 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00465 };
00466
00467 template<typename Body>
00468 class concrete_filter<void,void,Body>: public filter {
00469 const Body& my_body;
00470
00472 void* operator()(void*) {
00473 flow_control control;
00474 my_body(control);
00475 void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1;
00476 return output;
00477 }
00478 public:
00479 concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00480 };
00481
00483
00484 class pipeline_proxy {
00485 tbb::pipeline my_pipe;
00486 public:
00487 pipeline_proxy( const filter_t<void,void>& filter_chain );
00488 ~pipeline_proxy() {
00489 while( filter* f = my_pipe.filter_list )
00490 delete f;
00491 }
00492 tbb::pipeline* operator->() { return &my_pipe; }
00493 };
00494
00496
00497 class filter_node: tbb::internal::no_copy {
00499 tbb::atomic<intptr_t> ref_count;
00500 protected:
00501 filter_node() {
00502 ref_count = 0;
00503 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00504 ++(__TBB_TEST_FILTER_NODE_COUNT);
00505 #endif
00506 }
00507 public:
00509 virtual void add_to( pipeline& ) = 0;
00511 void add_ref() {++ref_count;}
00513 void remove_ref() {
00514 __TBB_ASSERT(ref_count>0,"ref_count underflow");
00515 if( --ref_count==0 )
00516 delete this;
00517 }
00518 virtual ~filter_node() {
00519 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00520 --(__TBB_TEST_FILTER_NODE_COUNT);
00521 #endif
00522 }
00523 };
00524
00526 template<typename T, typename U, typename Body>
00527 class filter_node_leaf: public filter_node {
00528 const tbb::filter::mode mode;
00529 const Body body;
00530 void add_to( pipeline& p ) {
00531 concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
00532 p.add_filter( *f );
00533 }
00534 public:
00535 filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
00536 };
00537
00539 class filter_node_join: public filter_node {
00540 friend class filter_node;
00541 filter_node& left;
00542 filter_node& right;
00543 ~filter_node_join() {
00544 left.remove_ref();
00545 right.remove_ref();
00546 }
00547 void add_to( pipeline& p ) {
00548 left.add_to(p);
00549 right.add_to(p);
00550 }
00551 public:
00552 filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
00553 left.add_ref();
00554 right.add_ref();
00555 }
00556 };
00557
00558 }
00560
00562 template<typename T, typename U, typename Body>
00563 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
00564 return new internal::filter_node_leaf<T,U,Body>(mode, body);
00565 }
00566
00567 template<typename T, typename V, typename U>
00568 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
00569 __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
00570 __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
00571 return new internal::filter_node_join(*left.root,*right.root);
00572 }
00573
00575 template<typename T, typename U>
00576 class filter_t {
00577 typedef internal::filter_node filter_node;
00578 filter_node* root;
00579 filter_t( filter_node* root_ ) : root(root_) {
00580 root->add_ref();
00581 }
00582 friend class internal::pipeline_proxy;
00583 template<typename T_, typename U_, typename Body>
00584 friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
00585 template<typename T_, typename V_, typename U_>
00586 friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
00587 public:
00588 filter_t() : root(NULL) {}
00589 filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
00590 if( root ) root->add_ref();
00591 }
00592 template<typename Body>
00593 filter_t( tbb::filter::mode mode, const Body& body ) :
00594 root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
00595 root->add_ref();
00596 }
00597
00598 void operator=( const filter_t<T,U>& rhs ) {
00599
00600
00601 filter_node* old = root;
00602 root = rhs.root;
00603 if( root ) root->add_ref();
00604 if( old ) old->remove_ref();
00605 }
00606 ~filter_t() {
00607 if( root ) root->remove_ref();
00608 }
00609 void clear() {
00610
00611 if( root ) {
00612 filter_node* old = root;
00613 root = NULL;
00614 old->remove_ref();
00615 }
00616 }
00617 };
00618
00619 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
00620 __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t" );
00621 filter_chain.root->add_to(my_pipe);
00622 }
00623
00624 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
00625 #if __TBB_TASK_GROUP_CONTEXT
00626 , tbb::task_group_context& context
00627 #endif
00628 ) {
00629 internal::pipeline_proxy pipe(filter_chain);
00630
00631 pipe->run(max_number_of_live_tokens
00632 #if __TBB_TASK_GROUP_CONTEXT
00633 , context
00634 #endif
00635 );
00636 }
00637
00638 #if __TBB_TASK_GROUP_CONTEXT
00639 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
00640 tbb::task_group_context context;
00641 parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
00642 }
00643 #endif // __TBB_TASK_GROUP_CONTEXT
00644
00645 }
00646
00647 using interface6::flow_control;
00648 using interface6::filter_t;
00649 using interface6::make_filter;
00650 using interface6::parallel_pipeline;
00651
00652 }
00653
00654 #endif