该方案基于任务调度框架Gearman,采用Python开发的分布式数据统计系统。

 

项目的目录结构很简单:



# apple at localhost in ~/Develop/getui [11:24:26]
$ tree
.
├── Browser.py
├── PickleGearman.py
├── SpiderWorker.py
└── countPushNum.py


0 directories, 4 files

 

我们的Mac Pro Book,Gearman安装并启动:

1 # apple at liujingyu.local in ~/Develop/getui [10:47:36]
2 $ brew install gearman
3 $ gearmand -d -L 127.0.0.1 -p 4307

Python需要安装Gearman、mechanize等库,(pip用于安装常用的包,具体安装见, https://pip.pypa.io/en/latest/installing.html#install-pip)

1 # apple at liujingyu.local in ~/Develop/getui [10:47:36]
2 $ pip install gearman mechanize

workder之间发送,接受Python对象。

 

 1 $ cat PickleGearman.py
 2 #!/usr/bin/env python
 3 #coding:utf-8
 4 
 5 import pickle
 6 import gearman
 7 
 8 class PickleDataEncoder(gearman.DataEncoder):
 9     @classmethod
10     def encode(cls, encodable_object):
11         return pickle.dumps(encodable_object)
12 
13     @classmethod
14     def decode(cls, decodable_string):
15         return pickle.loads(decodable_string)
16 
17 class PickleWorker(gearman.GearmanWorker):
18     data_encoder = PickleDataEncoder
19 
20 class PickleClient(gearman.GearmanClient):
21     data_encoder = PickleDataEncoder

运行图:

个推push数据统计(爬虫)

8个Spider运行过程图:

个推push数据统计(爬虫) 

 Spider代码:

 1 $ cat SpiderWorker.py
 2 #!/usr/bin/env python
 3 
 4 from PickleGearman import PickleWorker
 5 from Browser import Browser
 6 
 7 class GearmanWorker(PickleWorker):
 8     def on_job_execute(self, current_job):
 9         return super(GearmanWorker, self).on_job_execute(current_job)
10 
11 def SpiderWorker(gearman_worker, gearman_job):
12     taskIds = gearman_job.data
13 
14     try:
15         doc = Browser(taskIds)
16     except Exception as e:
17         config.logging.info(e)
18 
19     return doc
20 
21 worker = GearmanWorker(['127.0.0.1:4307'])
22 worker.register_task("SpiderWorker", SpiderWorker)
23 worker.work()

countPushNum.py代码:

 1 # apple at localhost in ~/Develop/getui [11:30:38]
 2 $ cat countPushNum.py
 3 #!/usr/bin/python
 4 # -*- coding: utf-8 -*-
 5 
 6 import cookielib
 7 import json
 8 import socket
 9 socket.setdefaulttimeout(10)
10 import redis
11 import mechanize
12 from PickleGearman import PickleClient
13 import numpy as np
14 currency = 30
15 
16 def printEveryGroupMsg(groupSum):
17     """docstring for printEveryGroupMsg"""
18     print '有效可发送数    实际下发数  收到数'
19     print groupSum
20 
21 def main():
22     gearman_clients = PickleClient(['127.0.0.1:4307'])
23     """docstring for main"""
24     r1 = redis.Redis(host='xxx.xx.xx.x', port=6379, db=0, password='pasword')
25     r2 = redis.Redis(host='xx.xx.xx.xx', port=6379, db=0, password='pasword')
26 
27     #总数统计
28     yesterdaykeys = '*'+yesterday+':count'
29 
30     totalkeys = r1.keys(yesterdaykeys)
31     for key in totalkeys:
32         print key,r1.get(key)
33     totalkeys = r2.keys(yesterdaykeys)
34     for key in totalkeys:
35         print key,r2.get(key)
36 
37     #push数统计
38     yesterdaykeys = '*'+yesterday+':taskIds'
39 
40     totalkeys = r1.keys(yesterdaykeys)
41     for key in totalkeys:
42         print key
43         taskIds = list(r1.smembers(key))
44         everyGroup = []
45         jobs = [dict(task='SpiderWorker', data=taskId) for taskId in [taskIds[i:i+currency] for i in range(0, len(taskIds), currency)]]
46         for per_jobs in [jobs[i:i+currency] for i in range(0, len(jobs), currency)]:
47             completed_requests = gearman_clients.submit_multiple_jobs(per_jobs)
48             for current_request in completed_requests:
49                 content = current_request.result
50                 if len(content) == 3:
51                     everyGroup.append(content)
52         printEveryGroupMsg(np.sum(everyGroup, 0))
53 
54     totalkeys = r2.keys(yesterdaykeys)
55     for key in totalkeys:
56         print key
57         taskIds = list(r2.smembers(key))
58 
59         everyGroup = []
60         jobs = [dict(task='SpiderWorker', data=taskId) for taskId in [taskIds[i:i+currency] for i in range(0, len(taskIds), currency)]]
61         for per_jobs in [jobs[i:i+currency] for i in range(0, len(jobs), currency)]:
62             completed_requests = gearman_clients.submit_multiple_jobs(per_jobs)
63             for current_request in completed_requests:
64                 content = current_request.result
65                 if len(content) == 3:
66                     everyGroup.append(content)
67         printEveryGroupMsg(np.sum(everyGroup, 0))
68 
69 if __name__ == '__main__':
70 
71     from datetime import date, timedelta
72 
73     day = input('请输入时间<昨天请输入1>\n>') or 0
74 
75     yesterday = (date.today() - timedelta(day)).strftime('%y%m%d')
76     today = (date.today() - timedelta(0)).strftime('%y%m%d')
77 
78     main()

 

 抓取模块代码:

 1 $ cat Browser.py
 2 #!/usr/bin/env python
 3 #coding:utf-8
 4 
 5 import mechanize
 6 import numpy as np
 7 import cookielib,json
 8 
 9 def Browser(taskIds):
10     url = 'http://dev.igetui.com/login.htm'
11 # Browser
12     br = mechanize.Browser()
13 
14 # Cookie Jar
15     cj = cookielib.LWPCookieJar()
16     br.set_cookiejar(cj)
17 
18 # Browser options
19     br.set_handle_equiv(True)
20     br.set_handle_gzip(True)
21     br.set_handle_redirect(True)
22     br.set_handle_referer(True)
23     br.set_handle_robots(False)
24 
25 # Follows refresh 0 but not hangs on refresh > 0
26     br.set_handle_refresh(mechanize._http.HTTPRefreshProcessor(), max_time=1)
27 
28 # Want debugging messages?
29     br.set_debug_http(False)
30     br.set_debug_redirects(False)
31     br.set_debug_responses(False)
32 
33 # User-Agent (this is cheating, ok?)
34     br.addheaders = [('User-agent', 'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.1) \
35      Gecko/2008071615 Fedora/3.0.1-1.fc9 Firefox/3.0.1')]
36 
37 # Open some site, let's pick a random one, the first that pops in mind:
38     r = br.open(url)
39 
40     br.select_form(name = 'loginForm')
41 # 登陆用户名和密码
42     br['username'] = 'getui'
43     br['password'] = 'password'
44     br.submit()
45 
46     everyGroup = []
47     for taskId in taskIds:
48         try:
49             tsum = []
50             try:
51                 home_url = 'http://dev.getui.com/dos/statistics/apiStatistics'
52                 response = br.open('https://dev.getui.com/dos/pushRecords/queryApiPushList?curPage=1&appId=16500&taskId=%s' % taskId)
53                 html = response.read()
54 
55                 result = json.loads(html.strip())
56                 if result.has_key('resultList'):
57                     resultList = result['resultList']
58 
59                     tsum.append(int(resultList[0]['sendNum']))
60                     tsum.append(int(resultList[0]['realSendNum']))
61                     tsum.append(int(resultList[0]['receiveNum']))
62             except Exception as e:
63                 print e
64             else:
65                 print tsum
66 
67             if len(tsum) == 3:
68                 everyGroup.append(tsum)
69         except Exception as e:
70             print e
71 
72     return np.sum(everyGroup, 0)