吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1550|回复: 12
收起左侧

[Python 原创] 腾讯云轻量应用服务器CLI防火墙管理工具

[复制链接]
zunmx 发表于 2024-5-22 17:30
本帖最后由 zunmx 于 2024-5-22 17:39 编辑

项目介绍

个人维护的腾讯云清量应用服务器,对于安全性来说,只能自己来维护,比如说SSH,或是说RDP服务,如果实在运维阶段,不得不开启,但是服务正常运行,就不需要对外开放,每次维护都需要登录到云厂商的控制台进行修改,当然会有大佬会使用API,或是说封装这个工具,我这里提供一个自己也在使用的工具

项目架构

主体部分

python:3.x

插件(包)

无额外模块

代码

注意: secret_id,secret_key 需要自行申请:https://console.cloud.tencent.com/cam/capi
region:区域
InstanceId:实例ID
区域两个可以在https://console.cloud.tencent.com/api/explorer?Product=lighthouse&Version=2020-03-24&Action=CreateFirewallRules 地址进行查阅,需要后面英文部分, 如ap-beijing


image.png

InstanceId: 在这里
image.png


import requests
import hashlib, hmac
import time
from datetime import datetime
import json

secret_id = "*******"
secret_key = "*******"
service = "lighthouse"
host = "lighthouse.tencentcloudapi.com"
endpoint = "https://" + host
region = "*******"
version = "2017-03-12"
algorithm = "TC3-HMAC-SHA256"
InstanceId = "*******"

def generate_auth(params):
    timestamp = int(time.time())
    date = datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d")
    # ************* 步骤 1:拼接规范请求串 *************
    http_request_method = "POST"
    canonical_uri = "/"
    canonical_querystring = ""
    ct = "application/json; charset=utf-8"
    payload = json.dumps(params)
    canonical_headers = "content-type:%s\nhost:%s\n" % (ct, host)
    signed_headers = "content-type;host"
    hashed_request_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest()
    canonical_request = (http_request_method + "\n" +
                         canonical_uri + "\n" +
                         canonical_querystring + "\n" +
                         canonical_headers + "\n" +
                         signed_headers + "\n" +
                         hashed_request_payload)

    # ************* 步骤 2:拼接待签名字符串 *************
    credential_scope = date + "/" + service + "/" + "tc3_request"
    hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
    string_to_sign = (algorithm + "\n" +
                      str(timestamp) + "\n" +
                      credential_scope + "\n" +
                      hashed_canonical_request)

    # ************* 步骤 3:计算签名 *************
    # 计算签名摘要函数
    def sign(key, msg):
        return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()

    secret_date = sign(("TC3" + secret_key).encode("utf-8"), date)
    secret_service = sign(secret_date, service)
    secret_signing = sign(secret_service, "tc3_request")
    signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()

    # ************* 步骤 4:拼接 Authorization *************
    authorization = (algorithm + " " +
                     "Credential=" + secret_id + "/" + credential_scope + ", " +
                     "SignedHeaders=" + signed_headers + ", " +
                     "Signature=" + signature)
    return authorization

def block(port, desc):
    param = {
        "InstanceId": InstanceId,
        "FirewallRules": [
            {
                "Protocol": "TCP",
                "Port": port,
                "CidrBlock": '0.0.0.0/0',
                "Action": "DROP",
                "FirewallRuleDescription": desc
            }
        ],
    }
    post = requests.post(endpoint, headers={"Content-Type": "application/json; charset=utf-8",
                                            "Authorization": generate_auth(param), "Host": host,
                                            "X-TC-Action": "CreateFirewallRules", "X-TC-Version": "2020-03-24",
                                            "X-TC-Timestamp": str(int(time.time())), "X-TC-Region": region},
                         data=json.dumps(param))
    return post

def delBlock(port):
    param = {
        "InstanceId": InstanceId,
        "FirewallRules": [
            {
                "Protocol": "TCP",
                "Port": port,
                "CidrBlock": "0.0.0.0/0",
                "Action": "DROP",
            }
        ],
    }
    post = requests.post(endpoint, headers={"Content-Type": "application/json; charset=utf-8",
                                            "Authorization": generate_auth(param), "Host": host,
                                            "X-TC-Action": "DeleteFirewallRules", "X-TC-Version": "2020-03-24",
                                            "X-TC-Timestamp": str(int(time.time())), "X-TC-Region": region},
                         data=json.dumps(param))
    return post

def getAllBlock(remark):
    rules = []
    param = {
        "InstanceId": InstanceId,
        "Offset": 0,
        "Limit": 100
    }
    while True:

        post = requests.post(endpoint, headers={"Content-Type": "application/json; charset=utf-8",
                                                "Authorization": generate_auth(param), "Host": host,
                                                "X-TC-Action": "DescribeFirewallRules", "X-TC-Version": "2020-03-24",
                                                "X-TC-Timestamp": str(int(time.time())), "X-TC-Region": region},
                             data=json.dumps(param))
        post_json = post.json()
        for i in post_json['Response']['FirewallRuleSet']:
            if str(i['FirewallRuleDescription']).startswith(remark):
                rules.append(i)
        if param['Offset'] > post_json['Response']['TotalCount']:
            break
        else:
            param['Offset'] += param['Limit']
    return rules

