经过一段时间的研究和学习,大致了解了DHT网络的一些信息,大部分还是参会别人的相关代码,一方面主要对DHT爬虫原理感兴趣,最主要的是为了学习python,大部分是别人的东西原理还是引用别人的吧
DHT网络爬虫的实现 | 学步园 http://www.xuebuyuan.com/1287052.html
DHT协议原理以及一些重点分析:
要做DHT的爬虫,首先得透彻理解DHT,这样才能知道在什么地方究竟该应用什么算法去解决问题。关于DHT协议的细节以及重要的参考文章,请参考文末1
DHT协议作为BT协议的一个辅助,是非常好玩的。它主要是为了在BT正式下载时得到种子或者BT资源。传统的网络,需要一台中央服务器存放种子或者BT资源,不仅浪费服务器资源,还容易出现单点的各种问题,而DHT网络则是为了去中心化,也就是说任意时刻,这个网络总有节点是亮的,你可以去询问问这些亮的节点,从而将自己加入DHT网络。
要实现DHT协议的网络爬虫,主要分3步,第一步是得到资源信息(infohash,160bit,20字节,可以编码为40字节的十六进制字符串),第二步是确认这些infohash是有效的,第三步是通过有效的infohash下载到BT的种子文件,从而得到对这个资源的完整描述。
其中第一步是其他节点用DHT协议中的get_peers方法向爬虫发送请求得到的,第二步是其他节点用DHT协议中的announce_peer向爬虫发送请求得到的,第三步可以有几种方式得到,比如可以去一些保存种子的网站根据infohash直接下载到,或者通过announce_peer的节点来下载到,具体如何实现,可以取决于你自己的爬虫。
DHT协议中的主要几个操作:
主要负责通过UDP与外部节点交互,封装4种基本操作的请求以及相应。
ping:检查一个节点是否“存活”
在一个爬虫里主要有两个地方用到ping,第一是初始路由表时,第二是验证节点是否存活时
find_node:向一个节点发送查找节点的请求
在一个爬虫中主要也是两个地方用到find_node,第一是初始路由表时,第二是验证桶是否存活时
get_peers:向一个节点发送查找资源的请求
在爬虫中有节点向自己请求时不仅像个正常节点一样做出回应,还需要以此资源的info_hash为机会尽可能多的去认识更多的节点。如图,get_peers实际上最后一步是announce_peer,但是因为爬虫不能announce_peer,所以实际上get_peers退化成了find_node操作。
announce_peer:向一个节点发送自己已经开始下载某个资源的通知
爬虫中不能用announce_peer,因为这就相当于通报虚假资源,对方很容易从上下文中判断你是否通报了虚假资源从而把你禁掉
DHT协议中有几个重点的需要澄清的地方:
1. node与infohash同样使用160bit的表示方式,160bit意味着整个节点空间有2^160 = 730750818665451459101842416358141509827966271488,是48位10进制,也就是说有百亿亿亿亿亿个节点空间,这么大的节点空间,是足够存放你的主机节点以及任意的资源信息的。
2. 每个节点有张路由表。每张路由表由一堆K桶组成,所谓K桶,就是桶中最多只能放K个节点,默认是8个。而桶的保存则是类似一颗前缀树的方式。相当于一张8桶的路由表中最多有160-4个K桶。
3. 根据DHT协议的规定,每个infohash都是有位置的,因此,两个infohash之间就有距离一说,而两个infohash的距离就可以用异或来表示,即infohash1 xor infohash2,也就是说,高位一样的话,他们的距离就近,反之则远,这样可以快速的计算两个节点的距离。计算这个距离有什么用呢,在DHT网络中,如果一个资源的infohash与一个节点的infohash越近则该节点越有可能拥有该资源的信息,为什么呢?可以想象,因为人人都用同样的距离算法去递归的询问离资源接近的节点,并且只要该节点做出了回应,那么就会得到一个announce信息,也就是说跟资源infohash接近的节点就有更大的概率拿到该资源的infohash
4. 根据上述算法,DHT中的查询是跳跃式查询,可以迅速的跨越的的节点桶而接近目标节点桶。之所以在远处能够大幅度跳跃,而在近处只能小幅度跳跃,原因是每个节点的路由表中离自身越接近的节点保存得越多,如下图
5. 在一个DHT网络中当爬虫并不容易,不像普通爬虫一样,看到资源就可以主动爬下来,相反,因为得到资源的方式(get_peers, announce_peer)都是被动的,所以爬虫的方式就有些变化了,爬虫所要做的事就是像个正常节点一样去响应其他节点的查询,并且得到其他节点的回应,把其中的数据收集下来就算是完成工作了。而爬虫唯一能做的,是尽可能的去多认识其他节点,这样,才能有更多其他节点来向你询问。
6. 有人说,那么我把DHT爬虫的K桶中的容量K增大是不是就能增加得到资源的机会,其实不然,之前也分析过了,DHT爬虫最重要的信息来源全是被动的,因为你不能增大别人的K,所以距离远的节点保存你自身的概率就越小,当然距离远的节点去请求你的概率相对也比较小。
一些主要的组件(实际实现更加复杂一些,有其他的模块,这里仅列举主要几个):
DHT crawler:
这个就是DHT爬虫的主逻辑,为了简化多线程问题,跟server用了生产者消费者模型,负责消费,并且复用server的端口。
主要任务就是负责初始化,包括路由表的初始化,以及初始的请求。另外负责处理所有进来的消息事件,由于生产者消费者模型的使用,里面的操作都基本上是单线程的,简化了不少问题,而且相信也比上锁要提升速度(当然了,加锁这步按理是放到了queue这里了,不过对于这种生产者源源不断生产的类型,可以用ring-buffer大幅提升性能)。
DHT server:
这里是DHT爬虫的服务器端,DHT网络中的节点不单是client,也是server,所以要有server担当生产者的角色,最初也是每个消费者对应一个生产者,但实际上发现可以利用IO多路复用来达到消息事件的目的,这样一来大大简化了系统中线程的数量,如果client可以的话,也应该用同样的方式来组织,这样系统的速度应该会快很多。(尚未验证)
DHT route table:
主要负责路由表的操作。
路由表有如下操作:
init:刚创建路由表时的操作。分两种情况:
1. 如果之前已经初始化过,并且将上次路由表的数据保存下来,则只需要读入保存数据。
2. 如果之前没有初始化过,则首先应当初始化。
首先,应当有一个接入点,也就是说,你要想加进这个网络,必须认识这个网络中某个节点i并将i加入路由表,接下来对i用find_node询问自己的hash_info,这里巧妙的地方就在于,理论上通过一定数量的询问就会找到离自己距离很近的节点(也就是经过一定步骤就会收敛)。find_node目的在于尽可能早的让自己有数据,并且让网络上别的节点知道自己,如果别人不认识你,就不会发送消息过来,意味着你也不能获取到想要的信息。
search:比较重要的方法,主要使用它来定位当前infohash所在的桶的位置。会被其他各种代理方法调用到。
findNodes:找到路由表中与传入的infohash最近的k个节点
getPeer:找到待查资源是否有peer(即是否有人在下载,也就是是否有人announce过)
announcePeer:通知该资源正在被下载
DHT bucket:
acitiveNode:逻辑比较多,分如下几点。
1. 查找所要添加的节点对应路由表的桶是否已经满,如果未满,添加节点
2. 如果已经满,检查该桶中是否包含爬虫节点自己,如果不包含,抛弃待添加节点
3. 如果该桶中包含本节点,则平均分裂该桶
其他的诸如locateNode,
replaceNode, updateNode,
removeNode,就不一一说明了
DHT torrent parser:
主要从bt种子文件中解析出以下几个重要的信息:name,size,file list(sub file name, sub file size),比较简单,用bencode方向解码就行了
Utils:
distance:计算两个资源之间的距离。在kad中用a xor b表示
为了增加难度,选用了不太熟悉的语言python,结果步步为营,但是也感慨python的简洁强大。在实现中,也碰到很多有意思的问题。比如如何保存一张路由表中的所有桶,之前想出来几个办法,甚至为了节省资源,打算用bit数组+dict直接保存,但是因为估计最终的几个操作不是很方便直观容易出错而放弃,选用的结构就是前缀树,操作起来果然是没有障碍;
在超时问题上,比如桶超时和节点超时,一直在思考一个高效但是比较优雅的做法,可以用一个同步调用然后等待它的超时,但是显然很低效,尤其我没有用更多线程的情况,一旦阻塞了就等于该端口所有事件都被阻塞了。所以必须用异步操作,但是异步操作很难去控制它的精确事件,当然,我可以在每个事件来的时候检查一遍是否超时,但是显然也是浪费和低效。那么,剩下的只有采用跟tomcat类似的方式了,增加一个线程来监控,当然,这个监控线程最好是全局的,能监控所有crawler中所有事务的超时。另外,超时如果控制不当,容易导致内存没有回收以至于内存泄露,也值得注意。超时线程是否会与其他线程互相影响也应当仔细检查。
最初超时的控制没处理好,出现了ping storm,运行一定时间后大多数桶已经满了,如果按照协议中的方式去跑的话会发现大量的事件都是在ping以确认这个节点是否ok以至于大量的cpu用于处理ping和ping响应。深入理解后发现,检查节点状态是不需要的,因为节点状态只是为了提供给询问的人一些好的节点,既然如此,可以将每次过来的节点替换当前桶中最老的节点,如此一来,我们将总是保存着最新的节点。
搜索算法也是比较让我困惑的地方,简而言之,搜索的目的并不是真正去找资源,而是去认识那些能够保存你的节点。为什么说是能够保存你,因为离你越远,桶的数量越少,这样一来,要想进他们的桶中去相对来说就比较困难,所以搜索的目标按理应该是附近的节点最好,但是不能排除远方节点也可能保存你的情况,这种情况会发生在远方节点初始化时或者远方节点的桶中节点超时的时候,但总而言之,概率要小些。所以搜索算法也不应该不做判断就胡乱搜索,但是也不应该将搜索的距离严格限制在附近,所以这是一个权衡问题,暂时没有想到好的方式,觉得暂时让距离远的以一定概率发生,而距离近的必然发生
还有一点,就是搜索速度问题,因为DHT网络的这种结构,决定了一个节点所认识的其他节点必然是有限的附近节点,于是每个节点在一定时间段内能拿到的资源数必然是有限的,所以应当分配多个节点同时去抓取,而抓取资源的数量很大程度上就跟分配节点的多少有关了。
最后一个值得优化的地方是findnodes方法,之前的方式是把一个桶中所有数据拿出来排序,然后取其中前K个返回回去,但是实际上我们做了很多额外的工作,这是经典的topN问题,使用排序明显是浪费时间的,因为这个操作非常频繁,所以即便所有保存的节点加起来很少((160 - 4) * 8),也会一定程度上增加时间。而采用的算法是在一篇论文《可扩展的DHT网络爬虫设计和优化》中找到的,基本公式是IDi = IDj xor 2 ^(160 - i),这样,已知IDi和i就能知道IDj,若已知IDi和IDj就能知道i,通过这种方式,可以快速的查找该桶A附近的其他桶(显然是离桶A层次最近的桶中的节点距离A次近),比起全部遍历再查找效率要高不少。
dht协议http://www.bittorrent.org/beps/bep_0005.html 及其翻译http://gobismoon.blog.163.com/blog/static/5244280220100893055533/
爬虫源码参考别人的,非原创,只为学习
1 #encoding: utf-8 2 3 from hashlib import sha1 4 from random import randint 5 from struct import unpack, pack 6 from socket import inet_aton, inet_ntoa 7 from bisect import bisect_left 8 from threading import Timer 9 from time import sleep 10 11 from bencode import bencode, bdecode 12 13 BOOTSTRAP_NODES = [ 14 ("router.bittorrent.com", 6881), 15 ("dht.transmissionbt.com", 6881), 16 ("router.utorrent.com", 6881) 17 ] 18 TID_LENGTH = 4 19 KRPC_TIMEOUT = 10 20 REBORN_TIME = 5 * 60 21 K = 8 22 23 def entropy(bytes): 24 s = "" 25 for i in range(bytes): 26 s += chr(randint(0, 255)) 27 return s 28 29 # """把爬虫"伪装"成正常node, 一个正常的node有ip, port, node ID三个属性, 因为是基于UDP协议, 30 # 所以向对方发送信息时, 即使没"明确"说明自己的ip和port时, 对方自然会知道你的ip和port, 31 # 反之亦然. 那么我们自身node就只需要生成一个node ID就行, 协议里说到node ID用sha1算法生成, 32 # sha1算法生成的值是长度是20 byte, 也就是20 * 8 = 160 bit, 正好如DHT协议里说的那范围: 0 至 2的160次方, 33 # 也就是总共能生成1461501637330902918203684832716283019655932542976个独一无二的node. 34 # ok, 由于sha1总是生成20 byte的值, 所以哪怕你写SHA1(20)或SHA1(19)或SHA1("I am a 2B")都可以, 35 # 只要保证大大降低与别人重复几率就行. 注意, node ID非十六进制, 36 # 也就是说非FF5C85FE1FDB933503999F9EB2EF59E4B0F51ECA这个样子, 即非hash.hexdigest(). """ 37 def random_id(): 38 hash = sha1() 39 hash.update( entropy(20) ) 40 return hash.digest() 41 42 def decode_nodes(nodes): 43 n = [] 44 length = len(nodes) 45 if (length % 26) != 0: 46 return n 47 for i in range(0, length, 26): 48 nid = nodes[i:i+20] 49 ip = inet_ntoa(nodes[i+20:i+24]) 50 port = unpack("!H", nodes[i+24:i+26])[0] 51 n.append( (nid, ip, port) ) 52 return n 53 54 def encode_nodes(nodes): 55 strings = [] 56 for node in nodes: 57 s = "%s%s%s" % (node.nid, inet_aton(node.ip), pack("!H", node.port)) 58 strings.append(s) 59 60 return "".join(strings) 61 62 def intify(hstr): 63 #"""这是一个小工具, 把一个node ID转换为数字. 后面会频繁用到.""" 64 return long(hstr.encode('hex'), 16) #先转换成16进制, 再变成数字 65 66 def timer(t, f): 67 Timer(t, f).start() 68 69 70 class BucketFull(Exception): 71 pass 72 73 74 class KRPC(object): 75 def __init__(self): 76 self.types = { 77 "r": self.response_received, 78 "q": self.query_received 79 } 80 self.actions = { 81 "ping": self.ping_received, 82 "find_node": self.find_node_received, 83 "get_peers": self.get_peers_received, 84 "announce_peer": self.announce_peer_received, 85 } 86 87 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 88 self.socket.bind(("0.0.0.0", self.port)) 89 90 def response_received(self, msg, address): 91 self.find_node_handler(msg) 92 93 def query_received(self, msg, address): 94 try: 95 self.actions[msg["q"]](msg, address) 96 except KeyError: 97 pass 98 99 def send_krpc(self, msg, address): 100 try: 101 self.socket.sendto(bencode(msg), address) 102 except: 103 pass 104 105 106 class Client(KRPC): 107 def __init__(self, table): 108 self.table = table 109 110 timer(KRPC_TIMEOUT, self.timeout) 111 timer(REBORN_TIME, self.reborn) 112 KRPC.__init__(self) 113 114 def find_node(self, address, nid=None): 115 nid = self.get_neighbor(nid) if nid else self.table.nid 116 tid = entropy(TID_LENGTH) 117 118 msg = { 119 "t": tid, 120 "y": "q", 121 "q": "find_node", 122 "a": {"id": nid, "target": random_id()} 123 } 124 self.send_krpc(msg, address) 125 126 def find_node_handler(self, msg): 127 try: 128 nodes = decode_nodes(msg["r"]["nodes"]) 129 for node in nodes: 130 (nid, ip, port) = node 131 if len(nid) != 20: continue 132 if nid == self.table.nid: continue 133 self.find_node( (ip, port), nid ) 134 except KeyError: 135 pass 136 137 def joinDHT(self): 138 for address in BOOTSTRAP_NODES: 139 self.find_node(address) 140 141 def timeout(self): 142 if len( self.table.buckets ) < 2: 143 self.joinDHT() 144 timer(KRPC_TIMEOUT, self.timeout) 145 146 def reborn(self): 147 self.table.nid = random_id() 148 self.table.buckets = [ KBucket(0, 2**160) ] 149 timer(REBORN_TIME, self.reborn) 150 151 def start(self): 152 self.joinDHT() 153 154 while True: 155 try: 156 (data, address) = self.socket.recvfrom(65536) 157 msg = bdecode(data) 158 self.types[msg["y"]](msg, address) 159 except Exception: 160 pass 161 162 def get_neighbor(self, target): 163 return target[:10]+random_id()[10:] 164 165 166 class Server(Client): 167 def __init__(self, master, table, port): 168 self.table = table 169 self.master = master 170 self.port = port 171 Client.__init__(self, table) 172 173 def ping_received(self, msg, address): 174 try: 175 nid = msg["a"]["id"] 176 msg = { 177 "t": msg["t"], 178 "y": "r", 179 "r": {"id": self.get_neighbor(nid)} 180 } 181 self.send_krpc(msg, address) 182 self.find_node(address, nid) 183 except KeyError: 184 pass 185 186 def find_node_received(self, msg, address): 187 try: 188 target = msg["a"]["target"] 189 neighbors = self.table.get_neighbors(target) 190 191 nid = msg["a"]["id"] 192 msg = { 193 "t": msg["t"], 194 "y": "r", 195 "r": { 196 "id": self.get_neighbor(target), 197 "nodes": encode_nodes(neighbors) 198 } 199 } 200 self.table.append(KNode(nid, *address)) 201 self.send_krpc(msg, address) 202 self.find_node(address, nid) 203 except KeyError: 204 pass 205 206 def get_peers_received(self, msg, address): 207 try: 208 infohash = msg["a"]["info_hash"] 209 210 neighbors = self.table.get_neighbors(infohash) 211 212 nid = msg["a"]["id"] 213 msg = { 214 "t": msg["t"], 215 "y": "r", 216 "r": { 217 "id": self.get_neighbor(infohash), 218 "nodes": encode_nodes(neighbors) 219 } 220 } 221 self.table.append(KNode(nid, *address)) 222 self.send_krpc(msg, address) 223 self.master.log(infohash) 224 self.find_node(address, nid) 225 except KeyError: 226 pass 227 228 def announce_peer_received(self, msg, address): 229 try: 230 infohash = msg["a"]["info_hash"] 231 nid = msg["a"]["id"] 232 233 msg = { 234 "t": msg["t"], 235 "y": "r", 236 "r": {"id": self.get_neighbor(infohash)} 237 } 238 239 self.table.append(KNode(nid, *address)) 240 self.send_krpc(msg, address) 241 self.master.log(infohash) 242 self.find_node(address, nid) 243 except KeyError: 244 pass 245 # 该类只实例化一次. 246 class KTable(object): 247 # 这里的nid就是通过node_id()函数生成的自身node ID. 协议里说道, 每个路由表至少有一个bucket, 248 # 还规定第一个bucket的min=0, max=2^160次方, 所以这里就给予了一个buckets属性来存储bucket, 这个是列表. 249 def __init__(self, nid): 250 self.nid = nid 251 self.buckets = [ KBucket(0, 2**160) ] 252 253 def append(self, node): 254 index = self.bucket_index(node.nid) 255 try: 256 bucket = self.buckets[index] 257 bucket.append(node) 258 except IndexError: 259 return 260 except BucketFull: 261 if not bucket.in_range(self.nid): 262 return 263 self.split_bucket(index) 264 self.append(node) 265 266 267 # 返回与目标node ID或infohash的最近K个node. 268 269 # 定位出与目标node ID或infohash所在的bucket, 如果该bucuck有K个节点, 返回. 270 # 如果不够到K个节点的话, 把该bucket前面的bucket和该bucket后面的bucket加起来, 只返回前K个节点. 271 # 还是不到K个话, 再重复这个动作. 要注意不要超出最小和最大索引范围. 272 # 总之, 不管你用什么算法, 想尽办法找出最近的K个节点. 273 def get_neighbors(self, target): 274 nodes = [] 275 if len(self.buckets) == 0: return nodes 276 if len(target) != 20 : return nodes 277 278 index = self.bucket_index(target) 279 try: 280 nodes = self.buckets[index].nodes 281 min = index - 1 282 max = index + 1 283 284 while len(nodes) < K and ((min >= 0) or (max < len(self.buckets))): 285 if min >= 0: 286 nodes.extend(self.buckets[min].nodes) 287 288 if max < len(self.buckets): 289 nodes.extend(self.buckets[max].nodes) 290 291 min -= 1 292 max += 1 293 294 num = intify(target) 295 nodes.sort(lambda a, b, num=num: cmp(num^intify(a.nid), num^intify(b.nid))) 296 return nodes[:K] #K是个常量, K=8 297 except IndexError: 298 return nodes 299 300 def bucket_index(self, target): 301 return bisect_left(self.buckets, intify(target)) 302 303 304 # 拆表 305 306 # index是待拆分的bucket(old bucket)的所在索引值. 307 # 假设这个old bucket的min:0, max:16. 拆分该old bucket的话, 分界点是8, 然后把old bucket的max改为8, min还是0. 308 # 创建一个新的bucket, new bucket的min=8, max=16. 309 # 然后根据的old bucket中的各个node的nid, 看看是属于哪个bucket的范围里, 就装到对应的bucket里. 310 # 各回各家,各找各妈. 311 # new bucket的所在索引值就在old bucket后面, 即index+1, 把新的bucket插入到路由表里. 312 def split_bucket(self, index): 313 old = self.buckets[index] 314 point = old.max - (old.max - old.min)/2 315 new = KBucket(point, old.max) 316 old.max = point 317 self.buckets.insert(index + 1, new) 318 for node in old.nodes[:]: 319 if new.in_range(node.nid): 320 new.append(node) 321 old.remove(node) 322 323 def __iter__(self): 324 for bucket in self.buckets: 325 yield bucket 326 327 328 class KBucket(object): 329 __slots__ = ("min", "max", "nodes") 330 331 # min和max就是该bucket负责的范围, 比如该bucket的min:0, max:16的话, 332 # 那么存储的node的intify(nid)值均为: 0到15, 那16就不负责, 这16将会是该bucket后面的bucket的min值. 333 # nodes属性就是个列表, 存储node. last_accessed代表最后访问时间, 因为协议里说到, 334 # 当该bucket负责的node有请求, 回应操作; 删除node; 添加node; 更新node; 等这些操作时, 335 # 那么就要更新该bucket, 所以设置个last_accessed属性, 该属性标志着这个bucket的"新鲜程度". 用linux话来说, touch一下. 336 # 这个用来便于后面说的定时刷新路由表. 337 338 def __init__(self, min, max): 339 self.min = min 340 self.max = max 341 self.nodes = [] 342 343 344 # 添加node, 参数node是KNode实例. 345 346 # 如果新插入的node的nid属性长度不等于20, 终止. 347 # 如果满了, 抛出bucket已满的错误, 终止. 通知上层代码进行拆表. 348 # 如果未满, 先看看新插入的node是否已存在, 如果存在, 就替换掉, 不存在, 就添加, 349 # 添加/替换时, 更新该bucket的"新鲜程度". 350 def append(self, node): 351 if node in self: 352 self.remove(node) 353 self.nodes.append(node) 354 else: 355 if len(self) < K: 356 self.nodes.append(node) 357 else: 358 raise BucketFull 359 360 def remove(self, node): 361 self.nodes.remove(node) 362 363 def in_range(self, target): 364 return self.min <= intify(target) < self.max 365 366 def __len__(self): 367 return len(self.nodes) 368 369 def __contains__(self, node): 370 return node in self.nodes 371 372 def __iter__(self): 373 for node in self.nodes: 374 yield node 375 376 def __lt__(self, target): 377 return self.max <= target 378 379 380 class KNode(object): 381 # """ 382 # nid就是node ID的简写, 就不取id这么模糊的变量名了. __init__方法相当于别的OOP语言中的构造方法, 383 # 在python严格来说不是构造方法, 它是初始化, 不过, 功能差不多就行. 384 # """ 385 __slots__ = ("nid", "ip", "port") 386 387 def __init__(self, nid, ip, port): 388 self.nid = nid 389 self.ip = ip 390 self.port = port 391 392 def __eq__(self, other): 393 return self.nid == other.nid 394 395 396 397 #using example 398 class Master(object): 399 def __init__(self, f): 400 self.f = f 401 402 def log(self, infohash): 403 self.f.write(infohash.encode("hex")+"\n") 404 self.f.flush() 405 try: 406 f = open("infohash.log", "a") 407 m = Master(f) 408 s = Server(Master(f), KTable(random_id()), 8001) 409 s.start() 410 except KeyboardInterrupt: 411 s.socket.close() 412 f.close()
种子从迅雷下,初期为学习从http://torrage.com/sync/下的infohash,去重用了别人写的Bloom Filter算法,数据库用Mysql,建表语句如下,其中uinthash是根据infohash的头四个字节和最后四个字节组成的一个int整数,先这样设计,看后期查询的时候用得到不,总觉得用infohash来查很慢
1 CREATE TABLE `torrentinfo` ( 2 `id` int(11) NOT NULL AUTO_INCREMENT, 3 `infohash` char(40) NOT NULL DEFAULT '', 4 `filename` varchar(128) DEFAULT NULL, 5 `filelength` bigint(11) DEFAULT NULL, 6 `recvtime` datetime DEFAULT NULL, 7 `filecontent` text, 8 `uinthash` int(11) unsigned NOT NULL DEFAULT '0', 9 PRIMARY KEY (`id`), 10 KEY `uinthash_index` (`uinthash`) 11 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Thunder.py
1 # _*_ coding: utf-8 _*_ 2 import socket 3 import os,glob 4 import time as time_p 5 import requests 6 from bencode import bdecode, BTL 7 from torrent import * 8 import threading, signal 9 import MySQLdb 10 from BloomFilter import * 11 12 class Thunder(object): 13 def __init__(self): 14 self.connstr={'host':'127.0.0.1','user':'root','passwd':'123456','port':3306,'charset':"UTF8"} 15 def download(self, infohash): 16 try: 17 tc = self._download(infohash) 18 if(tc==-1): 19 return 20 tc = bdecode(tc) 21 info = torrentInfo(tc) 22 # print info['name'] 23 # print info['length'] 24 # print info['files'] 25 uint=int(infohash[:4]+infohash[-4:],16) 26 time_now=time_p.strftime('%Y-%m-%d %H:%M:%S',time_p.localtime(time_p.time())) 27 sql="insert into torrentinfo(infohash,filename,filelength,recvtime,filecontent,uinthash) values('%s','%s','%d','%s','%s','%d')"%(infohash,MySQLdb.escape_string(info['name']),info['length'],time_now,MySQLdb.escape_string(info['files']),uint) 28 self.executeSQL(sql) 29 except Exception,e: 30 print e 31 pass 32 33 def openConnection(self): 34 try: 35 self.conn=MySQLdb.connect(**self.connstr) 36 self.cur=self.conn.cursor() 37 self.conn.select_db('dht') 38 except MySQLdb.Error,e: 39 print 'mysql error %d:%s'%(e.args[0],e.args[1]) 40 41 42 def executeSQL(self,sql): 43 try: 44 self.cur.execute(sql) 45 self.conn.commit() 46 except MySQLdb.Error,e: 47 print 'mysql error %d:%s'%(e.args[0],e.args[1]) 48 def closeConnection(self): 49 try: 50 self.cur.close() 51 self.conn.close() 52 except MySQLdb.Error,e: 53 print 'mysql error %d:%s'%(e.args[0],e.args[1]) 54 55 def _download(self, infohash): 56 infohash = infohash.upper() 57 start = infohash[0:2] 58 end = infohash[-2:] 59 url = "http://bt.box.n0808.com/%s/%s/%s.torrent" % (start, end, infohash) 60 headers = { 61 "Referer": "http://bt.box.n0808.com" 62 } 63 try: 64 r = requests.get(url, headers=headers, timeout=10) 65 if r.status_code == 200: 66 # f=open("d:\\"+infohash+'.torrent','wb') 67 # f.write(r.content) 68 # f.close() 69 return r.content 70 except (socket.timeout, requests.exceptions.Timeout), e: 71 pass 72 return -1 73 74 class torrentBean(object): 75 """docstring for torrentBean""" 76 __slots__=('infohash','filename','recvtime','filecontent','uinthash') 77 78 def __init__(self, infohash,filename,recvtime,filecontent,uinthash): 79 super(torrentBean, self).__init__() 80 self.infohash = infohash 81 self.filename = filename 82 self.recvtime = recvtime 83 self.filecontent = filecontent 84 self.uinthash = uinthash 85 86 87 bf = BloomFilter(0.001, 1000000) 88 a=Thunder() 89 a.openConnection() 90 # info_hash="a02d2735e6e1daa6f7d58f21bd7340a7b7c4b7a5" 91 # info_hash='cf3a6a4f07da0b90beddae838462ca0012bef285' 92 # a.download('cf3a6a4f07da0b90beddae838462ca0012bef285') 93 94 95 files=glob.glob('./*.txt') 96 for fl in files: 97 print os.path.basename(fl) 98 f=open(fl,'r') 99 for line in f: 100 infohash=line.strip('\n') 101 if not bf.is_element_exist(infohash): 102 bf.insert_element(infohash) 103 a.download(infohash) 104 a.closeConnection()
torrent种子文件经过bencode解析,获取key为info对应value值,种子大致的格式如下,有乱码,不影响观看
{ 'files': [{ 'path': ['PGD660.avi'], 'length': 1367405512, 'filehash': 'J\xef\xfe\xb3K\xd4g\x8d\x07m\x03\xbb\xb3\xadt\xa1\xa0\xf0\xec\xab', 'ed2k': '/\xfb\xe55#n\xbd1\xb6\x1c\x0f\xf3\xe4\x9dP\xfb', 'path.utf-8': ['PGD660.avi'] }, { 'path': ['PGD660B.jpg'], 'length': 135899, 'filehash': '*$O\x17w\xe9E\x95>O\x1f\xfb\x0e\x9b\x16\x15B\\Q\x9d', 'ed2k': 'T/L*\xbb\x8e.\xe2d\xddu\nR\x07\xca\x19', 'path.utf-8': ['PGD660B.jpg'] }, { 'path': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba@\xe6\x9c\x80\xe6\x96\xb0\xe5\x9c\xb0\xe5\x9d\x80.mht'], 'length': 472, 'filehash': '&\xa92\xb7\xdd8\xeel3\xcc-S\x07\xb5e\xd35\xc0\xb7r', 'ed2k': '\x13\xd2 a\x0cA\xb4\xf2X\x12\xea\xd4\xe8\xac`\x92', 'path.utf-8': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba@\xe6\x9c\x80\xe6\x96\xb0\xe5\x9c\xb0\xe5\x9d\x80.mht'] }, { 'path': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba\xe5\xae\xa3\xe4\xbc\xa0.txt'], 'length': 363, 'filehash': '\x96nA*\xe2\xb6Y+[\xe3\xaf\xd4\x14A\x94\xf5@\xcd\xc1\x91', 'ed2k': '8V\xa6X\xd9\x82l\xdbNO8\xe8D\xe9E\xed', 'path.utf-8': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba\xe5\xae\xa3\xe4\xbc\xa0.txt'] }, { 'path': ['\xe2\x98\x85\xe5\xb0\x91\xe5\xa6\x87 \xe8\xae\xba\xe5\x9d\x9b \xe9\x99\x90\xe9\x87\x8f\xe5\xbc\x80\xe6\x94\xbe\xe4\xb8\xad\xe3\x80\x82\xe3\x80\x82.mht'], 'length': 475, 'filehash': '\xec\xde\xeb-6\x86\x1avB\xdd\xd8q\x8b\x8f\xc06\xf0XX\x0e', 'ed2k': '\xa7\x8dU\xfd\xfc=\x12\x15>yE\x8f&A\xc2u', 'path.utf-8': ['\xe2\x98\x85\xe5\xb0\x91\xe5\xa6\x87 \xe8\xae\xba\xe5\x9d\x9b \xe9\x99\x90\xe9\x87\x8f\xe5\xbc\x80\xe6\x94\xbe\xe4\xb8\xad\xe3\x80\x82\xe3\x80\x82.mht'] }, { 'path': ['\xe6\x9f\x8f\xe6\x8b\x89\xe5\x9c\x96\xe7\xa7\x98\xe5\xaf\x86\xe8\x8a\xb1\xe5\x9c\x92.mht'], 'length': 478, 'filehash': "\xe4\xb5'Td\x0b=P\xc0\x9aG\xa2\xd7\xfapg\xc6.\x8e\xa7", 'ed2k': '\xdd\x8d\xbb\x0b\x04\xcb\x03O\xb1\x18"\x03\xb1\x1d\xba\x08', 'path.utf-8': ['\xe6\x9f\x8f\xe6\x8b\x89\xe5\x9c\x96\xe7\xa7\x98\xe5\xaf\x86\xe8\x8a\xb1\xe5\x9c\x92.mht'] }, { 'path': ['\xe7\xbe\x8e\xe5\xa5\xb3\xe4\xb8\x8a\xe9\x96\x80\xe6\x8f\xb4\xe4\xba\xa4\xe6\x9c\x8d\xe5\x8b\x99.mht'], 'length': 478, 'filehash': "\xe4\xb5'Td\x0b=P\xc0\x9aG\xa2\xd7\xfapg\xc6.\x8e\xa7", 'ed2k': '\xdd\x8d\xbb\x0b\x04\xcb\x03O\xb1\x18"\x03\xb1\x1d\xba\x08', 'path.utf-8': ['\xe7\xbe\x8e\xe5\xa5\xb3\xe4\xb8\x8a\xe9\x96\x80\xe6\x8f\xb4\xe4\xba\xa4\xe6\x9c\x8d\xe5\x8b\x99.mht'] }], 'publisher': 'yoy123', 'piece length': 524288, 'name': 'PGD660 \xe6\x83\xb3\xe8\xa9\xa6\xe8\x91\x97\xe5\x85\xa8\xe5\x8a\x9b\xe6\x93\x8d\xe6\x93\x8d\xe7\x9c\x8b\xe9\x80\x99\xe5\x80\x8b\xe6\xb7\xab\xe8\x95\xa9\xe7\xbe\x8e\xe5\xa5\xb3\xe5\x97\x8e \xe5\xb0\x8f\xe5\xb7\x9d\xe3\x81\x82\xe3\x81\x95\xe7\xbe\x8e', 'publisher.utf-8': 'yoy123', }
解析代码torrent.py
1 # _*_ coding: utf-8 _*_ 2 from time import time 3 4 def torrentInfo(torrentContent): 5 metadata = torrentContent["info"] 6 print metadata 7 info = { 8 "name": getName(metadata), 9 "length": calcLength(metadata), 10 "timestamp": getCreateDate(torrentContent), 11 "files": extraFiles(metadata) 12 } 13 return info 14 15 def calcLength(metadata): 16 length = 0 17 try: 18 length = metadata["length"] 19 except KeyError: 20 try: 21 for file in metadata["files"]: 22 length += file["length"] 23 except KeyError: 24 pass 25 return length 26 27 def extraFiles(metadata): 28 files = [] 29 try: 30 for file in metadata["files"]: 31 path = file["path.utf-8"] 32 size=file['length'] 33 if len(path) > 1: 34 main = path[0] 35 for f in path[1:2]: 36 files.append("%s/%s %d bytes" % (main, f,size)) 37 else: 38 files.append("%s %d bytes" % (path[0],size) ) 39 if files: 40 return '\r\n'.join(files) 41 else: 42 return getName(metadata) 43 except KeyError: 44 return getName(metadata) 45 46 def getName(metadata): 47 try: 48 name = metadata["name.utf-8"] 49 if name.strip()=="": 50 raise KeyError 51 except KeyError: 52 try: 53 name = metadata["name"] 54 if name.strip()=="": 55 raise KeyError 56 except KeyError: 57 name = getMaxFile(metadata) 58 59 return name 60 def getMaxFile(metadata): 61 try: 62 maxFile = metadata["files"][0] 63 for file in metadata["files"]: 64 if file["length"] > maxFile["length"]: 65 maxFile = file 66 name = maxFile["path"][0] 67 return name 68 except KeyError: 69 return "" 70 71 def getCreateDate(torrentContent): 72 try: 73 timestamp = torrentContent["creation date"] 74 except KeyError: 75 timestamp = int( time() ) 76 return timestamp
最后还有别人写的BloomFilter代码
1 #encoding: utf-8 2 ''' 3 Created on 2012-11-7 4 5 @author: palydawn 6 ''' 7 import cmath 8 from BitVector import BitVector 9 10 class BloomFilter(object): 11 def __init__(self, error_rate, elementNum): 12 #计算所需要的bit数 13 self.bit_num = -1 * elementNum * cmath.log(error_rate) / (cmath.log(2.0) * cmath.log(2.0)) 14 15 #四字节对齐 16 self.bit_num = self.align_4byte(self.bit_num.real) 17 18 #分配内存 19 self.bit_array = BitVector(size=self.bit_num) 20 21 #计算hash函数个数 22 self.hash_num = cmath.log(2) * self.bit_num / elementNum 23 24 self.hash_num = self.hash_num.real 25 26 #向上取整 27 self.hash_num = int(self.hash_num) + 1 28 29 #产生hash函数种子 30 self.hash_seeds = self.generate_hashseeds(self.hash_num) 31 32 def insert_element(self, element): 33 for seed in self.hash_seeds: 34 hash_val = self.hash_element(element, seed) 35 #取绝对值 36 hash_val = abs(hash_val) 37 #取模,防越界 38 hash_val = hash_val % self.bit_num 39 #设置相应的比特位 40 self.bit_array[hash_val] = 1 41 42 #检查元素是否存在,存在返回true,否则返回false 43 def is_element_exist(self, element): 44 for seed in self.hash_seeds: 45 hash_val = self.hash_element(element, seed) 46 #取绝对值 47 hash_val = abs(hash_val) 48 #取模,防越界 49 hash_val = hash_val % self.bit_num 50 51 #查看值 52 if self.bit_array[hash_val] == 0: 53 return False 54 return True 55 56 #内存对齐 57 def align_4byte(self, bit_num): 58 num = int(bit_num / 32) 59 num = 32 * (num + 1) 60 return num 61 62 #产生hash函数种子,hash_num个素数 63 def generate_hashseeds(self, hash_num): 64 count = 0 65 #连续两个种子的最小差值 66 gap = 50 67 #初始化hash种子为0 68 hash_seeds = [] 69 for index in xrange(hash_num): 70 hash_seeds.append(0) 71 for index in xrange(10, 10000): 72 max_num = int(cmath.sqrt(1.0 * index).real) 73 flag = 1 74 for num in xrange(2, max_num): 75 if index % num == 0: 76 flag = 0 77 break 78 79 if flag == 1: 80 #连续两个hash种子的差值要大才行 81 if count > 0 and (index - hash_seeds[count - 1]) < gap: 82 continue 83 hash_seeds[count] = index 84 count = count + 1 85 86 if count == hash_num: 87 break 88 return hash_seeds 89 90 def hash_element(self, element, seed): 91 hash_val = 1 92 for ch in str(element): 93 chval = ord(ch) 94 hash_val = hash_val * seed + chval 95 return hash_val 96 97 98 def SaveBitToFile(self,f): 99 self.bit_array.write_bits_to_fileobject(f) 100 pass
表内容见下图
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:开博第一篇:DHT 爬虫的学习记录 - Python技术站