爬虫(五):生产者消费者方法

1.不使用锁的话,可能会产生线程的竞争:当共享一个对象(变量或者一个方法)加锁的操作

爬虫(五):生产者消费者方法

在threading模块中,定义两种类型的琐:threading.Lock和threading.RLock。它们之间有一点细微的区别,通过比较下面两段代码来说明:
import threading  
lock = threading.Lock() #Lock对象  
lock.acquire()  
lock.acquire()  #产生了死琐。  
lock.release()  

lock.release()  

 

import threading  
rLock = threading.RLock()  #RLock对象  
rLock.acquire()  
rLock.acquire() #在同一线程内,程序不会堵塞。  
rLock.release()  
rLock.release()  

  这两种琐的主要区别是:RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。threading.Condition
  可以把Condiftion理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。):
Condition.wait([timeout]):  
  wait方法释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。
Condition.notify():
  唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。
Condition.notify_all()
Condition.notifyAll()
  唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐。

 

在这里采用threading.Condition类的方法:但是这样太麻烦,后面采用Queen的方式:

 1 #!/usr/bin/env python
 2 #----coding:utf-8----
 3 ##python 下对生产者和消费者模型的理解
 4 
 5 import threading
 6 import random
 7 import time
 8 
 9 lock=threading.Condition()
10 
11 class Produce(threading.Thread):
12     
13     def __init__(self,lock,product,filename):
14         self._lock=lock
15         self.product=product
16         self.file=filename
17         threading.Thread.__init__(self)
18     def run(self):
19         while True:
20             if self._lock.acquire():##判断是否得到锁,类似于Java中lock1.lock
21                 if (len(self.product)>100):
22                     self._lock.wait()
23                 else:
24                     ##打印现在product数字,同时将其中IO写入txt文档
25                     tmp=random.randint(0,10)
26                     self.product.append(tmp)
27                     
28                     print "add %d,product=%s: "%(tmp,str(self.product))
29                     
30                     ##通过IO口写入
31                     fp=open(self.file,'a')
32                     fp.write("add %d, product = %s\n" %(tmp,str(self.product)))
33                     fp.close()
34                 self._lock.notify()
35                 self._lock.release()
36                 time.sleep(0.2)
37 
38 class Consumer(threading.Thread):
39     def __init__(self,lock,product,filename):
40         self._lock = lock
41         self.product = product
42         self.file=filename
43         threading.Thread.__init__(self)
44         
45     def run(self):
46         while True:
47             if self._lock.acquire():
48                 if len(self.product)== 0:
49                     self._lock.wait()
50                 else:
51                     tmp = self.product[0]
52                     del self.product[0]
53                     print 'consum %d, product =%s'%(tmp,str(self.product))
54                     fp=open(self.file,'a')
55                     fp.write('consum %d, product = %s\n'%(tmp,str(self.product)))
56                     fp.close()
57                 self._lock.notify()
58                 self._lock.release()
59                 time.sleep(0.1)
60     
61  
62 
63     
64                 
65 if __name__=='__main__':
66     product=[]
67     ##假设product有五位:,消费者只有三位
68     for i in xrange(5):
69         p=Produce(lock,product,'log_in.txt')
70         p.start()
71         
72     for i in xrange(3):
73         T=Consumer(lock,product,'log_in.txt')
74         T.start()
75         
76