PythonSocket套接字编程

网络模型的简介

网络技术是从1990年代中期发展起来的新技术,它把互联网上分散的资源融为有机整体,实现资源的全面共享和有机协作,使人们能够透明地使用资源的整体能力并按需获取信息,资源包括高性能计算机、存储资源、数据资源、信息资源、知识资源、专家资源、大型数据库、网络、传感器等.

网络编程是实现各类网络应用程序的基础,现在大部分的网络应用程序都基于Socket通信的,网络通信协议一共有两种,一种是基于OSI的7层参考模型,而另一种则是TCP/IP协议的4层通信模型,在我们开发环境中会使用TCP/IP作为通信基础,接下来将具体介绍两种通信协议模型.

网络编程最主要的工作就是在发送端把信息通过规定好的协议进行组装包,在接收端按照规定好的协议把包进行解析,从而提取出对应的信息,达到通信的目的.中间最主要的就是数据包的组装,数据包的过滤,数据包的捕获,数据包的分析,当然最后再做一些处理.

◆OSI 参考模型◆

1983年,国际标准化组织(International Organization for standardization,ISO)发布了著名的ISO/IEC 7498标准,也就是开放式系统互连参考模型OSI,这个标准定义了网络的七层框架,试图使计算机在整个世界范围内实现互联,其中这七层分别的含义如下:

物理层次 协议作用
应用层 为用户提供服务,给用户一个接口
表示层 为数据提供表示,加密与压缩
会话层 确定数据是否需要传输,建立管理与终止会话
传输层 可靠与不可靠的数据传输,以及后期的错误检测
网络层 进行逻辑地址编址,实现不同网络之间的路由选择
数据链路层 进行硬件(MAC)地址编址有,差错的校验
物理层 电气特性,设备之间进行比特流的传输

以上的列表是一个通用的网络系统模型,并不是一个协议定义.实际上OSI模型从来没有被真正实现过,但是,出于其模型的广泛指导性,现在的网络协议都已经纳入了OSI参考模型的范围之内,OSI参考模型一共有7层,每层的作用在上面有说明,这也是网络方面的基础知识.

◆TCP/IP 参考模型◆

其实早在OSI模型出现之前,就已经有了TCP/IP的研究和实现,时间最早可以追溯到20世纪70年代,为互联网的最旱的通信协议,TCP为传输层的协议,而IP则为网络层的协议,两个层次中有代表性的协议组合代表了一系列的协议族,还包括有 ARP、ICMP 和 UDP 协议等,山于TCP/IP协议出现的比OSI早,所以井不符合OSI模型,他们的对应关系是这样的,如下表所示.

OSI 7层参考模型 TCP/IP 4层参考模型
应用层 应用层
表示层 应用层
会话层 应用层
传输层 传输层
网络层 互联网层
数据链路层 网络接口层
物理层 网络接口层

以上的两个列表左边是OSI模型,右边是TCP/IP模型,从列表可以看到,TCP/IP模型并不关心IP层以下的组成,而是将数据输出统一成了网络接口层,这样一来,IP层只需要将数据发往网络接口层就可以了,而不需要关心下层具体的操作.而在OSI模型中,则将这些功能分成了数据链路层和物理层,而且还进行了进一步的划分,在传输层和网络层大部分还是一致的,而对于OSI中的上面三层,则在TCP/IP模型中合将其合并成了应用层.

在现在的互联网中,主要采用TCP/IP协议,这已经成为了互联网上通信的事实标准,现在TCP/IP协议已经可以运行在各种信道和底层协议之上.

◆Socket 基础知识◆

套接字(Sockct)随着 TCP/IP协议的使用,也越来越多地被使用在网络应用程序的构建中,实际上 Socket编程也已经成为了网络中传送和接收数据的首选方法,套接字最早是由伯克利在BSD中推出的一种进程间通信方案和网络互联的基本机制,现在已经有多种相关的套接字实现,但大部分还是遵循着最初的设计要求.

Socket通常也称作”套接字”,用于描述IP地址和端口,是一个通信链的句柄,应用程序通常通过”套接字”向网络发出请求或者应答网络请求,Socket起源于Unix而Unix/Linux基本哲学之一就是”一切皆文件”,Socket就是文件模式的一个实现,Socket即是一种特殊的文件,一些Socket函数就是对其进行的操作(读/写IO、打开、关闭)等.

Pythhon 标准库中支持套接口的模块是Socket,其中包含生成套接字、等待连接、建立连接和传输数据的方法,任何应用程序需要使用套接字,都必须调用Socket方法生成一个套接字对象,对于服务器端而言,首先需要调用 bind 方法绑定一个套接口地址,接着使用 listen 方法开始监听客户端请求.当有客户端请求过来的时候,将通过 accept 方法来生成一个连接对象,然后就可以通过此连接对发送和接收数据了,数据传输完毕后可以调用 close 方法将生成的连接关闭.

