ELK Stack收集处理Tomcat Access日志

文章首发站点OpensGalaxy

首先,接上文http://opensgalaxy.com/2016/08/26/elk-stack-%E4%BB%8B%E7%BB%8D/我下载的logstash-all-plugins-2.3.4.tar.gz包,直接解压使用。本文我是用logstash作为shipper,但实际运维场景中会使用Beats作为shipper,例如:filebeat,而logstash则作为Broker and Indexer使用,方便统一配置处理接收到的日志信息。

1、定义tomcat日志格式

[root@server25 patterns]# cat /usr/local/apache-tomcat-8.0.24/conf/server.xml 

<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
               prefix="localhost_access_log" suffix=".txt"
               pattern="[%a] [%u] [%t] ["%r"] [%s] [%B] [%{X-Forwarded-For}i] [%{Referer}i] [%{User-Agent}i]" />

为了方便分隔日志的字段,我给每个字段加了一个'[]’,增加%{X-Forwarded-For}i (因为是在负载均衡后,所以需要增加次字段来取到真实的客户端IP)、%{Referer}i(用户来路页面)、%{User-Agent}i(用户使用的客户端信息)三个字段。
tomcat access 生成的日志格式:
[100.97.138.51] [-] [[06/Sep/2016:15:20:43 +0800]] ["GET / HTTP/1.0"] [404] [0] [123.151.12.154] [-] [Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9b4) Gecko/2008030317 Firefox/3.0b4]

2、自定义tomcat access日志格式的pattern

logstash默认带的patterns在这里:
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns

[root@server25 patterns]# cat tomcataccess 
CLIENTIP \d+\.\d+\.\d+\.\d+|-
AWORD .+
TOMCATACCESS \[%{CLIENTIP:Dclientip}\] \[%{USER:auth}\] \[\[%{HTTPDATE:atimestamp}\]\] \[\"(?:%{WORD:verb}) (%{NOTSPACE:request}) ((?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\"\] \[%{NUMBER:response}\] \[(?:%{NUMBER:bytes})\] \[%{CLIENTIP:Sclientip}\] \[%{AWORD:referer}\] \[%{AWORD:user_agent}\]

3、logstash 配置

[root@server05 ~]# cat /etc/logstash/conf.d/1.tomcat_access.conf 
input {
    file {
        path => "/usr/local/apache-tomcat-8.0.24/logs/localhost_access_log.*"
        type => "tomcat_access"
        start_position => "beginning"
    }
}
filter {
     if ([message] =~ "healthDetection|\/zza-sif\/sms\/recv") {
             drop {}            ##健康监测生成的日志过滤掉 不输出  
        }
     grok {
          match => ["message",  "%{TOMCATACCESS}"]
          }
     grok {
      match => ["message", "%{HTTPDATE:atimestamp}"]
      }
     date {
      match => ["atimestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
      }  ## 处理atimestamp时间字段,覆盖@timestamp 的值。


     ### 如果字段Sclientip 不是空 就覆盖Dclientip字段的值, 因为前端有负载均衡,所以通过LB访问的的ip会填充Sclientip的值,如果直接访问后端服务器,那么会填充Dclientip的值。

     if [Sclientip] != "-" {
            mutate {
                replace => {
                    "Dclientip" => "%{Sclientip}"
                   }
                }
}
    ### 如果字段Sclientip 是空,就使用Dclientip的值填充Sclientip字段值。
     if [Sclientip] == "-" {
                mutate {
                        replace => {
                                "Sclientip" => "%{Dclientip}"
                                   }
                        }
}
    ### 这两个if保证了不管是通过LB访问还是直接访问服务器,Sclientip都会得到一个真实的公网IP,方便后续根据geoip做地图分布。

    ### 获取用户客户端信息
     useragent {      
           target => "useragent"
           source => "user_agent"
    }
    ### 根据访问ip得到的所在地等信息
     geoip {
      source => "Sclientip"
       }
}

### 输出到elasticsearch
output {
    elasticsearch {
        hosts => ["10.90.100.10:9200"]
        index => "logstash-%{type}"
        }
}

Kibana 效果:

image

日志信息:

image

根据收集的日志信息建立Visualize

* 用户Agent信息

image

image

* 用户OS信息

image

image

* 用户map分布

image

image

建立Dashboard

使用刚才建立的3个Visualize 组建Dashboard

image

发表评论