前一阵子,搭建了ELK日志分析平台,用着挺爽的,再也不用给开发拉各种日志,节省了很多时间。

这篇博文是介绍用python代码实现日志分析的,用MRJob实现hadoop上的mapreduce,可以直接放到hadoop集群上运行。

mrjob可以让我们使用Python编写MapReduce运算,并在多个不同平台运行,你可以:

  • 使用纯python编写multi-step MapReduce 
  • 本机测试
  • 在hadoop集群上运行

安装mrjob

pip install mrjob

nginx访问日志格式

gamebbs.51.com 10.80.2.176 219.239.255.42 54220 [26/Dec/2016:04:34:39 +0800] "GET /forum.php?mod=ajax&action=forumchecknew&fid=752&time=1482697523&inajax=yes HTTP/1.0" 200 66 "http://gamebbs.51.com/forum.php?mod=forumdisplay&fid=752&page=1" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2626.106 Safari/537.36 Yunhai Browser" 0.016 0.011

日志格式分为下面几个部分:

server_name(域名): game.51.com
local_ip(本机内网IP):10.80.2.176
client_ip(客户端IP):219.239.255.42
remote_port(客户端建立连接端口):54220
time_local(请求时间):[26/Dec/2016:04:34:39 +0800]
method(请求方式):GET
request(请求url):/forum.php?mod=ajax&action=forumchecknew&fid=752&time=1482697523&inajax=yes HTTP/1.0
verb(http版本号):HTTP/1.0
status(状态码):200
body_bytes_sent:66
http_referer:http://gamebbs.51.com/forum.php?mod=forumdisplay&fid=752&page=1
http_user_agent:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2626.106 Safari/537.36 Yunhai Browser
request_time:0.016
upstream_response_time:0.011

处理nginx日志的类:

#!/usr/bin/env python
# coding=utf-8

import datetime
from urllib.parse import urlparse
from user_agents import parse as ua_parse

class NginxLineParser(object):

    def parse(self, line):
        """ 将 nginx 日志解析多个字段
        """
        try:
            line_item = line.strip().split('"')
            self._server_name, self._local_ip, self._client_ip, self._remote_port = line_item[0].strip().split('[')[0].split()
            self._time_local = line_item[0].strip().split('[')[-1].strip(']')
            self._method, self._request, self._verb = line_item[1].strip().split()
            self._status, self._body_bytes_sent = line_item[2].strip().split()
            self._http_referer = line_item[3].strip()
            self._http_user_agent = line_item[-2].strip()
            self._request_time, self._upstream_response_time = line_item[-1].strip().split()
        except:
            with open('/tmp/parser_log_error.txt', 'a+') as f:
                f.write(line + '\n')

    def logline_to_dict(self):
        """ 将日志段转为字典
        """
        line_field = {}
        line_field['server_name'] = self.server_name
        line_field['local_ip'] = self.local_ip
        line_field['client_ip'] = self.client_ip
        line_field['remote_port'] = self.remote_port
        line_field['time_local'] = self.time_local
        line_field['method'] = self.method
        line_field['request'] = self.request
        line_field['verb'] = self.verb
        line_field['status'] = self.status
        line_field['body_bytes_sent'] = self.body_bytes_sent
        line_field['http_referer'] = self.http_referer
        line_field['http_user_agent'] = self.http_user_agent
        line_field['request_time'] = self.request_time
        line_field['upstream_response_time'] = self.upstream_response_time

        return line_field

    @property
    def server_name(self):
        return self._server_name
    
    @property
    def local_ip(self):
        return self._local_ip

    @property
    def client_ip(self):
        return self._client_ip

    @property
    def remote_port(self):
        return self._remote_port

    @property
    def time_local(self):
        return datetime.datetime.strptime(self._time_local, '%d/%b/%Y:%H:%M:%S +0800')

    @property
    def method(self):
        return self._method

    @property
    def request(self):
        return urlparse(self._request).path

    @property
    def verb(self):
        return self._verb

    @property
    def body_bytes_sent(self):
        return self._body_bytes_sent

    @property
    def http_referer(self):
        return self._http_referer

    @property
    def http_user_agent(self):
        ua_agent = ua_parse(self._http_user_agent)
        if not ua_agent.is_bot:
            return ua_agent.browser.family

    @property
    def user_agent_type(self):
        us_agent = ua_parse(self._http_user_agent)
        if us_agent.is_bot:
            return us_agent.browser.family

    @property
    def status(self):
        return self._status

    @property
    def request_time(self):
        return self._request_time

    @property
    def upstream_response_time(self):
        return self._upstream_response_time

def main():
    """程序执行入口
    """
    ng_line_parser = NginxLineParser()
    with open('test.log', 'r') as f:
        for line in f:
            ng_line_parser.parse(line)

if __name__ == '__main__':
    main()

 

该类主要有两个方法:

  1. parse:将日志行解析为几个字段
  2. logline_to_dict:将解析好的日志段转为字典类型