Stream Graph
In this chapter, we will start to learn DtCraft's stream graph programming model and deploy applications on a cluster. The idea of streaming is analogous to an assembly pipeline, where one end produces data and the other end takes action on the data. In real applications, data streams come continuously and indefinitely, and we need to compute the data soon after they arrive. Our stream graph programming model is developed based on this behavior.
A stream graph consists of three key components,
vertex, stream, and container.
Vertex is used to synchronize all adjacent streams, and to store application-specific data at runtime.
Stream connects vertices and invokes computation callback when data arrive.
Container is used to request resources (cpu, memory) for your program.
The class Graph
is the main entry to create these components.
class Graph {
VertexBuilder vertex();
StreamBuilder stream(key_type, key_type);
ContainerBuilder container();
};
Each graph component is associated with a unique integer key which is deterministically consistent throughout an application's lifetime. Since a graph may be partitioned into different portions running on different machines, we use these keys to enable partial execution and decide which component goes to which partition.
Vertex
Vertex represents a persistent storage to interact with incoming and outgoing streams.
It has a constructor-like callback to synchronize adjacent streams,
and a public std::any
object to store application-specific data.
This is similar to C-style void*
but safer in type conversion.
You can access a particular input stream or output stream at a vertex given a stream key.
In most situations, we only obtain the output stream from a vertex to write data.
The source code of vertex can be found in include/dtc/kernel/vertex.hpp
and src/kernel/vertex.cpp
.
The code below shows a list of common methods to call from a vertex.
class Vertex {
const key_type key; // key of this vertex
std::any any; // store your data of any types
std::shared_ptr<InputStream> istream(key_type key) const;
std::shared_ptr<OutputStream> ostream(key_type key) const;
void remove_ostream(key_type key) const;
void remove_istream(key_type key) const;
template <typename... T>
void broadcast(T&&...) const;
template <typename K, typename... T>
void broadcast_to(K&& keys, T&&...) const;
};
Most methods are easily understandable by their names.
istream
returns a shared pointer to the input stream event at a given key, ornullptr
if not accessible from this vertex.ostream
returns a shared pointer to the output stream event at a given key, ornullptr
if not accessible from this vertex.remove_ostream
removes a output stream event from this vertex at a given key.remove_istream
removes an input stream event from this vertex at a given key.broadcast
writes data to all output streams.broadcast_to
writes data to those output streams specified in a vector of keys.
Performing stream operations through a vertex only makes sense for adjacent streams.
We skip those operations not belonging to the vertex.
Closing one end of a stream will automatically force the other end to close.
When a stream is closed, accessing it will return nullptr
.
Example: Access a Output Stream at a Vertex
The example below writes an integer 123456
through the output stream key
at the vertex v
, and then closes the stream.
auto callback = [key] (dtc::Vertex& v) {
if(auto os = v.ostream(key); os) {
(*os)(123456);
v.close_ostream(key);
}
else {
std::cout << "stream " << key << " doesn't exist or is closed\n";
}
};
Example: Broadcast at a Vertex
The example below writes an integer 123456
through all output streams
at the vertex v
.
auto callback = [key] (dtc::Vertex& v) {
v.broadcast(123456);
};
Note: Removing an input or a output stream is an irreversible operation. There is no chance to reopen a closed stream.
Stream
Stream is the heart of a DtCraft application. Each stream is associated with a pair of vertices and computation callbacks at both ends. The vertex on the output side of a stream is referred to as tail, and head on the input side. We do not allow direct access to a stream object but decompose it to a vertex and an I/O stream event. A stream can be either an inter-stream or an intra-stream, depending on the partition layout of the scheduler. We have unified the callback for both cases. The callback of a stream has the following two forms:
std::function<Event::Signal(Vertex&, OutputStream&)>; // ostream callback
std::function<Event::Signal(Vertex&, InputStream&)>; // istream callback
The input stream callback is invoked when data arrive and the output stream callback is invoked
when data are sent. The kernel autonomously polls to synchronize the stream buffer with the associated
device before delegating control over the stream callback.
You can safely assume in the context of the callback
the stream has been properly synchronized.
The return of the callback is an event signal which can be either dtc::Event::DEFAULT
or dtc::Event::REMOVE
.
Returning dtc::Event::REMOVE
from a callback will force the kernel
to remove the stream event.
Example: Extract Data From an Input Stream
The example below defines an input stream callback that extracts a vector of integers from an input stream event.
auto callback = [] (dtc::Vertex& head, dtc::InputStream& is) {
if(auto std::vector data; is(data) != -1) {
std::cout << "Received a vector of integers at vertex " << head.key << '\n';
return dtc::Event::REMOVE; // remove this stream event
}
return dtc::Event::DEFAULT; // incomplete data, try again later
}
Note: Multiple stream events can happen at the same time. It is users' responsibility to avoid data race when computation needs to synchronize at a vertex storage, for example, a parallel reduction.
Container
DtCraft implements a lightweight Linux container to isolate resources of a graph. Unlike existing frameworks, our container interface allows you to request different resources for different portions of a graph. Currently we support cpu and memory resources. Resources must be requested on a vertex basis. The kernel scheduler partitions the graph based on these requests into topologies and deploys them on remote machines. By default, DtCraft creates one container to include unassigned vertices.
Graph Builder
Every time you create a graph component,
it will not be instantiated until runtime.
Instead, you get a builder object on behalf of a particular graph component.
Each builder object provides a rich set of member functions to initialize the associated attributes.
The source of graph builder can be found in src/kernel/graph.cpp
and
include/dtc/kernel/graph.hpp
.
Vertex Builder
Vertex builder VertexBuilder
lets you initialize the attributes of a vertex.
class VertexBuilder {
const key_type key; // key of the associated vertex
operator key_type() const;
template <typename C>
VertexBuilder& on(C&& c);
VertexBuilder& tag(std::string str);
VertexBuilder& program(std::string command);
};
Each builder method returns a reference to itself similar to the JavaScript builder pattern.
()
implicitly converts the builder to the vertex key.on
sets the associated callback toc
, convertible tostd::function<void(dtc::Vertex&)>
.tag
sets the associated tag tostr
.program
sets the vertex as an external program supplied bycommand
.
Multiple calls to the same method will cause previous values to be replaced by the newest one.
Example: Set a Vertex Callback
The example below creates a vertex and assigns a callback to initialize the any object to a vector of integers.
auto vertex = G.vertex().on(
[] (dtc::Vertex& v) {
v.any = std::vector<int>();
}
);
Example: Set a Vertex Program
The example below creates a vertex program that executes the command /bin/ls -al
.
auto vertex = G.vertex().on(
[] (dtc::Vertex& v) {
std::cout << "Ready to execute '/bin/ls -al'\n";
}
).program("/bin/ls -al");
Stream Builder
Stream builder StreamBuilder
lets you initialize the attributes of a stream.
class StreamBuilder {
const key_type key; // key of the associated stream
operator key_type() const;
template <typename C>
StreamBuilder& on(C&& c);
StreamBuilder& critical(bool flag);
StreamBuilder& tag(std::string str);
};
Each builder method returns a reference to itself similar to the JavaScript builder pattern.
()
implicitly converts the builder to the stream key.on
sets the associated callback toc
, convertible to eitherstd::function<dtc::Event::Signal(dtc::Vertex&, dtc::InputStream&)>
orstd::function<dtc::Event::Signal(dtc::Vertex&, dtc::OutputStream&)>
at compile time.critical
sets the stream to be critical. When a critical stream is closed, the entire application will terminate.tag
sets the associated tag tostr
. This is useful when the stream is connected to a vertex program.
Multiple calls to the same method will cause previous values replaced by the newest one. In most cases, we only set the input stream callback and ignore the output stream callback.
Example: Set a Stream Callback
The example below creates a stream and assigns an input stream callback. The callback prints the number of bytes that are immediately available to read and then closes the stream.
auto tail = G.vertex();
auto head = G.vertex();
auto edge = G.stream(tail, head).on(
[] (dtc::Vertex& v, dtc::InputStream& is) {
std::cout << "stream has " << is.isbuf.in_avail() << " bytes available to read\n";
return dtc::Event::REMOVE;
}
);
Container Builder
Container builder ContainerBuilder
lets you initialize the attributes of a container,
in terms of system resources such as cpu count and memory limit.
The DtCraft kernel creates a lightweight Linux container to isolate your application.
Container only works for distributed mode, where a graph is partitioned to topologies based
on a user-specified container layout.
Local mode simply runs the program under the default resource limit on the user.
class ContainerBuilder {
const key_type key; // key of the associated container
operator key_type() const;
ContainerBuilder& add(key_type key);
ContainerBuilder& cpu(uintmax_t num);
ContainerBuilder& memory(uintmax_t bytes);
ContainerBuilder& space(uintmax_t bytes);
ContainerBuilder& host(std::string hostname);
};
Each builder method returns a reference to itself similar to the JavaScript builder pattern.
()
implicitly converts the builder to the container key.add
adds the vertexkey
to this container.cpu
assignsnum
cpus to this container.memory
assignsbytes
memory to this container.space
assignsbytes
disk space to this container.host
forces this container to run on the machinehostname
.
Container only adds vertex components as streams connections can be determined based on the vertex layout. The stream crossing two containers is called inter-stream and often comes with higher latency than that inside a container, called intra-stream. The DtCraft kernel deploys a graph based on the container layout.
Example: Containerize Vertices in a Graph
The example below creates two containers for two vertices, respectively, in a graph, and requests 1 cpu and 1 GB memory for each.
using namespace dtc::literals;
auto A = G.vertex();
auto B = G.vertex();
auto C1 = G.container().add(A).cpu(1).memory(1_GB);
auto C2 = G.container().add(B).cpu(1).memory(1_GB);
Vertex Program
DtCraft supports the execution of external programs in a graph.
This is particularly useful when you want to create a new stream application on top of
existing or 3rd-party programs.
When a vertex is specified as an external program, the kernel spawns a new program supplied by
the given command.
Connections will be passed to the program through the environment variable DTC_BRIDGES
,
where key is the stream tag and value is the associated file descriptor.
You can retrieve these handles and establish stream channels using your own libraries or our I/O stream API.
The file descriptor passed to an external program is restored to blocking
and open-on-exec.
Example: Execute an External Program in a Vertex
The example below demonstrates how to execute an external program and get the stream handle from its environment variable. When the external program finishes, it closes all adjacent streams which in turn forces all other ends to close.
// Graph program.
auto v = G.vertex().program("demo")
auto u = G.vertex();
auto s = G.stream(u, v).tag("some_id_here");
// Demo program
std::cout << ::getenv("DTC_BRIDGES") << '\n';
Execute a Stream Graph
After you created a stream graph, you can run it under either local mode
or distributed mode.
Local mode runs as a normal process on your operating system and distributed mode
deploys your graph to remote machines.
To execute a stream graph,
you must create an executor
object from your graph and call the method run
.
The executor will take care of runtime controls and interact with the kernel to deploy your graph.
Example: Create an Executor to Run a Stream Graph
The example below demonstrates how to create an executor to execute a stream graph.
dtc::Executor(G).run();
Note: Currently, DtCraft supports only a single graph in one application. Defining more than one stream graph in your program can result in undefined behavior.
Run a Stream Graph Locally
Running a stream graph under local mode is the same as running a binary from your local console.
After the program is successfully compiled, simply type in ./program_name
and pass the
argument list you defined. Local mode does not run any master and agents.
It is always a good idea to ensure your graph run correctly on local mode
before submitting it to a cluster.
Submit a Stream Graph to a Cluster
The submit
script in DtCraft's sbin
directory is used to help
submit an application to a cluster.
Please refer to QuickStart for how to run a stream graph on
distributed mode.
Where to Go from Here?
Congratulations! You have just learnt to create a DtCraft application using our stream graph
programming model. Please refer to examples/
to know more stream graph examples in practice.