Skip to content

采用流式处理和批处理并没有不同。仍然采用 source标签方式配置流式数据源。这里type=”kafka”表明这个source是流式处理的,是否是支持流式主力,这完全跟开发者对source的定义有关。

xml
<source type="kafka"
        table_name="cust_id_agmt_id_t"
        duration="30"
        topic="testtopic"
        fields="id,name,begintime,endtime,status,test_dsc,test_b,test_c,test_rand,created_at"
        key_data_type="STRING"
        value_data_type="STRING" 
        delimiter="," 
        group_id="mygroupid"
        broker-list="localhost:9092"/>

说明:

  1. Table_name 为定义当前source的 “虚表”命名;
  2. Duration:为SparkStreaming 对每个数据窗口(Window)处理间隔时间;
  3. Topic指待接入的Kafka topic;
  4. Fields:为该topic Message value中的每个字段;
  5. Key_data_type: 为定义 Topic Message Key的数据类型;
  6. Value_data_type: 为定义 Topic Message Value的数据类型;
  7. Delimiter: 指 Message Value 的切分逻辑字符;
  8. Group_id: 为定义获取Kafka topic message group_id;
  9. Broker-list: 为指定Kafka集群地址;

Kafka StreamSorce 使用教程如下(通过Kafka获取数据,并和mysql字典表关联):

xml
<?xml version="1.0" encoding="UTF-8"?>
<flow xmlns="http://www.yiidata.com/xml/DataFlow"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.yiidata.com/xml/DataFlow SparkSqlFlowSchema.xsd">
    <properties>
        <name>Kafka StreamSource</name>
        <version>1.2</version>
    </properties>
    <sources>
        <source type="data"
                table_name="et_pty_cust_flg"
                fields="cust_id,name,gender,age:int"
                delimiter=",">
            <![CDATA[
            12,YY,M,15
            13,QQ,F,22
            14,WC,M,8
            15,WB,U,20
            ]]>
        </source>
        <source type="kafka"
                table_name="cust_id_agmt_id_t"
                duration="30"
                topic="testtopic"
                fields="id,name,begintime,endtime,status,test_dsc,test_b,test_c,test_rand,created_at"
                key_data_type="STRING"
                value_data_type="STRING"
                delimiter="," 
                group_id="mygroupid"
                broker-list="localhost:9092"/>
    </sources>
    <transformers>
        <transform type="sql" table_name="result_show">
            <![CDATA[
            SELECT a.*, b.status_name 
            from cust_id_agmt_id_t a left join et_pty_cust_flg b on(a.status = b.status)
                ]]>
        </transform>
    </transformers>
    <targets>
        <target type="show" table_name="result_show" />
    </targets>
</flow>