服务端与客户端

Socket 常用地址簇:
socket.AF_UNIX unix进程间通信
socket.AF_INET ipv4
socket.AF_INET6 ipv6

Socket 常用地址簇:
socket.SOCK 裸套接字
socket.SOCK_STREAM TCP通信
socket.SOCK_DGRAM UDP通信
SOCK_RAW 原始套接字,可处理ICMP、IGMP等网络报文
SOCK_RDM 是一种可靠的UDP形式,即保证交付数据报但不保证顺序
SOCK_SEQPACKET 可靠的连续数据包服务

常用方法:

方法名称 方法说明
sk.socket() 创建一个套接字对象,并返回套接字相关内容:(socket.AF_INET,socket.SOCK_STREAM)
sk.bind(address) 将套接字绑定到地址,在AF_INET下,以元组(host,port)的形式表示地址
sk.listen(5) 开始监听传入连接,且最大接收5个链接请求,请求数可根据机器性能动态调整
sk.setblocking(bool) 是否阻塞,如果设置为False,那accept和recv时一旦无数据,则报错
sk.accept() 接受连接并返回(conn,address),其中conn是新的套接字对象,可以用来接收和发送数据,address是连接客户端的地址,接收TCP客户的连接(阻塞式)等待连接的到来
sk.connect(address) 连接到address处的套接字,一般address的格式为元组(hostname,port),如果连接出错,返回socket.error错误
sk.connect_ex(address) 同上,只不过会有返回值,连接成功时返回0,连接失败时候返回编码,例如:10061
sk.close() 关闭套接字
sk.recv(bufsize[,flag]) 接受套接字的数据.数据以字符串形式返回,bufsize指定最多可以接收的数量,flag提供有关消息的其他信息,通常可以忽略
sk.recvfrom(bufsize[.flag]) 与recv()类似,但返回值是(data,address),其中data是包含接收数据的字符串,address是发送数据的套接字地址
sk.send(bytes[,flag]) 将string中的数据发送到连接的套接字.返回值是要发送的字节数量,该数量可能小于string的字节大小.即:可能未将指定内容全部发送
sk.sendall(bytes[,flag]) 将string中的数据发送到连接的套接字,但在返回之前会尝试发送所有数据.成功返回None,失败则抛出异常,内部通过递归调用send,将所有内容发送出去
sk.sendto(bytes[,flag],address) 将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址.返回值是发送的字节数,该函数主要用于UDP协议
sk.settimeout(timeout) 设置套接字操作的超时期,timeout是一个浮点数,单位是秒.值为None表示没有超时期.一般超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如 client 连接最多等待5s)
sk.getpeername() 返回连接套接字的远程地址,返回值通常是元组(ipaddr,port)
sk.getsockname() 返回套接字自己的地址,通常是一个元组(ipaddr,port)
sk.fileno() 套接字的文件描述符

◆实现TCP传输◆

服务端: 首先启动服务端,然后服务端会创建套接字,并绑定localhost:9999端口,设置最大连接数为5,然后发送数据.

import socket

ip_addr=("localhost",9999)

server = socket.socket() #创建套接字对象
server.bind(ip_addr) #绑定IP地址和端口,必须是元组形式
server.listen(5) #设置连接池挂起的数量,并发选项

while True:
conn,addr=server.accept() #接收客户连接,conn是客户端连接服务端的信号,addr客户端ip,port
while True:
try:
send_data=input("请输入要发送的命令:").strip()
if len(send_data) == 0: continue #如果发送数据为空,则继续下次循环
conn.send(bytes(send_data,encoding="utf-8")) #发送一条消息,格式是bytes,utf-8
recv_data=conn.recv(1024) #conn.recv接收客户端信息前1024字节
print(str(recv_data,encoding="utf-8")) #将数据转换成字符串,并使用UTF8编码

except Exception:
break
server.close()

客户端: 客户端启动后,创建套接字,并主动连接localhost:9999端口,等待接收数据,并根据接收到的数据做回复.

import socket
import os

ip_addr=("localhost",9999)

client=socket.socket() #创建套接字对象
client.connect(ip_addr) #指定连接的服务器地址

while True:
try:
recv_data=client.recv(1024) #接收服务端前1024字节数据
data = os.popen(str(recv_data, encoding="utf-8")).read()
print("执行命令:",recv_data) #客户端返回执行命令
if len(data) == 0: continue #发送数据为空,则退出
client.send(bytes(data, encoding="utf-8")) #返回执行结果,反向发送
except Exception:
break
client.close()

◆实现UDP传输◆

