工作流
工作流包含两部份:
- connectors: 连接器配置,定义工作流需要使用的数据源连接器
- graph: 流程图(DAG)定义
工作流使用XML格式进行定义,示例配置如下:
<?xml version="1.0" encoding="utf-8" ?>
<workflow>
<connectors>
<connector id="postgres" type="sql">
<props>
<prop key="poolName" value="postgres"/>
<prop key="maximumPoolSize" value="2"/>
<prop key="username" value="massdata"/>
<prop key="password" value="massdata"/>
<prop key="dataSourceClassName" value="org.postgresql.ds.PGSimpleDataSource"/>
<prop key="dataSource.serverName" value="localhost"/>
<prop key="dataSource.portNumber" value="5432"/>
<prop key="dataSource.databaseName" value="massdata"/>
</props>
</connector>
<connector id="mysql" type="sql">
<props>
<prop key="poolName" value="mysql"/>
<prop key="maximumPoolSize" value="2"/>
<prop key="jdbcUrl">
<value><![CDATA[jdbc:mysql://localhost:3306/massdata?useSSL=false&characterEncoding=utf8]]></value>
</prop>
<prop key="username" value="massdata"/>
<prop key="password" value="massdata"/>
</props>
</connector>
</connectors>
<!-- #graph_example -->
<graph>
<source id="postgres">
<connector ref="postgres"/>
<out>flow1</out>
</source>
<flows>
<flow id="flow1">
<script type="scala" src=""><![CDATA[]]></script>
<out>mysql</out>
</flow>
</flows>
<sink id="mysql">
<connector ref="mysql">
<prop key="" value=""/>
</connector>
</sink>
</graph>
<!-- #graph_example -->
</workflow>
Connectors
见 数据连接器
Graph
类似Akka Stream的数据流图,这里是一个DAG,定义了ETL的处理流程。整个graph必需为闭合状态才有效并可被执行,不然RDP将分析workflow抛出异常。
- source: 数据来源,它只有一个out。用来决定从哪里获得数据流。
- flow: 数据流元素将流经的处理过程。flow至少有一个in和out。
- sink: 数据收集汇,它只有一个in。用来决定数据流最终被被存储到哪里。
<graph name="mysql2pg-test">
<source name="source">
<connector ref="mysql"/>
<script type="sql" src=""><![CDATA[
select id, isbn, title, description, publish_at, created_at from t_book
]]></script>
<out>flow1</out>
</source>
<flows>
<flow name="flow1">
<script type="javascript" src=""><![CDATA[
event.data().values()
]]></script>
<out>sink</out>
</flow>
</flows>
<sink name="sink">
<!--<connector ref="postgres"/>-->
<connector ref="postgres"/>
<script type="sql" src=""><![CDATA[
insert into t_book(id, isbn, title, description, publish_at, created_at) values(?, ?, ?, ?, ?, ?)
]]></script>
</sink>
</graph>
graph.source
指Source要引用的connector,若在当前workflow配置文件内找不到connector定义,则从系统全局的connectors库寻找。
graph.flows
flow是实际对数据进行各种转换、过滤操作的阶段。
graph.flows.flow.script
script,配置flow怎样处理数据的脚本。包含以下主要功能:
- type:可以是 Scala、Java、Javascript、Python代码
- src:或者指定
jar
、zip
可执行程序包或.scala
,.py
、.js
可执行代码文件文件路径格式支持:file://
,http://
,classpath://
,ftp://
,hdfs://
若指定了package,则 script 段内嵌代码无效。