百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

自动化漏洞猎人代码分析_自动化漏洞检测

suiw9 2025-02-17 14:04 25 浏览 0 评论

0x00 前言

安全人员可以扫描,网络上悬赏网站等的漏洞,如果能够发现其存在着安全漏洞,则可以通过提交漏洞的方式来获得一定的赏金,国外的这类悬赏的网站比较多,比如hackone,这上面列出了大量的资产信息,白帽子们可以分析这些资产,发现漏洞来获取赏金。hackone上截至到2020年6月,已经有六名白帽子获得了百万奖金。

0x01 大概了解

hackone上的资产个数是非常多的,如果人工分析起来,不光累,而且效率还低,不靠谱,所以就有了很多自动化的工具。网上也有不少开源的,找了几个研究了下,大概的流程都差不多,无非是利用各种开源工具的组合来完成漏洞的探测工作。

一般的流程就是几步:

  1. 信息收集 : 收集资产信息、详细收集域名和服务端口等。
  2. 漏洞探测 Fuzz: 即用xray等工具扫描资产是否存在漏洞;
  3. 提醒功能 : 如果自动探测到了漏洞,要提醒我们记得提交,更灵活的工具,可能包含自动提交。

我主要研究的工具是AUTO-EARN :

  1. 这个工具比较简单,但是却五脏俱全,除了hackone上采集域名没有外,其他的都有,而且这个框架比较灵活,方便各个部分的工具的升级。
  2. 界面也很酷,终端界面+一个网页统计信息展示



扫描样例

0x03 执行方法

1.执行顺序:sh start.sh --> python3 autoearn.py --> sh stop.sh 2. start.sh 即:

Bash
chmod +x ./tools/crawlergo
chmod +x ./tools/xray/xray_linux_amd64 
nohup python3 server.py > logs/server.log 2>&1 &
nohup ./tools/xray/xray_linux_amd64 webscan --listen 127.0.0.1:7777 --webhook-output http://127.0.0.1:2333/webhook > logs/xray.log 2>&1 &
nohup python3 subdomain_monitor.py > logs/subdomain_monitor.log 2>&1 &

1、前面两个增加可执行权限就不说了,看看后面,启动server.py 来获取通知等信息。

2、我们把xray启动起来,并且开启代理端口,等爬虫将爬取的网页送过来扫描;

3、开启子域名执行情况的检测程序;

4、subdomain_monitor.py 检查子域名扫描结果,将数据保存到sqlite表中。

  1. python3 autoearn.py输入1 即进行子域名扫描;
  2. 要等到子域名扫描结束,再输入2进行端口检测、完成后输入3进行waf检测(可选)
  3. 最后输入5 进行爬虫爬取网页后输入到xray进行漏洞探测、探测到漏洞后会发通知。

0x04 流程分析

开源地址已经讲代码讲的非常详细了,昨天看了一天,基本上懂点python的就可以看的懂,感谢作者这么用心,整个框架利用众多安全处理工具,如下图:




工具介绍:

Bash
信息收集:
------------
1. OneForAll:功能强大的子域名收集工具,可以根据域名获取所有子域名信息,也算是个集合工具;
利用证书透明度收集子域名、利用爬虫收集域名、利用DNS收集子域名、利用威胁情报收集子域名、利用搜索引擎来收集子域名;
2. Shodan是个搜索引擎网站,这里面利用它来搜索IP开放的端口信息;
3. masscan+nmap都是用来探测IP开放端口的,前者速度更快,后者可以发现服务名;
4. wafw00f 探测waf指纹的工具,如果有waf,我们忽略这个目标。

Fuzz
--------------
1. crawlergo 作为爬虫爬取相关链接;
2. xray: 长亭开发的免费的安全检测模块,可以进行xss漏洞、SQL注入、命令注入、目录枚举、文件上传等;
3. Server 酱: 这个工具挺有意思,可以免费进行微信通知,免费版本一天最多通知五次;
4. 利用flask框架做个简单的展示页面,利用Echarts显示统计报表信息;
5. sqlite3 这个就是一个文件的简单DB。

处理的数据流:



4.1 子域名收集

步骤说明:

  1. target.txt 里面每行保存一行域名信息,可以简单的改成从网站采集后处理。
  2. 第一个执行的插件是OneForALL,作用就是探测target中的子域名信息; 这里面是通过autoearn.py中的命令1来实现的,调用代码:
 subdomain_collect.oneforall_collect(config.target_file_path)

