Cpp-Taskflow  2.4-master-branch
observer.hpp
1 // 2019/07/31 - modified by Tsung-Wei Huang
2 // - fixed the missing comma in outputing JSON
3 //
4 // 2019/06/13 - modified by Tsung-Wei Huang
5 // - added TaskView interface
6 //
7 // 2019/04/17 - created by Tsung-Wei Huang
8 
9 #pragma once
10 
11 #include <iostream>
12 #include <sstream>
13 #include <vector>
14 #include <cstdlib>
15 #include <cstdio>
16 #include <atomic>
17 #include <memory>
18 #include <deque>
19 #include <thread>
20 #include <algorithm>
21 #include <set>
22 #include <numeric>
23 #include <cassert>
24 
25 #include "task.hpp"
26 
27 namespace tf {
28 
39 
40  public:
41 
45  virtual ~ExecutorObserverInterface() = default;
46 
51  virtual void set_up(unsigned num_workers) = 0;
52 
58  virtual void on_entry(unsigned worker_id, TaskView task_view) = 0;
59 
65  virtual void on_exit(unsigned worker_id, TaskView task_view) = 0;
66 };
67 
68 // ------------------------------------------------------------------
69 
77 
78  friend class Executor;
79 
80  // data structure to record each task execution
81  struct Execution {
82 
83  TaskView task_view;
84 
87 
88  Execution(
89  TaskView tv,
91  ) :
92  task_view {tv}, beg {b} {
93  }
94 
95  Execution(
96  TaskView tv,
99  ) :
100  task_view {tv}, beg {b}, end {e} {
101  }
102  };
103 
104  // data structure to store the entire execution timeline
105  struct Timeline {
108  };
109 
110  public:
111 
116  inline void dump(std::ostream& ostream) const;
117 
122  inline std::string dump() const;
123 
127  inline void clear();
128 
133  inline size_t num_tasks() const;
134 
135  private:
136 
137  inline void set_up(unsigned num_workers) override final;
138  inline void on_entry(unsigned worker_id, TaskView task_view) override final;
139  inline void on_exit(unsigned worker_id, TaskView task_view) override final;
140 
141  Timeline _timeline;
142 };
143 
144 // Procedure: set_up
145 inline void ExecutorObserver::set_up(unsigned num_workers) {
146 
147  _timeline.executions.resize(num_workers);
148 
149  for(unsigned w=0; w<num_workers; ++w) {
150  _timeline.executions[w].reserve(1024);
151  }
152 
153  _timeline.origin = std::chrono::steady_clock::now();
154 }
155 
156 // Procedure: on_entry
157 inline void ExecutorObserver::on_entry(unsigned w, TaskView tv) {
158  _timeline.executions[w].emplace_back(tv, std::chrono::steady_clock::now());
159 }
160 
161 // Procedure: on_exit
162 inline void ExecutorObserver::on_exit(unsigned w, TaskView tv) {
163  static_cast<void>(tv); // avoid warning from compiler
164  assert(_timeline.executions[w].size() > 0);
165  _timeline.executions[w].back().end = std::chrono::steady_clock::now();
166 }
167 
168 // Function: clear
169 inline void ExecutorObserver::clear() {
170  for(size_t w=0; w<_timeline.executions.size(); ++w) {
171  _timeline.executions[w].clear();
172  }
173 }
174 
175 // Procedure: dump
176 inline void ExecutorObserver::dump(std::ostream& os) const {
177 
178  size_t first;
179 
180  for(first = 0; first<_timeline.executions.size(); ++first) {
181  if(_timeline.executions[first].size() > 0) {
182  break;
183  }
184  }
185 
186  os << '[';
187 
188  for(size_t w=first; w<_timeline.executions.size(); w++) {
189 
190  if(w != first && _timeline.executions[w].size() > 0) {
191  os << ',';
192  }
193 
194  for(size_t i=0; i<_timeline.executions[w].size(); i++) {
195 
196  os << '{'
197  << "\"cat\":\"ExecutorObserver\","
198  << "\"name\":\"" << _timeline.executions[w][i].task_view.name() << "\","
199  << "\"ph\":\"X\","
200  << "\"pid\":1,"
201  << "\"tid\":" << w << ','
203  _timeline.executions[w][i].beg - _timeline.origin
204  ).count() << ','
206  _timeline.executions[w][i].end - _timeline.executions[w][i].beg
207  ).count();
208 
209  if(i != _timeline.executions[w].size() - 1) {
210  os << "},";
211  }
212  else {
213  os << '}';
214  }
215  }
216  }
217  os << "]\n";
218 }
219 
220 // Function: dump
222  std::ostringstream oss;
223  dump(oss);
224  return oss.str();
225 }
226 
227 // Function: num_tasks
228 inline size_t ExecutorObserver::num_tasks() const {
229  return std::accumulate(
230  _timeline.executions.begin(), _timeline.executions.end(), size_t{0},
231  [](size_t sum, const auto& exe){
232  return sum + exe.size();
233  }
234  );
235 }
236 
237 
238 } // end of namespace tf -------------------------------------------
239 
240 
virtual void on_entry(unsigned worker_id, TaskView task_view)=0
method to call before a worker thread executes a closure
virtual void set_up(unsigned num_workers)=0
constructor-like method to call when the executor observer is fully created
Default executor observer to dump the execution timelines.
Definition: observer.hpp:76
virtual ~ExecutorObserverInterface()=default
virtual destructor
size_t num_tasks() const
get the number of total tasks in the observer
Definition: observer.hpp:228
T duration_cast(T... args)
Definition: error.hpp:9
immutable accessor class to a task node used by tf::ExecutorObserver
Definition: task.hpp:475
virtual void on_exit(unsigned worker_id, TaskView task_view)=0
method to call after a worker thread executed a closure
void clear()
clear the timeline data
Definition: observer.hpp:169
std::string dump() const
dump the timelines in JSON to a std::string
Definition: observer.hpp:221
execution interface for running a taskflow graph
Definition: executor.hpp:32
The interface class for creating an executor observer.
Definition: observer.hpp:38