UDP服务端: UDP服务端绑定端口,并等待接收数据,收到数据后返回一个OK作为结束.

import socket

ip_addr = ("localhost",9999)
server = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
server.bind(ip_addr)

while True:
data,(host,port) = server.recvfrom(1024) #等待接收数据
print(data,host,port) #打印数据
server.sendto(bytes("ok",encoding="utf-8"),(host,port))

UDP客户端: 客户端绑定IP地址,然后发送数据,并等待服务端回应OK作为结束.

import socket

ip_addr = ("localhost",9999)
client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)

while True:
temp = input("请输入发送数据:").strip()
if temp == "exit": break
client.sendto(bytes(temp,encoding="utf-8"),ip_addr) #UDP发送数据
data = client.recvfrom(1024) #等待接收数据
print(data)

client.close()

◆实现文件传输◆

服务端: 首先服务端打开文件并计算大小后,发送给客户端文件的大小,并等待传输数据.

import socket

ip_addr=("localhost",8888)

server=socket.socket()
server.bind(ip_addr)
server.listen(5)

conn,addr=server.accept()

# 打开文件,并发送文件大小...
f=open("wang.txt","r").read()
conn.send(bytes(str(len(f)),encoding="utf-8")) #计算文件大小,并发送给远程
print("发送文件大小完成:",len(f)) #打印一下,看结果...

conn.send(bytes(f,encoding="utf-8")) #开始发送数据

客户端: 客户端收到文件大小后,开始接收数据,每次接收完数据以后累加,直到全部传输完成以后退出循环.

import socket
import os

ip_addr=("localhost",8888)

client=socket.socket()
client.connect(ip_addr)

#接收文件大小,并保存在变量里
file_size=int(client.recv(1024)) #保存文件的大小
max_size=0 #定义一个变量,用于做比较
print("这个文件大小为:",file_size)

f=open("file.txt","wb")

while file_size != max_size: #如果不相等,则说明文件没有传输完
data=client.recv(1024) #每次传输1024字节
f.write(data)
max_size=max_size+len(data) #将传输数据累加到变量
print("本次传输:",max_size)

print("传输完成....")

传输文件(简单写法): Socket也可以实现远程文件的传输,比如以下代码就实现了服务端向客户端传递数据的过程.

#===============================================================
# 传输服务端
import socket

server = socket.socket()
server.bind(("localhost",8888))
server.listen(5)

conn,addr=server.accept()
f=open("lyshark.mp4","r").read()
conn.sendall(bytes(f,encoding="utf-8"))
#===============================================================
# 传输客户端
import socket
import os

client=socket.socket()
client.connect(("localhost",8888))

f=open("lyshark.mp4","wb")
data=client.recv(10240)
f.write(data)

传输文件(完整写法):

#===============================================================
# 客户端

import socket
import os
import json

ip_addr=("127.0.0.1",9999)
client=socket.socket()
client.connect(ip_addr)

def file_put(filedir):
if os.path.isfile(filedir):
file_name = filedir #指定文件名称
file_size = os.stat(file_name).st_size #计算文件大小
file_msg = {"action":"put","name":file_name,"size":file_size} #构建一个json文件
client.send(bytes(json.dumps(file_msg),encoding="utf-8")) #发送传输需要的数据
print("文件名: %s --> 文件大小: %s "%(file_name,file_size))
with open(file_name,"rb") as f:
for line in f:
client.send(line)
print("文件已发送: %s" % len(line))
print("文件发送完成...")


file_put("文件名称.txt") #调用函数,传输文件


#===============================================================
# 服务端
import socket
import json

ip_addr=("127.0.0.1",9999)

server=socket.socket()
server.bind(ip_addr)
server.listen(5)

conn,addr=server.accept()

file_msg=conn.recv(1024)
msg_data = json.loads(file_msg)

if msg_data.get("action") == "put": #是PUT则执行上传
file_name = msg_data.get("name")
file_size = msg_data.get("size")
recv_size = 0
with open(file_name, "wb") as f:
while recv_size != file_size:
data = conn.recv(1024)
f.write(data)
recv_size += len(data)
print("文件大小: %s 传输大小: %s" % (file_size, recv_size))
print("文件 %s 传输成功..." % file_size)

elif msg_data.get("action") == "get": #是GET则执行上传
print("这里写下载的过程..")

Socket Server

SocketServer 是标准库中一个高级的网络处理模块,用于简化网络客户与服务器的实现(在前面使用Socket的过程中,我们先设置了Socket的类型,然后依次调用bind(),listen(),accept()最后使用while循环来让服务器不断的接受请求,而这些步骤可以通过SocketServer包来简化),模块中已经实现了一些可供使用的类.

