封装旁路输出操作去分流kafka消息

  • java
  • flink
  • 旁路输出

一、实现效果

用filter分流有filter算子重复计算的问题,于是乎就想到了用旁路输出分流日志,设计效果如下:

// kafka topic中读取的事件流
DataStream<String> stream = ...;
// TopicStreamHolder持有stream,并通过XXXFunction将不同的事件分流
var topicHolder = new TopicStreamHolder<>(stream, new XXXFunction());
// 获取FirstAudioFrame的outPutTag
var outputTag = getOutputTag(FirstAudioFrame.class);
// LogStreamHolder使用outputTag从TopicStreamHolder的输出流中获取对应的事件流
var logStream = new LogStreamHolder<>(topicHolder, outputTag).getOutputStream();

二、具体实现

2.1 StreamHolder

定义StreamHolder接口:

public interface StreamHolder<I,O> {
    DataStream<O> getOutputStream();
    DataStream<I> getInputStream();
}

2.2 TopicStreamHolder

TopicStreamHolder实现StreamHolder接口:

@Accessors(chain = true)
@Data
public class TopicStreamHolder<I, O> implements StreamHolder<I, O> {
    // 输入流
    private DataStream<I> stream;
    // 输出流
    private DataStream<O> processedStream;
    // 分流函数
    private ProcessFunction<I, O> processFunction;
    // 分流函数并行度
    private int parallelism;

    public TopicStreamHolder() {}

    public TopicStreamHolder(
            DataStream<I> stream,
            ProcessFunction<I, O> processFunction
    ) {
        this(stream, processFunction, 1);
    }

    public TopicStreamHolder(
            DataStream<I> stream,
            ProcessFunction<I, O> processFunction,
            int parallelism
    ) {
        this.stream = stream;
        this.processFunction = processFunction;
        this.parallelism = parallelism;
    }

    @Override
    public DataStream<O> getOutputStream() {
        if (processedStream == null) {
            processedStream = getInputStream()
                    // 应用分流函数
                    .process(processFunction)
                    .setParallelism(parallelism);
        }
        return processedStream;
    }


    @Override
    public DataStream<I> getInputStream() {
        return this.stream;
    }
}

2.3 LogStreamHolder

利用装饰器模式,让LogStreamHolder持有TopicStreamHolder

// PI、PO是TopicStreamHolder的输入输出类型
// I、O是LogStreamHolder的输入输出类型
@Accessors(chain = true)
public class LogStreamHolder<I extends PO, O, PI, PO> extends TopicStreamHolder<I,O> {

    private final StreamHolder<PI, PO> holder;
    private final OutputTag<O> outputTag;

    public LogStreamHolder(
            TopicStreamHolder<PI, PO> holder,
            OutputTag<O> outputTag
    ) {
        super();
        this.holder = holder;
        this.outputTag = outputTag;
    }

    @Override
    public DataStream<O> getOutputStream() {
        // 根据outputTag获取对应的事件流
        return getInputStream().getSideOutput(outputTag);
    }

    // TopicStreamHolder的输出流就是LogStreamHolder输入流
    @Override
    public SingleOutputStreamOperator<I> getInputStream() {
        return (SingleOutputStreamOperator<I>) holder.getOutputStream();
    }
}

2.4 分流函数XXXFunction

public class XXXFunction extends ProcessFunction<String, XXXLog> {
    @Override
    public void processElement(
            String value,
            ProcessFunction<String, XXXLog>.Context ctx,
            Collector<XXXLog> out
    ) throws Exception {
        // 正则提取出事件类型
        var type = LogUtils.getMatchType(value);
        if (StringUtils.isBlank(type)) {
            return;
        }
        // 根据事件类型获取枚举 
        var typeEnum = XXXLogType.getEnumByType(type);
        if (typeEnum == null) {
            return;
        }
        output(value, ctx, typeEnum);
    }

    private void output(
            String value,
            ProcessFunction<String, PaasLog>.Context ctx,
            PaasLogType typeEnum
    ) {
        try {
            // 获取时间对应的java类
            var clazz = typeEnum.getClazz();
            // json序列化成对象
            var log = JSON.parseObject(value, clazz);
            // 获取outputTag
            var outputTag = (OutputTag<XXXLog>) typeEnum.getOutputTag();
            if (outputTag == null) {
                return;
            }
            // 旁路输出
            ctx.output(outputTag, log);
        } catch (Exception ignored) {
        }
    }
}
Loading...