经过一段时间的研究和学习,大致了解了DHT网络的一些信息,大部分还是参会别人的相关代码,一方面主要对DHT爬虫原理感兴趣,最主要的是为了学习python,大部分是别人的东西原理还是引用别人的吧

DHT网络爬虫的实现 | 学步园   http://www.xuebuyuan.com/1287052.html

DHT协议原理以及一些重点分析:

   要做DHT的爬虫,首先得透彻理解DHT,这样才能知道在什么地方究竟该应用什么算法去解决问题。关于DHT协议的细节以及重要的参考文章,请参考文末1

   DHT协议作为BT协议的一个辅助,是非常好玩的。它主要是为了在BT正式下载时得到种子或者BT资源。传统的网络,需要一台中央服务器存放种子或者BT资源,不仅浪费服务器资源,还容易出现单点的各种问题,而DHT网络则是为了去中心化,也就是说任意时刻,这个网络总有节点是亮的,你可以去询问问这些亮的节点,从而将自己加入DHT网络。

   要实现DHT协议的网络爬虫,主要分3步,第一步是得到资源信息(infohash,160bit,20字节,可以编码为40字节的十六进制字符串),第二步是确认这些infohash是有效的,第三步是通过有效的infohash下载到BT的种子文件,从而得到对这个资源的完整描述。

   其中第一步是其他节点用DHT协议中的get_peers方法向爬虫发送请求得到的,第二步是其他节点用DHT协议中的announce_peer向爬虫发送请求得到的,第三步可以有几种方式得到,比如可以去一些保存种子的网站根据infohash直接下载到,或者通过announce_peer的节点来下载到,具体如何实现,可以取决于你自己的爬虫。

   DHT协议中的主要几个操作:

 

   主要负责通过UDP与外部节点交互,封装4种基本操作的请求以及相应。

   ping:检查一个节点是否“存活”

   在一个爬虫里主要有两个地方用到ping,第一是初始路由表时,第二是验证节点是否存活时

   find_node:向一个节点发送查找节点的请求

   在一个爬虫中主要也是两个地方用到find_node,第一是初始路由表时,第二是验证桶是否存活时

   get_peers:向一个节点发送查找资源的请求

   在爬虫中有节点向自己请求时不仅像个正常节点一样做出回应,还需要以此资源的info_hash为机会尽可能多的去认识更多的节点。如图,get_peers实际上最后一步是announce_peer,但是因为爬虫不能announce_peer,所以实际上get_peers退化成了find_node操作。

开博第一篇:DHT 爬虫的学习记录

   announce_peer:向一个节点发送自己已经开始下载某个资源的通知

   爬虫中不能用announce_peer,因为这就相当于通报虚假资源,对方很容易从上下文中判断你是否通报了虚假资源从而把你禁掉

   DHT协议中有几个重点的需要澄清的地方:

   1. node与infohash同样使用160bit的表示方式,160bit意味着整个节点空间有2^160 = 730750818665451459101842416358141509827966271488,是48位10进制,也就是说有百亿亿亿亿亿个节点空间,这么大的节点空间,是足够存放你的主机节点以及任意的资源信息的。

   2. 每个节点有张路由表。每张路由表由一堆K桶组成,所谓K桶,就是桶中最多只能放K个节点,默认是8个。而桶的保存则是类似一颗前缀树的方式。相当于一张8桶的路由表中最多有160-4个K桶。

   3. 根据DHT协议的规定,每个infohash都是有位置的,因此,两个infohash之间就有距离一说,而两个infohash的距离就可以用异或来表示,即infohash1 xor infohash2,也就是说,高位一样的话,他们的距离就近,反之则远,这样可以快速的计算两个节点的距离。计算这个距离有什么用呢,在DHT网络中,如果一个资源的infohash与一个节点的infohash越近则该节点越有可能拥有该资源的信息,为什么呢?可以想象,因为人人都用同样的距离算法去递归的询问离资源接近的节点,并且只要该节点做出了回应,那么就会得到一个announce信息,也就是说跟资源infohash接近的节点就有更大的概率拿到该资源的infohash

   4. 根据上述算法,DHT中的查询是跳跃式查询,可以迅速的跨越的的节点桶而接近目标节点桶。之所以在远处能够大幅度跳跃,而在近处只能小幅度跳跃,原因是每个节点的路由表中离自身越接近的节点保存得越多,如下图