由于获取子域名是个非常耗时的操作,启动的是后台进程再查看:

def oneforall_collect(target):
    cmd = 'nohup python3 ' + config.oneforall_path + ' --target ' + target + ' run > logs/oneforall.log 2>&1 &'
    try:
     rsp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
     console.print('正在后台进行子域收集', style="#ADFF2F")
    except:
        console.print('子域收集失败,请检查输入格式', style="bold red")

收集子域名放在后台后,我们需要知道什么时候执行完毕,可以通过tail -f logs/oneforall.log 可以查看执行的日志,收集完子域名后,将结果放入到OneForAll下面的reuslt下面的一个sqlite数据库里面:

如果收集的域名为:example.com 则:
example_com_origin_result        表存放每个模块最初子域收集结果。
example_com_resolve_result      表存放对子域进行解析后的结果。
example_com_last_result            表存放上一次子域收集结果(需要收集两次以上才会生成)。
example_com_now_result            表存放现在子域收集结果,一般情况关注这张表就可以了

观测到的日志信息:

04:51:14,588 [ALERT] utils:252 - GET http://114.55.181.28/check_web/databaseInfo_mainSearch.action?isSearch=true&searchType=url&term=5nine.com&pageNo=1 404 - Not Found 5042
04:51:14,589 [INFOR] module:65 - The WZPCQuery module took 1.1 seconds found 0 subdomains
04:51:14,613 [ALERT] utils:252 - GET https://searchdns.netcraft.com/ 403 - Forbidden 7540
04:51:14,617 [INFOR] module:65 - The NetCraftQuery module took 1.2 seconds found 0 subdomains
04:51:14,861 [ERROR] module:118 - (MaxRetryError('HTTPSConnectionPool(host=\'www.search.ask.com\', port=443): Max retries exceeded with url: /web?q=site%3A.5nineservice.demo.5nine.com&page=1 (Caused by SSLError(SSLError("bad handshake: SysCallError(104, \'ECONNRESET\')")))'),)
04:51:14,862 [INFOR] module:65 - The AskSearch module took 1.3 seconds found 0 subdomains
OneForAll is a powerful subdomain integration tool
             ___             _ _ 
 ___ ___ ___|  _|___ ___ ___| | | {v0.3.0 #dev}
| . |   | -_|  _| . |  _| .'| | | 
|___|_|_|___|_| |___|_| |__,|_|_| git.io/fjHT1

OneForAll is under development, please update before each use!

[*] Starting OneForAll @ 2023-06-24 06:24:41

07:13:05,615 [INFOR] oneforall:249 - Finished OneForAll


检测程序,不光会检测子域名是否收集完成,还会将其插入到sqlite表中,进行后续的流程,核心代码在通知的server_push.py中:


# 子域收集状态提醒
def subdomain_status_push():
    console.log('子域收集完成')
    sql_connect.task_sql_check()
    sql_connect.subdomain_sql_check()
    sql_connect.vuln_sql_check()
    sql_connect.insert_subdomain_sql(sql_connect.oneforall_results_sql())
    subdomain_num = len(sql_connect.read_subdomain_sql())
    content = """``` 子域收集结束```
#### 结果:  共收集到了{subdomain_num}个子域
#### 发现时间: {now_time}
""".format(subdomain_num=subdomain_num, now_time=time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()))
    try:
        resp = requests.post(config.sckey,data={"text": "子域收集完成提醒", "desp": content})
    except:
        console.print('子域提醒失败,请检查sckey是否正确配置', style="bold red")

关键代码在于:
sql_connect.insert_subdomain_sql(
sql_connect.oneforall_results_sql())
即读取oneforall的扫描结果插入到一个子域名的表中:

# 读取OneForAll数据库
def oneforall_results_sql():
    url_result = []
    oneforall_conn = sqlite3.connect(config.oneforall_sql_path)
    console.print('OneForAll数据库连接成功',style="#ADFF2F")
    oneforall_c = oneforall_conn.cursor()
    oneforall_cursor = oneforall_c.execute("select name from sqlite_master where type='table' order by name;")
    for table_name in oneforall_cursor.fetchall():
        table_name = table_name[0]
        if 'now' in table_name:
            sql_cmd = "SELECT subdomain from " + table_name
            oneforall_c.execute(sql_cmd)
            for url in oneforall_c.fetchall():
                url = url[0]
                url_result.append(url)
    oneforall_conn.close()
    return url_result

