1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| input { beats { port => 5044 } kafka { bootstrap_servers => "192.168.1.106:9092,192.168.1.107:9092,192.168.1.108:9092" auto_commit_interval_ms => 5000 group_id => "logstash" client_id => "logstash-0" consumer_threads => 2 auto_offset_reset => "latest" topics => ["topic1", "topic2"] add_field => {"logstash" => "192.168.1.143"} codec => json { charset => "UTF-8" } } }
filter { if [logtype] == "bi"{ mutate { // 替换字符,记得转义 gsub => [ "message",'\\x22','"' ] gsub => [ "message",'\,-','' ] } // json格式 json { source => "message" } } else if [logtype] == "acc" { mutate{ split => ["message","~"] add_field => { "module" => "%{[message][1]}" } } } # json模块,前提是message是json格式的 #json { # source => "message" # remove_field => ["message","@version","path"] #} # 这里是将日志中的logTime字段以yyyy-MM-dd HH:mm:ss.SSS格式赋值到@timestamp字段,正常来说,这种才是日志系统生产环境的正确配置 #date{ # match => ["logTime", "yyyy-MM-dd HH:mm:ss.SSS"] # target => "@timestamp" #} # 这里是使@timestamp转unix_ms毫秒时间戳 #date{ # match => ["timestamp", "UNIX_MS"] # target => "@timestamp" #} # 时区问题可以通过如下代码块解决>>然后output中的%{indexDay}可以用%{+yyyyMMdd}代替(temp字段不想要可以通过remove_field去掉) # date{ # match => ["timestamp", "UNIX_MS"] # target => "@timestamp" # } #ruby { # code => "event.set('temp', event.get('@timestamp').time.localtime + 8*60*60); event.set('@timestamp', event.get('temp'))" #}
# 用ruby模块添加字段 #ruby { # code => "event.set('indexDay', event.get('[@timestamp]').time.localtime('+08:00').strftime('%Y%m%d'))" #} # 相当于trim()方法,取出多个字段的前后空格 #mutate{ # strip => ["module","traceId","localIp","remoteAddr","url","httpVersion","method","status","contentLength","referer","userAgent"] #} } output { # 输出到es,以logstash.template.json的mapping方式输出到es # 如果不想写logstash.template.json并且需要es自定存一些字段的话,可以查看https://www.cnblogs.com/duanxuan/p/6517462.html,或者如我上面写的,用muate模块,grok模块等解析 elasticsearch { hosts => ["http://192.168.56.100:9200"] manage_template => false template => "/usr/share/logstash/templates/logstash.template.json" template_name => "sopei" template_overwrite => true index => "xxx-%{indexDay}-%{type}" codec => json user => "logstash_system" password => "xiaowu" } stdout { codec => rubydebug } }
|