开博第一篇:DHT 爬虫的学习记录

   5. 在一个DHT网络中当爬虫并不容易,不像普通爬虫一样,看到资源就可以主动爬下来,相反,因为得到资源的方式(get_peers, announce_peer)都是被动的,所以爬虫的方式就有些变化了,爬虫所要做的事就是像个正常节点一样去响应其他节点的查询,并且得到其他节点的回应,把其中的数据收集下来就算是完成工作了。而爬虫唯一能做的,是尽可能的去多认识其他节点,这样,才能有更多其他节点来向你询问。

   6. 有人说,那么我把DHT爬虫的K桶中的容量K增大是不是就能增加得到资源的机会,其实不然,之前也分析过了,DHT爬虫最重要的信息来源全是被动的,因为你不能增大别人的K,所以距离远的节点保存你自身的概率就越小,当然距离远的节点去请求你的概率相对也比较小。

   一些主要的组件(实际实现更加复杂一些,有其他的模块,这里仅列举主要几个):

   DHT crawler

   这个就是DHT爬虫的主逻辑,为了简化多线程问题,跟server用了生产者消费者模型,负责消费,并且复用server的端口。

   主要任务就是负责初始化,包括路由表的初始化,以及初始的请求。另外负责处理所有进来的消息事件,由于生产者消费者模型的使用,里面的操作都基本上是单线程的,简化了不少问题,而且相信也比上锁要提升速度(当然了,加锁这步按理是放到了queue这里了,不过对于这种生产者源源不断生产的类型,可以用ring-buffer大幅提升性能)。

   DHT server

   这里是DHT爬虫的服务器端,DHT网络中的节点不单是client,也是server,所以要有server担当生产者的角色,最初也是每个消费者对应一个生产者,但实际上发现可以利用IO多路复用来达到消息事件的目的,这样一来大大简化了系统中线程的数量,如果client可以的话,也应该用同样的方式来组织,这样系统的速度应该会快很多。(尚未验证)

   DHT route table

   主要负责路由表的操作。

   路由表有如下操作:

   init:刚创建路由表时的操作。分两种情况:

   1. 如果之前已经初始化过,并且将上次路由表的数据保存下来,则只需要读入保存数据。

   2. 如果之前没有初始化过,则首先应当初始化。

   首先,应当有一个接入点,也就是说,你要想加进这个网络,必须认识这个网络中某个节点i并将i加入路由表,接下来对i用find_node询问自己的hash_info,这里巧妙的地方就在于,理论上通过一定数量的询问就会找到离自己距离很近的节点(也就是经过一定步骤就会收敛)。find_node目的在于尽可能早的让自己有数据,并且让网络上别的节点知道自己,如果别人不认识你,就不会发送消息过来,意味着你也不能获取到想要的信息。

   search:比较重要的方法,主要使用它来定位当前infohash所在的桶的位置。会被其他各种代理方法调用到。

   findNodes:找到路由表中与传入的infohash最近的k个节点

   getPeer:找到待查资源是否有peer(即是否有人在下载,也就是是否有人announce过)

   announcePeer:通知该资源正在被下载

   DHT bucket:

   acitiveNode:逻辑比较多,分如下几点。

 

        1. 查找所要添加的节点对应路由表的桶是否已经满,如果未满,添加节点

        2. 如果已经满,检查该桶中是否包含爬虫节点自己,如果不包含,抛弃待添加节点

        3. 如果该桶中包含本节点,则平均分裂该桶

   其他的诸如locateNode
replaceNode
updateNode
removeNode
,就不一一说明了

   DHT torrent parser  

   主要从bt种子文件中解析出以下几个重要的信息:name,size,file list(sub file name, sub file size),比较简单,用bencode方向解码就行了

   Utils

   distance:计算两个资源之间的距离。在kad中用a xor b表示

 

   为了增加难度,选用了不太熟悉的语言python,结果步步为营,但是也感慨python的简洁强大。在实现中,也碰到很多有意思的问题。比如如何保存一张路由表中的所有桶,之前想出来几个办法,甚至为了节省资源,打算用bit数组+dict直接保存,但是因为估计最终的几个操作不是很方便直观容易出错而放弃,选用的结构就是前缀树,操作起来果然是没有障碍;

   在超时问题上,比如桶超时和节点超时,一直在思考一个高效但是比较优雅的做法,可以用一个同步调用然后等待它的超时,但是显然很低效,尤其我没有用更多线程的情况,一旦阻塞了就等于该端口所有事件都被阻塞了。所以必须用异步操作,但是异步操作很难去控制它的精确事件,当然,我可以在每个事件来的时候检查一遍是否超时,但是显然也是浪费和低效。那么,剩下的只有采用跟tomcat类似的方式了,增加一个线程来监控,当然,这个监控线程最好是全局的,能监控所有crawler中所有事务的超时。另外,超时如果控制不当,容易导致内存没有回收以至于内存泄露,也值得注意。超时线程是否会与其他线程互相影响也应当仔细检查。

   最初超时的控制没处理好,出现了ping storm,运行一定时间后大多数桶已经满了,如果按照协议中的方式去跑的话会发现大量的事件都是在ping以确认这个节点是否ok以至于大量的cpu用于处理ping和ping响应。深入理解后发现,检查节点状态是不需要的,因为节点状态只是为了提供给询问的人一些好的节点,既然如此,可以将每次过来的节点替换当前桶中最老的节点,如此一来,我们将总是保存着最新的节点。

   搜索算法也是比较让我困惑的地方,简而言之,搜索的目的并不是真正去找资源,而是去认识那些能够保存你的节点。为什么说是能够保存你,因为离你越远,桶的数量越少,这样一来,要想进他们的桶中去相对来说就比较困难,所以搜索的目标按理应该是附近的节点最好,但是不能排除远方节点也可能保存你的情况,这种情况会发生在远方节点初始化时或者远方节点的桶中节点超时的时候,但总而言之,概率要小些。所以搜索算法也不应该不做判断就胡乱搜索,但是也不应该将搜索的距离严格限制在附近,所以这是一个权衡问题,暂时没有想到好的方式,觉得暂时让距离远的以一定概率发生,而距离近的必然发生

   还有一点,就是搜索速度问题,因为DHT网络的这种结构,决定了一个节点所认识的其他节点必然是有限的附近节点,于是每个节点在一定时间段内能拿到的资源数必然是有限的,所以应当分配多个节点同时去抓取,而抓取资源的数量很大程度上就跟分配节点的多少有关了。

   最后一个值得优化的地方是findnodes方法,之前的方式是把一个桶中所有数据拿出来排序,然后取其中前K个返回回去,但是实际上我们做了很多额外的工作,这是经典的topN问题,使用排序明显是浪费时间的,因为这个操作非常频繁,所以即便所有保存的节点加起来很少((160 - 4) * 8),也会一定程度上增加时间。而采用的算法是在一篇论文《可扩展的DHT网络爬虫设计和优化》中找到的,基本公式是IDi = IDj xor 2 ^(160 - i),这样,已知IDi和i就能知道IDj,若已知IDi和IDj就能知道i,通过这种方式,可以快速的查找该桶A附近的其他桶(显然是离桶A层次最近的桶中的节点距离A次近),比起全部遍历再查找效率要高不少。

  开博第一篇:DHT 爬虫的学习记录

    dht协议http://www.bittorrent.org/beps/bep_0005.html 及其翻译http://gobismoon.blog.163.com/blog/static/5244280220100893055533/

 