SocketServer 内部使用IO多路复用以及”多线程”和”多进程”,从而实现并发处理多个客户端请求的Socket服务端,即每个客户端请求连接到服务器时,Socket服务端都会在服务器是创建一个”线程”或”进程”专门负责处理当前客户端的所有请求,底层还是对socket进行了封装和加入线程、进程就实现了进一步对Socket函数的二次封装.

◆简单的线程交互◆

服务端:

import socketserver

class MyServer(socketserver.BaseRequestHandler): #定义方法,并继承Base基类
def handle(self): #默认执行handle里面的过程
conn = self.request
conn.sendall(bytes("服务端链接正常...",encoding="utf-8")) #发送服务端正常
print("客户端IP:%s 过来访问了..."%str(self.client_address)) #显示过来访问的IP地址
while True:
try:
data = conn.recv(1024) #接收1024字节的数据
if len(data)==0:continue
print("收到数据:%s"%str(data,encoding="utf-8")) #打印收到的数据
except Exception:
continue


if __name__ == "__main__": #程序的开头
server = socketserver.ThreadingTCPServer(("127.0.0.1",9999),MyServer)
server.serve_forever()

客户端:

import socket

ip_addr = ("127.0.0.1",9999)

client = socket.socket() #加载套接字
client.connect(ip_addr) #绑定地址

msg = client.recv(1024)
print(str(msg,encoding="utf-8"))

while True:
try:
data=input("输入发送数据:").strip()
client.send(bytes(data,encoding="utf-8"))
except Exception:
continue

◆高级多线程交互◆

服务端:

import socketserver
import os
import sys

user=["admin","guest","lyshark"]
pasd=["1233","123123","123456"]

def login(x,y):
for i in range(len(user)):
if x == user[i] and y == str(pasd[i]):
return 1
else:
continue
return 0

class MyServer(socketserver.BaseRequestHandler):
def handle(self):
conn=self.request
conn.send(bytes("服务端初始化完毕....",encoding="utf-8"))
print("客户:"+str(self.client_address)+"过来访问了...")
username=str(conn.recv(1024),encoding="utf-8")
password=str(conn.recv(1024),encoding="utf-8")
print("用户%s:登陆密码%s:"%(username,password))
ret=login(username,password) #登陆验证
if ret == 1:
print("登陆完成....")
conn.send(bytes("登陆成功,你的用户名密码正确...",encoding="utf-8"))
else:
print("登陆失败...")
conn.send(bytes("登陆失败,你的用户名密码错误...", encoding="utf-8"))

if __name__ == "__main__":
server = socketserver.ThreadingTCPServer(("127.0.0.1",9999),MyServer)
server.serve_forever()

客户端:

import socket

client=socket.socket()
client.connect(("127.0.0.1",9999))

msg=client.recv(1024) #初始化
print(str(msg,encoding="utf-8")) #接收服务端传来的消息

username=input("用户名:").strip() #输入用户名和密码
passworld=input("用户密码:").strip()

client.send(bytes(username,encoding="utf-8"))
client.send(bytes(passworld,encoding="utf-8"))

ret=client.recv(1024) #返回状态
print(str(ret,encoding="utf-8"))

异步IO数据通信

在上面的实例中除了SocketServer的内容外,其他的服务器端的实现都是同步的,也就是说,服务程序只有处理完一个连接后,才能处理另外一个连接,如果需要让服务器端应用程序能够同时处理多个连接,则需要使用异步通信方式.

当同时有多个连接的时候,采用SocketServer和线程的方式都可以,但是对于那种持续时间长且数据突发的多连接,前面的这些处理方式所占用的资源太大,一种改进的方式是在一定的时间段内查看已有的连接并处理,处理的过程包括读取数据和发送数据,在 Python 标准库中包含了一种专门的异步IO通信方式,它就是select模块.

同步机制 && 异步机制

● 同步机制:调用发出之后不会立即返回,进程会一直询问内核准备好数据没有,但一旦返回,则内核返回最终结果,数据已经从内核内存复制到进程内存了,这种方式没有通知机制.
● 异步机制:调用发出之后,被调用方立即返回消息,但返回的并非最终结果,被调用者通过状态、通知机制来通知调用者,或通过回调函数来处理,这种方式有有通知机制.

阻塞IO && 非阻塞IO

● 阻塞IO:当用户线程发起一个IO请求操作,内核会去查看要读取的数据是否就绪,如果数据没有就绪,则会一直等待,直到数据就绪,当数据就绪之后,便将数据拷贝到用户线程,最后结束整个过程.
● 非阻塞IO:当用户线程发起一个IO请求操作,内核会去查看要读取的数据是否就绪,如果数据没有就绪,如果数据没有就绪,则会返回一个标志信息告知用户线程当前要读的数据没有就绪.
● 阻塞(blocking IO),非阻塞(non-blocking IO)的区别就在于第一个阶段,如果数据没有就绪,在查看数据是否就绪的过程中是一直等待,还是直接返回一个标志信息.

