Skip to content

UDF

Udf 是用户自定义开发的函数,可用于 select 时对字段的转换,也可用于 where 条件作为判断。 如:

sql
select cert_type_valid(cert_type) as cert_type_valid, where get_user_age(cert_no) > 22

传统的 SQL 标准中有非常多的内置函数,但是在数据开发过程中仍然不能满足需求,因此有必要开发自定义的UDF 函数。

xml
<methods>
    <udf name="GET_USER_AGE" class="com.yiidata.etl.udf.UserAge" returnType=="int"/>
</methods>

返回值类型可为 map,array 类型,map 和 array 类型时可支持泛型。如:

java
returnType="map<String, int>"
returnType="array<String>"

UDF 开发,Spark UDF 支持传入多个值的 UDF, 继承分别从 UDF0-22:

java
public class Long2IPUDF implements UDF1<Long, String> {
    @Override
    public String call(Long ipInt) throws Exception {
        if (ipInt > 0) {
            return IpUtils.long2Ipv4(ipInt);
        } else {
            return "127.0.0.1";
        }
    }
}

UDAF

UDAF 是 sparksql 中的聚合函数,如count。编写 UDAF 需要继承 UserDefinedAggregateFunction 类。

配置方法:

xml
<methods>
    <udaf name="CONCAT_VAL" class="com.yiidata.etl.udf.ConcatUDAF"/>
</methods>

Udaf 无须配置返回值。

UDAF 开发:

java
public class ConcatUDAF extends UserDefinedAggregateFunction {
    /**
     * 默认的链接分隔符
     */
    public static final String DEFAULT_CONCAT_DELIMITED = ",";
    /**
     * 聚合函数的输入数据结构
     */
    @Override
    public StructType inputSchema() {
        return new StructType().add("input", DataTypes.StringType);
    }
    /**
     * 缓存区数据结构
     */
    @Override
    public StructType bufferSchema() {
        return new StructType().add("result", DataTypes.StringType);
    }
    /**
     * 聚合函数返回值数据类型
     */
    @Override
    public DataType dataType() {
        return DataTypes.StringType;
    }
    /**
     * 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出
     * @return
     */
    @Override
    public boolean deterministic() {
        return true;
    }
    /**
     * 初始化缓冲区
     * @param buffer
     */
    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        buffer.update(0, null);
    }
    /**
     * 给聚合函数传入一条新数据进行处理
     * @param buffer
     * @param input SQL UDAF 传入的参数,0 开始获取
     */
    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        if(input.size() == 0) {
            return;
        }
        String seg = DEFAULT_CONCAT_DELIMITED;
        if(input.size() > 1) {
            seg = input.isNullAt(input.size()-1) ?
                    DEFAULT_CONCAT_DELIMITED : input.getString(input.size()-1);
        }
        if(buffer.getString(0) == null) {
            buffer.update(0, input.get(0));
        } else {
            buffer.update(0, buffer.getString(0) + seg + input.get(0));
        }
    }
    /**
     * 合并聚合函数缓冲区
     * @param buffer
     * @param input SQL UDAF 传入的参数,0 开始获取
     */
    @Override
    public void merge(MutableAggregationBuffer buffer, Row input) {
        if(input.size() == 0) {
            return;
        }
        String seg = DEFAULT_CONCAT_DELIMITED;
        if(input.size() > 1) {
            seg = input.isNullAt(input.size()-1) ?
                    DEFAULT_CONCAT_DELIMITED : input.getString(input.size()-1);
        }
        if(buffer.getString(0) == null) {
            buffer.update(0, input.get(0));
        } else {
            buffer.update(0, buffer.getString(0) + seg + input.get(0));
        }
    }
    /**
     * 计算最终结果。 返回类型应该和 {@link #dataType()} 相同
     * @param buffer
     */
    @Override
    public Object evaluate(Row buffer) {
        return buffer.getString(0);
    }
}