爬虫源码参考别人的,非原创,只为学习

 

 

  1 #encoding: utf-8
  2 
  3 from hashlib import sha1
  4 from random import randint
  5 from struct import unpack, pack
  6 from socket import inet_aton, inet_ntoa
  7 from bisect import bisect_left
  8 from threading import Timer
  9 from time import sleep
 10 
 11 from bencode import bencode, bdecode
 12 
 13 BOOTSTRAP_NODES = [
 14     ("router.bittorrent.com", 6881),
 15     ("dht.transmissionbt.com", 6881),
 16     ("router.utorrent.com", 6881)
 17 ] 
 18 TID_LENGTH = 4
 19 KRPC_TIMEOUT = 10
 20 REBORN_TIME = 5 * 60
 21 K = 8
 22 
 23 def entropy(bytes):
 24     s = ""
 25     for i in range(bytes):
 26         s += chr(randint(0, 255))
 27     return s
 28 
 29     # """把爬虫"伪装"成正常node, 一个正常的node有ip, port, node ID三个属性, 因为是基于UDP协议,   
 30     # 所以向对方发送信息时, 即使没"明确"说明自己的ip和port时, 对方自然会知道你的ip和port,   
 31     # 反之亦然. 那么我们自身node就只需要生成一个node ID就行, 协议里说到node ID用sha1算法生成,   
 32     # sha1算法生成的值是长度是20 byte, 也就是20 * 8 = 160 bit, 正好如DHT协议里说的那范围: 0 至 2的160次方,   
 33     # 也就是总共能生成1461501637330902918203684832716283019655932542976个独一无二的node.   
 34     # ok, 由于sha1总是生成20 byte的值, 所以哪怕你写SHA1(20)或SHA1(19)或SHA1("I am a 2B")都可以,   
 35     # 只要保证大大降低与别人重复几率就行. 注意, node ID非十六进制,   
 36     # 也就是说非FF5C85FE1FDB933503999F9EB2EF59E4B0F51ECA这个样子, 即非hash.hexdigest(). """
 37 def random_id():
 38     hash = sha1()
 39     hash.update( entropy(20) )
 40     return hash.digest()
 41 
 42 def decode_nodes(nodes):
 43     n = []
 44     length = len(nodes)
 45     if (length % 26) != 0: 
 46         return n
 47     for i in range(0, length, 26):
 48         nid = nodes[i:i+20]
 49         ip = inet_ntoa(nodes[i+20:i+24])
 50         port = unpack("!H", nodes[i+24:i+26])[0]
 51         n.append( (nid, ip, port) )
 52     return n
 53 
 54 def encode_nodes(nodes):
 55     strings = []
 56     for node in nodes:
 57         s = "%s%s%s" % (node.nid, inet_aton(node.ip), pack("!H", node.port))
 58         strings.append(s)
 59 
 60     return "".join(strings)
 61 
 62 def intify(hstr):
 63     #"""这是一个小工具, 把一个node ID转换为数字. 后面会频繁用到.""" 
 64     return long(hstr.encode('hex'), 16)    #先转换成16进制, 再变成数字
 65 
 66 def timer(t, f):
 67     Timer(t, f).start()
 68 
 69 
 70 class BucketFull(Exception):
 71     pass
 72 
 73 
 74 class KRPC(object):
 75     def __init__(self):
 76         self.types = {
 77             "r": self.response_received,
 78             "q": self.query_received
 79         }
 80         self.actions = {
 81             "ping": self.ping_received,
 82             "find_node": self.find_node_received,
 83             "get_peers": self.get_peers_received,
 84             "announce_peer": self.announce_peer_received,
 85         }
 86 
 87         self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 88         self.socket.bind(("0.0.0.0", self.port))
 89 
 90     def response_received(self, msg, address):
 91         self.find_node_handler(msg)
 92 
 93     def query_received(self, msg, address):
 94         try:
 95             self.actions[msg["q"]](msg, address)
 96         except KeyError:
 97             pass
 98 
 99     def send_krpc(self, msg, address):
