先上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, String>> source1 = env.fromElements(
Tuple2.of(1L, "xiaoming"),
Tuple2.of(1L, "xiaoming111111"),
Tuple2.of(4L, "4"),
Tuple2.of(3L, "3"),
Tuple2.of(2L, "xiaowang"));

DataSet<Tuple3<Long, String, String>> source2 = env.fromElements(
Tuple3.of(2L, "xiaoli","2"),
Tuple3.of(1L, "shinelon","3"),
Tuple3.of(3L, "hhhhhh","4"));

source1.coGroup(source2)
.where(0)
.equalTo(0)
.with(new CoGroupFunction<Tuple2<Long, String>, Tuple3<Long, String, String>, Tuple2<Long, String>>() {
@Override
public void coGroup(Iterable<Tuple2<Long, String>> first,
Iterable<Tuple3<Long, String, String>> second, Collector<Tuple2<Long, String>> out) throws Exception {
Tuple2<Long, String> map = new Tuple2<>();
for (Tuple2<Long, String> left : first) {
boolean isMatched = false;
for (Tuple3<Long, String, String> right : second) {
if (left.f0.equals(right.f0)) {
map.setField(left.f0, 0);
map.setField(String.join("-", left.f1, right.f1), 1);
}
out.collect(map);
}
if (!isMatched && ((NonReusingKeyGroupedIterator.ValuesIterator) first).hasNext()) {
// 右set中没有对应的记录
out.collect(left);
}
}
}
}).print();

如上, source1和source2,想以coGroup实现leftjoin的效果

上面的写法导致了循环second处发生Caused by: org.apache.flink.util.TraversableOnceException: The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed

原因是source1有两个1L,所以会循环两遍second