Stream Graph

In this chapter, we will start to learn DtCraft's stream graph programming model and deploy applications on a cluster. The idea of streaming is analogous to an assembly pipeline, where one end produces data and the other end takes action on the data. In real applications, data streams come continuously and indefinitely, and we need to compute the data soon after they arrive. Our stream graph programming model is developed based on this behavior.

A stream graph consists of three key components, vertex, stream, and container. Vertex is used to synchronize all adjacent streams, and to store application-specific data at runtime. Stream connects vertices and invokes computation callback when data arrive. Container is used to request resources (cpu, memory) for your program. The class Graph is the main entry to create these components.

class Graph {
  VertexBuilder vertex();
  StreamBuilder stream(key_type, key_type);
  ContainerBuilder container(); 
};

Each graph component is associated with a unique integer key which is deterministically consistent throughout an application's lifetime. Since a graph may be partitioned into different portions running on different machines, we use these keys to enable partial execution and decide which component goes to which partition.

Vertex

Vertex represents a persistent storage to interact with incoming and outgoing streams. It has a constructor-like callback to synchronize adjacent streams, and a public std::any object to store application-specific data. This is similar to C-style void* but safer in type conversion. You can access a particular input stream or output stream at a vertex given a stream key. In most situations, we only obtain the output stream from a vertex to write data. The source code of vertex can be found in include/dtc/kernel/vertex.hpp and src/kernel/vertex.cpp. The code below shows a list of common methods to call from a vertex.

class Vertex {
  const key_type key;  // key of this vertex
  std::any any;        // store your data of any types

  std::shared_ptr<InputStream> istream(key_type key) const;
  std::shared_ptr<OutputStream> ostream(key_type key) const;

  void remove_ostream(key_type key) const;
  void remove_istream(key_type key) const;
  
  template <typename... T>
  void broadcast(T&&...) const;
  
  template <typename K, typename... T>
  void broadcast_to(K&& keys, T&&...) const;
};

Most methods are easily understandable by their names.

Performing stream operations through a vertex only makes sense for adjacent streams. We skip those operations not belonging to the vertex. Closing one end of a stream will automatically force the other end to close. When a stream is closed, accessing it will return nullptr.

Example: Access a Output Stream at a Vertex

The example below writes an integer 123456 through the output stream key at the vertex v, and then closes the stream.

auto callback = [key] (dtc::Vertex& v) {
  if(auto os = v.ostream(key); os) {
    (*os)(123456);
    v.close_ostream(key);
  }
  else {
    std::cout << "stream " << key << " doesn't exist or is closed\n";
  }
};
Example: Broadcast at a Vertex

The example below writes an integer 123456 through all output streams at the vertex v.

auto callback = [key] (dtc::Vertex& v) {
  v.broadcast(123456);
};

Note: Removing an input or a output stream is an irreversible operation. There is no chance to reopen a closed stream.

Stream

Stream is the heart of a DtCraft application. Each stream is associated with a pair of vertices and computation callbacks at both ends. The vertex on the output side of a stream is referred to as tail, and head on the input side. We do not allow direct access to a stream object but decompose it to a vertex and an I/O stream event. A stream can be either an inter-stream or an intra-stream, depending on the partition layout of the scheduler. We have unified the callback for both cases. The callback of a stream has the following two forms:

std::function<Event::Signal(Vertex&, OutputStream&)>;  // ostream callback
std::function<Event::Signal(Vertex&, InputStream&)>;   // istream callback

The input stream callback is invoked when data arrive and the output stream callback is invoked when data are sent. The kernel autonomously polls to synchronize the stream buffer with the associated device before delegating control over the stream callback. You can safely assume in the context of the callback the stream has been properly synchronized. The return of the callback is an event signal which can be either dtc::Event::DEFAULT or dtc::Event::REMOVE. Returning dtc::Event::REMOVE from a callback will force the kernel to remove the stream event.

Example: Extract Data From an Input Stream

The example below defines an input stream callback that extracts a vector of integers from an input stream event.

auto callback = [] (dtc::Vertex& head, dtc::InputStream& is) {
  if(auto std::vector data; is(data) != -1) {
    std::cout << "Received a vector of integers at vertex " << head.key << '\n';
    return dtc::Event::REMOVE; // remove this stream event
  }
  return dtc::Event::DEFAULT;  // incomplete data, try again later
}

Note: Multiple stream events can happen at the same time. It is users' responsibility to avoid data race when computation needs to synchronize at a vertex storage, for example, a parallel reduction.

Container

DtCraft implements a lightweight Linux container to isolate resources of a graph. Unlike existing frameworks, our container interface allows you to request different resources for different portions of a graph. Currently we support cpu and memory resources. Resources must be requested on a vertex basis. The kernel scheduler partitions the graph based on these requests into topologies and deploys them on remote machines. By default, DtCraft creates one container to include unassigned vertices.

Graph Builder

Every time you create a graph component, it will not be instantiated until runtime. Instead, you get a builder object on behalf of a particular graph component. Each builder object provides a rich set of member functions to initialize the associated attributes. The source of graph builder can be found in src/kernel/graph.cpp and include/dtc/kernel/graph.hpp.

Vertex Builder

Vertex builder VertexBuilder lets you initialize the attributes of a vertex.

class VertexBuilder {
  const key_type key;  // key of the associated vertex
  
  operator key_type() const; 
  