由于oneforall是一个域名建一个表的,我们将表里面的子域名信息都集合起来,然后插入到子域名表中:

# 插入SUBDOMAIN数据库
def insert_subdomain_sql(url_result):
    subdomain_conn = sqlite3.connect(config.result_sql_path)
    console.print('AUTOEARN数据库连接成功',style="#ADFF2F")
    subdomain_c = subdomain_conn.cursor()
    for url in url_result:
        now_time = time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime())
        try:
            subdomain_c.execute("INSERT INTO SUBDOMAIN (URL,SUBDOMAIN_TIME) VALUES ('%s', '%s')"%(url,now_time))
            subdomain_conn.commit()
        except:
            console.print('插入子域数据库失败',style="bold red")
    console.print('插入子域数据库成功',style="#ADFF2F")
    subdomain_conn.close()

4.2 端口扫描

端口检测阶段,端口扫描是通过查询子域名表,即SUBDOMAIN 核心代码是在port_check.py中的这个函数:

def mul_subdomain_port_check(threadName, q):
    url_list = []
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            domain = q.get()
            queueLock.release()
            try:
                if len(check_cdn.check_cdn(domain[1])) == 1:
                    url_list.extend(shodan_port_check(check_cdn.check_cdn(domain[1])[0],domain[1]))
                else:
                    console.print('目标存在CDN', style="bold red")
                    url_list.append('http://'+domain[1])
            except:
                console.print('目标' + domain[1] + '查询异常', style="bold red")
            console.print("%s processing %s" % (threadName, domain[1]), style="#ADFF2F")
        else:
            queueLock.release()
    sql_connect.insert_task_sql(url_list)

在利用shodan检测端口的时候,需要先进行cdn检测,如果域名对应的ip超过1个,则说明含有cdn,直接加入域名进行后续检测,不通过shodan检测。

url_list.extend(shodan_port_check(check_cdn.check_cdn(domain[1])[0],domain[1])) 关于cdn检测,代码不复杂,但是没做过还是不容易想到.即根据利用socket.getaddrinfo来获取ip列表 信息,再保存起来,多个ip就认为有CDN(这里面还可以优化下)。

# 判断CDN函数
def check_cdn(domain):
    ip_list = []
    try:
        console.print('正在进行CDN检测', style="#ADFF2F")
        addrs = socket.getaddrinfo(domain, None, family=0)
        for item in addrs:
            if item[4][0] not in ip_list:
                if item[4][0].count('.') == 3:
                    ip_list.append(item[4][0])
                else:
                    pass
        return ip_list
    except:
        console.print('CDN检测失败,请检查输入格式', style="bold red")
        pass

扫描到端口后进入下一步是进行端口探测,是通过:
sql_connect.insert_task_sql(url_list)
将数据插入到任务表,插入任务表代码如下:

# 插入TASK数据库
def insert_task_sql(url_result):
    task_conn = sqlite3.connect(config.result_sql_path)
    console.print('AUTOEARN数据库连接成功',style="#ADFF2F")
    task_c = task_conn.cursor()
    for url in url_result:
        now_time = time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime())
        try:
            task_c.execute("INSERT INTO TASK (URL,TASK_TIME) VALUES ('%s', '%s')"%(url,now_time))
            task_conn.commit()
        except:
            console.print('插入任务数据库失败',style="bold red")
    console.print('插入任务数据库成功',style="#ADFF2F")
    task_conn.close()

注意这里面有点问题: 问题在于:端口扫描没有执行,只是根据Shodan进行端口扫描,根本没有进行nmap扫描,主要是函数没调用,自己可以改下。改下: url_list.extend(shodan_port_check(check_cdn.check_cdn(domain[1])[0],domain[1]))这句即可。

4.3 WAF指纹识别

waf检测不检测都行,可选步骤,检测代码如下,调用wafw00f进行检测,更新下WAF检测到内容和STATUS为检测完成状态。

import json
import sqlite3
import subprocess
from lib import config
from rich.console import Console


console = Console()