常用异步机制 Select,Poll,Epoll

● select 最早于1983年出现在4.2BSD中,目前所有的平台上都支持,但select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024个文件.
● poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制,它的开销随着文件描述符数量的增加而增大.
● epoll 直到Linux2.6才出现了由内核直接支持的实现方法,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法,理论上边缘触发的性能要更高一些,但是代码实现相当复杂.

◆Select◆

Select 的使用方法是监视指定的文件描述符,并在文件描述符集改变的时候做出响应,在Python标准库中,具体的实现了Select模块中的Select方法,这实际上也是Select系统调用的一个接口.

Select 服务端: 使用Select模块构建并发环境,在单线程下实现多并发的要求.

import socket
import select
import queue

server = socket.socket()
server.bind(("0.0.0.0",9999))
server.listen(100)
server.setblocking(False)

inputs = [server,] #建立的连接请求,server是本机的意思
outputs = []
msg_dic = {} #创建一个空字典,用来存放发送的消息

while True: # ↓ input=接收,output=发送,excep=异常
read,write,excep = select.select(inputs,outputs,inputs) #select负责监听连接请求
#print("当前的连接实例:",read) #打印出当前的连接实例
for ret in read:
if ret is server: #如果ret=server实例,实例本身被访问,说明来了新的连接请求
conn,addr = server.accept()
print("来了一个新连接:",addr)
inputs.append(conn) #将这个新的连接实例,加入到select的监测列表中.
msg_dic[conn] = queue.Queue() #初始化一个队列,后面存要返回给这个客户端的数据
else: #否则准备接收老连接发来的请求
data = ret.recv(1024)
print("收到的数据:",data)
msg_dic[ret].put(data) #放入队列中,等待下次循环
outputs.append(ret) #放入发送列表中,下次循环将发送这条数据

# -----------------------------------------------------------------------
# 针对主机的发送环节.
for w in write: #要返回给客户端的连接列表
data_to_client = msg_dic[w].get() #获取需要发送的数据
w.send(data_to_client) #返回给客户端源数据
outputs.remove(w) #确保下次循环的时候writeable,不返回这个已经处理完的连接了

#-----------------------------------------------------------------------
# 异常主机自动清理环节
for e in excep: #循环检测,如果主机异常断开,则从inputs主机列表中自动移除
if e in outputs: #保证下次select 监测的时候,不在检测这个异常的链接请求
outputs.remove(e)
if e in inputs:
inputs.remove(e)

Select 客户端: 客户端直接绑定服务端的IP地址,只需要简单的实现数据的收发即可.

import socket

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("localhost",9999))

while True:
msg = input("发送数据:").strip()
client.sendall(bytes(msg,encoding="utf-8"))
data = client.recv(1024)
print("收到数据:",data)
client.close()

此处的Socket服务端相比与原生的Socket,他支持当某一个请求不再发送数据时,服务器端不会等待而是可以去处理其他请求的数据.但是,如果每个请求的耗时比较长,select版本的服务器端也无法完成同时操作. 

◆Selectors◆

Selectors 模块,是在Python 3.x以后加入的新模块,其实就是在Select的基础之上进行了更加深入的封装,但是需要注意的是,Selectors模块会根据用户所在的平台的不同来选择性的使用select或者是epoll,对用户来说是透明的,但是还是需要注意这一点,但是此模块如果不做后端开发,一般也很少使用,这里了解即可.

服务端:

import selectors
import socket

self = selectors.DefaultSelector()

def accept(sock,mask):
conn,addr = sock.accept()
print('accepted', conn, 'from', addr,mask)
conn.setblocking(False)
self.register(conn, selectors.EVENT_READ, read) #新连接注册read回调函数

def read(conn, mask):
data = conn.recv(1024) #返回数据
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data)
else:
print('closing', conn)
self.unregister(conn)
conn.close()

sock = socket.socket()
sock.bind(('localhost',6666))
sock.listen(100)
sock.setblocking(False)
self.register(sock,selectors.EVENT_READ,accept)

while True:
events = self.select() #默认阻塞,有活动连接就返回活动的连接列表
for key, mask in events:
callback = key.data #accept
callback(key.fileobj, mask) #key.fileobj= 文件句柄

客户端:

import socket

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("localhost",6666))

while True:
msg = input("发送数据:").strip()
client.sendall(bytes(msg,encoding="utf-8"))
data = client.recv(1024)
print("收到数据:",data)
client.close()

TCP 实现的通信

client

import socket
import sys
import selectors
import types

