日志系统配置(长期补充)

elastic各产品配置/学习手册
所有的ip不要写本地ip

filebeat.yml

如果同时指定了exclude_lines和include_lines, Filebeat将会先校验include_lines,再校验exclude_lines

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# Enable filebeat config reloading
# filebeat.config(after 6.0.0)允许input/module配置放在其他路径下(类似nginx)
# 每个外部配置文件的第一行必须是一个以- type. 确保filebeat.config.inputs从该文件中省略该行 。所有input type configuration options 必须在每个外部配置文件中指定。filebeat.config.inputs不支持在全局级别指定这些配置选项。
filebeat.config:
inputs:
enabled: true
# 这里不能写多级目录(<=1)
path: inputs.d/*.yml
reload.enabled: true
reload.period: 10s
modules:
enabled: false
path: modules.d/*.yml
reload.enabled: true
reload.period: 10s
# -----------------------------filebeat从6.x版本以后不再支持多output---------------
# -----------------------------Console输出类型配置解析---------------
output.console:
pretty: true
pretty: 如果pretty设置为true,则写入stdout的事件将被格式化。默认值为false
codec: 输出编解码器配置。如果缺少编解码器部分,事件将使用“pretty”选项进行json编码。
enabled: enabled config是用于启用或禁用输出的布尔设置。如果设置为false,则禁用输出
# ------------------------------ Logstash Output -------------------------------
# 多种output.这里配置logstash
output.logstash:
# logstash服务ip
hosts: ["ip:5044"]
# ------------------------------ Kafka Output -------------------------------
output.kafka: #输出到kafka系统
enabled: true
hosts: ["192.168.81.210:9092","192.168.81.220:9092","192.168.81.230:9092"] #kafka的地址
#指定将日志存储到kafka集群的哪个topic中,这里的topic值是引用在inputs中定义的fields,通过这种方式可以将不同路径的日志分别存储到不同的topic中,如果配置了fields_under_root: true,记得删除掉`[fields]`
topic: '%{[fields][log_topic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
# 队列参数优化
queue.mem:
# 消息队列的大小,默认值是4096
events: 256
flush.min_events: 128
flush.timeout: 1s
# 限制filebeat的进程数量,其实是内核数,建议手动设为1(f)
max_procs: 1
# ================================= Processors =================================
# 处理器,有多种属性,这里去除字段
# 详细见: https://www.elastic.co/guide/en/beats/filebeat/current/filtering-and-enhancing-data.html
# 注: inputs.d/下每个yml也可添加属于各自的processors,子processors会在下面的processors之前执行
processors:
# 脚本处理
- script:
lang: javascript
source: >
function process(event) {
var message = event.Get("message");
message = message.replace(/\\x22/g,'"');
message = message.replace(/\,-/g,'');
event.Put("message", message.trim());
}
# json处理
- decode_json_fields:
fields: ["message"]
process_array: false
max_depth: 1
target: ""
overwrite_keys: false
add_error_key: true
# 删除字段
- drop_fields:
fields: ["log","host","input","agent","ecs","message"]
ignore_missing: false
# ================================== Logging ===================================
# 调试的时候,level可以配置为debug
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644
# 配置定时指标报告(每5分钟检查一次看看是否服务有问题)
logging.metrics.enabled: true
logging.metrics.period: 300s

# ================================== inputs可以写在外部,也可以写在内部,如下是写在内部 ===================================
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

# Change to true to enable this input configuration.
enabled: true

- /usr/share/filebeat/logs/*info.log
#匹配列表中的任何正则表达式。
### Multiline options
#用于Java堆栈跟踪或C-Line继续
enabled: true

# Paths that should be crawled and fetched. Glob based paths.
paths:
- /usr/share/filebeat/logs/*info.log
#- c:\programdata\elasticsearch\logs\*

#排除内容。要匹配的正则表达式列表。它去掉了这些线
#匹配列表中的任何正则表达式。
#exclude_lines: ['^DBG']

#包括内容。要匹配的正则表达式列表。它导出
#匹配列表中的任何正则表达式。
#include_lines: ['^ERR', '^WARN', '^DBG','^INFO']

# 排除的文件。要匹配的正则表达式列表。Filebeat删除文件
# 表示匹配列表中的任何正则表达式。默认情况下,不删除任何文件。
#exclude_files: ['.gz$']

# 可选的附加字段。这些字段可以自由挑选
#将附加信息添加到抓取的日志文件中进行过滤
fields:
filetype: info
# 如果值为ture,那么fields存储在输出文档的顶级位置
# 如果与filebeat中字段冲突,自定义字段会覆盖其他字段
fields_under_root: true
### Multiline options

#Multiline可以用于跨越多行的日志消息。这是常见的
#用于Java堆栈跟踪或C-Line继续

# 必须匹配的regexp模式。示例模式匹配以[开头的所有行
#multiline.pattern: ^\[

# 定义pattern下的模式集是否应该被否定。默认是假的。
#multiline.negate: false

#匹配可以设置为"after"或"before"。它用于定义是否应该将行附加到模式中
#在匹配之前或之后,或在模式没有基于否定匹配的情况下被匹配(没有)。
#注意:After等价于previous, before等价于next
#multiline.match: after
- type: log
enabled: true
paths:
- /usr/share/filebeat/logs/*error.log
fields:
filetype: error
fields_under_root: true

inputs.d/*.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# log input--这里写外置对应的日志输出路径(标备注是需要改的)
- type: log
paths:
# 这里写路径
- /xxx/logs/app/debug*.log
- /xxx/logs/app/info*.log
- /xxx/logs/app/error*.log

#排除内容。要匹配的正则表达式列表。它去掉了这些线
#匹配列表中的任何正则表达式。
#exclude_lines: ['^DBG']

#包括内容。要匹配的正则表达式列表。它导出
#匹配列表中的任何正则表达式。
#正则匹配不包含某字符串: ^((?!xxx).)*$
#include_lines: ['^ERR', '^WARN', '^DBG','^INFO']

# 排除的文件。要匹配的正则表达式列表。Filebeat删除文件
# 表示匹配列表中的任何正则表达式。默认情况下,不删除任何文件。
#exclude_files: ['.gz$']

#可选的附加字段。这些字段可以自由挑选
#将附加信息添加到抓取的日志文件中进行过滤
fields:
logtype: debug
env: dev
log_topic: log-data
# 如果值为ture,那么fields存储在输出文档的顶级位置
# 如果与filebeat中字段冲突,自定义字段会覆盖其他字段
fields_under_root: true
# 检测文件更新频率
scan_frequency: 10s
# [yyyy-MM-dd hh:mm:ss.SSS]
multiline.pattern: ^\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.?\d{0,3})\]
# negate参数意思是是否否定多行融入
# negate参数为false,表示“否定参数=false”。multiline多行参数负负得正,表示符合pattern、match条件的行会融入多行之中、成为一条完整日志的中间部分。如果match=after,则以b开头的和前面一行将合并成一条完整日志;如果match=before,则以b开头的和后面一行将合并成一条完整日志。
# negate参数为true,表示“否定参数=true”。multiline多行参数为负,表示符合match条件的行是多行的开头,是一条完整日志的开始或结尾。如果match=after,则以b开头的行是一条完整日志的开始,它和后面多个不以b开头的行组成一条完整日志;如果match=before,则以b开头的行是一条完整日志的结束,和前面多个不以b开头的合并成一条完整日志。
multiline.negate: true
multiline.match: after
multiline.max_lines: 500 #表示如果多行信息的行数炒过该数字,则多余的都会被丢弃。默认值为500行
multiline.timeout: 500 #表示超时时间,默认为 5s,如果炒过timeout还没有新的一行日志产生,则自动结束当前的多行、形成一条日志发出去。防止合并消耗太多时间导致进程卡死
flush_pattern: ^\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.?\d{0,3})\] #表示符合该正则表达式的,将其从内存刷入硬盘。
- type: log
......

logstash.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
input {
# 以beats模式进行输入
beats {
port => 5044
}
# https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
# 从kafka获取消息
kafka {
# 服务器列表,可动态更改不必重启logstash
bootstrap_servers => "192.168.1.106:9092,192.168.1.107:9092,192.168.1.108:9092"
# 提交消费情况 默认5000ms 一次
auto_commit_interval_ms => 5000
# 消费组,默认logstash
group_id => "logstash"
# 消费者,默认logstash-0
client_id => "logstash-0"
# 消费者线程(默认为1),保证 消费的分区数和线程数一致以达到完美效果,线程数大于消费的分区数时将会产生空闲线程
# 如果一个logstash去消费3个分区,则应该设置为3,以提高吞吐量
# 官方是这么说的,不过经过测试好像没发现有多大变化
consumer_threads => 2
# Kafka中没有初始偏移量或偏移量超出范围时该怎么办? 自动将偏移量重置为最新偏移量
auto_offset_reset => "latest"
# 主题
topics => ["topic1", "topic2"]
# 自定义字段
add_field => {"logstash" => "192.168.1.143"}
codec => json { charset => "UTF-8" }
}
}
# 比较符号可以写:==, !=, <, >, <=, >=, =~(匹配正则), !~(不匹配正则), in(包含,in ["hello", "world"]), not in(不包含,not in ["hello", "world"])
# 衔接符号可以写:and, or
filter {
# 依据字段值判断分割
# muate模块各属性有顺序,详细可以看https://www.cnblogs.com/hengistwu/p/14210381.html
if [logtype] == "bi"{
mutate {
//替换字符,记得转义
gsub => [ "message",'\\x22','"' ]
gsub => [ "message",'\,-','' ]
}
// json格式
json {
source => "message"
}
}
else if [logtype] == "acc" {
mutate{
split => ["message","~"]
add_field => {
"module" => "%{[message][1]}"
}
}
}
# json模块,前提是message是json格式的
#json {
# source => "message"
# remove_field => ["message","@version","path"]
#}
# 这里是将日志中的logTime字段以yyyy-MM-dd HH:mm:ss.SSS格式赋值到@timestamp字段,正常来说,这种才是日志系统生产环境的正确配置
#date{
# match => ["logTime", "yyyy-MM-dd HH:mm:ss.SSS"]
# target => "@timestamp"
#}
# 这里是使@timestamp转unix_ms毫秒时间戳
#date{
# match => ["timestamp", "UNIX_MS"]
# target => "@timestamp"
#}
#时区问题可以通过如下代码块解决>>然后output中的%{indexDay}可以用%{+yyyyMMdd}代替(temp字段不想要可以通过remove_field去掉)-[如果下面的做法会遇到]
# date{
# match => ["timestamp", "UNIX_MS"]
# target => "@timestamp"
# }
#ruby {
# code => "event.set('temp', event.get('@timestamp').time.localtime + 8*60*60); event.set('@timestamp', event.get('temp'))"
#}

# 用ruby模块添加字段
#ruby {
# code => "event.set('indexDay', event.get('[@timestamp]').time.localtime('+08:00').strftime('%Y%m%d'))"
#}
# 相当于trim()方法,取出多个字段的前后空格
#mutate{
# strip => ["module","traceId","localIp","remoteAddr","url","httpVersion","method","status","contentLength","referer","userAgent"]
#}
}
output {
# 输出到es,以logstash.template.json的mapping方式输出到es
# 如果不想写logstash.template.json并且需要es自定存一些字段的话,可以查看https://www.cnblogs.com/duanxuan/p/6517462.html,或者如我上面写的,用muate模块,grok模块等解析
elasticsearch {
hosts => ["http://192.168.56.100:9200"]
# 使用索引模板, 这里可以不这么写,可以在kibana中配置索引模板,相对来说更方便
manage_template => false
template => "/usr/share/logstash/templates/logstash.template.json"
template_name => "sopei"
template_overwrite => true
index => "xxx-%{indexDay}-%{type}"
# codec,编码解码用的,一般在input和output中用,这里意思就是说上面传的是json,下面输出用json解码
codec => json
user => "logstash_system"
password => "xiaowu"
}
# 生产去掉,debug专用,用来查看经过处理逻辑之后的日志
stdout {
codec => rubydebug
}
}

kibana.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
server.name: kibana
# kibana的主机地址 0.0.0.0可表示监听所有IP
server.host: "0.0.0.0"
# kibana访问es的URL
elasticsearch.hosts: "http://127.0.0.1:9200"
elasticsearch.username: 'kibana_system'
# 密码
elasticsearch.password: 'xiaowu'
# 显示登陆页面
xpack.monitoring.ui.container.elasticsearch.enabled: true
# 默认情况下,每次启动 Kibana 时都会为报告功能生成一个加密密钥。如果 Kibana 配置中未保留静态加密密钥,则在您重新启动 Kibana 时,任何挂起的报告都会失败。
xpack.reporting.encryptionKey: fd7c75cf-6abd-4704-a614-10a8679d64e7
如果您在多个 Kibana 实例之间进行负载平衡,则每个实例都需要具有相同的报告加密密钥
# 在Docker容器中运行Kibana时,所有容器进程都在一个usernamespace中运行,其中包含seccomp bpf和AppArmor配置文件,这些配置文件防止使用Chromium沙箱。在这些情况下,建议禁用沙盒,因为容器实现类似的安全机制
xpack.reporting.capture.browser.chromium.disableSandbox: false
# Config key [monitoring.cluster_alerts.email_notifications.email_address] will be required for email notifications to work in 8.0
monitoring.cluster_alerts.email_notifications.email_address: xxx
# 最终用户访问 Kibana 的公开可用 URL。必须包括协议、主机名、端口(如果分别http与 和https、80 和 443的默认值不同)和 server.basePath(如果已配置)。此设置不能以斜杠 ( /)结尾
server.publicBaseUrl: xxx
# 一个由 32 个或更多字符组成的字符串,用于在警报规则和操作的敏感属性存储在 Elasticsearch 之前对其进行加密
xpack.encryptedSavedObjects.encryptionKey: xxx
# 语言
i18n.locale: "zh-CN"

elasticsearch.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 7.x配置单节点只需要discovery.type: single-node和去掉cluster.initial_master_nodes: ["es-node1"](https://stackoverflow.com/questions/16432300/how-to-config-single-node-for-single-cluster-standalone-cluster-elasticsearch)
# 集群名称
cluster.name: elasticsearch-cluster
# 节点名称
node.name: es-node1
# 绑定host,0.0.0.0代表当前节点的ip
network.host: 0.0.0.0
# 设置其它节点和该节点交互的ip地址,如果不设置它会自动判断,值必须是个真实的ip地址(本机ip)
network.publish_host: 192.168.56.100
# 设置对外服务的http端口,默认为9200
http.port: 9200
# 设置节点间交互的tcp端口,默认是9300
transport.tcp.port: 9300
# 是否支持跨域,默认为false
http.cors.enabled: true
# 当设置允许跨域,默认为*,表示支持所有域名,如果我们只是允许某些网站能访问,那么可以使用正则表达式。比如只允许本地地址。 /https?:\/\/localhost(:[0-9]+)?/
http.cors.allow-origin: "*"
discovery.type: single-node
# 表示这个节点是否可以充当主节点
node.master: true
# 是否充当数据节点
node.data: true
# 所有主从节点ip:port
discovery.seed_hosts: ["192.168.56.100"]
# cluster.initial_master_nodes: ["es-node1"]
# 这个参数决定了在选主过程中需要 有多少个节点通信 预防脑裂(es7+已废弃该参数,由集群自主控制,并且在启动一个新的集群的时候需要有cluster.initial_master_nodes初始化集群列表)
# discovery.zen.minimum_master_nodes: 1
# 跨域允许设置的头信息,默认为X-Requested-With,Content-Type,Content-Lengt
http.cors.allow-headers: Authorization
# 这条配置表示开启xpack认证机制
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
##锁内存,提前占用内存
bootstrap.memory_lock: true

logstash.yml

1
2
3
4
5
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: elastic
# 密码
xpack.monitoring.elasticsearch.password: xiaowu
xpack.monitoring.elasticsearch.hosts: ["http://127.0.0.1:9200"]