# WAF检测函数
def waf_check(domain_list):
    console.print('正在进行WAF检测',style="#ADFF2F")
    console.print('任务数据库连接成功',style="#ADFF2F")
    conn = sqlite3.connect(config.result_sql_path)
    c = conn.cursor()
    for domain in domain_list:
        domain = domain[1]
        cmd = ['python3', config.wafw00f_path, domain]
        rsp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        for i in (rsp.stdout.read().decode("GBK").split('\n')):
            if 'url' in i:
                url = json.loads(i.replace('\'', '\"'))['url']
                waf = json.loads(i.replace('\'', '\"'))['waf'][0]
                c.execute("UPDATE TASK set WAF = '%s' where URL = '%s' "%(waf, url))
                c.execute("UPDATE TASK set STATUS = 'WAF检测完成' where URL = '%s' "%(url,))
                conn.commit()
        while True:
            if rsp.poll() == None:
                pass
            else:
                break
    console.print('WAF检测完成',style="#ADFF2F")
    conn.close()

4.4 漏洞检测

这步骤是关键步骤,通过爬取网页,然后调用xray进行相关漏洞扫描。调用代码:

    craw_to_xray.craw_to_xray(sql_connect.read_task_sql())

爬虫代码也比较简单,主要是利用crawlergo进行网页的爬取。

crawlergo是一个使用chrome headless模式进行URL收集的浏览器爬虫。它对整个网页的关键位置与DOM渲染阶段进行HOOK,自动进行表单填充并提交,配合智能的JS事件触发,尽可能的收集网站暴露出的入口。内置URL去重模块,过滤掉了大量伪静态URL,对于大型网站仍保持较快的解析与抓取速度,最后得到高质量的请求结果集合。

import sqlite3
import subprocess
from lib import config
from rich.console import Console

console = Console()


# 爬虫爬取并且发送到XRAY
def craw_to_xray(domain_list):
    console.print('正在进行爬虫探测+漏洞检测',style="#ADFF2F")
    console.print('任务数据库连接成功',style="#ADFF2F")
    conn = sqlite3.connect(config.result_sql_path)
    c = conn.cursor()
    for domain in domain_list:
        domain = domain[1]
        # cmd = [config.crawlergo_path, "-c", config.chrome_path,"-t",config.max_tab_count, "-f", "smart", "--fuzz-path", "--push-to-proxy",config.push_to_proxy,  "--push-pool-max", config.max_send_count, domain]
        cmd = config.crawlergo_path + " -c " + config.chrome_path + " -t " + config.max_tab_count + " -f " + " smart " + " --fuzz-path " + " --push-to-proxy " + config.push_to_proxy + " --push-pool-max " + config.max_send_count + " " + domain 
        # rsp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        console.print('即将开启爬虫模块,可通过[bold cyan]tail -f logs/xray.log[/bold cyan]查看进度信息',style="#ADFF2F")
        rsp = subprocess.Popen(cmd, shell=True)
        while True:
            if rsp.poll() == None:
                pass
            else:
                break

采集完成后,通过--push-to-proxy推送给xray 开启的代理:# Xray被动代理地址 push_to_proxy = "http://127.0.0.1:7777"

这是start.sh 里面开启的:

nohup ./tools/xray/xray_linux_amd64 webscan --listen 127.0.0.1:7777 --webhook-output http://127.0.0.1:2333/webhook > logs/xray.log 2>&1 &

注意下webhook,这是xray的扫描结果推送给这个连接:
http://127.0.0.1:2333/webhook
这个代码处理在server.py中如下:

@app.route('/webhook', methods=['POST'])
def xray_webhook():
    vuln = request.json
    # 因为还会收到 https://chaitin.github.io/xray/#/api/statistic 的数据
    if "vuln_class" not in vuln:
        return "ok"
    content = """```xray 发现了新漏洞```
### url: {url}
### 插件: {plugin}
### 漏洞类型: {vuln_class}
### 发现时间: {create_time}
```请及时查看和处理```
""".format(url=vuln["target"]["url"], plugin=vuln["plugin"],
           vuln_class=vuln["vuln_class"] or "Default",
           create_time=str(datetime.datetime.fromtimestamp(vuln["create_time"] / 1000)))
    try:
        push_ftqq(content)
        sql_connect.insert_vuln_sql(vuln)
    except Exception as e:
        logging.exception(e)
    return 'ok'

