Cpp-Taskflow  2.2.0
observer.hpp
1 // 2019/06/13 - modified by Tsung-Wei Huang
2 // - added TaskView interface
3 //
4 // 2019/04/17 - created by Tsung-Wei Huang
5 
6 #pragma once
7 
8 #include <iostream>
9 #include <sstream>
10 #include <vector>
11 #include <cstdlib>
12 #include <cstdio>
13 #include <atomic>
14 #include <memory>
15 #include <deque>
16 #include <optional>
17 #include <thread>
18 #include <algorithm>
19 #include <set>
20 #include <numeric>
21 #include <cassert>
22 
23 #include "task.hpp"
24 
25 namespace tf {
26 
37 
38  public:
39 
43  virtual ~ExecutorObserverInterface() = default;
44 
49  virtual void set_up(unsigned num_workers) {};
50 
56  virtual void on_entry(unsigned worker_id, TaskView task_view) {};
57 
63  virtual void on_exit(unsigned worker_id, TaskView task_view) {};
64 };
65 
66 // ------------------------------------------------------------------
67 
75 
76  friend class Executor;
77 
78  // data structure to record each task execution
79  struct Execution {
80 
81  TaskView task_view;
82 
85 
86  Execution(
87  TaskView tv,
89  ) :
90  task_view {tv}, beg {b} {
91  }
92 
93  Execution(
94  TaskView tv,
97  ) :
98  task_view {tv}, beg {b}, end {e} {
99  }
100  };
101 
102  // data structure to store the entire execution timeline
103  struct Timeline {
106  };
107 
108  public:
109 
114  inline void dump(std::ostream& ostream) const;
115 
120  inline std::string dump() const;
121 
125  inline void clear();
126 
131  inline size_t num_tasks() const;
132 
133  private:
134 
135  inline void set_up(unsigned num_workers) override final;
136  inline void on_entry(unsigned worker_id, TaskView task_view) override final;
137  inline void on_exit(unsigned worker_id, TaskView task_view) override final;
138 
139  Timeline _timeline;
140 };
141 
142 // Procedure: set_up
143 inline void ExecutorObserver::set_up(unsigned num_workers) {
144 
145  _timeline.executions.resize(num_workers);
146 
147  for(unsigned w=0; w<num_workers; ++w) {
148  _timeline.executions[w].reserve(1024);
149  }
150 
151  _timeline.origin = std::chrono::steady_clock::now();
152 }
153 
154 // Procedure: on_entry
155 inline void ExecutorObserver::on_entry(unsigned w, TaskView tv) {
156  _timeline.executions[w].emplace_back(tv, std::chrono::steady_clock::now());
157 }
158 
159 // Procedure: on_exit
160 inline void ExecutorObserver::on_exit(unsigned w, TaskView tv) {
161  assert(_timeline.executions[w].size() > 0);
162  _timeline.executions[w].back().end = std::chrono::steady_clock::now();
163 }
164 
165 // Function: clear
166 inline void ExecutorObserver::clear() {
167  for(size_t w=0; w<_timeline.executions.size(); ++w) {
168  _timeline.executions[w].clear();
169  }
170 }
171 
172 // Procedure: dump
173 inline void ExecutorObserver::dump(std::ostream& os) const {
174 
175  os << '[';
176 
177  for(size_t w=0; w<_timeline.executions.size(); w++) {
178 
179  if(w != 0 && _timeline.executions[w].size() > 0 &&
180  _timeline.executions[w-1].size() > 0) {
181  os << ',';
182  }
183 
184  for(size_t i=0; i<_timeline.executions[w].size(); i++) {
185 
186  os << '{'
187  << "\"cat\":\"ExecutorObserver\","
188  << "\"name\":\"" << _timeline.executions[w][i].task_view.name() << "\","
189  << "\"ph\":\"X\","
190  << "\"pid\":1,"
191  << "\"tid\":" << w << ','
193  _timeline.executions[w][i].beg - _timeline.origin
194  ).count() << ','
196  _timeline.executions[w][i].end - _timeline.executions[w][i].beg
197  ).count();
198 
199  if(i != _timeline.executions[w].size() - 1) {
200  os << "},";
201  }
202  else {
203  os << '}';
204  }
205  }
206  }
207  os << "]\n";
208 }
209 
210 // Function: dump
212  std::ostringstream oss;
213  dump(oss);
214  return oss.str();
215 }
216 
217 // Function: num_tasks
218 inline size_t ExecutorObserver::num_tasks() const {
219  return std::accumulate(
220  _timeline.executions.begin(), _timeline.executions.end(), size_t{0},
221  [](size_t sum, const auto& exe){
222  return sum + exe.size();
223  }
224  );
225 }
226 
227 
228 } // end of namespace tf -------------------------------------------
229 
230 
Default executor observer to dump the execution timelines.
Definition: observer.hpp:74
virtual ~ExecutorObserverInterface()=default
virtual destructor
size_t num_tasks() const
get the number of total tasks in the observer
Definition: observer.hpp:218
T duration_cast(T... args)
Definition: taskflow.hpp:5
virtual void set_up(unsigned num_workers)
constructor-like method to call when the executor observer is fully created
Definition: observer.hpp:49
A constant wrapper class to a task node, mainly used in the tf::ExecutorObserver interface.
Definition: task.hpp:300
virtual void on_exit(unsigned worker_id, TaskView task_view)
method to call after a worker thread executed a closure
Definition: observer.hpp:63
virtual void on_entry(unsigned worker_id, TaskView task_view)
method to call before a worker thread executes a closure
Definition: observer.hpp:56
void clear()
clear the timeline data
Definition: observer.hpp:166
std::string dump() const
dump the timelines in JSON to a std::string
Definition: observer.hpp:211
The executor class to run a taskflow graph.
Definition: executor.hpp:73
The interface class for creating an executor observer.
Definition: observer.hpp:36