单元测试

  • java
  • flink
  • 单元测试

一、添加测试时的依赖

testImplementation 'org.mockito:mockito-junit-jupiter:4.6.1'
testImplementation "org.apache.flink:flink-test-utils:${flinkVersion}"
testImplementation "org.apache.flink:flink-test-utils-junit:${flinkVersion}"
testImplementation "org.apache.flink:flink-runtime:${flinkVersion}:tests"
testImplementation "org.apache.flink:flink-streaming-java:${flinkVersion}:tests"

二、测试无状态算子

2.1 map

无状态算子:

public class MyMapFunction implements MapFunction<String, String> {
    @Override
    public String map(String s) throws Exception {
        //...
    }
}
class MyMapFunctionTest {
    @Test
    void testMap() {
        var function = new MyMapFunction();
        var json = "a";
        var result = function.map(json);
        Assert.assertEquals("b", result);
    }
}

2.2 flatMap

public class MyFlatMap implements FlatMapFunction<String, String> {
    @Override
    public void flatMap(String s, Collector<String> collector) throws Exception {
        // ...
        collector.collect(out);
    }
}

2.2.1 使用mockito

class MyMapFunctionTest {
    @Test
    void testFlatMap() {
        var function = new MyFlatMap();
        Collector<String> collector = mock(Collector.class);
        var json = "a";
        var result = function.flatMap(json, collector);
        Mockito.verify(collector, times(1)).collect("b");
    }
}

2.2.2 ListCollector

class MyMapFunctionTest {
    @Test
    void testFlatMap() {
        var function = new MyFlatMap();
        List<String> out = new ArrayList<>();
        ListCollector<String> collector = new ListCollector<>(out);
        var json = "a";
        var result = function.flatMap(json, collector);
        Assert.assertEquals(Lists.newArrayList("b"), out);
    }
}

三、测试有状态算子

测试一个双流join的例子

public class JoinVideoResolutionFunctionTest {

    @Test
    @DisplayName("关联AA、BB")
    public void test1() throws Exception {
        var function = new JoinAAAndBBFunction();
        @Cleanup
        var testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
                new CoStreamFlatMap<>(function),
                x -> null,
                x -> null,
                Types.STRING
        );
        testHarness.open();

        // AA
        testHarness.processElement1(...);
        // BB
        testHarness.processElement2(...);
        

        var expectedOutput = new ConcurrentLinkedQueue<>();
        expectedOutput.add(...);
        expectedOutput.add(...);
        var output = testHarness.getOutput()
                .stream()
                .map(item -> (StreamRecord) item)
                .map(StreamRecord::getValue)
                .collect(Collectors.toList());
        TestHarnessUtil.assertOutputEquals(
                "Output was not correct.",
                expectedOutput,
                new ArrayDeque<>(output)
        );
    }
}
Loading...