Flow 模型为一组 XML 格式编码定义的 source,Transform,target 开发的 SQL 数据流。有 source 定义数据加载,Transform 做数据关联和转换,target 控制数据输出。
样例如下:
xml
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<properties>
<name>QUA_DATA</name>
<version>1.2</version>
<root.path>/opt /software/spark</root.path>
<jdbc.driverclass>com.mysql.jdbc.Driver</jdbc.driverclass>
<jdbc.url>jdbc:mysql://localhost:3306/tdb?characterEncoding=UTF-8</jdbc.url>
<jdbc.user>root</jdbc.user>
<jdbc.password>123456</jdbc.password>
<task.type>QUA_CUST_RULE</task.type>
<catalog.model>QUA</catalog.model>
</properties>
<methods>
<udaf name="CONCAT_VAL" class="com.yiidata.etl.udf.ConcatUDAF"/>
<udf name="XCOUNT" class="com.yiidata.etl.udf.XCount" return="int" />
</methods>
<prepare>
<sql>use default</sql>
<round type="mysql"
sql="insert into cpic_task_history(id, task_type, catalog_model, start_time, retry_count, final_status, created_at)
values(${uuid}, ${task.type}, ${catalog.model}, ${starttime}, 0, ${status}, now())"
url="${jdbc.url}"
driver="${jdbc.driverclass}"
user="${jdbc.user}"
password="${jdbc.password}" />
</prepare>
<sources>
<source type="textfile" table_name="et_pty_cust_flg"
fields="cust_id,name,gender,age:int" delimiter=","
path="file:///Users/zhenqin/software/hive/user.txt"/>
<source type="hive" table_name="user_infox"
sql="select * from user_info"/>
<source type="hive" table_name="user_concat_testx"
sql="select * from user_concat_test"/>
</sources>
<transformers>
<transform type="sql" table_name="cust_id_agmt_id_t" cached="true">
SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids
FROM user_concat_testx
group by c_phone,c_type,c_num
</transform>
</transformers>
<targets>
<target type="show" table_name="cust_id_agmt_id_t"/>
<target type="jsonfile" table_name="cust_id_agmt_id_t"
path="file:///Users/zhenqin/temp/output3.json"
partition="1"
savemode="overwrite" />
</targets>
<after>
<round type="mysql"
sql="update cpic_task_history set
end_time = ${endtime}, final_status = ${status}, error_text = ${error} where id = ${uuid}"
url="${jdbc.url}"
driver="${jdbc.driverclass}"
user="${jdbc.user}"
password="${jdbc.password}" />
</after>
</flow>