单元测试
一、添加测试时的依赖
testImplementation 'org.mockito:mockito-junit-jupiter:4.6.1'
testImplementation "org.apache.flink:flink-test-utils:${flinkVersion}"
testImplementation "org.apache.flink:flink-test-utils-junit:${flinkVersion}"
testImplementation "org.apache.flink:flink-runtime:${flinkVersion}:tests"
testImplementation "org.apache.flink:flink-streaming-java:${flinkVersion}:tests"
二、测试无状态算子
2.1 map
无状态算子:
public class MyMapFunction implements MapFunction<String, String> {
@Override
public String map(String s) throws Exception {
//...
}
}
class MyMapFunctionTest {
@Test
void testMap() {
var function = new MyMapFunction();
var json = "a";
var result = function.map(json);
Assert.assertEquals("b", result);
}
}
2.2 flatMap
public class MyFlatMap implements FlatMapFunction<String, String> {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
// ...
collector.collect(out);
}
}
2.2.1 使用mockito
class MyMapFunctionTest {
@Test
void testFlatMap() {
var function = new MyFlatMap();
Collector<String> collector = mock(Collector.class);
var json = "a";
var result = function.flatMap(json, collector);
Mockito.verify(collector, times(1)).collect("b");
}
}
2.2.2 ListCollector
class MyMapFunctionTest {
@Test
void testFlatMap() {
var function = new MyFlatMap();
List<String> out = new ArrayList<>();
ListCollector<String> collector = new ListCollector<>(out);
var json = "a";
var result = function.flatMap(json, collector);
Assert.assertEquals(Lists.newArrayList("b"), out);
}
}
三、测试有状态算子
测试一个双流join的例子
public class JoinVideoResolutionFunctionTest {
@Test
@DisplayName("关联AA、BB")
public void test1() throws Exception {
var function = new JoinAAAndBBFunction();
@Cleanup
var testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
new CoStreamFlatMap<>(function),
x -> null,
x -> null,
Types.STRING
);
testHarness.open();
// AA
testHarness.processElement1(...);
// BB
testHarness.processElement2(...);
var expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(...);
expectedOutput.add(...);
var output = testHarness.getOutput()
.stream()
.map(item -> (StreamRecord) item)
.map(StreamRecord::getValue)
.collect(Collectors.toList());
TestHarnessUtil.assertOutputEquals(
"Output was not correct.",
expectedOutput,
new ArrayDeque<>(output)
);
}
}
Loading...