一、socketserver实现并发

基于tcp的套接字,关键就是两个循环,一个链接循环,一个通信循环。

socketserver模块中分两大类:server类(解决链接问题)和request类(解决通信问题)

server类:

数据库:socketserver模块、MySQL(一)

request类:

数据库:socketserver模块、MySQL(一)

继承关系:

 数据库:socketserver模块、MySQL(一)

数据库:socketserver模块、MySQL(一)

数据库:socketserver模块、MySQL(一)

以下述代码为例,分析socketserver源码:

ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
ftpserver.serve_forever()

查找属性的顺序:ThreadingTCPServer->ThreadingMixIn->TCPServer->BaseServer

  1. 实例化得到ftpserver,先找类ThreadingTCPServer的__init__,在TCPServer中找到,进而执行server_bind,server_active
  2. 找ftpserver下的serve_forever,在BaseServer中找到,进而执行self._handle_request_noblock(),该方法同样是在BaseServer中
  3. 执行self._handle_request_noblock()进而执行request, client_address = self.get_request()(就是TCPServer中的self.socket.accept()),然后执行self.process_request(request, client_address)
  4. 在ThreadingMixIn中找到process_request,开启多线程应对并发,进而执行process_request_thread,执行self.finish_request(request, client_address)
  5. 上述四部分完成了链接循环,本部分开始进入处理通讯部分,在BaseServer中找到finish_request,触发我们自己定义的类的实例化,去找__init__方法,而我们自己定义的类没有该方法,则去它的父类也就是BaseRequestHandler中找....

源码分析总结:

基于tcp的socketserver我们自己定义的类中的

  1. self.server即套接字对象
  2. self.request即一个链接
  3. self.client_address即客户端地址

基于udp的socketserver我们自己定义的类中的

  1. self.request是一个元组(第一个元素是客户端发来的数据,第二部分是服务端的udp套接字对象),如(b'adsf', <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)
  2. self.client_address即客户端地址

import os
import time
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
import socket
import selectors

class selectFtpServer:

    def __init__(self):
        self.dic = {}
        self.hasReceived=0
        self.sel = selectors.DefaultSelector()
        self.create_socket()
        self.handle()

    def create_socket(self):
        server = socket.socket()
        server.bind(("127.0.0.1",8885))
        server.listen(5)
        server.setblocking(False)
        self.sel.register(server, selectors.EVENT_READ, self.accept)
        print("服务端已开启,等待用户连接...")

    def handle(self):
        while True:
            events = self.sel.select()
            for key, mask in events:
                callback = key.data
                callback(key.fileobj, mask)

    def accept(self,sock, mask):

        conn, addr = sock.accept()
        print("from %s %s connected"%addr)
        conn.setblocking(False)
        self.sel.register(conn, selectors.EVENT_READ, self.read)

        self.dic[conn] = {}

    def read(self, conn, mask):
        try:
            if not self.dic[conn] :

                data = conn.recv(1024)
                cmd,filename,filesize = str(data, encoding='utf-8').split('|')
                self.dic={conn:{"cmd": cmd, "filename": filename,"filesize": int(filesize)}}

                if cmd == 'put':
                    conn.send(bytes("OK",encoding='utf8'))

                if self.dic[conn]['cmd'] == 'get':
                    file = os.path.join(BASE_DIR,"download",filename)

                    if os.path.exists(file):
                        fileSize = os.path.getsize(file)
                        send_info = '%s|%s'%('YES',fileSize)
                        conn.send(bytes(send_info, encoding='utf8'))
                    else:
                        send_info = '%s|%s'%('NO',0)
                        conn.send(bytes(send_info, encoding='utf8'))
            else:
                if self.dic[conn].get('cmd',None):
                    cmd=self.dic[conn].get('cmd')
                    if hasattr(self, cmd):
                        func = getattr(self,cmd)
                        func(conn)
                    else:
                        print("error cmd!")
                        conn.close()
                else:
                    print("error cmd!")
                    conn.close()

        except Exception as e:
            print('error', e)
            self.sel.unregister(conn)
            conn.close()

    def put(self, conn):

        fileName = self.dic[conn]['filename']
        fileSize = self.dic[conn]['filesize']
        path = os.path.join(BASE_DIR,"upload",fileName)
        recv_data = conn.recv(1024)
        self.hasReceived += len(recv_data)

        with open(path, 'ab') as f:
            f.write(recv_data)
        if fileSize == self.hasReceived:
            if conn in self.dic.keys():
                self.dic[conn] = {}
            print("%s上传完毕!"%fileName)

    def get(self,conn):

        filename = self.dic[conn]['filename']
        path = os.path.join(BASE_DIR,"download",filename)
        if str(conn.recv(1024), 'utf-8') == "second_active":
            with open(path, 'rb') as f:
                for line in f:
                    conn.send(line)
            self.dic[conn] = {}
            print('文件下载完毕!')


if __name__ == '__main__':

    selectFtpServer()

FtpServer