通过 es 的 api 接口查询日志,使用了 elasticsearch 和
elasticsearch_dsl 模块,对于不想组装 json 的人 dsl 真是方便~~~

pip install elasticsearch 
pip install elasticsearch_dsl

 

# -*- coding: utf-8 -*-
# @Author: richard
# @Date:   2017-09-26 10:24:20
# @Last Modified by:   richard
# @Last Modified time: 2017-09-26 18:12:32

# usage: python $0 jjjj[index_name:jjjj-2017.09.27] <keyword|status> "string|[200]"

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search,Q
import datetime,time
import sys

BEFORE_MINS = 15

def dtToStr(time_dt):
    time_str = (time_dt).strftime('%Y-%m-%dT%H:%M:%S+08:00')
    return time_str


before_dt = datetime.datetime.now()+datetime.timedelta(seconds=-BEFORE_MINS*60)
now_dt = datetime.datetime.now()
before_str = dtToStr(before_dt)
now_str = dtToStr(now_dt)
# print before_str,now_str


class Es(object):
    """docstring for es"""
    def __init__(self, prod):
        super(Es, self).__init__()
        self.prod = prod
        es = Elasticsearch('http://1.1.1.1:9200')
        self.s = Search(using=es,index=self.today_index())

    def today_index(self):
        today = (datetime.datetime.now()).strftime("%Y.%m.%d")
        # today = "2017.09.25"
        today_index = "%s-%s" % (self.prod,today)
        return today_index


def main(args):
    _,prod_env,qtype,keyword=args
    es = Es(prod_env)
    # 监控status
    if qtype == 'status':
        s = es.s \
            .query(Q("match",type="nginx_access")& \
                (Q("match",status=keyword))& \
                (Q("range",time_local={'gte':before_str,'lte':now_str})))

    # 监控关键字
    elif qtype == 'keyword':
        print keyword
        s = es.s \
            .query(Q("match",type="nginx_access")& \
            (Q("match",status=200))& \
            (Q("match",request=keyword))& \
            (Q("range",time_local={'gte':before_str,'lte':now_str})))


    response = s.execute()
    print(response['hits']['total'])


if __name__ == '__main__':
    if len(sys.argv) == 4:
        main(sys.argv)
    else:
        print("parameter error")