100         try:
101             self.socket.sendto(bencode(msg), address)
102         except:
103             pass
104 
105 
106 class Client(KRPC):
107     def __init__(self, table):
108         self.table = table
109 
110         timer(KRPC_TIMEOUT, self.timeout)
111         timer(REBORN_TIME, self.reborn)
112         KRPC.__init__(self)
113 
114     def find_node(self, address, nid=None):
115         nid = self.get_neighbor(nid) if nid else self.table.nid
116         tid = entropy(TID_LENGTH)
117         
118         msg = {
119             "t": tid,
120             "y": "q",
121             "q": "find_node",
122             "a": {"id": nid, "target": random_id()}
123         }
124         self.send_krpc(msg, address)
125 
126     def find_node_handler(self, msg):
127         try:
128             nodes = decode_nodes(msg["r"]["nodes"])
129             for node in nodes:
130                 (nid, ip, port) = node
131                 if len(nid) != 20: continue
132                 if nid == self.table.nid: continue
133                 self.find_node( (ip, port), nid )
134         except KeyError:
135             pass
136 
137     def joinDHT(self):
138         for address in BOOTSTRAP_NODES: 
139             self.find_node(address)
140 
141     def timeout(self):
142         if len( self.table.buckets ) < 2:
143             self.joinDHT()
144         timer(KRPC_TIMEOUT, self.timeout)
145 
146     def reborn(self):
147         self.table.nid = random_id()
148         self.table.buckets = [ KBucket(0, 2**160) ]
149         timer(REBORN_TIME, self.reborn)
150 
151     def start(self):
152         self.joinDHT()
153 
154         while True:
155             try:
156                 (data, address) = self.socket.recvfrom(65536)
157                 msg = bdecode(data)
158                 self.types[msg["y"]](msg, address)
159             except Exception:
160                 pass
161 
162     def get_neighbor(self, target):
163         return target[:10]+random_id()[10:]
164 
165 
166 class Server(Client):
167     def __init__(self, master, table, port):
168         self.table = table
169         self.master = master
170         self.port = port
171         Client.__init__(self, table)
172 
173     def ping_received(self, msg, address):
174         try:
175             nid = msg["a"]["id"]
176             msg = {
177                 "t": msg["t"],
178                 "y": "r",
179                 "r": {"id": self.get_neighbor(nid)}
180             }
181             self.send_krpc(msg, address)
182             self.find_node(address, nid)
183         except KeyError:
184             pass
185 
186     def find_node_received(self, msg, address):
187         try:
188             target = msg["a"]["target"]
189             neighbors = self.table.get_neighbors(target)
190             
191             nid = msg["a"]["id"]
192             msg = {
193                 "t": msg["t"],
194                 "y": "r",
195                 "r": {
196                     "id": self.get_neighbor(target), 
197                     "nodes": encode_nodes(neighbors)
198                 }
199             }
200             self.table.append(KNode(nid, *address))
201             self.send_krpc(msg, address)
202             self.find_node(address, nid)
203         except KeyError:
204             pass
205 
206     def get_peers_received(self, msg, address):
207         try:
208             infohash = msg["a"]["info_hash"]
209 
210             neighbors = self.table.get_neighbors(infohash)
211 
212             nid = msg["a"]["id"]
213             msg = {
214                 "t": msg["t"],
215                 "y": "r",
216                 "r": {
217                     "id": self.get_neighbor(infohash), 
218                     "nodes": encode_nodes(neighbors)
219                 }
220             }
221             self.table.append(KNode(nid, *address))
222             self.send_krpc(msg, address)
223             self.master.log(infohash)
224             self.find_node(address, nid)
225         except KeyError:
226             pass
227 
228     def announce_peer_received(self, msg, address):
229         try:
230             infohash = msg["a"]["info_hash"]
231             nid = msg["a"]["id"]
232 
233             msg = { 
234                 "t": msg["t"],
235                 "y": "r",
236                 "r": {"id": self.get_neighbor(infohash)}
237             }
238 
239             self.table.append(KNode(nid, *address))
240             self.send_krpc(msg, address)
241             self.master.log(infohash)
242             self.find_node(address, nid)
243         except KeyError:
244             pass
245 # 该类只实例化一次. 
246 class KTable(object):
247     # 这里的nid就是通过node_id()函数生成的自身node ID. 协议里说道, 每个路由表至少有一个bucket,   
248  #    还规定第一个bucket的min=0, max=2^160次方, 所以这里就给予了一个buckets属性来存储bucket, 这个是列表.
249     def __init__(self, nid):
250         self.nid = nid
251         self.buckets = [ KBucket(0, 2**160) ]
252 
253     def append(self, node):
254         index = self.bucket_index(node.nid)
255         try:
256             bucket = self.buckets[index]
257             bucket.append(node)
258         except IndexError:
259             return
260         except BucketFull:
261             if not bucket.in_range(self.nid): 
262                 return
263             self.split_bucket(index)
264             self.append(node)
265 
266 
267         # 返回与目标node ID或infohash的最近K个node.  
268  
269         # 定位出与目标node ID或infohash所在的bucket, 如果该bucuck有K个节点, 返回.   
270         # 如果不够到K个节点的话, 把该bucket前面的bucket和该bucket后面的bucket加起来, 只返回前K个节点.  
271         # 还是不到K个话, 再重复这个动作. 要注意不要超出最小和最大索引范围.  
272         # 总之, 不管你用什么算法, 想尽办法找出最近的K个节点.  
273     def get_neighbors(self, target):
274         nodes = []
275         if len(self.buckets) == 0: return nodes
276         if len(target) != 20 : return nodes
277 
278         index = self.bucket_index(target)
279         try:
280             nodes = self.buckets[index].nodes
281             min = index - 1
282             max = index + 1
283 
284             while len(nodes) < K and ((min >= 0) or (max < len(self.buckets))):
285                 if min >= 0:
286                     nodes.extend(self.buckets[min].nodes)
287 
288                 if max < len(self.buckets):
289                     nodes.extend(self.buckets[max].nodes)
290 
291                 min -= 1
292                 max += 1
293 
294             num = intify(target)
295             nodes.sort(lambda a, b, num=num: cmp(num^intify(a.nid), num^intify(b.nid)))
296             return nodes[:K] #K是个常量, K=8 
297         except IndexError:
298             return nodes
299 
300     def bucket_index(self, target):
301         return bisect_left(self.buckets, intify(target))
302 
303 
304         # 拆表  
305  
306         # index是待拆分的bucket(old bucket)的所在索引值.   
307         # 假设这个old bucket的min:0, max:16. 拆分该old bucket的话, 分界点是8, 然后把old bucket的max改为8, min还是0.   
308         # 创建一个新的bucket, new bucket的min=8, max=16.  
309         # 然后根据的old bucket中的各个node的nid, 看看是属于哪个bucket的范围里, 就装到对应的bucket里.   
310         # 各回各家,各找各妈.  
311         # new bucket的所在索引值就在old bucket后面, 即index+1, 把新的bucket插入到路由表里. 
312     def split_bucket(self, index):
313         old = self.buckets[index]
314         point = old.max - (old.max - old.min)/2
315         new = KBucket(point, old.max)
316         old.max = point
317         self.buckets.insert(index + 1, new)
318         for node in old.nodes[:]:
319             if new.in_range(node.nid):
320                 new.append(node)
321                 old.remove(node)
322 
323     def __iter__(self):
324         for bucket in self.buckets:
325             yield bucket
326 
327 
328 class KBucket(object):
329     __slots__ = ("min", "max", "nodes")
330 
331     # min和max就是该bucket负责的范围, 比如该bucket的min:0, max:16的话,   
332     # 那么存储的node的intify(nid)值均为: 0到15, 那16就不负责, 这16将会是该bucket后面的bucket的min值.   
333     # nodes属性就是个列表, 存储node. last_accessed代表最后访问时间, 因为协议里说到,   
334     # 当该bucket负责的node有请求, 回应操作; 删除node; 添加node; 更新node; 等这些操作时,   
335     # 那么就要更新该bucket, 所以设置个last_accessed属性, 该属性标志着这个bucket的"新鲜程度". 用linux话来说, touch一下.  
336     # 这个用来便于后面说的定时刷新路由表.  
337 
338     def __init__(self, min, max):
339         self.min = min
340         self.max = max
341         self.nodes = []
342 
343 
344     # 添加node, 参数node是KNode实例.  
345 
346     # 如果新插入的node的nid属性长度不等于20, 终止.  
347     # 如果满了, 抛出bucket已满的错误, 终止. 通知上层代码进行拆表.  
348     # 如果未满, 先看看新插入的node是否已存在, 如果存在, 就替换掉, 不存在, 就添加,  
349     # 添加/替换时, 更新该bucket的"新鲜程度".  
350     def append(self, node):
351         if node in self:
352             self.remove(node)
353             self.nodes.append(node)
354         else:
355             if len(self) < K:
356                 self.nodes.append(node)
357             else:
358                 raise BucketFull
359 
360     def remove(self, node):
361         self.nodes.remove(node)
362 
363     def in_range(self, target):
364         return self.min <= intify(target) < self.max
365 
366     def __len__(self):
367         return len(self.nodes)
368 
369     def __contains__(self, node):
370         return node in self.nodes
371 
372     def __iter__(self):
373         for node in self.nodes:
374             yield node
375 
376     def __lt__(self, target):
377         return self.max <= target
378 
379 
380 class KNode(object):
381      # """  
382   #       nid就是node ID的简写, 就不取id这么模糊的变量名了. __init__方法相当于别的OOP语言中的构造方法,   
383   #       在python严格来说不是构造方法, 它是初始化, 不过, 功能差不多就行.  
384   #       """ 
385     __slots__ = ("nid", "ip", "port")
386     
387     def __init__(self, nid, ip, port):
388         self.nid = nid
389         self.ip = ip
390         self.port = port
391 
392     def __eq__(self, other):
393         return self.nid == other.nid
394 
395 
396 
397 #using example
398 class Master(object):
399     def __init__(self, f):
400         self.f = f
401 
402     def log(self, infohash):
403         self.f.write(infohash.encode("hex")+"\n")
404         self.f.flush()
405 try:
406     f = open("infohash.log", "a")
407     m = Master(f)
408     s = Server(Master(f), KTable(random_id()), 8001)
409     s.start()     
410 except KeyboardInterrupt:
411     s.socket.close()
412     f.close()

 

