Skip to content

Avro 序列化系统使用

何为 Avro?

Avro是一个跨语言数据序列化系统

** 优点**

  1. 跨语言(Java,Python,C++,C#等)
  2. 序列化和反序列化速度快
  3. 序列化字节码较小
  4. 兼容性、易于使用

Avro Schema 和 Protocol

  • Schema

由 record 关键字定义,相当于 Bean 类型

  • Protocol

由 protocol 关键字定义,相当于接口类型。

Avro 提供了 avro-tools-{version}.jar 来编译 IDL。只要 IDL 包含了 protocol,编译为类时就要指定为 protocol 类型。

Avro Specification

见:http://avro.apache.org/docs/current/spec.html

Avro Specification是一个由 JSON 描述的语法规范。编程可以直接定义Specification类型的 json 文件,并通过 avro-tools 编译为 Java 类源码。如:

shell
java -jar avro-tools-1.7.7.jar compile  schema|protocol  name.avro  ../java/

Avro IDL language

见:http://avro.apache.org/docs/current/idl.html

Avro IDL language 是为了描述Avro Specification的一个轻便语法,使用 avro-tools 可以直接编译为Avro Specification。由于Avro Specification使用 json 描述,语法复杂,定于对象不符合大多数面向对象的思想。因此,Avro IDL language 更像面向对象语言的描述方式。

编译方式如:

shell
java -jar avro-tools-1.7.7.jar idl  name.avdl  name.avro

Avro IDL language 定义例子:

java
/**
传输的 Avro 消息, 序列化结果(注释,类似 Java)
*/
record Message {
    /**
    Index ID(字段注释)
    */
    string indexId;
    /**
    Data Shard Key
    */
    string rowId;
    /**
    Data Bytes
    */
    bytes payload;
}

Avro DatumWriter

DatumWriter 是把 Avro 对象直接写为字节码的 Writer 类。

java
@Test
public void testWriteWithObject() throws Exception {
    File file = new File("test.avroc");
    DataFileWriter<PairAvro> fileWriter = new DataFileWriter<PairAvro>(
            new SpecificDatumWriter(PairAvro.class));
    fileWriter.create(schema, file);
    PairAvro record = new PairAvro();
    record.setLeft("L");
    record.setRight("R");
    fileWriter.append(record);
    fileWriter.close();
}

Avro DatumReader

DatumReader 是读取 Avro 对象的 Reader 类,可以把序列化的 Avro 对象文件直接读取为 Avro 对象。

java
@Test
public void testReaderWithObject() throws Exception {
    File file = new File("test.avroc");
    SpecificDatumReader datumReader = new SpecificDatumReader(PairAvro.class);
    DataFileReader<PairAvro> reader = new DataFileReader<PairAvro>(
            file, datumReader);
    while (reader.hasNext()){
        PairAvro record = reader.next();
        System.out.println(record);
    }
    reader.close();
}

快速的实现一个 RPC 接口

java
@namespace("com.sdyc.avrotest.protocol")
/**
 Hello
 */
protocol ClientProtocol {
    /**
      插入单条数据
     */
    int add(Message message);
    /**
     批次插入(List)
     */
    int addList(array<Message> messages);
}

Java Server:

java
public class TestSocketServer {
    static class ClientProtocolImpl implements ClientProtocol {
        @Override
        public int add(Message message) throws AvroRemoteException {
            System.out.println(message);
            return 1;
        }
        @Override
        public int addList(List<Message> messages) throws AvroRemoteException {
            return 0;
        }
    }

    public static void main(String[] args) throws Exception {
        try {
            Server server = new NettyServer(new SpecificResponder(ClientProtocol.class, new ClientProtocolImpl()), new InetSocketAddress("localhost", 8088));
            server.start();
            System.out.println("start avro nio socket success.");
            server.join();
        } catch (Exception e) {
        }
    }
}

Java Client:

java
public class TestClient {

    public static void main(String[] args) throws IOException {
        Transceiver t = new NettyTransceiver(new InetSocketAddress("localhost", 8088));
        ClientProtocol clientProtocol = SpecificRequestor.getClient(ClientProtocol.class,
                new SpecificRequestor(ClientProtocol.class, t));
        Message message = new Message();
        message.setIndexId("");
        message.setRowId("");
        clientProtocol.add(message);
        t.close();
    }
}

Java Server 和 Transceiver 各有4 个实现

  1. Netty
  2. Socket|SaslSocket
  3. Datagram
  4. Http