Skip to content

DataFlow 将结果集输出到 RabbitMQ exchange 交换机中。

配置方法:

xml
<target type="amqp"
        table_name="result_join"
        write_type="JSON"
        delimiter=","
        user_name="admin"
        password="123456"
        queue="xxxxx"
        host="localhost"
        fields="test_id,test_name,created_at"
        port="5672"
        virtual_host="/"
        exchange="exchangeChange"/>

字段说明:

  1. write_type 将结果以 JSON 形式写入到 exchange;支持 STRING 和 JSON 两种,STRING 则需要 delimiter 分隔符支持;
  2. virtual_host rabbitmq virtual_host, 默认: /;
  3. exchange 发送的 rabbitmq exchange,如果只制定 exchange,则默认创建exchange,并将消息发送到该exchange;
  4. queue 发送到 rabbitmq 的队列名称,queue 如果和 exchange 同时制定,则优先使用 exchange。但是 exchange 和 queue 必须存在任意一个;
  5. fields 是 flow 向 target 输出字段列表,如果制定 key(key:test_id,test_name,created_at) 则采用 test_id 作为 routerkey,否则采用第一个字段作为 routerKey;

RabbitMQ 概念

rabbitmq 的 消息provider,exchange,queue 关系如下图:

rabbitmq-provider-exchange-queue-ralea

Provider 的消息发送到 exchange,由 exchange 的路由决定向 queue 分发。而消息持久化在在 queue 中的,因此如果 exchange 和 queue 没有绑定的关系,由于接收端接收的不及时,可能造成数据丢失。

因此建议在使用 amqp target 时,同时设置 exchange 和 queue,确保消息不丢失。

必要时用户需手动绑定 exchange 和 queue 的关系。