通过 es 的 api 接口查询日志,使用了 elasticsearch 和
elasticsearch_dsl 模块,对于不想组装 json 的人 dsl 真是方便~~~
pip install elasticsearch pip install elasticsearch_dsl
# -*- coding: utf-8 -*- # @Author: richard # @Date: 2017-09-26 10:24:20 # @Last Modified by: richard # @Last Modified time: 2017-09-26 18:12:32 # usage: python $0 jjjj[index_name:jjjj-2017.09.27] <keyword|status> "string|[200]" from elasticsearch import Elasticsearch from elasticsearch_dsl import Search,Q import datetime,time import sys BEFORE_MINS = 15 def dtToStr(time_dt): time_str = (time_dt).strftime('%Y-%m-%dT%H:%M:%S+08:00') return time_str before_dt = datetime.datetime.now()+datetime.timedelta(seconds=-BEFORE_MINS*60) now_dt = datetime.datetime.now() before_str = dtToStr(before_dt) now_str = dtToStr(now_dt) # print before_str,now_str class Es(object): """docstring for es""" def __init__(self, prod): super(Es, self).__init__() self.prod = prod es = Elasticsearch('http://1.1.1.1:9200') self.s = Search(using=es,index=self.today_index()) def today_index(self): today = (datetime.datetime.now()).strftime("%Y.%m.%d") # today = "2017.09.25" today_index = "%s-%s" % (self.prod,today) return today_index def main(args): _,prod_env,qtype,keyword=args es = Es(prod_env) # 监控status if qtype == 'status': s = es.s \ .query(Q("match",type="nginx_access")& \ (Q("match",status=keyword))& \ (Q("range",time_local={'gte':before_str,'lte':now_str}))) # 监控关键字 elif qtype == 'keyword': print keyword s = es.s \ .query(Q("match",type="nginx_access")& \ (Q("match",status=200))& \ (Q("match",request=keyword))& \ (Q("range",time_local={'gte':before_str,'lte':now_str}))) response = s.execute() print(response['hits']['total']) if __name__ == '__main__': if len(sys.argv) == 4: main(sys.argv) else: print("parameter error")
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:[ELK] 通过 es 接口监控 nginx 日志 - Python技术站