记一次利用Semaphore处理大批次数据计算的解决方案
先描述下信号量的意义 1234567891011Semaphore是一个计数信号量。 在概念上,信号量维持一组许可证。如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。 每个release()添加许可证,潜在地释放阻塞获取方。但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。 信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。在这里插入图片描述Semaphore实现的功能就类似有3个停车位,假如有6个人要停车,那么同时只能停多少辆车?同时只能有3个人能够占用,当3个人中...
记一次使用coGroup产生的问题
记一次使用coGroup产生的问题(Caused by: org.apache.flink.util.TraversableOnceException: The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed)
记录flink的安装及简单使用
先查看centos中自带的jdk并卸载 1234567[root@root ~]# rpm -qa | grep java //查看tzdata-java-2016c-1.el6.noarchjava-1.6.0-openjdk-1.6.0.38-1.13.10.4.el6.x86_64java-1.7.0-openjdk-1.7.0.99-2.6.5.1.el6.x86_64[root@root ~]# rpm -e --allmatches --nodeps java-1.6.0-openjdk-1.6.0.38-1.13.10.4.el6.x86_64 //卸载[root@root ~]# rpm -e --allmatches --nodeps java-1.7.0-openjdk-1.7.0.99-2.6.5.1.el6.x86_64 //卸载[root@root ~]# rpm -qa | grep java //再次查看 开发版openjdk(非开发版还需单独额外安装jps等工具) 1yum install -y ...
记录一次flink的数据转换解决方式
12345678910111213/** * { * 1:[2,3,4], * 2:[3,4,5] * } * ↓ * { * 2:[1], * 3:[1,2], * 4:[1,2], * 5:[2] * } */ flink由上转成下面的数据格式 打散 1234562:1,3:1,4:1,3:2,4:2,5:2 2.再根据key分组并且reduce 12345678910111213141516171819202122232425262728DataSet<Tuple2<String, String>> test = ...test.flatMap(new FlatMapFunction<Tuple2<String, String>, Tuple2<String, String>>() { @Override public void flatMap(Tuple2<String, String>...
记录开发中遇到关于flink的一些错误(长期)
2022-01-20 09:25:58,308 WARN org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count] 这是因为当 flink 作业是以 detached...
git配置代理
本机vpn设置代理 设置代理 12git config --global http.proxy 代理地址 (全局)git config --local http.proxy 代理地址 (当前仓库) 取消代理 1git config --global --unset http.proxy 查看当前已设置的代理,没有打印任何东西证明没有设置代理 1git config --global --get http.proxy 配置github代理(配合服务器使用) 台湾代理: https://hub.xn--gzu630h.xn--kpry57d/ 本地代理: https://cdn.githubjs.cf/ 可以使用insteadOf关键字,替换所有https://github.com/仓库的代理 1git config --global url."https://hub.xn--gzu630h.xn--kpry57d/".insteadOf...
Git 中 merge 和 rebase 的区别
Git 中 merge 和 rebase 的区别参考: https://developer.aliyun.com/article/652579 简介: $ git pull --rebase和$ git pull区别 是git fetch + git merge FETCH_HEAD的缩写,所以默认情况下,git pull就是先fetch,然后执行merge操作,如果加-rebase参数,就是使用git rebase代替git merge 。 $ git pull --rebase和$ git pull区别是git fetch + git merge FETCH_HEAD的缩写,所以默认情况下,git pull就是先fetch,然后执行merge操作,如果加-rebase参数,就是使用git rebase代替git merge 。更新本地仓库 merge 和 rebasemerge 是合并的意思,rebase是复位基底的意思。现在我们有这样的两个分支,test和master,提交如下: 123 D---E test /A---B---C---F...
pprof调优学习
Go性能优化 测试代码: https://github.com/behappy-project/behappy-url-shortener Go语言项目中的性能优化主要有以下几个方面: CPU profile:报告程序的 CPU 使用情况,按照一定频率去采集应用程序在 CPU 和寄存器上面的数据 Memory Profile(Heap Profile):报告程序的内存使用情况 Block Profiling:报告 goroutines 不在运行状态的情况,可以用来分析和查找死锁等性能瓶颈 Goroutine Profiling:报告 goroutines 的使用情况,有哪些...
robfig_cron_v3,cron工具类
包: https://github.com/robfig/cron123go get github.com/robfig/cron/v3@v3.0.0import "github.com/robfig/cron/v3" util123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104package utilimport ( "github.com/pkg/errors" cron "github.com/robfig/cron/v3" "sync")// Crontab crontab managertype Crontab struct...
Systemd主命令-Systemctl相关用法(长期记录)
systemctl查看当前已安装服务(取代chkconfig) 命令 说明 systemctl 列出所有的系统服务 systemctl list-units 列出所有启动unit systemctl list-unit-files 列出所有启动文件 systemctl list-units –type=service –all 列出所有service类型的unit systemctl list-units –type=service –all grep cpu 列出 cpu电源管理机制的服务 systemctl list-units –type=target –all 列出所有target systemctl list-unit-files 列出所有的系统服务 systemctl is-active [unit type] 查看服务是否运行 systemctl is-enable [unit type] 查看服务是否设置为开机启动 systemctl mask [unit...