# 测试类
class Client:
def __init__(self, host, port, numConn):
self.host = host # 待连接的远程主机的域名
self.port = port
self.message = [b'message 1 from client', b'message 2 from client']
self.numConn = numConn
self.selector = selectors.DefaultSelector()

def connet(self): # 连接方法
server_addr = (self.host, self.port)
for i in range(0, self.numConn):
connid = i + 1
print('开始连接', connid, '到', server_addr)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
sock.connect_ex(server_addr)#连接服务端
events = selectors.EVENT_READ | selectors.EVENT_WRITE
data = types.SimpleNamespace(connid=connid,
msg_total=sum(len(m) for m in self.message),
recv_total=0,
messages=list(self.message),
outb=b'')
self.selector.register(sock, events, data=data)

try:
while True:
events = self.selector.select(timeout=1)
if events:
for key, mask in events:
self.service_connection(key, mask)

finally:
self.selector.close()

def service_connection(self,key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024)
if recv_data:
print("收到", repr(recv_data), "来自连接", data.connid)
data.recv_total += len(recv_data)
if not recv_data or data.recv_total == data.msg_total:
print("关闭连接:", data.connid)
self.selector.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if not data.outb and data.messages:
data.outb = data.messages.pop(0)
if data.outb:
print("发送", repr(data.outb), "到连接", data.connid)
sent = sock.send(data.outb) #发送数据
data.outb = data.outb[sent:]#清空数据

if __name__ == '__main__':
cl = Client('127.0.0.1', 8800, 5)
cl.connet()

server

import socket
import sys
import selectors
import types

class server:
def __init__(self,ip,port):
self.port=port
self.ip=ip
self.selector = selectors.DefaultSelector()#初始化selector

def start(self):
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
try:
s.bind((self.ip,self.port))
s.listen()
print('等待连接:',(self.ip,self.port))
s.setblocking(False) # 非阻塞
self.selector.register(s,selectors.EVENT_READ,None)#注册I/O对象
while True:
events = self.selector.select(timeout=None)#阻塞调用,等待新的读/写事件
for key, mask in events:
if key.data is None:#新的连接请求
self.accept_wrapper(key.fileobj)
else:#收到客户端连接发送的数据
self.service_connection(key, mask)
except socket.error as e:
print(e)
sys.exit()
finally:
s.close() #关闭服务端

def accept_wrapper(self,sock):
conn, addr = sock.accept() # Should be ready to read
print('接收客户端连接', addr)
conn.setblocking(False) #非阻塞
data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'')#socket数据
events = selectors.EVENT_READ | selectors.EVENT_WRITE #监听读写
self.selector.register(conn, events, data=data)#注册客户端socket

def service_connection(self,key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024) # 接收数据
if recv_data:
data.outb += recv_data
else:#客户端断开连接
print('关闭连接', data.addr)
self.selector.unregister(sock)#取消注册,防止出错
sock.close()
if mask & selectors.EVENT_WRITE:
if data.outb:
print('发送', repr(data.outb), '到', data.addr)
sent = sock.send(data.outb)
data.outb = data.outb[sent:]

if __name__ == '__main__':
s = server('',8800)
s.start()

lib套接字通信

server

import sys
import selectors
import json
import io
import struct

request_search = {
"morpheus": "Follow the white rabbit. \U0001f430",
"ring": "In the caves beneath the Misty Mountains. \U0001f48d",
"\U0001f436": "\U0001f43e Playing ball! \U0001f3d0",
}


class Message:
def __init__(self, selector, sock, addr):
self.selector = selector
self.sock = sock
self.addr = addr
self._recv_buffer = b""
self._send_buffer = b""
self._jsonheader_len = None
self.jsonheader = None
self.request = None
self.response_created = False

def _set_selector_events_mask(self, mode):
if mode == "r":
events = selectors.EVENT_READ
elif mode == "w":
events = selectors.EVENT_WRITE
elif mode == "rw":
events = selectors.EVENT_READ | selectors.EVENT_WRITE
else:
raise ValueError(f"Invalid events mask mode {repr(mode)}.")
self.selector.modify(self.sock, events, data=self)

def _read(self):
try:
data = self.sock.recv(4096)
except BlockingIOError:
pass
else:
if data:
self._recv_buffer += data
else:
raise RuntimeError("Peer closed.")

def _write(self):
if self._send_buffer:
print("sending", repr(self._send_buffer), "to", self.addr)
try:
sent = self.sock.send(self._send_buffer)
except BlockingIOError:
pass
else:
self._send_buffer = self._send_buffer[sent:]
if sent and not self._send_buffer:
self.close()

def _json_encode(self, obj, encoding):
return json.dumps(obj, ensure_ascii=False).encode(encoding)