def deal(text, port, oper):
    if text == 'unknown':
        return "[×] 端口未定义在程序中"
    text = text.text
    print(text)
    if text.find("have already existed.") > -1:
        return f"[×] 已经存在此[{oper}]规则了。"
    elif text.find("are not found.") > -1:
        return "[×] 不存在此规则。"
    else:
        return f"[√] 端口 [{port}] [{oper}]成功"

def cmdhelp():
    print(f"[i] 腾讯云私人防火墙配置终端")
    print(f"[i] 腾讯云实例ID:{InstanceId}, 地域号:{region}")
    print("  [>] 参考命令:block/allow [port/sName] 阻塞/放行 访问端口")
    print("  [>] 参考命令:show 显示服务端口被封禁情况")
    print("  [>] 参考命令:custom 显示当前IP被封禁情况")
    print("  [>] 参考命令:help 显示帮助")
    print("  [>] 参考命令:exit/quit 退出服务")

if __name__ == "__main__":
    cmdhelp()
    print("[#] 当前被阻塞的服务如下所示")
    rules = getAllBlock("服务-")
    for i in rules:
        print(f"[#] {i}")
    while True:
        s = input("[$] 阁下的意图是? ")
        s = s.strip().lower()
        if s.startswith("block"):
            s = s[5:].strip()
            if s == "3306" or s == 'mysql':
                p = block('3306', "服务-MySQL")
            elif s == "6379" or s == 'redis':
                p = block('6379', "服务-Redis")
            elif s == "3389" or s == 'rdp':
                p = block('3389', "服务-RDP")
            else:
                p = "unknown"
            print("  " + deal(p, s, '阻塞'))
        elif s.startswith("allow"):
            s = s[5:].strip()
            if s == "3306" or s == 'mysql':
                p = delBlock('3306')
            elif s == "6379" or s == 'redis':
                p = delBlock('6379')
            elif s == "3389" or s == 'rdp':
                p = delBlock('3389')
            else:
                p = "unknown"
            print("  " + deal(p, s, '放行'))
        elif s == 'show':
            print("[#] 当前被阻塞的服务如下所示")
            rules = getAllBlock("服务-")
            for i in rules:
                print(f"[#] {i}")
        elif s == 'exit' or s == 'quit':
            exit(0)
        elif s == 'help':
            cmdhelp()
        elif s == 'custom':
            print("[#] 当前被阻塞的ip如下所示")
            rules = getAllBlock("个性化")
            for i in rules:
                print(f"[#] {i}")
        else:
            print("  [X] 未定义的操作符")

使用截图


image.png

如果板块放置不对,感谢审核调整,上一个帖子感谢hmily调整板块


免费评分

参与人数 3吾爱币 +10 热心值 +3 收起 理由
苏紫方璇 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
junjia215 + 1 + 1 用心讨论,共获提升!
柒呀柒 + 2 + 1 谢谢@Thanks!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

 楼主| zunmx 发表于 2024-5-22 22:22
忘记custom命令的解释了,因为我自己搭建了一个蜜罐系统,触发危险策略的话,就会封禁那个IP,封禁的原理也是通过云厂商的API添加防火墙规则,所以custom实际上是查看蜜罐封禁的IP,蜜罐系统通过记录的时间,对IP进行指定时间的封锁,到期后是会解封的。因为云厂商的防火墙规则是有最高数量限制的,如果您有兴趣,也可自己搭建一个蜜罐系统,触发防火墙规则,如果不需要,可以删除那部分代码。
 楼主| zunmx 发表于 2024-5-23 10:13
conancheng 发表于 2024-5-23 09:49
感谢分享,不过个人还是习惯装个宝塔免费版,就完事了

上次用宝塔不记得多久了,通过系统级防火墙和云厂商的防火墙是存在一定的去别的。
比如说一个端口的请求会先经过云厂商的防火墙,再到主机的防火墙,如果通过云厂商的防火墙进行设置,那么请求就到不了主机,一定程度上也降低了服务器的负载。

比如说一个敏感端口,如ssh端口,只需要在运维的时候进行访问,如果不需要,就可以阻塞,这样也提高了服务器的安全性。
柒呀柒 发表于 2024-5-22 17:54
谢谢分享, 我每次都是扫码登录去放行端口,现在赶紧抄作业了。
 楼主| zunmx 发表于 2024-5-22 18:01
柒呀柒 发表于 2024-5-22 17:54
谢谢分享, 我每次都是扫码登录去放行端口,现在赶紧抄作业了。

这个代码写了好久了,其实有些是可以优化的,
实际上delBlock是删除防火墙规则
block是添加防火墙规则

样例中输入blockrdp和block3389是相同效果,其实这里写的有些冗余了,可以封装一下的。
小三丶 发表于 2024-5-22 21:43
学到了,谢谢分享
conancheng 发表于 2024-5-23 09:49
感谢分享,不过个人还是习惯装个宝塔免费版,就完事了
conancheng 发表于 2024-5-23 12:02
zunmx 发表于 2024-5-23 10:13
上次用宝塔不记得多久了,通过系统级防火墙和云厂商的防火墙是存在一定的去别的。
比如说一个端口的请求 ...

宝塔支持系统级防火墙配置和云厂商防火墙一键放行,你可以试试,挺简单的
 楼主| zunmx 发表于 2024-5-23 12:22
conancheng 发表于 2024-5-23 12:02
宝塔支持系统级防火墙配置和云厂商防火墙一键放行,你可以试试,挺简单的

好久没用过了,变得这么强了,有时间试试
laustar 发表于 2024-5-27 08:49
学习一下。
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-12-15 02:29

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表