种子从迅雷下,初期为学习从http://torrage.com/sync/下的infohash,去重用了别人写的Bloom Filter算法,数据库用Mysql,建表语句如下,其中uinthash是根据infohash的头四个字节和最后四个字节组成的一个int整数,先这样设计,看后期查询的时候用得到不,总觉得用infohash来查很慢

 1 CREATE TABLE `torrentinfo` (
 2   `id` int(11) NOT NULL AUTO_INCREMENT,
 3   `infohash` char(40) NOT NULL DEFAULT '',
 4   `filename` varchar(128) DEFAULT NULL,
 5   `filelength` bigint(11) DEFAULT NULL,
 6   `recvtime` datetime DEFAULT NULL,
 7   `filecontent` text,
 8   `uinthash` int(11) unsigned NOT NULL DEFAULT '0',
 9   PRIMARY KEY (`id`),
10   KEY `uinthash_index` (`uinthash`)
11 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

 

Thunder.py

  1 # _*_ coding: utf-8 _*_
  2 import socket
  3 import os,glob
  4 import time as time_p
  5 import requests
  6 from bencode import bdecode, BTL
  7 from torrent import *
  8 import threading, signal
  9 import MySQLdb
 10 from BloomFilter import *
 11 
 12 class Thunder(object):
 13     def __init__(self):
 14         self.connstr={'host':'127.0.0.1','user':'root','passwd':'123456','port':3306,'charset':"UTF8"}
 15     def download(self, infohash):
 16         try:
 17             tc = self._download(infohash)
 18             if(tc==-1):
 19                 return
 20             tc = bdecode(tc)
 21             info = torrentInfo(tc)
 22             # print info['name']
 23             # print info['length']
 24             # print info['files']
 25             uint=int(infohash[:4]+infohash[-4:],16)
 26             time_now=time_p.strftime('%Y-%m-%d %H:%M:%S',time_p.localtime(time_p.time()))
 27             sql="insert into torrentinfo(infohash,filename,filelength,recvtime,filecontent,uinthash) values('%s','%s','%d','%s','%s','%d')"%(infohash,MySQLdb.escape_string(info['name']),info['length'],time_now,MySQLdb.escape_string(info['files']),uint)
 28             self.executeSQL(sql)
 29         except Exception,e:
 30             print  e
 31             pass
 32 
 33     def openConnection(self):
 34         try:
 35             self.conn=MySQLdb.connect(**self.connstr)
 36             self.cur=self.conn.cursor()
 37             self.conn.select_db('dht')
 38         except MySQLdb.Error,e:
 39             print 'mysql error %d:%s'%(e.args[0],e.args[1])
 40 
 41 
 42     def executeSQL(self,sql):
 43         try:
 44             self.cur.execute(sql)
 45             self.conn.commit()
 46         except MySQLdb.Error,e:
 47             print 'mysql error %d:%s'%(e.args[0],e.args[1])
 48     def closeConnection(self):
 49         try:
 50             self.cur.close()
 51             self.conn.close()
 52         except MySQLdb.Error,e:
 53             print 'mysql error %d:%s'%(e.args[0],e.args[1])    
 54 
 55     def _download(self, infohash):
 56         infohash = infohash.upper()
 57         start = infohash[0:2]
 58         end = infohash[-2:]
 59         url = "http://bt.box.n0808.com/%s/%s/%s.torrent" % (start, end, infohash)
 60         headers = {
 61             "Referer": "http://bt.box.n0808.com"
 62         }
 63         try:
 64             r = requests.get(url, headers=headers, timeout=10)
 65             if r.status_code == 200:
 66                 # f=open("d:\\"+infohash+'.torrent','wb')
 67                 # f.write(r.content)
 68                 # f.close()
 69                 return r.content
 70         except (socket.timeout, requests.exceptions.Timeout), e:
 71             pass
 72         return -1
 73 
 74 class torrentBean(object):
 75     """docstring for torrentBean"""
 76     __slots__=('infohash','filename','recvtime','filecontent','uinthash')
 77 
 78     def __init__(self, infohash,filename,recvtime,filecontent,uinthash):
 79         super(torrentBean, self).__init__()
 80         self.infohash = infohash
 81         self.filename = filename
 82         self.recvtime = recvtime
 83         self.filecontent = filecontent
 84         self.uinthash = uinthash
 85 
 86 
 87 bf = BloomFilter(0.001, 1000000)
 88 a=Thunder()
 89 a.openConnection()
 90 # info_hash="a02d2735e6e1daa6f7d58f21bd7340a7b7c4b7a5"
 91 # info_hash='cf3a6a4f07da0b90beddae838462ca0012bef285'
 92 # a.download('cf3a6a4f07da0b90beddae838462ca0012bef285')
 93 
 94 
 95 files=glob.glob('./*.txt')
 96 for fl in files:
 97     print os.path.basename(fl)
 98     f=open(fl,'r')
 99     for line in f:
100         infohash=line.strip('\n')
101         if not bf.is_element_exist(infohash):
102             bf.insert_element(infohash)
103             a.download(infohash)
104 a.closeConnection()

 

  torrent种子文件经过bencode解析,获取key为info对应value值,种子大致的格式如下,有乱码,不影响观看

{
    'files': [{
        'path': ['PGD660.avi'],
        'length': 1367405512,
        'filehash': 'J\xef\xfe\xb3K\xd4g\x8d\x07m\x03\xbb\xb3\xadt\xa1\xa0\xf0\xec\xab',
        'ed2k': '/\xfb\xe55#n\xbd1\xb6\x1c\x0f\xf3\xe4\x9dP\xfb',
        'path.utf-8': ['PGD660.avi']
    }, {
        'path': ['PGD660B.jpg'],
        'length': 135899,
        'filehash': '*$O\x17w\xe9E\x95>O\x1f\xfb\x0e\x9b\x16\x15B\\Q\x9d',
        'ed2k': 'T/L*\xbb\x8e.\xe2d\xddu\nR\x07\xca\x19',
        'path.utf-8': ['PGD660B.jpg']
    }, {
        'path': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba@\xe6\x9c\x80\xe6\x96\xb0\xe5\x9c\xb0\xe5\x9d\x80.mht'],
        'length': 472,
        'filehash': '&\xa92\xb7\xdd8\xeel3\xcc-S\x07\xb5e\xd35\xc0\xb7r',
        'ed2k': '\x13\xd2 a\x0cA\xb4\xf2X\x12\xea\xd4\xe8\xac`\x92',
        'path.utf-8': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba@\xe6\x9c\x80\xe6\x96\xb0\xe5\x9c\xb0\xe5\x9d\x80.mht']
    }, {
        'path': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba\xe5\xae\xa3\xe4\xbc\xa0.txt'],
        'length': 363,
        'filehash': '\x96nA*\xe2\xb6Y+[\xe3\xaf\xd4\x14A\x94\xf5@\xcd\xc1\x91',
        'ed2k': '8V\xa6X\xd9\x82l\xdbNO8\xe8D\xe9E\xed',
        'path.utf-8': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba\xe5\xae\xa3\xe4\xbc\xa0.txt']
    }, {
        'path': ['\xe2\x98\x85\xe5\xb0\x91\xe5\xa6\x87 \xe8\xae\xba\xe5\x9d\x9b \xe9\x99\x90\xe9\x87\x8f\xe5\xbc\x80\xe6\x94\xbe\xe4\xb8\xad\xe3\x80\x82\xe3\x80\x82.mht'],
        'length': 475,
        'filehash': '\xec\xde\xeb-6\x86\x1avB\xdd\xd8q\x8b\x8f\xc06\xf0XX\x0e',
        'ed2k': '\xa7\x8dU\xfd\xfc=\x12\x15>yE\x8f&A\xc2u',
        'path.utf-8': ['\xe2\x98\x85\xe5\xb0\x91\xe5\xa6\x87 \xe8\xae\xba\xe5\x9d\x9b \xe9\x99\x90\xe9\x87\x8f\xe5\xbc\x80\xe6\x94\xbe\xe4\xb8\xad\xe3\x80\x82\xe3\x80\x82.mht']
    }, {
        'path': ['\xe6\x9f\x8f\xe6\x8b\x89\xe5\x9c\x96\xe7\xa7\x98\xe5\xaf\x86\xe8\x8a\xb1\xe5\x9c\x92.mht'],
        'length': 478,
        'filehash': "\xe4\xb5'Td\x0b=P\xc0\x9aG\xa2\xd7\xfapg\xc6.\x8e\xa7",
        'ed2k': '\xdd\x8d\xbb\x0b\x04\xcb\x03O\xb1\x18"\x03\xb1\x1d\xba\x08',
        'path.utf-8': ['\xe6\x9f\x8f\xe6\x8b\x89\xe5\x9c\x96\xe7\xa7\x98\xe5\xaf\x86\xe8\x8a\xb1\xe5\x9c\x92.mht']
    }, {
        'path': ['\xe7\xbe\x8e\xe5\xa5\xb3\xe4\xb8\x8a\xe9\x96\x80\xe6\x8f\xb4\xe4\xba\xa4\xe6\x9c\x8d\xe5\x8b\x99.mht'],
        'length': 478,
        'filehash': "\xe4\xb5'Td\x0b=P\xc0\x9aG\xa2\xd7\xfapg\xc6.\x8e\xa7",
        'ed2k': '\xdd\x8d\xbb\x0b\x04\xcb\x03O\xb1\x18"\x03\xb1\x1d\xba\x08',
        'path.utf-8': ['\xe7\xbe\x8e\xe5\xa5\xb3\xe4\xb8\x8a\xe9\x96\x80\xe6\x8f\xb4\xe4\xba\xa4\xe6\x9c\x8d\xe5\x8b\x99.mht']
    }],
    'publisher': 'yoy123',
    'piece length': 524288,
    'name': 'PGD660 \xe6\x83\xb3\xe8\xa9\xa6\xe8\x91\x97\xe5\x85\xa8\xe5\x8a\x9b\xe6\x93\x8d\xe6\x93\x8d\xe7\x9c\x8b\xe9\x80\x99\xe5\x80\x8b\xe6\xb7\xab\xe8\x95\xa9\xe7\xbe\x8e\xe5\xa5\xb3\xe5\x97\x8e \xe5\xb0\x8f\xe5\xb7\x9d\xe3\x81\x82\xe3\x81\x95\xe7\xbe\x8e',
    'publisher.utf-8': 'yoy123',
}

解析代码torrent.py

 1 # _*_ coding: utf-8 _*_
 2 from time import time
 3 
 4 def torrentInfo(torrentContent):
 5     metadata = torrentContent["info"]
 6     print metadata
 7     info = {
 8         "name": getName(metadata),
 9         "length": calcLength(metadata),
10         "timestamp": getCreateDate(torrentContent),
11         "files": extraFiles(metadata)
12     }
13     return info
14 
15 def calcLength(metadata):
16     length = 0
17     try:
18         length = metadata["length"]
19     except KeyError:
20         try:
21             for file in metadata["files"]:
22                 length += file["length"]
23         except KeyError:
24             pass
25     return length
26 
27 def extraFiles(metadata):
28     files = []
29     try:
30         for file in metadata["files"]:
31             path = file["path.utf-8"]
32             size=file['length']
33             if len(path) > 1:
34                 main = path[0]
35                 for f in path[1:2]:
36                     files.append("%s/%s    %d bytes" % (main, f,size))
37             else:
38                 files.append("%s    %d bytes" % (path[0],size) )
39         if files:
40             return '\r\n'.join(files)
41         else:
42             return getName(metadata)
43     except KeyError:
44         return getName(metadata)
45 
46 def getName(metadata):
47     try:
48         name = metadata["name.utf-8"]
49         if name.strip()=="": 
50                 raise KeyError
51     except KeyError:
52         try:
53             name = metadata["name"]
54             if name.strip()=="": 
55                 raise KeyError
56         except KeyError:
57             name = getMaxFile(metadata)
58 
59     return name
60 def getMaxFile(metadata):
61     try:
62         maxFile = metadata["files"][0]
63         for file in metadata["files"]:
64             if file["length"] > maxFile["length"]:
65                 maxFile = file
66         name = maxFile["path"][0]
67         return name
68     except KeyError:
69         return ""
70 
71 def getCreateDate(torrentContent):
72     try:
73         timestamp = torrentContent["creation date"]
74     except KeyError:
75         timestamp = int( time() )
76     return timestamp

 

  最后还有别人写的BloomFilter代码

  1 #encoding: utf-8
  2 '''
  3 Created on 2012-11-7
  4 
  5 @author: palydawn
  6 '''
  7 import cmath
  8 from BitVector import BitVector
  9 
 10 class BloomFilter(object):
 11     def __init__(self, error_rate, elementNum):
 12         #计算所需要的bit数
 13         self.bit_num = -1 * elementNum * cmath.log(error_rate) / (cmath.log(2.0) * cmath.log(2.0))
 14         
 15         #四字节对齐
 16         self.bit_num = self.align_4byte(self.bit_num.real)
 17         
 18         #分配内存
 19         self.bit_array = BitVector(size=self.bit_num)
 20         
 21         #计算hash函数个数
 22         self.hash_num = cmath.log(2) * self.bit_num / elementNum
 23         
 24         self.hash_num = self.hash_num.real
 25         
 26         #向上取整
 27         self.hash_num = int(self.hash_num) + 1
 28         
 29         #产生hash函数种子
 30         self.hash_seeds = self.generate_hashseeds(self.hash_num)
 31         
 32     def insert_element(self, element):
 33         for seed in self.hash_seeds:
 34             hash_val = self.hash_element(element, seed)
 35             #取绝对值
 36             hash_val = abs(hash_val)
 37             #取模,防越界
 38             hash_val = hash_val % self.bit_num
 39             #设置相应的比特位
 40             self.bit_array[hash_val] = 1
 41     
 42     #检查元素是否存在,存在返回true,否则返回false 
 43     def is_element_exist(self, element):
 44         for seed in self.hash_seeds:
 45             hash_val = self.hash_element(element, seed)
 46             #取绝对值
 47             hash_val = abs(hash_val)
 48             #取模,防越界
 49             hash_val = hash_val % self.bit_num
 50             
 51             #查看值
 52             if self.bit_array[hash_val] == 0:
 53                 return False
 54         return True
 55         
 56     #内存对齐    
 57     def align_4byte(self, bit_num):
 58         num = int(bit_num / 32)
 59         num = 32 * (num + 1)
 60         return num
 61     
 62     #产生hash函数种子,hash_num个素数
 63     def generate_hashseeds(self, hash_num):
 64         count = 0
 65         #连续两个种子的最小差值
 66         gap = 50
 67         #初始化hash种子为0
 68         hash_seeds = []
 69         for index in xrange(hash_num):
 70             hash_seeds.append(0)
 71         for index in xrange(10, 10000):
 72             max_num = int(cmath.sqrt(1.0 * index).real)
 73             flag = 1
 74             for num in xrange(2, max_num):
 75                 if index % num == 0:
 76                     flag = 0
 77                     break
 78             
 79             if flag == 1:
 80                 #连续两个hash种子的差值要大才行
 81                 if count > 0 and (index - hash_seeds[count - 1]) < gap:
 82                     continue
 83                 hash_seeds[count] = index
 84                 count = count + 1
 85             
 86             if count == hash_num:
 87                 break
 88         return hash_seeds
 89     
 90     def hash_element(self, element, seed):
 91         hash_val = 1
 92         for ch in str(element):
 93             chval = ord(ch)
 94             hash_val = hash_val * seed + chval
 95         return hash_val
 96 
 97 
 98     def SaveBitToFile(self,f):
 99         self.bit_array.write_bits_to_fileobject(f)
100         pass

 

表内容见下图 

 开博第一篇:DHT 爬虫的学习记录