def _json_decode(self, json_bytes, encoding):
tiow = io.TextIOWrapper(
io.BytesIO(json_bytes), encoding=encoding, newline=""
)
obj = json.load(tiow)
tiow.close()
return obj

def _create_message(
self, *, content_bytes, content_type, content_encoding
):
jsonheader = {
"byteorder": sys.byteorder,
"content-type": content_type,
"content-encoding": content_encoding,
"content-length": len(content_bytes),
}
jsonheader_bytes = self._json_encode(jsonheader, "utf-8")
message_hdr = struct.pack(">H", len(jsonheader_bytes))
message = message_hdr + jsonheader_bytes + content_bytes
return message

def _create_response_json_content(self):
action = self.request.get("action")
if action == "search":
query = self.request.get("value")
answer = request_search.get(query) or f'No match for "{query}".'
content = {"result": answer}
else:
content = {"result": f'Error: invalid action "{action}".'}
content_encoding = "utf-8"
response = {
"content_bytes": self._json_encode(content, content_encoding),
"content_type": "text/json",
"content_encoding": content_encoding,
}
return response

def _create_response_binary_content(self):
response = {
"content_bytes": b"First 10 bytes of request: "
+ self.request[:10],
"content_type": "binary/custom-server-binary-type",
"content_encoding": "binary",
}
return response

def process_events(self, mask):
if mask & selectors.EVENT_READ:
self.read()
if mask & selectors.EVENT_WRITE:
self.write()

def read(self):
self._read()

if self._jsonheader_len is None:
self.process_protoheader()

if self._jsonheader_len is not None:
if self.jsonheader is None:
self.process_jsonheader()

if self.jsonheader:
if self.request is None:
self.process_request()

def write(self):
if self.request:
if not self.response_created:
self.create_response()

self._write()

def close(self):
print("closing connection to", self.addr)
try:
self.selector.unregister(self.sock)
except Exception as e:
print(
f"error: selector.unregister() exception for",
f"{self.addr}: {repr(e)}",
)

try:
self.sock.close()
except OSError as e:
print(
f"error: socket.close() exception for",
f"{self.addr}: {repr(e)}",
)
finally:
self.sock = None

def process_protoheader(self):
hdrlen = 2
if len(self._recv_buffer) >= hdrlen:
self._jsonheader_len = struct.unpack(
">H", self._recv_buffer[:hdrlen]
)[0]
self._recv_buffer = self._recv_buffer[hdrlen:]

def process_jsonheader(self):
hdrlen = self._jsonheader_len
if len(self._recv_buffer) >= hdrlen:
self.jsonheader = self._json_decode(
self._recv_buffer[:hdrlen], "utf-8"
)
self._recv_buffer = self._recv_buffer[hdrlen:]
for reqhdr in (
"byteorder",
"content-length",
"content-type",
"content-encoding",
):
if reqhdr not in self.jsonheader:
raise ValueError(f'Missing required header "{reqhdr}".')

def process_request(self):
content_len = self.jsonheader["content-length"]
if not len(self._recv_buffer) >= content_len:
return
data = self._recv_buffer[:content_len]
self._recv_buffer = self._recv_buffer[content_len:]
if self.jsonheader["content-type"] == "text/json":
encoding = self.jsonheader["content-encoding"]
self.request = self._json_decode(data, encoding)
print("received request", repr(self.request), "from", self.addr)
else:
self.request = data
print(
f'received {self.jsonheader["content-type"]} request from',
self.addr,
)
self._set_selector_events_mask("w")

def create_response(self):
if self.jsonheader["content-type"] == "text/json":
response = self._create_response_json_content()
else:
response = self._create_response_binary_content()
message = self._create_message(**response)
self.response_created = True
self._send_buffer += message

client

import sys
import selectors
import json
import io
import struct


class Message:
def __init__(self, selector, sock, addr, request):
self.selector = selector
self.sock = sock
self.addr = addr
self.request = request
self._recv_buffer = b""
self._send_buffer = b""
self._request_queued = False
self._jsonheader_len = None
self.jsonheader = None
self.response = None

def _set_selector_events_mask(self, mode):
if mode == "r":
events = selectors.EVENT_READ
elif mode == "w":
events = selectors.EVENT_WRITE
elif mode == "rw":
events = selectors.EVENT_READ | selectors.EVENT_WRITE
else:
raise ValueError(f"Invalid events mask mode {repr(mode)}.")
self.selector.modify(self.sock, events, data=self)

def _read(self):
try:
data = self.sock.recv(4096)
except BlockingIOError:
pass
else:
if data:
self._recv_buffer += data
else:
raise RuntimeError("Peer closed.")

def _write(self):
if self._send_buffer:
print("sending", repr(self._send_buffer), "to", self.addr)
try:
sent = self.sock.send(self._send_buffer)
except BlockingIOError:
pass
else:
self._send_buffer = self._send_buffer[sent:]