  template <typename C>
  VertexBuilder& on(C&& c);

  VertexBuilder& tag(std::string str);
  VertexBuilder& program(std::string command);
};

Each builder method returns a reference to itself similar to the JavaScript builder pattern.

Multiple calls to the same method will cause previous values to be replaced by the newest one.

Example: Set a Vertex Callback

The example below creates a vertex and assigns a callback to initialize the any object to a vector of integers.

auto vertex = G.vertex().on(
  [] (dtc::Vertex& v) {
    v.any = std::vector<int>();
  }
);
Example: Set a Vertex Program

The example below creates a vertex program that executes the command /bin/ls -al.

auto vertex = G.vertex().on(
  [] (dtc::Vertex& v) {
    std::cout << "Ready to execute '/bin/ls -al'\n";
  }
).program("/bin/ls -al");

Stream Builder

Stream builder StreamBuilder lets you initialize the attributes of a stream.

class StreamBuilder {
  const key_type key;  // key of the associated stream
  
  operator key_type() const; 
  
  template <typename C>
  StreamBuilder& on(C&& c);

  StreamBuilder& critical(bool flag);
  StreamBuilder& tag(std::string str);
};

Each builder method returns a reference to itself similar to the JavaScript builder pattern.

Multiple calls to the same method will cause previous values replaced by the newest one. In most cases, we only set the input stream callback and ignore the output stream callback.

Example: Set a Stream Callback

The example below creates a stream and assigns an input stream callback. The callback prints the number of bytes that are immediately available to read and then closes the stream.

auto tail = G.vertex();
auto head = G.vertex();
auto edge = G.stream(tail, head).on(
  [] (dtc::Vertex& v, dtc::InputStream& is) {
    std::cout << "stream has " << is.isbuf.in_avail() << " bytes available to read\n";
    return dtc::Event::REMOVE;
  }
);

Container Builder

Container builder ContainerBuilder lets you initialize the attributes of a container, in terms of system resources such as cpu count and memory limit. The DtCraft kernel creates a lightweight Linux container to isolate your application. Container only works for distributed mode, where a graph is partitioned to topologies based on a user-specified container layout. Local mode simply runs the program under the default resource limit on the user.

class ContainerBuilder {
  const key_type key;  // key of the associated container
  
  operator key_type() const; 
 
  ContainerBuilder& add(key_type key);
  ContainerBuilder& cpu(uintmax_t num);
  ContainerBuilder& memory(uintmax_t bytes);
  ContainerBuilder& space(uintmax_t bytes);
  ContainerBuilder& host(std::string hostname);
};

Each builder method returns a reference to itself similar to the JavaScript builder pattern.

Container only adds vertex components as streams connections can be determined based on the vertex layout. The stream crossing two containers is called inter-stream and often comes with higher latency than that inside a container, called intra-stream. The DtCraft kernel deploys a graph based on the container layout.

Example: Containerize Vertices in a Graph

The example below creates two containers for two vertices, respectively, in a graph, and requests 1 cpu and 1 GB memory for each.

using namespace dtc::literals;
auto A = G.vertex();
auto B = G.vertex();
auto C1 = G.container().add(A).cpu(1).memory(1_GB);
auto C2 = G.container().add(B).cpu(1).memory(1_GB);

Vertex Program

DtCraft supports the execution of external programs in a graph. This is particularly useful when you want to create a new stream application on top of existing or 3rd-party programs. When a vertex is specified as an external program, the kernel spawns a new program supplied by the given command. Connections will be passed to the program through the environment variable DTC_BRIDGES, where key is the stream tag and value is the associated file descriptor. You can retrieve these handles and establish stream channels using your own libraries or our I/O stream API. The file descriptor passed to an external program is restored to blocking and open-on-exec.

Example: Execute an External Program in a Vertex

The example below demonstrates how to execute an external program and get the stream handle from its environment variable. When the external program finishes, it closes all adjacent streams which in turn forces all other ends to close.

// Graph program.
auto v = G.vertex().program("demo")
auto u = G.vertex();
auto s = G.stream(u, v).tag("some_id_here");
// Demo program
std::cout << ::getenv("DTC_BRIDGES") << '\n';

Execute a Stream Graph

After you created a stream graph, you can run it under either local mode or distributed mode. Local mode runs as a normal process on your operating system and distributed mode deploys your graph to remote machines. To execute a stream graph, you must create an executor object from your graph and call the method run. The executor will take care of runtime controls and interact with the kernel to deploy your graph.

Example: Create an Executor to Run a Stream Graph

The example below demonstrates how to create an executor to execute a stream graph.

dtc::Executor(G).run();

Note: Currently, DtCraft supports only a single graph in one application. Defining more than one stream graph in your program can result in undefined behavior.

Run a Stream Graph Locally

Running a stream graph under local mode is the same as running a binary from your local console. After the program is successfully compiled, simply type in ./program_name and pass the argument list you defined. Local mode does not run any master and agents. It is always a good idea to ensure your graph run correctly on local mode before submitting it to a cluster.

Submit a Stream Graph to a Cluster

The submit script in DtCraft's sbin directory is used to help submit an application to a cluster. Please refer to QuickStart for how to run a stream graph on distributed mode.

Where to Go from Here?

Congratulations! You have just learnt to create a DtCraft application using our stream graph programming model. Please refer to examples/ to know more stream graph examples in practice.