Skip to content

Flow 模型为一组 XML 格式编码定义的 source,Transform,target 开发的 SQL 数据流。有 source 定义数据加载,Transform 做数据关联和转换,target 控制数据输出。

样例如下:

xml
<?xml version="1.0" encoding="UTF-8"?>
<flow>
    <properties>
        <name>QUA_DATA</name>
        <version>1.2</version>
        <root.path>/opt /software/spark</root.path>
        <jdbc.driverclass>com.mysql.jdbc.Driver</jdbc.driverclass>
        <jdbc.url>jdbc:mysql://localhost:3306/tdb?characterEncoding=UTF-8</jdbc.url>
        <jdbc.user>root</jdbc.user>
        <jdbc.password>123456</jdbc.password>
        <task.type>QUA_CUST_RULE</task.type>
        <catalog.model>QUA</catalog.model>
    </properties>
    <methods>
        <udaf name="CONCAT_VAL" class="com.yiidata.etl.udf.ConcatUDAF"/>
        <udf name="XCOUNT" class="com.yiidata.etl.udf.XCount" return="int" />
    </methods>

    <prepare>
        <sql>use default</sql>
        <round type="mysql"
               sql="insert into cpic_task_history(id, task_type, catalog_model, start_time, retry_count, final_status, created_at)
               values(${uuid}, ${task.type}, ${catalog.model}, ${starttime}, 0, ${status}, now())"
               url="${jdbc.url}"
               driver="${jdbc.driverclass}"
               user="${jdbc.user}"
               password="${jdbc.password}" />
    </prepare>
    <sources>
        <source type="textfile" table_name="et_pty_cust_flg"
                fields="cust_id,name,gender,age:int" delimiter=","
                path="file:///Users/zhenqin/software/hive/user.txt"/>
        <source type="hive" table_name="user_infox"
                sql="select * from user_info"/>
        <source type="hive" table_name="user_concat_testx"
                sql="select * from user_concat_test"/>
    </sources>

    <transformers>
        <transform type="sql" table_name="cust_id_agmt_id_t" cached="true">
            SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids
            FROM user_concat_testx
            group by c_phone,c_type,c_num
        </transform>
    </transformers>

    <targets>
        <target type="show" table_name="cust_id_agmt_id_t"/>
        <target type="jsonfile" table_name="cust_id_agmt_id_t"
                path="file:///Users/zhenqin/temp/output3.json"
                partition="1"
                savemode="overwrite" />
    </targets>

    <after>
        <round type="mysql"
               sql="update cpic_task_history set
               end_time = ${endtime}, final_status = ${status}, error_text = ${error} where id = ${uuid}"
               url="${jdbc.url}"
               driver="${jdbc.driverclass}"
               user="${jdbc.user}"
               password="${jdbc.password}" />
    </after>
</flow>