def _json_encode(self, obj, encoding):
return json.dumps(obj, ensure_ascii=False).encode(encoding)

def _json_decode(self, json_bytes, encoding):
tiow = io.TextIOWrapper(
io.BytesIO(json_bytes), encoding=encoding, newline=""
)
obj = json.load(tiow)
tiow.close()
return obj

def _create_message(
self, *, content_bytes, content_type, content_encoding
):
jsonheader = {
"byteorder": sys.byteorder,
"content-type": content_type,
"content-encoding": content_encoding,
"content-length": len(content_bytes),
}
jsonheader_bytes = self._json_encode(jsonheader, "utf-8")
message_hdr = struct.pack(">H", len(jsonheader_bytes))
message = message_hdr + jsonheader_bytes + content_bytes
return message

def _process_response_json_content(self):
content = self.response
result = content.get("result")
print(f"got result: {result}")

def _process_response_binary_content(self):
content = self.response
print(f"got response: {repr(content)}")

def process_events(self, mask):
if mask & selectors.EVENT_READ:
self.read()
if mask & selectors.EVENT_WRITE:
self.write()

def read(self):
self._read()

if self._jsonheader_len is None:
self.process_protoheader()

if self._jsonheader_len is not None:
if self.jsonheader is None:
self.process_jsonheader()

if self.jsonheader:
if self.response is None:
self.process_response()

def write(self):
if not self._request_queued:
self.queue_request()

self._write()

if self._request_queued:
if not self._send_buffer:
self._set_selector_events_mask("r")

def close(self):
print("closing connection to", self.addr)
try:
self.selector.unregister(self.sock)
except Exception as e:
print(
f"error: selector.unregister() exception for",
f"{self.addr}: {repr(e)}",
)

try:
self.sock.close()
except OSError as e:
print(
f"error: socket.close() exception for",
f"{self.addr}: {repr(e)}",
)
finally:
self.sock = None

def queue_request(self):
content = self.request["content"]
content_type = self.request["type"]
content_encoding = self.request["encoding"]
if content_type == "text/json":
req = {
"content_bytes": self._json_encode(content, content_encoding),
"content_type": content_type,
"content_encoding": content_encoding,
}
else:
req = {
"content_bytes": content,
"content_type": content_type,
"content_encoding": content_encoding,
}
message = self._create_message(**req)
self._send_buffer += message
self._request_queued = True

def process_protoheader(self):
hdrlen = 2
if len(self._recv_buffer) >= hdrlen:
self._jsonheader_len = struct.unpack(
">H", self._recv_buffer[:hdrlen]
)[0]
self._recv_buffer = self._recv_buffer[hdrlen:]

def process_jsonheader(self):
hdrlen = self._jsonheader_len
if len(self._recv_buffer) >= hdrlen:
self.jsonheader = self._json_decode(
self._recv_buffer[:hdrlen], "utf-8"
)
self._recv_buffer = self._recv_buffer[hdrlen:]
for reqhdr in (
"byteorder",
"content-length",
"content-type",
"content-encoding",
):
if reqhdr not in self.jsonheader:
raise ValueError(f'Missing required header "{reqhdr}".')

def process_response(self):
content_len = self.jsonheader["content-length"]
if not len(self._recv_buffer) >= content_len:
return
data = self._recv_buffer[:content_len]
self._recv_buffer = self._recv_buffer[content_len:]
if self.jsonheader["content-type"] == "text/json":
encoding = self.jsonheader["content-encoding"]
self.response = self._json_decode(data, encoding)
print("received response", repr(self.response), "from", self.addr)
self._process_response_json_content()
else:
self.response = data
print(
f'received {self.jsonheader["content-type"]} response from',
self.addr,
)
self._process_response_binary_content()
self.close()

未完待续

server

import os,time
import socket
import threading

sock = ["0", "0"]

def client_hander(socket,addr):
try:
while True:
if sock[0] != "0" and sock[1] != "0" and sock[0] == addr[0]:
aaa = socket.recv(4096)
print(aaa)
while True:
bb = input()
socket.send(bytes(bb,encoding="utf-8"))
except Exception:
socket.close()
pass

if __name__ == "__main__":
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("0.0.0.0", 9999))
server.listen(10)
while True:
client_socket, addr = server.accept()
client_thread = threading.Thread(target=client_hander, args=(client_socket,addr,))
client_thread.start()
ip = input("aa")
sock[0] = ip
sock[1]="1"

client

import os,socket

client = socket.socket()
client.connect(("192.168.1.2",9999))
client.send(bytes("ipconfig\r\n",encoding="utf-8"))

while True:
data = client.recv(1024)
print(data)