目录
1、使用codec的multiline插件收集java日志... 1
2、收集nginx日志... 2
3、收集系统syslog日志... 3
4、使用fliter的grok模块收集mysql日志... 4
1、使用codec的multiline插件收集java日志
对于采用ELK作为应用日志来说,多行消息的友好展示是必不可少的,否则ELK的价值就大大打折了。要正确的处理多行消息,需使用multiline插件
比如,对于java日志而言,可以使用:
multiline.pattern:
'^\['
multiline.negate:
true
multiline.match:
after
这样,下面的日志就算一个事件了。
input {
file {
path => "/var/log/elasticsearch/chuck-clueser.log"
type => "es-error"
start_position => "beginning"
codec => multiline {
pattern => "^\[" #使用正则表式, 以中括号开头的就是一行日志
negate => true
what => "previous"
}
}
}
output {
if [type] == "es-error" {
elasticsearch {
hosts => ["192.168.100.163:9200"]
index => "es-error-%{+YYYY.MM.dd}"
}
}
}
2、收集nginx日志
使用codec的json插件将日志的域进行分段,使用key-value的方式,使日志格式更清晰,易于搜索,还可以降低cpu的负载
2.1 更改nginx的配置文件的日志格式,使用json
[root@linux-node1 ~]# vim
/etc/nginx/nginx.conf #添加日志格式,把自带的格式注释掉
17 http {
18
#log_format main '$remote_addr - $remote_user [$time_local]
"$request" '
19
# '$status
$body_bytes_sent "$http_referer" '
20
#
'"$http_user_agent" "$http_x_forwarded_for"';
21
#access_log /var/log/nginx/access.log main;
22
log_format json '{ "@timestamp": "$time_local", '
23 '"@fields":
{ '
24
'"remote_addr": "$remote_addr", '
25
'"remote_user": "$remote_user", '
26
'"body_bytes_sent": "$body_bytes_sent", '
27
'"request_time": "$request_time", '
28 '"status":
"$status", '
29 '"request":
"$request", '
30
'"request_method": "$request_method", '
31
'"http_referrer": "$http_referer", '
32
'"body_bytes_sent":"$body_bytes_sent", '
33
'"http_x_forwarded_for": "$http_x_forwarded_for", '
34
'"http_user_agent": "$http_user_agent" } }';
35
access_log /var/log/nginx/access_json.log json;
[root@linux-node1 ~]# nginx -t #检查配置文件
[root@linux-node1 ~]# systemctl start nginx
日志格式如下
2.2
使用logstash将nginx访问日志收集起来
[root@linux-node1 ~]# cat
log_nginx.conf 4、
input {
file {
path =>
"/var/log/nginx/access_json.log"
codec => "json"
start_position =>
"beginning"
type => "nginx-log"
}
}
output {
elasticsearch {
hosts =>
["http://192.168.100.163:9200"]
index =>
"nginx-%{+YYY.MM.dd}"
}
}
[root@linux-node1 ~]#
/usr/local/logstash/bin/logstash -f log_nginx.conf
3、收集系统syslog日志
[root@linux-node1 ~]# vim syslog.conf
input {
syslog {
type => "system-syslog"
#绑定个ip,监听个514端口,启动后,别的机器可以通过网络把日志发过来
host => "192.168.100.161"
port => "514"
}
}
output {
elasticsearch {
hosts => ["192.168.100.161:9200"]
index => "system-syslog-%{+YYYY.MM.dd}"
}
}
[root@linux-node1 ~]#
/usr/local/logstash/bin/logstash -f syslog.conf
修改服务器的syslog配置文件,把日志信息发送到514端口上
[root@linux-node2 ~]# vim /etc/rsyslog.conf
90 *.* @@192.168.100.161:514
[root@linux-node2 ~]# systemctl restart
rsyslog
4、使用fliter的grok模块收集mysql日志
filter插件有很多,在这里就学习grok插件,使用正则匹配日志里的域来拆分。在实际生产中,apache日志不支持jason,就只能使用grok插件匹配;mysql慢查询日志也是无法拆分,只能使用grok正则表达式匹配拆分。
在如下链接,github上有很多写好的grok模板,可以直接引用
https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
在装好的logstash中也会有grok匹配规则,直接可以引用,路径如下
[root@linux-node1 patterns]# pwd
/usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.0/patterns
4.1日志文件
[root@linux-node1 ~]# cat slow.log
# Time: 160108 15:46:14
# User@Host: dev_select_user[dev_select_user] @ [192.168.97.86] Id: 714519
# Query_time: 1.638396
Lock_time: 0.000163 Rows_sent: 40
Rows_examined: 939155
SET timestamp=1452239174;
SELECT DATE(create_time) as day,HOUR(create_time) as
h,round(avg(low_price),2) as low_price
FROM t_actual_ad_num_log
WHERE create_time>='2016-01-07' and ad_num<=10
GROUP BY
DATE(create_time),HOUR(create_time);
4.2编写slow.conf
[root@linux-node1 ~]# cat mysql-slow.conf
input{
file {
path =>
"/root/slow.log"
type =>
"mysql-slow-log"
start_position =>
"beginning"
codec => multiline {
pattern => "^#
User@Host:"
negate => true
what =>
"previous"
}
}
}
filter {
# drop sleep events
grok {
match => {
"message" =>"SELECT SLEEP" }
add_tag => [
"sleep_drop" ]
tag_on_failure =>
[] # prevent default _grokparsefailure tag on real records
}
if "sleep_drop"
in [tags] {
drop {}
}
grok {
match => [ "message",
"(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*)
)?\[(?:%{IP:clientip})?\]\s+Id: %{NUMBER:row_id:int}\s*# Query_time:
%{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent:
%{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use
%{DATA:database};\s*)?SET
timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n#\s*"
]
}
date {
match => [
"timestamp", "UNIX" ]
remove_field => [
"timestamp" ]
}
}
output {
stdout{
codec =>
"rubydebug"
}
}
执行该配置文件,查看grok正则匹配结果
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:使用logstash收集java、nginx、系统等常见日志 - Python技术站