做了两件事情:1. 将漏洞推送给微信通知;2. 将漏洞信息保存到漏洞库做备份。

0x05 总结下

整个系统来说,功能完备,比较清晰,可以作为基础的框架继续做优化,想直接使用,还是欠缺的。 后续的改造点:

  1. target.txt 的域名通过采集获取,获取有赏金的域名。
  2. 域名每次只扫描新增加的域名,全量扫描太慢了。
  3. 端口探测,要增强下,把nmap和msscan调用起来,现在没有。
  4. 整个程序还处于半自动的,想完全执行起来,还需要建立个循环。
  5. 如果说框架有什么缺点,那就是插件之间数据传递不够标准,有的通过数据库,有的不是,建议改成成统一接口,这样好插拔控制插件,更灵活。

最后如果对这方面有兴趣,还是建议看看官方文档,讲的非常好: 从零开始写自动化漏洞猎人

相关推荐

10款超实用JavaScript音频库(js播放音频代码)

HTML5提供了一种新的音频标签实现和规范用一个简单的HTML对象而无需音频插件来控制音频。这只是一个简单的整合这些新的HTML5音频特征及使用JavaScript来创建各种播放控制。下面将介绍10款...

Howler.js,一款神奇的 JavaScript 开源网络音频工具库

o...

PROFINET转Modbus网关——工业协议融合的智能枢纽

三格电子SG-PNh750-MOD-221,无缝连接Profinet与Modbus,赋能工业物联产品概述...

简单实用的Modbus类库,支持从站和DTU

一、简介...

[西门子PLC] S7-200 SMART PROFINET :通过GSD组态PLC设备

从S7-200SMARTV2.5版本开始,S7-200SMART开始支持做PROFINETIO通信的智能设备。从而,两个S7-200SMART之间可以进行PROFINETI...

Modbus(RTU / TCP)有什么异同(modbus tcp和tcp)

Modbus是一种广泛使用的工业自动化通信协议,它支持设备之间的数据交换。Modbus协议有两个主要的变体:ModbusRTU(二进制模式)和ModbusTCP(基于TCP/IP网络的模式)。尽管...

Modbus通信调试步骤详解(modbus调试工具怎么用)

Modbus通信调试步骤详解  Modbus通信分为串口和以太网,无论是串口还是以太网,只要是标准Modbus,就可以用Modbus模拟器进行调试。按以下几步进行调试。...

理解Intel手册汇编指令(intel 汇编指令手册)

指令格式...

「西门子PLC」S7-200 SMART的Modbus RTU通讯

S7-200SMART集成的RS485端口(端口0)以及SBCM01RS485/232信号板(端口1)两个通信端口可以同时做MODBUSRTU主站,或者一个做MODBUSRTU主站一个做MO...

InfiniBand网络运维全指南:从驱动安装到故障排查

一、InfiniBand网络概述InfiniBand(直译为“无限带宽”技术,缩写为IB)是一种用于高性能计算的计算机网络通信标准,具有极高的吞吐量和极低的延迟,用于计算机与计算机之间的数据互连。它...

一加回归 OPPO,背后的秘密不可告人

有这样一个手机品牌,它诞生于互联网品牌。在大众群体看来,它的身世似乎模糊不清,许多人以为它是国外品牌。它的产品定位是极客群体,深受国内发烧友,甚至国外极客玩家喜爱。...

[西门子PLC] S7-200SMART快速高效的完成Modbus通信程序的设计

一、导读Modbus通信是一种被广泛应用的通信协议,在变频器、智能仪表还有其他一些智能设备上都能见到它的身影。本文呢,就把S7-200SMART系列PLC当作Modbus主站,把...

狂肝10个月手搓GPU,他们在我的世界中玩起我的世界,梦想成真

梦晨衡宇萧箫发自凹非寺量子位|公众号QbitAI自从有人在《我的世界》里用红石电路造出CPU,就流传着一个梗:...

[西门子PLC] 博途TIA portal SCL编程基础入门:1-点动与自锁

一、S7-SCL编程语言简介...

工作原理系列之:Modbus(modbus工作过程)

MODBUS是一种在自动化工业中广泛应用的高速串行通信协议。该协议是由Modion公司(现在由施耐德电气公司获得)于1979年为自己的可编程逻辑控制器开发的。该协议充当了PLCS和智能自动化设备之间的...

取消回复欢迎 发表评论: