Python抓取并解码原始数据包

应用Python支持的混杂模式,抓取流经网卡的数据包,并对IP以及ICMP数据包进行拆包,打印出我们所需要的字段信息。

抓取原始数据包: Python中默认的Socket模块就可以实现对原始数据包的解包操作,如下代码.

需要注意这段代码只能在Windows平台使用,因为我们需要开启网卡的IOCTL混杂模式,这是Win平台特有的.

import socket
import uuid

# 获取本机MAC地址
def GetHostMAC():
mac=uuid.UUID(int = uuid.getnode()).hex[-12:]
this_mac = ":".join([mac[e:e+2] for e in range(0,11,2)])
this_name = socket.getfqdn(socket.gethostname())
print("本机名: {} --> 本机MAC: {}".format(this_name,this_mac))

# 获取本机IP地址
def GetHostAddress():
try:
sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.connect(('8.8.8.8',80))
address =sock.getsockname()[0]
finally:
return address
address.close()

# 开始跟踪原始数据包
def SnifferIOSock(address):
# 创建原始套接字,然后绑定在公开接口上
socket_protocol = socket.IPPROTO_IP

# 开启原始数据包模式
sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
sniffer.bind((address,0))

# 设置在捕获的数据包中包含IP头
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

# Windows平台需要设置IOCTL以启动混杂模式
sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

# 读取单个数据包
while True:
print(sniffer.recvfrom(65565))

# 退出时,关闭混杂模式
sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
GetHostMAC()
address = GetHostAddress()
print("本机IP: {}".format(address))
SnifferIOSock(address)

解码IP数据包头: 解码方法同样运用的是上方的方法,只不过这里我们需要找到完整的IP地址的包头封装格式,然后根据特定的包头格式对数据包进行解包操作即可.

import socket
import os
import struct
from ctypes import *

# 定义IP头部结构体
class IP(Structure):
_fields_ = [
("ihl", c_ubyte, 4),
("version", c_ubyte, 4),
("tos", c_ubyte),
("len", c_ushort),
("id", c_ushort),
("offset", c_ushort),
("ttl", c_ubyte),
("protocol_num", c_ubyte),
("sum", c_ushort),
("src", c_ulong), # linux 需要变为 c_uint32
("dst", c_ulong) # linux 需要变为 c_uint32
]
def __new__(self,socket_buffer=None):
return self.from_buffer_copy(socket_buffer)

def __init__(self, socket_buffer=None):
# 定义协议序号与名称对应关系
self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

# 将数据包解包为地址
self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
self.this_ttl = self.ttl

# self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))
# self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))

# 协议类型
try:
self.protocol = self.protocol_map[self.protocol_num]
except:
self.protocol = str(self.protocol_num)

# 获取本机IP地址
def GetHostAddress():
try:
sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.connect(('8.8.8.8',80))
address =sock.getsockname()[0]
finally:
return address
address.close()

# 执行解包过程,并输出
def SnifferIPAddress(address):

# 平台选择,nt代表Windows
if os.name == "nt":
socket_protocol = socket.IPPROTO_IP
else:
socket_protocol = socket.IPPROTO_ICMP

# 开启原始套接字模式
sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
sniffer.bind((address,0))
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

if os.name == "nt":
sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

# 循环接受数据包并解包
try:
while True:
# 读取数据包
raw_buffer = sniffer.recvfrom(65565)[0]

# 将缓冲区的前20个字节按IP头进行解析
ip_header = IP(raw_buffer[0:20])

# 输出协议和通信双方IP地址
print("传输协议: {} --> 原地址: {} --> 传输到: {} --> 生存周期: {}".format(ip_header.protocol,ip_header.src_address,ip_header.dst_address,ip_header.this_ttl))

# 如果按下Ctrl+C则退出
except KeyboardInterrupt:
# 关闭混杂模式
if os.name == "nt":
sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
address = GetHostAddress()
SnifferIPAddress(address)

解码ICMP数据包头: 原理与解包IP头相同,但需要注意,由于ICMP头在IP头的下方,所以我们需要先解析出IP头数据包,然后根据IP头中的protocol_num判断如果是ICMP则将其传入ICMP结构做进一步解包即可.

import socket
import os
import struct
from ctypes import *

# 定义IP头部结构体
class IP(Structure):
_fields_ = [
("ihl", c_ubyte, 4),
("version", c_ubyte, 4),
("tos", c_ubyte),
("len", c_ushort),
("id", c_ushort),
("offset", c_ushort),
("ttl", c_ubyte),
("protocol_num", c_ubyte),
("sum", c_ushort),
("src", c_ulong), # linux 需要变为 c_uint32
("dst", c_ulong) # linux 需要变为 c_uint32
]
def __new__(self,socket_buffer=None):
return self.from_buffer_copy(socket_buffer)

def __init__(self, socket_buffer=None):
# 定义协议序号与名称对应关系
self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

# 将数据包解包为地址
self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
self.this_ttl = self.ttl

# self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))
# self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))

# 协议类型
try:
self.protocol = self.protocol_map[self.protocol_num]
except:
self.protocol = str(self.protocol_num)

# 定义ICMP结构包头
class ICMP(Structure):
_fields_ = [
("type", c_ubyte),
("code", c_ubyte),
("checksum", c_ushort),
("unused", c_ushort),
("next_hop_mtu", c_ushort)
]

def __new__(self,socket_buffer=None):
return self.from_buffer_copy(socket_buffer)

def __init__(self, socket_buffer=None):
self.icmp_type = self.type
self.icmp_code = self.code
self.icmp_checksum = self.checksum


# 获取本机IP地址
def GetHostAddress():
try:
sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.connect(('8.8.8.8',80))
address =sock.getsockname()[0]
finally:
return address
address.close()

# 执行解包过程,并输出
def SnifferIPAddress(address):

# 平台选择,nt代表Windows
if os.name == "nt":
socket_protocol = socket.IPPROTO_IP
else:
socket_protocol = socket.IPPROTO_ICMP

# 开启原始套接字模式
sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
sniffer.bind((address,0))
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

if os.name == "nt":
sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

# 循环接受数据包并解包
try:
while True:
# 读取数据包
raw_buffer = sniffer.recvfrom(65565)[0]

# 将缓冲区的前20个字节按IP头进行解析
ip_header = IP(raw_buffer[0:20])

# 判断协议类型是否为ICMP
if ip_header.protocol == "ICMP":

# 计算ICMP包的起始位置
offset = ip_header.ihl * 4
buf = raw_buffer[offset:offset + sizeof(ICMP)]

#解析ICMP数据
icmp_header = ICMP(buf)
print("原地址: {} --> 发送到: {} --> 解包协议: {} --> 解包代码: {} --> 校验和: {}".format(ip_header.src_address, ip_header.dst_address, icmp_header.icmp_type,icmp_header.icmp_code,icmp_header.icmp_checksum))

# 如果按下Ctrl+C则退出
except KeyboardInterrupt:
# 关闭混杂模式
if os.name == "nt":
sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
address = GetHostAddress()
SnifferIPAddress(address)

我们运行上方的代码,然后在本机Ping测试一下www.lyshark.com此时即可看到,本机与对端的来往数据包.