前言

在工作当中,经常会涉及到扫描某个内网设备中启用的某些服务端口号,但是现有很多IP扫描器不带端口扫描的功能,这里实现了该功能,该程序可以扫描指定范围内的IP启用的某个端口号。

功能描述

扫描指定范围的设备所启用的端口号

代码

import tkinter as tk
from tkinter import ttk, messagebox
import socket
import ipaddress
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import queue
import threading
import psutil
​
class PortScanner:
    def __init__(self, start_ip, end_ip, port, max_workers=100):
        self.start_ip = ipaddress.ip_address(start_ip)
        self.end_ip = ipaddress.ip_address(end_ip)
        self.port = port
        self.max_workers = max_workers
        self.results = []
        self.total_ips = 0
        self.scanned_ips = 0
        self.open_count = 0
​
    def scan_port(self, ip):
        """扫描指定IP的端口"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)
            result = sock.connect_ex((str(ip), self.port))
            sock.close()
            return ip, result == 0, time.time()
        except:
            return ip, False, time.time()
​
    def scan_range(self, task_queue, result_queue):
        """扫描IP地址范围内的端口"""
        ip_range = list(ipaddress.summarize_address_range(self.start_ip, self.end_ip))
        self.total_ips = sum(len(list(ip_network)) for ip_network in ip_range)
        self.scanned_ips = 0
        self.open_count = 0
​
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = []
            for ip_network in ip_range:
                for ip in ip_network:
                    if self.scanned_ips >= self.total_ips:
                        break
                    futures.append(executor.submit(self.scan_port, ip))
​
                for future in as_completed(futures):
                    ip, status, timestamp = future.result()
                    self.scanned_ips += 1
                    if status:
                        self.open_count += 1
                    result = {
                        'ip': str(ip),
                        'port': self.port,
                        'status': '开启' if status else '关闭',
                        'timestamp': timestamp
                    }
                    self.results.append(result)
                    result_queue.put((0, [result]))
                    task_queue.put('update')
                    progress = (self.scanned_ips / self.total_ips) * 100
                    task_queue.put(('progress', (progress, f"{self.open_count}/{self.total_ips}")))
​
        task_queue.put('done')
​
class PortScannerApp:
    def __init__(self, root):
        self.root = root
        self.root.title("端口扫描器")
        self.scanning = False
        self.cancel_scan = False
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.worker_thread = None
        
        # 获取本地网络信息
        self.local_ip = self.get_local_ip()
        self.network = self.get_network_range()
​
        # IP范围输入
        ttk.Label(root, text="起始IP:").grid(row=0, column=0, padx=5, pady=5)
        self.start_ip = ttk.Entry(root)
        self.start_ip.grid(row=0, column=1, padx=5, pady=5)
​
        ttk.Label(root, text="结束IP:").grid(row=1, column=0, padx=5, pady=5)
        self.end_ip = ttk.Entry(root)
        self.end_ip.grid(row=1, column=1, padx=5, pady=5)
​
        # 设置默认IP范围
        if self.network:
            network_start = str(self.network.network_address)
            network_end = str(self.network.broadcast_address)
            self.start_ip.insert(0, network_start)
            self.end_ip.insert(0, network_end)
​
        # 端口输入
        ttk.Label(root, text="端口号:").grid(row=0, column=2, padx=5, pady=5)
        self.port_entry = ttk.Entry(root)
        self.port_entry.grid(row=0, column=3, padx=5, pady=5)
        self.port_entry.insert(0, "80")
​
        # 控制按钮
        self.scan_button = ttk.Button(root, text="开始扫描", command=self.start_scan)
        self.scan_button.grid(row=2, column=0, pady=10)
​
        self.cancel_button = ttk.Button(root, text="取消扫描", command=self.cancel_scanning, state=tk.DISABLED)
        self.cancel_button.grid(row=2, column=1, pady=10)
​
        # 进度条和统计信息
        progress_frame = ttk.Frame(root)
        progress_frame.grid(row=4, column=0, columnspan=4, padx=5, pady=10, sticky='ew')
        
        self.progress = ttk.Progressbar(progress_frame, orient="horizontal", mode="determinate")
        self.progress.pack(side='left', fill='x', expand=True)
        
        self.stats_label = ttk.Label(progress_frame, text="0/0")
        self.stats_label.pack(side='right', padx=5)
​
        # 结果表格
        self.tree = ttk.Treeview(root, columns=('序号', 'IP地址', '端口号', '状态'), show='headings')
        self.tree.grid(row=3, column=0, columnspan=4, padx=5, pady=5, sticky='nsew')
​
        self.tree.heading('序号', text='序号')
        self.tree.heading('IP地址', text='IP地址')
        self.tree.heading('端口号', text='端口号')
        self.tree.heading('状态', text='状态')
​
        self.tree.column('序号', width=50, anchor='center')
        self.tree.column('IP地址', width=120, anchor='center')
        self.tree.column('端口号', width=80, anchor='center')
        self.tree.column('状态', width=80, anchor='center')
​
        # 设置表格排序
        for col in self.tree['columns']:
            self.tree.heading(col, text=col, command=lambda c=col: self.sort_tree(c, False))
​
        # 设置状态图标样式
        self.tree.tag_configure('open', foreground='green')
        self.tree.tag_configure('closed', foreground='gray')
​
        scrollbar = ttk.Scrollbar(root, orient="vertical", command=self.tree.yview)
        scrollbar.grid(row=3, column=4, sticky='ns')
        self.tree.configure(yscrollcommand=scrollbar.set)
​
    def get_local_ip(self):
        """获取本地IP地址"""
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(('8.8.8.8', 80))
            ip = s.getsockname()[0]
            s.close()
            return ip
        except Exception:
            return None
​
    def get_network_range(self):
        """获取本地网络范围"""
        if not self.local_ip:
            return None
​
        try:
            interfaces = psutil.net_if_addrs()
            for interface, addresses in interfaces.items():
                for addr in addresses:
                    if addr.family == socket.AF_INET and addr.address == self.local_ip:
                        netmask = addr.netmask
                        network = ipaddress.IPv4Network(f"{self.local_ip}/{netmask}", strict=False)
                        return network
        except Exception:
            return None
​
    def sort_tree(self, col, reverse):
        """表格排序功能"""
        data = []
        for child in self.tree.get_children(''):
            value = self.tree.set(child, col)
            if col == '序号':
                value = int(value)
            data.append((value, child))
​
        data.sort(reverse=reverse, key=lambda x: (isinstance(x[0], str), x[0]))
​
        for index, (val, child) in enumerate(data):
            self.tree.move(child, '', index)
​
        self.tree.heading(col, command=lambda: self.sort_tree(col, not reverse))
​
    def start_scan(self):
        """开始扫描"""
        if self.scanning:
            return
​
        self.scanning = True
        self.cancel_scan = False
        self.scan_button.config(state=tk.DISABLED)
        self.cancel_button.config(state=tk.NORMAL)
        self.progress['value'] = 0
        self.stats_label.config(text="0/0")
​
        for item in self.tree.get_children():
            self.tree.delete(item)
​
        start_ip = self.start_ip.get()
        end_ip = self.end_ip.get()
        port = int(self.port_entry.get())
​
        self.scanner = PortScanner(start_ip, end_ip, port)
        self.worker_thread = threading.Thread(
            target=self.scanner.scan_range,
            args=(self.task_queue, self.result_queue),
            daemon=True
        )
        self.worker_thread.start()
​
        self.update_ui()
​
    def update_ui(self):
        """更新UI"""
        try:
            while not self.task_queue.empty():
                task = self.task_queue.get_nowait()
                if isinstance(task, tuple) and task[0] == 'progress':
                    progress, stats = task[1]
                    self.progress['value'] = progress
                    self.stats_label.config(text=stats)
                elif task == 'update':
                    results = []
                    while not self.result_queue.empty():
                        priority, result = self.result_queue.get_nowait()
                        results.extend(result)
                    if results:
                        self.process_results(results)
                elif task == 'done':
                    self.scanning = False
                    self.scan_button.config(state=tk.NORMAL)
                    self.cancel_button.config(state=tk.DISABLED)
                    self.progress['value'] = 100
                    messagebox.showinfo("完成", "扫描已完成!")
        except Exception as e:
            messagebox.showerror("错误", f"更新UI时出错: {str(e)}")
        if self.scanning:
            self.root.after(100, self.update_ui)
​
    def process_results(self, results):
        """处理扫描结果"""
        for result in results:
            row_number = len(self.tree.get_children()) + 1
            status_icon = "●" if result['status'] == '开启' else "○"
            status_tag = 'open' if result['status'] == '开启' else 'closed'
            self.tree.insert('', 'end', values=(
                row_number,
                result['ip'],
                result['port'],
                status_icon
            ), tags=(status_tag,))
        self.root.update_idletasks()
​
    def cancel_scanning(self):
        """取消扫描"""
        self.cancel_scan = True
        self.scanning = False
        self.scan_button.config(state=tk.NORMAL)
        self.cancel_button.config(state=tk.DISABLED)
​
if __name__ == "__main__":
    root = tk.Tk()
    app = PortScannerApp(root)
    root.mainloop()
​

功能截图

扫描当前网段内,所有启用adb服务的设备

下载

端口扫描器.exe