Avro 序列化系统使用
何为 Avro?
Avro是一个跨语言数据序列化系统
** 优点**
- 跨语言(Java,Python,C++,C#等)
- 序列化和反序列化速度快
- 序列化字节码较小
- 兼容性、易于使用
Avro Schema 和 Protocol
- Schema
由 record 关键字定义,相当于 Bean 类型
- Protocol
由 protocol 关键字定义,相当于接口类型。
Avro 提供了 avro-tools-{version}.jar 来编译 IDL。只要 IDL 包含了 protocol,编译为类时就要指定为 protocol 类型。
Avro Specification
见:http://avro.apache.org/docs/current/spec.html
Avro Specification是一个由 JSON 描述的语法规范。编程可以直接定义Specification类型的 json 文件,并通过 avro-tools 编译为 Java 类源码。如:
shell
java -jar avro-tools-1.7.7.jar compile schema|protocol name.avro ../java/
Avro IDL language
见:http://avro.apache.org/docs/current/idl.html
Avro IDL language 是为了描述Avro Specification的一个轻便语法,使用 avro-tools 可以直接编译为Avro Specification。由于Avro Specification使用 json 描述,语法复杂,定于对象不符合大多数面向对象的思想。因此,Avro IDL language 更像面向对象语言的描述方式。
编译方式如:
shell
java -jar avro-tools-1.7.7.jar idl name.avdl name.avro
Avro IDL language 定义例子:
java
/**
传输的 Avro 消息, 序列化结果(注释,类似 Java)
*/
record Message {
/**
Index ID(字段注释)
*/
string indexId;
/**
Data Shard Key
*/
string rowId;
/**
Data Bytes
*/
bytes payload;
}
Avro DatumWriter
DatumWriter 是把 Avro 对象直接写为字节码的 Writer 类。
java
@Test
public void testWriteWithObject() throws Exception {
File file = new File("test.avroc");
DataFileWriter<PairAvro> fileWriter = new DataFileWriter<PairAvro>(
new SpecificDatumWriter(PairAvro.class));
fileWriter.create(schema, file);
PairAvro record = new PairAvro();
record.setLeft("L");
record.setRight("R");
fileWriter.append(record);
fileWriter.close();
}
Avro DatumReader
DatumReader 是读取 Avro 对象的 Reader 类,可以把序列化的 Avro 对象文件直接读取为 Avro 对象。
java
@Test
public void testReaderWithObject() throws Exception {
File file = new File("test.avroc");
SpecificDatumReader datumReader = new SpecificDatumReader(PairAvro.class);
DataFileReader<PairAvro> reader = new DataFileReader<PairAvro>(
file, datumReader);
while (reader.hasNext()){
PairAvro record = reader.next();
System.out.println(record);
}
reader.close();
}
快速的实现一个 RPC 接口
java
@namespace("com.sdyc.avrotest.protocol")
/**
Hello
*/
protocol ClientProtocol {
/**
插入单条数据
*/
int add(Message message);
/**
批次插入(List)
*/
int addList(array<Message> messages);
}
Java Server:
java
public class TestSocketServer {
static class ClientProtocolImpl implements ClientProtocol {
@Override
public int add(Message message) throws AvroRemoteException {
System.out.println(message);
return 1;
}
@Override
public int addList(List<Message> messages) throws AvroRemoteException {
return 0;
}
}
public static void main(String[] args) throws Exception {
try {
Server server = new NettyServer(new SpecificResponder(ClientProtocol.class, new ClientProtocolImpl()), new InetSocketAddress("localhost", 8088));
server.start();
System.out.println("start avro nio socket success.");
server.join();
} catch (Exception e) {
}
}
}
Java Client:
java
public class TestClient {
public static void main(String[] args) throws IOException {
Transceiver t = new NettyTransceiver(new InetSocketAddress("localhost", 8088));
ClientProtocol clientProtocol = SpecificRequestor.getClient(ClientProtocol.class,
new SpecificRequestor(ClientProtocol.class, t));
Message message = new Message();
message.setIndexId("");
message.setRowId("");
clientProtocol.add(message);
t.close();
}
}
Java Server 和 Transceiver 各有4 个实现
- Netty
- Socket|SaslSocket
- Datagram
- Http