UDF
Udf 是用户自定义开发的函数,可用于 select 时对字段的转换,也可用于 where 条件作为判断。 如:
sql
select cert_type_valid(cert_type) as cert_type_valid, where get_user_age(cert_no) > 22
传统的 SQL 标准中有非常多的内置函数,但是在数据开发过程中仍然不能满足需求,因此有必要开发自定义的UDF 函数。
xml
<methods>
<udf name="GET_USER_AGE" class="com.yiidata.etl.udf.UserAge" returnType=="int"/>
</methods>
返回值类型可为 map,array 类型,map 和 array 类型时可支持泛型。如:
java
returnType="map<String, int>"
returnType="array<String>"
UDF 开发,Spark UDF 支持传入多个值的 UDF, 继承分别从 UDF0-22:
java
public class Long2IPUDF implements UDF1<Long, String> {
@Override
public String call(Long ipInt) throws Exception {
if (ipInt > 0) {
return IpUtils.long2Ipv4(ipInt);
} else {
return "127.0.0.1";
}
}
}
UDAF
UDAF 是 sparksql 中的聚合函数,如count。编写 UDAF 需要继承 UserDefinedAggregateFunction 类。
配置方法:
xml
<methods>
<udaf name="CONCAT_VAL" class="com.yiidata.etl.udf.ConcatUDAF"/>
</methods>
Udaf 无须配置返回值。
UDAF 开发:
java
public class ConcatUDAF extends UserDefinedAggregateFunction {
/**
* 默认的链接分隔符
*/
public static final String DEFAULT_CONCAT_DELIMITED = ",";
/**
* 聚合函数的输入数据结构
*/
@Override
public StructType inputSchema() {
return new StructType().add("input", DataTypes.StringType);
}
/**
* 缓存区数据结构
*/
@Override
public StructType bufferSchema() {
return new StructType().add("result", DataTypes.StringType);
}
/**
* 聚合函数返回值数据类型
*/
@Override
public DataType dataType() {
return DataTypes.StringType;
}
/**
* 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出
* @return
*/
@Override
public boolean deterministic() {
return true;
}
/**
* 初始化缓冲区
* @param buffer
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, null);
}
/**
* 给聚合函数传入一条新数据进行处理
* @param buffer
* @param input SQL UDAF 传入的参数,0 开始获取
*/
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
if(input.size() == 0) {
return;
}
String seg = DEFAULT_CONCAT_DELIMITED;
if(input.size() > 1) {
seg = input.isNullAt(input.size()-1) ?
DEFAULT_CONCAT_DELIMITED : input.getString(input.size()-1);
}
if(buffer.getString(0) == null) {
buffer.update(0, input.get(0));
} else {
buffer.update(0, buffer.getString(0) + seg + input.get(0));
}
}
/**
* 合并聚合函数缓冲区
* @param buffer
* @param input SQL UDAF 传入的参数,0 开始获取
*/
@Override
public void merge(MutableAggregationBuffer buffer, Row input) {
if(input.size() == 0) {
return;
}
String seg = DEFAULT_CONCAT_DELIMITED;
if(input.size() > 1) {
seg = input.isNullAt(input.size()-1) ?
DEFAULT_CONCAT_DELIMITED : input.getString(input.size()-1);
}
if(buffer.getString(0) == null) {
buffer.update(0, input.get(0));
} else {
buffer.update(0, buffer.getString(0) + seg + input.get(0));
}
}
/**
* 计算最终结果。 返回类型应该和 {@link #dataType()} 相同
* @param buffer
*/
@Override
public Object evaluate(Row buffer) {
return buffer.getString(0);
}
}