Python实现单线程多任务非阻塞TCP服务端,主要采用异步非阻塞编程模型,使用Python内建的asyncio库,同时搭配使用socket、select等标准库实现。
以下是Python 实现单线程多任务非阻塞TCP服务端的攻略:
-
创建一个协程 async def handle_client(client_socket, client_address),用来作为TCP连接的服务器端处理逻辑。其中client_socket用来处理与客户端的通信接口,client_address为客户端的地址和端口号。
-
创建一个协程 async def handle_clients(),用来作为TCP服务器的监听函数,可以接收多个客户端连接请求。该函数会不断的进行accept()操作,等待客户端连接请求,一旦有请求过来,就会回调handle_client协程,客户端的连接请求将由handle_client协程来进行处理。
-
为了避免使用阻塞型的accept()调用,我们可以使用Python标准库中的select函数来进行多路复用,从而创建一个异步非阻塞的服务端程序。在handle_clients协程中,使用selectors模块创建Selector对象sel,用来实现多路复用IO。
-
在handle_clients协程中,通过调用socket的setblocking(0)方法,将socket设置为非阻塞模式,如果出现了非阻塞问题,就会抛出socket.error异常。这些异常处理在handle_client协程中都应该考虑到。
示例一:实现一个简单的Echo服务器
import asyncio
import socket
async def handle_client(client_socket, client_address):
print(f"Connection from {client_address}")
try:
while True:
data = await asyncio.to_thread(client_socket.recv, 1024)
if not data:
break
await asyncio.to_thread(client_socket.sendall, data)
except socket.error as e:
print(f"Socket error: {e}")
finally:
client_socket.close()
print(f"Connection closed from {client_address}")
async def handle_clients():
sel = selectors.DefaultSelector()
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("127.0.0.1", 8000))
server_socket.setblocking(False)
server_socket.listen(5)
sel.register(server_socket, selectors.EVENT_READ, data=None)
while True:
events = await asyncio.to_thread(sel.select)
for key, mask in events:
if key.data is None:
handle_accept(key.fileobj, sel)
else:
await asyncio.create_task(key.data.handle_client(key.fileobj, mask))
async def handle_accept(sock, mask):
client_socket, client_address = sock.accept()
client_socket.setblocking(False)
await asyncio.create_task(handle_client(client_socket, client_address))
asyncio.run(handle_clients())
示例二:实现一个异步非阻塞的Web服务器
import asyncio
import sys
def get_content_type(filename):
return {
'.html': 'text/html',
'.css': 'text/css',
'.jpg': 'image/jpeg',
'.png': 'image/png',
'.ico': 'image/x-icon'
}.get(filename, 'application/octet-stream')
async def handle_read(client_socket, request):
try:
while True:
data = await asyncio.to_thread(client_socket.recv, 1024)
request += data.decode('utf-8')
if '\r\n\r\n' in request:
break
except socket.error as e:
print(f"Error: {e}")
return request.strip()
async def handle_write(client_socket, response):
try:
await asyncio.to_thread(client_socket.sendall, response.encode('utf-8'))
finally:
client_socket.close()
async def handle_request(client_socket, client_addr):
request = await asyncio.create_task(handle_read(client_socket, ""))
filename = re.match(r'GET /(\S+)\s+', request).group(1)
filename = '.' + filename if filename != '/' else './index.html'
try:
with open(filename, 'rb') as f:
content = f.read()
content_type = get_content_type(filename)
response = f'HTTP/1.1 200 OK\r\nContent-Type: {content_type}\r\nContent-Length: {len(content)}\r\n\r\n{content.decode("utf-8")}'
except FileNotFoundError:
response = 'HTTP/1.1 404 NOT FOUND\r\nContent-Type: text/html\r\n\r\n<html><body><h1>404 Not Found!</h1></body></html>'
await asyncio.create_task(handle_write(client_socket, response))
async def handle_clients():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('127.0.0.1', 8001))
server_socket.setblocking(False)
server_socket.listen(5)
while True:
try:
client_socket, client_address = server_socket.accept()
client_socket.setblocking(False)
except socket.error as e:
print(f"Error: {e}")
continue
await asyncio.create_task(handle_request(client_socket, client_address))
asyncio.run(handle_clients())
以上示例可以用于学习Python单线程多任务非阻塞TCP服务端的核心思路和编程实现。实际使用中应将性能等因素进行详细考虑,进行相应的优化和调整。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:python实现单线程多任务非阻塞TCP服务端 - Python技术站