1、申 请 I D:linjie608
2、个人邮箱:18859007210@139.com
工具名称:批量IP查询工具
主要代码:
绘制:
from PyQt5.QtCore import QSettings, QThread, pyqtSignal
from PyQt5.QtWidgets import QApplication, QWidget, QPushButton
from PyQt5.QtGui import QIcon
import sys
import openpyxl
from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QFormLayout, QButtonGroup, QPlainTextEdit
from PyQt5.QtWidgets import QGroupBox
from PyQt5.QtWidgets import QRadioButton, QLabel, QLineEdit, QRadioButton, QScrollArea
from PyQt5.QtCore import pyqtSignal, Qt
import main
import ctypes
myappid = "wo de app"
ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID(myappid)
class WorkerThread(QThread):
finished = pyqtSignal(dict) # 创建信号,用于任务完成后通知主线程
def __init__(self, params):
super().__init__()
self.params = params
def run(self):
# 在这里执行耗时的任务...
try:
ip_list = self.params[0].split('\n')
ip_list = [arr for arr in ip_list if arr]
t = self.params[1]
text = main.run(ip_list, t)
print(text)
self.finished.emit({'data': text, 'msg': '程序执行完毕!'})
except Exception as e:
print(self.params)
self.finished.emit({'data': str(e), 'msg': '未知错误'})
class MyWindow(QWidget):
my_signal = pyqtSignal(str) # 声明一个信号 只能放在函数的外面
def __init__(self):
super(QWidget, self).__init__()
self.init_ui()
def init_ui(self):
self.setWindowTitle("IP归属查询")
self.resize(600, 300)
box_group_1 = QGroupBox("IP区") # 组
box_group_2 = QGroupBox("运行区")
box_1 = QHBoxLayout()
box_2 = QHBoxLayout()
box_1_1 = QHBoxLayout()
box_1_2 = QHBoxLayout()
box_group_1.setLayout(box_1)
box_group_2.setLayout(box_2)
self.msg = QPlainTextEdit() # QLabel可以放入图片或文字
self.msg.setPlaceholderText("输入IP(每行一IP)")
scroll = QScrollArea()
scroll.setMinimumWidth(255)
scroll.setWidget(self.msg) # 设置对msg进行滚动,因为原先msg(QLabel)大小只有(440,15)
box_1_1.addWidget(scroll)
box_1.addLayout(box_1_1)
self.copy_btn = QPushButton('复制>>>')
self.copy_btn.clicked.connect(self.copy)
self.msg2 = QLabel("输出内容") # QLabel可copy放入图片或文字
self.msg2.resize(650, 15)
self.msg2.setMinimumWidth(450)
self.msg2.setWordWrap(True) # 自动换行
self.msg2.setAlignment(Qt.AlignTop) # 靠上
self.msg2.setStyleSheet("background-color:yellow;color:grey")
# 调用adjustSize()方法来让标签根据内容自动扩大
self.msg2.adjustSize()
# 创建一个滚动对象
scroll2 = QScrollArea()
scroll2.setMinimumWidth(455)
scroll2.setWidget(self.msg2) # 设置对msg进行滚动,因为原先msg(QLabel)大小只有(440,15)
box_1_2.addWidget(self.copy_btn)
box_1_2.addWidget(scroll2)
box_1.addLayout(box_1_2)
# ---------------------------------------------------------
self.btn_run = QPushButton('运行')
self.excel_btn = QPushButton('生成表格')
self.poolLabel = QLabel("线程数量:")
self.edit = QLineEdit() # 创建1个输入框
self.edit.setMinimumWidth(100)
self.edit.setMaximumWidth(50)
self.edit.setPlaceholderText("默认60")
self.statusIngLbl = QLabel("等待指令...")
self.statusIngLbl.setMinimumWidth(450)
box_2.addWidget(self.btn_run)
box_2.addWidget(self.excel_btn)
box_2.addWidget(self.poolLabel)
box_2.addWidget(self.edit)
box_2.addWidget(self.statusIngLbl)
box_group_1.setLayout(box_2)
self.btn_run.setFixedSize(100, 30)
self.btn_run.clicked.connect(self.run)
self.excel_btn.clicked.connect(self.put_excel)
container = QVBoxLayout()
container.addWidget(box_group_1)
container.addWidget(box_group_2)
self.setLayout(container) # 让当前窗口的布局器使用这个布局器
def run(self):
self.btn_run.setEnabled(False)
self.btn_run.setText('生成中')
if self.edit.text() == '':
self.edit.setText('60')
params = [self.msg.toPlainText(), int(self.edit.text())]
self.thread = WorkerThread(params)
self.thread.finished.connect(self.update_ui) # 任务完成时更新UI
self.thread.start() # 开始线程
self.statusIngLbl.setText("程序执行中...")
def put_excel(self):
self.excel_btn.setEnabled(False)
self.excel_btn.setText('生成中')
self.statusIngLbl.setText("表格生成中...")
wb = openpyxl.Workbook()
ws = wb.active
ws.cell(row=1, column=1, value='IP')
ws.cell(row=1, column=2, value='归属')
text = self.msg2.text()
text_list = text.split('\n')
for index, i in enumerate(text_list):
arr = i.split(' ')
if len(arr) != 1:
ws.cell(row=index + 2, column=1, value=arr[0])
ws.cell(row=index + 2, column=2, value=arr[1])
wb.save('./result.xlsx')
self.excel_btn.setText('生成表格')
self.excel_btn.setEnabled(True)
self.statusIngLbl.setText("表格生成完成")
def update_ui(self, result):
feedback_path = result["data"]
res = result["msg"]
# self.feedback_path_edit.setText(feedback_path)
self.msg2.setText(feedback_path)
self.msg2.setWordWrap(True) # 自动换行
self.msg2.setAlignment(Qt.AlignTop) # 靠上
self.msg2.setStyleSheet("background-color:yellow;color:grey")
# 调用adjustSize()方法来让标签根据内容自动扩大
self.msg2.adjustSize()
self.statusIngLbl.setText(res)
self.btn_run.setEnabled(True)
self.btn_run.setText('运行')
def copy(self):
clipboard = QApplication.clipboard()
clipboard.setText(self.msg2.text())
if __name__ == "__main__":
app = QApplication(sys.argv) # 相当于打开界面(①不能少)
w = MyWindow()
w.show() # 展示窗口
app.exec_() # 程序进行循环等待状态 相当于检测界面(②不能少)
2、主代码
import grequests
import socket, struct
import json
def int_to_ip(int_val):
return str(int_val)
def int_to_ipv4(int_val):
return '.'.join(map(str, struct.unpack('>BBBB', struct.pack('>I', int_val))))
def run(urls, t):
host = 'https://aa72.market.alicloudapi.com'
path = '/ip'
appcode = 'd2611f33235044443a8d922fb640bexxxx'
url = host + path + '?ip='
headers = {
'Authorization': 'APPCODE ' + appcode
}
new_urls = []
for i in urls:
new_urls.append(url + i)
def exception_handler(request, exception):
print("Request failed:", exception)
requests = [grequests.get(url, headers=headers, timeout=20) for url in new_urls]
responses = grequests.map(requests, exception_handler=exception_handler, size=int(t))
ip_str = ''
for response in responses:
print(response.json())
data = response.json()['data']
country = data['country']
prov = data['region']
city = data['city']
isp = data['isp']
area = data['area']
ip = data['ip']
ip_str += f'{ip} {country}-{prov}-{city}-{isp}-{area}\n'
return ip_str |