前言
在工作当中,经常会涉及到扫描某个内网设备中启用的某些服务端口号,但是现有很多IP扫描器不带端口扫描的功能,这里实现了该功能,该程序可以扫描指定范围内的IP启用的某个端口号。
功能描述
扫描指定范围的设备所启用的端口号
代码
import tkinter as tk
from tkinter import ttk, messagebox
import socket
import ipaddress
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import queue
import threading
import psutil
class PortScanner:
def __init__(self, start_ip, end_ip, port, max_workers=100):
self.start_ip = ipaddress.ip_address(start_ip)
self.end_ip = ipaddress.ip_address(end_ip)
self.port = port
self.max_workers = max_workers
self.results = []
self.total_ips = 0
self.scanned_ips = 0
self.open_count = 0
def scan_port(self, ip):
"""扫描指定IP的端口"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((str(ip), self.port))
sock.close()
return ip, result == 0, time.time()
except:
return ip, False, time.time()
def scan_range(self, task_queue, result_queue):
"""扫描IP地址范围内的端口"""
ip_range = list(ipaddress.summarize_address_range(self.start_ip, self.end_ip))
self.total_ips = sum(len(list(ip_network)) for ip_network in ip_range)
self.scanned_ips = 0
self.open_count = 0
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for ip_network in ip_range:
for ip in ip_network:
if self.scanned_ips >= self.total_ips:
break
futures.append(executor.submit(self.scan_port, ip))
for future in as_completed(futures):
ip, status, timestamp = future.result()
self.scanned_ips += 1
if status:
self.open_count += 1
result = {
'ip': str(ip),
'port': self.port,
'status': '开启' if status else '关闭',
'timestamp': timestamp
}
self.results.append(result)
result_queue.put((0, [result]))
task_queue.put('update')
progress = (self.scanned_ips / self.total_ips) * 100
task_queue.put(('progress', (progress, f"{self.open_count}/{self.total_ips}")))
task_queue.put('done')
class PortScannerApp:
def __init__(self, root):
self.root = root
self.root.title("端口扫描器")
self.scanning = False
self.cancel_scan = False
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.worker_thread = None
# 获取本地网络信息
self.local_ip = self.get_local_ip()
self.network = self.get_network_range()
# IP范围输入
ttk.Label(root, text="起始IP:").grid(row=0, column=0, padx=5, pady=5)
self.start_ip = ttk.Entry(root)
self.start_ip.grid(row=0, column=1, padx=5, pady=5)
ttk.Label(root, text="结束IP:").grid(row=1, column=0, padx=5, pady=5)
self.end_ip = ttk.Entry(root)
self.end_ip.grid(row=1, column=1, padx=5, pady=5)
# 设置默认IP范围
if self.network:
network_start = str(self.network.network_address)
network_end = str(self.network.broadcast_address)
self.start_ip.insert(0, network_start)
self.end_ip.insert(0, network_end)
# 端口输入
ttk.Label(root, text="端口号:").grid(row=0, column=2, padx=5, pady=5)
self.port_entry = ttk.Entry(root)
self.port_entry.grid(row=0, column=3, padx=5, pady=5)
self.port_entry.insert(0, "80")
# 控制按钮
self.scan_button = ttk.Button(root, text="开始扫描", command=self.start_scan)
self.scan_button.grid(row=2, column=0, pady=10)
self.cancel_button = ttk.Button(root, text="取消扫描", command=self.cancel_scanning, state=tk.DISABLED)
self.cancel_button.grid(row=2, column=1, pady=10)
# 进度条和统计信息
progress_frame = ttk.Frame(root)
progress_frame.grid(row=4, column=0, columnspan=4, padx=5, pady=10, sticky='ew')
self.progress = ttk.Progressbar(progress_frame, orient="horizontal", mode="determinate")
self.progress.pack(side='left', fill='x', expand=True)
self.stats_label = ttk.Label(progress_frame, text="0/0")
self.stats_label.pack(side='right', padx=5)
# 结果表格
self.tree = ttk.Treeview(root, columns=('序号', 'IP地址', '端口号', '状态'), show='headings')
self.tree.grid(row=3, column=0, columnspan=4, padx=5, pady=5, sticky='nsew')
self.tree.heading('序号', text='序号')
self.tree.heading('IP地址', text='IP地址')
self.tree.heading('端口号', text='端口号')
self.tree.heading('状态', text='状态')
self.tree.column('序号', width=50, anchor='center')
self.tree.column('IP地址', width=120, anchor='center')
self.tree.column('端口号', width=80, anchor='center')
self.tree.column('状态', width=80, anchor='center')
# 设置表格排序
for col in self.tree['columns']:
self.tree.heading(col, text=col, command=lambda c=col: self.sort_tree(c, False))
# 设置状态图标样式
self.tree.tag_configure('open', foreground='green')
self.tree.tag_configure('closed', foreground='gray')
scrollbar = ttk.Scrollbar(root, orient="vertical", command=self.tree.yview)
scrollbar.grid(row=3, column=4, sticky='ns')
self.tree.configure(yscrollcommand=scrollbar.set)
def get_local_ip(self):
"""获取本地IP地址"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return None
def get_network_range(self):
"""获取本地网络范围"""
if not self.local_ip:
return None
try:
interfaces = psutil.net_if_addrs()
for interface, addresses in interfaces.items():
for addr in addresses:
if addr.family == socket.AF_INET and addr.address == self.local_ip:
netmask = addr.netmask
network = ipaddress.IPv4Network(f"{self.local_ip}/{netmask}", strict=False)
return network
except Exception:
return None
def sort_tree(self, col, reverse):
"""表格排序功能"""
data = []
for child in self.tree.get_children(''):
value = self.tree.set(child, col)
if col == '序号':
value = int(value)
data.append((value, child))
data.sort(reverse=reverse, key=lambda x: (isinstance(x[0], str), x[0]))
for index, (val, child) in enumerate(data):
self.tree.move(child, '', index)
self.tree.heading(col, command=lambda: self.sort_tree(col, not reverse))
def start_scan(self):
"""开始扫描"""
if self.scanning:
return
self.scanning = True
self.cancel_scan = False
self.scan_button.config(state=tk.DISABLED)
self.cancel_button.config(state=tk.NORMAL)
self.progress['value'] = 0
self.stats_label.config(text="0/0")
for item in self.tree.get_children():
self.tree.delete(item)
start_ip = self.start_ip.get()
end_ip = self.end_ip.get()
port = int(self.port_entry.get())
self.scanner = PortScanner(start_ip, end_ip, port)
self.worker_thread = threading.Thread(
target=self.scanner.scan_range,
args=(self.task_queue, self.result_queue),
daemon=True
)
self.worker_thread.start()
self.update_ui()
def update_ui(self):
"""更新UI"""
try:
while not self.task_queue.empty():
task = self.task_queue.get_nowait()
if isinstance(task, tuple) and task[0] == 'progress':
progress, stats = task[1]
self.progress['value'] = progress
self.stats_label.config(text=stats)
elif task == 'update':
results = []
while not self.result_queue.empty():
priority, result = self.result_queue.get_nowait()
results.extend(result)
if results:
self.process_results(results)
elif task == 'done':
self.scanning = False
self.scan_button.config(state=tk.NORMAL)
self.cancel_button.config(state=tk.DISABLED)
self.progress['value'] = 100
messagebox.showinfo("完成", "扫描已完成!")
except Exception as e:
messagebox.showerror("错误", f"更新UI时出错: {str(e)}")
if self.scanning:
self.root.after(100, self.update_ui)
def process_results(self, results):
"""处理扫描结果"""
for result in results:
row_number = len(self.tree.get_children()) + 1
status_icon = "●" if result['status'] == '开启' else "○"
status_tag = 'open' if result['status'] == '开启' else 'closed'
self.tree.insert('', 'end', values=(
row_number,
result['ip'],
result['port'],
status_icon
), tags=(status_tag,))
self.root.update_idletasks()
def cancel_scanning(self):
"""取消扫描"""
self.cancel_scan = True
self.scanning = False
self.scan_button.config(state=tk.NORMAL)
self.cancel_button.config(state=tk.DISABLED)
if __name__ == "__main__":
root = tk.Tk()
app = PortScannerApp(root)
root.mainloop()
功能截图
扫描当前网段内,所有启用adb服务的设备
