flink问题:esotericsoftware.kryo.serializers.CollectionSerializer.read NullPointerException
(flink)问题:esotericsoftware.kryo.serializers.CollectionSerializer.read NullPointerException 异常如下 123456789101112131415161718192021222324252627282930313233com.esotericsoftware.kryo.KryoException: java.lang.NullPointerExceptionSerialization trace:values (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware...
记录一次flink的数据转换解决方式
12345678910111213/** * { * 1:[2,3,4], * 2:[3,4,5] * } * ↓ * { * 2:[1], * 3:[1,2], * 4:[1,2], * 5:[2] * } */ flink由上转成下面的数据格式 打散 1234562:1,3:1,4:1,3:2,4:2,5:2 2.再根据key分组并且reduce 12345678910111213141516171819202122232425262728DataSet<Tuple2<String, String>> test = ...test.flatMap(new FlatMapFunction<Tuple2<String, String>, Tuple2<String, String>>() { @Override public void flatMap(Tuple2<String, String> ...
flink简述
Flink什么是Flink?优势是什么,为什么选择Flink?ApacheFlink是一个框架和分布式处理引擎,用于在无界和有界数据流上进行有状态计算。Flink被设计为在所有通用集群环境中运行,以内存速度和任何规模执行计算。 经典的MapReduce程序 算子的概念:可以并行执行的方法称之为算子。 hadoop:在MR程序执行时,基本上每一次的数据交换都会读写磁盘。在复杂的业务逻辑下,由于执行的job任务多造成多次的磁盘读写,严重的影响了计算的时效性。这就是第一代计算框架hadoop的不足。 Tez:Tez是一个Hive的运行引擎,优化业务之间的读写磁盘次数,性能优于MR。 Spark:Spark的设计模式是读取集群中的数据后,在内存中存储和运算,直到全部运算完毕后,再存储到集群中。 Flink:在批处理的性能与Spark是差不多的,在底层设计的模式上Spark是以批处理为出发点,流是批处理的一种特例,Flink是以流处理为出发点,批处理是流的一种特例。 总结:在处理的的效率上MR的计算方式基本上被淘汰了。在批处理方面Spark&Flink性能是差不...
关于DataRowException, internal schema representation is probably ...异常的调试记录
异常:DataRowException, internal schema representation is probably out of sync with real database schema 123456789起初异常信息打印的不全, 但是浑然不知,后来在打印信息里发现flink并没有打印出应该打印的日志,然后去查了下,是缺少依赖包包加上<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <!-- 注意,若无type为jar则报错--> <type>jar</type></dependency> 全部异常信息 1234567891011121314151617181920212223242526272829303132333435ERROR ...
记录flink的安装及简单使用
先查看centos中自带的jdk并卸载 1234567[root@root ~]# rpm -qa | grep java //查看tzdata-java-2016c-1.el6.noarchjava-1.6.0-openjdk-1.6.0.38-1.13.10.4.el6.x86_64java-1.7.0-openjdk-1.7.0.99-2.6.5.1.el6.x86_64[root@root ~]# rpm -e --allmatches --nodeps java-1.6.0-openjdk-1.6.0.38-1.13.10.4.el6.x86_64 //卸载[root@root ~]# rpm -e --allmatches --nodeps java-1.7.0-openjdk-1.7.0.99-2.6.5.1.el6.x86_64 //卸载[root@root ~]# rpm -qa | grep java //再次查看 开发版openjdk(非开发版还需单独额外安装jps等工具) 1yum install -y java-1.8.0-o...
记一次利用Semaphore处理大批次数据计算的解决方案
先描述下信号量的意义 1234567891011Semaphore是一个计数信号量。 在概念上,信号量维持一组许可证。如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。 每个release()添加许可证,潜在地释放阻塞获取方。但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。 信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。在这里插入图片描述Semaphore实现的功能就类似有3个停车位,假如有6个人要停车,那么同时只能停多少辆车?同时只能有3个人能够占用,当3个人中 的任何一个人开车离开后,其中等待的另外3个人中又有一个人可以来停车了。另外等待的2个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。 批量处理集合util123456789101112131415161718192021222...
