1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<Long, String>> source1 = env.fromElements( Tuple2.of(1L, "xiaoming"), Tuple2.of(1L, "xiaoming111111"), Tuple2.of(4L, "4"), Tuple2.of(3L, "3"), Tuple2.of(2L, "xiaowang"));
DataSet<Tuple3<Long, String, String>> source2 = env.fromElements( Tuple3.of(2L, "xiaoli","2"), Tuple3.of(1L, "shinelon","3"), Tuple3.of(3L, "hhhhhh","4"));
source1.coGroup(source2) .where(0) .equalTo(0) .with(new CoGroupFunction<Tuple2<Long, String>, Tuple3<Long, String, String>, Tuple2<Long, String>>() { @Override public void coGroup(Iterable<Tuple2<Long, String>> first, Iterable<Tuple3<Long, String, String>> second, Collector<Tuple2<Long, String>> out) throws Exception { Tuple2<Long, String> map = new Tuple2<>(); for (Tuple2<Long, String> left : first) { boolean isMatched = false; for (Tuple3<Long, String, String> right : second) { if (left.f0.equals(right.f0)) { map.setField(left.f0, 0); map.setField(String.join("-", left.f1, right.f1), 1); } out.collect(map); } if (!isMatched && ((NonReusingKeyGroupedIterator.ValuesIterator) first).hasNext()) { out.collect(left); } } } }).print();
|