采用流式处理和批处理并没有不同。仍然采用 source标签方式配置流式数据源。这里type=”kafka”表明这个source是流式处理的,是否是支持流式主力,这完全跟开发者对source的定义有关。
xml
<source type="kafka"
table_name="cust_id_agmt_id_t"
duration="30"
topic="testtopic"
fields="id,name,begintime,endtime,status,test_dsc,test_b,test_c,test_rand,created_at"
key_data_type="STRING"
value_data_type="STRING"
delimiter=","
group_id="mygroupid"
broker-list="localhost:9092"/>
说明:
- Table_name 为定义当前source的 “虚表”命名;
- Duration:为SparkStreaming 对每个数据窗口(Window)处理间隔时间;
- Topic指待接入的Kafka topic;
- Fields:为该topic Message value中的每个字段;
- Key_data_type: 为定义 Topic Message Key的数据类型;
- Value_data_type: 为定义 Topic Message Value的数据类型;
- Delimiter: 指 Message Value 的切分逻辑字符;
- Group_id: 为定义获取Kafka topic message group_id;
- Broker-list: 为指定Kafka集群地址;
Kafka StreamSorce 使用教程如下(通过Kafka获取数据,并和mysql字典表关联):
xml
<?xml version="1.0" encoding="UTF-8"?>
<flow xmlns="http://www.yiidata.com/xml/DataFlow"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.yiidata.com/xml/DataFlow SparkSqlFlowSchema.xsd">
<properties>
<name>Kafka StreamSource</name>
<version>1.2</version>
</properties>
<sources>
<source type="data"
table_name="et_pty_cust_flg"
fields="cust_id,name,gender,age:int"
delimiter=",">
<![CDATA[
12,YY,M,15
13,QQ,F,22
14,WC,M,8
15,WB,U,20
]]>
</source>
<source type="kafka"
table_name="cust_id_agmt_id_t"
duration="30"
topic="testtopic"
fields="id,name,begintime,endtime,status,test_dsc,test_b,test_c,test_rand,created_at"
key_data_type="STRING"
value_data_type="STRING"
delimiter=","
group_id="mygroupid"
broker-list="localhost:9092"/>
</sources>
<transformers>
<transform type="sql" table_name="result_show">
<![CDATA[
SELECT a.*, b.status_name
from cust_id_agmt_id_t a left join et_pty_cust_flg b on(a.status = b.status)
]]>
</transform>
</transformers>
<targets>
<target type="show" table_name="result_show" />
</targets>
</flow>