前言

在工作当中经常会涉及到查到某些指定设备的IP,网上也有很方便的IP扫描器(Advanced IP Scanner),但是吧,自己嫌找资源麻烦,这里自己复刻了一个功能一致的IP扫描器

功能描述

查找指定IP范围内在线的设备

源代码

import tkinter as tk
from tkinter import ttk, messagebox
import socket
import ipaddress
import queue
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import psutil
import subprocess
import platform
import re
import logging
​
logging.basicConfig(level=logging.ERROR, filename='ip_scanner.log', filemode='a',
                    format='%(asctime)s - %(levelname)s - %(message)s')
​
class IPScannerApp:
    def __init__(self, root):
        self.root = root
        self.root.title("IP 扫描器")
        self.scanning = False
        self.cancel_scan = False
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.worker_thread = None
        self.mac_cache = {}
​
        self.local_ip = self.get_local_ip()
        self.network = self.get_network_range()
​
        ttk.Label(root, text="起始IP:").grid(row=0, column=0, padx=5, pady=5)
        self.start_ip = ttk.Entry(root)
        self.start_ip.grid(row=0, column=1, padx=5, pady=5)
​
        ttk.Label(root, text="结束IP:").grid(row=1, column=0, padx=5, pady=5)
        self.end_ip = ttk.Entry(root)
        self.end_ip.grid(row=1, column=1, padx=5, pady=5)
​
        self.scan_button = ttk.Button(root, text="开始扫描", command=self.start_scan)
        self.scan_button.grid(row=2, column=0, pady=10)
​
        self.cancel_button = ttk.Button(root, text="取消扫描", command=self.cancel_scanning, state=tk.DISABLED)
        self.cancel_button.grid(row=2, column=1, pady=10)
​
        self.progress = ttk.Progressbar(root, orient="horizontal", length=300, mode="determinate")
        self.progress.grid(row=4, column=0, padx=5, pady=10, sticky='w')
​
        self.stats_label = ttk.Label(root, text="0/0")
        self.stats_label.grid(row=4, column=1, padx=5, pady=10, sticky='e')
​
        if self.network:
            network_start = str(self.network.network_address)
            network_end = str(self.network.broadcast_address)
            self.start_ip.insert(0, network_start)
            self.end_ip.insert(0, network_end)
​
        self.tree = ttk.Treeview(root, columns=('序号', 'IP地址', '在线状态', 'Mac地址', '延时'), show='headings')
        self.tree.grid(row=3, column=0, columnspan=2, padx=5, pady=5, sticky='nsew')
​
        self.tree.heading('序号', text='序号')
        self.tree.heading('IP地址', text='IP地址')
        self.tree.heading('在线状态', text='在线状态')
        self.tree.heading('Mac地址', text='Mac地址')
        self.tree.heading('延时', text='延时(ms)')
​
        self.tree.column('序号', width=50, anchor='center')
        self.tree.column('IP地址', width=120, anchor='center')
        self.tree.column('在线状态', width=80, anchor='center')
        self.tree.column('Mac地址', width=120, anchor='center')
        self.tree.column('延时', width=80, anchor='center')
​
        scrollbar = ttk.Scrollbar(root, orient="vertical", command=self.tree.yview)
        scrollbar.grid(row=3, column=2, sticky='ns')
        self.tree.configure(yscrollcommand=scrollbar.set)
​
        self.online_count = 0
        self.total_count = 0
​
        self.tree.bind('<<TreeviewSelect>>', lambda event: self.show_port_details(self.tree.item(self.tree.selection()[0], 'values')[1]))
​
        for col in self.tree['columns']:
            self.tree.heading(col, text=col, command=lambda c=col: self.sort_tree(self.tree, c, False))
​
    def get_local_ip(self):
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(('8.8.8.8', 80))
            ip = s.getsockname()[0]
            s.close()
            return ip
        except Exception:
            return None
​
    def get_network_range(self):
        if not self.local_ip:
            return None
​
        try:
            interfaces = psutil.net_if_addrs()
            for interface, addresses in interfaces.items():
                for addr in addresses:
                    if addr.family == socket.AF_INET and addr.address == self.local_ip:
                        netmask = addr.netmask
                        network = ipaddress.IPv4Network(f"{self.local_ip}/{netmask}", strict=False)
                        return network
        except Exception:
            return None
​
    def ping_ip(self, ip):
        """Ping IP地址并返回是否在线"""
        try:
            if platform.system() == "Windows":
                command = ['ping', '-n', '1', '-w', '500', str(ip)]  # Windows: ping -n 1 -w 500
            else:
                command = ['ping', '-c', '1', '-W', '1', str(ip)]  # Linux/macOS: ping -c 1 -W 1
​
            output = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=1)
            if output.returncode == 0:
                return True, str(ip)
            else:
                return False, str(ip)
        except:
            return False, str(ip)
​
    def get_latency(self, ip):
        """获取延迟"""
        try:
            if platform.system() == "Windows":
                command = ['ping', '-n', '4', str(ip)]  # Windows: ping -n 4
                output = subprocess.check_output(command, timeout=5).decode('gbk')
                latency_values = re.findall(r'时间[\<=](\d+\.?\d*)ms', output)
            else:
                command = ['ping', '-c', '4', str(ip)]  # Linux/macOS: ping -c 4
                output = subprocess.check_output(command, timeout=5).decode('utf-8')
                latency_match = re.search(r'rtt min/avg/max/mdev = (\d+\.\d+)/(\d+\.\d+)/(\d+\.\d+)/(\d+\.\d+) ms', output)
                if latency_match:
                    avg_latency = float(latency_match.group(2))
                    return f"{avg_latency:.2f}"
                else:
                    return '未知'
​
            if latency_values:
                avg_latency = sum(float(val) for val in latency_values) / len(latency_values)
                return f"{avg_latency:.2f}"
            else:
                return '未知'
        except:
            return '超时'
​
    def ping_and_scan(self, ip):
        """Ping IP并获取延迟"""
        latency = self.get_latency(str(ip))
        # 根据延时判断在线状态
        if latency != '未知' and latency != '超时' and float(latency) > 0:
            status = '在线'
        else:
            status = '离线'
        mac = self.get_mac_address(str(ip))
        return {'ip': str(ip), 'status': status, 'latency': latency, 'mac': mac}
​
    def cancel_scanning(self):
        """取消扫描"""
        self.cancel_scan = True
        self.scanning = False
        self.scan_button.config(state=tk.NORMAL)
        self.cancel_button.config(state=tk.DISABLED)
        while not self.task_queue.empty():
            self.task_queue.get_nowait()
        while not self.result_queue.empty():
            self.result_queue.get_nowait()
​
    def scan_worker(self):
        """扫描工作线程"""
        try:
            start_ip = ipaddress.ip_address(self.start_ip.get())
            end_ip = ipaddress.ip_address(self.end_ip.get())
​
            # 生成 IP 范围
            ip_range = list(ipaddress.summarize_address_range(start_ip, end_ip))
            total_tasks = sum(len(list(ip_network)) for ip_network in ip_range)
            completed_tasks = 0
​
            print(f"生成的 IP 范围: {ip_range}")
            print(f"任务总数: {total_tasks}")
​
            with ThreadPoolExecutor(max_workers=100) as executor:
                futures = []
                batch_size = 100  # 每批次提交 100 个任务
                current_batch = []
​
                for ip_network in ip_range:
                    for ip in ip_network:
                        if self.cancel_scan:
                            break
                        current_batch.append(ip)
                        if len(current_batch) >= batch_size:
                            # 提交当前批次的任务
                            futures.extend(executor.submit(self.ping_and_scan, ip) for ip in current_batch)
                            current_batch = []
​
                    if self.cancel_scan:
                        break
​
                # 提交剩余的任务
                if current_batch:
                    futures.extend(executor.submit(self.ping_and_scan, ip) for ip in current_batch)
​
                for future in as_completed(futures):
                    if self.cancel_scan:
                        break
                    ip_info = future.result()
                    self.result_queue.put((0, [ip_info]))
                    completed_tasks += 1
                    self.progress['value'] = (completed_tasks / total_tasks) * 100
                    self.task_queue.put('update')
        except Exception as e:
            logging.error(f"扫描错误: {e}")
            self.result_queue.put(f"错误: {str(e)}")
        finally:
            self.task_queue.put('done')
​
    def start_scan(self):
        """开始扫描"""
        if self.scanning:
            return
​
        self.scanning = True
        self.cancel_scan = False
        self.scan_button.config(state=tk.DISABLED)
        self.cancel_button.config(state=tk.NORMAL)
        self.progress['value'] = 0
​
        for item in self.tree.get_children():
            self.tree.delete(item)
​
        self.online_count = 0
        self.total_count = 0
        self.stats_label.config(text="0/0")
​
        self.worker_thread = threading.Thread(target=self.scan_worker, daemon=True)
        self.worker_thread.start()
​
        self.update_ui()
​
    def process_results(self, results):
        """处理扫描结果"""
        items_to_add = []
        for ip_info in results:
            ip = ip_info['ip']
            status = ip_info['status']
            latency = ip_info['latency']
            mac = ip_info['mac']
​
            self.total_count += 1
            if status == '在线':
                self.online_count += 1
​
            status_icon = "●" if status == '在线' else "○"
            status_color = "green" if status == '在线' else "gray"
            items_to_add.append((ip, status_icon, status_color, mac, latency))
​
        self.stats_label.config(text=f"{self.online_count}/{self.total_count}")
​
        self.root.after(0, lambda: self.update_tree(items_to_add))
​
    def update_tree(self, items):
        """更新表格"""
        for item in items:
            row_number = len(self.tree.get_children()) + 1
            self.tree.insert('', 'end', values=(row_number, item[0], item[1], item[3], item[4]), tags=(item[2],))
            self.tree.tag_configure(item[2], foreground=item[2])
        self.root.update_idletasks()
​
    def show_port_details(self, ip):
        """显示IP的端口详情"""
        pass
​
    def get_mac_address(self, ip):
        """获取MAC地址"""
        if ip in self.mac_cache:
            return self.mac_cache[ip]
​
        try:
            if platform.system() == "Windows":
                output = subprocess.check_output(['arp', '-a', ip]).decode('gbk')
            else:
                output = subprocess.check_output(['arp', '-n', ip]).decode('utf-8')
            for line in output.split('\n'):
                if ip in line:
                    parts = line.split()
                    if len(parts) > 2:
                        mac = parts[1]
                        self.mac_cache[ip] = mac
                        return mac
            return '未知'
        except Exception:
            return '未知'
​
    def sort_tree(self, tree, col, reverse):
        """表格排序功能"""
        data = []
        for child in tree.get_children(''):
            value = tree.set(child, col)
            if col == '延时':
                try:
                    value = float(value)
                except ValueError:
                    value = float('inf')
            elif col == '序号':
                value = int(value)
            data.append((value, child))
​
        data.sort(reverse=reverse, key=lambda x: (isinstance(x[0], str), x[0]))
​
        for index, (val, child) in enumerate(data):
            tree.move(child, '', index)
​
        tree.heading(col, command=lambda: self.sort_tree(tree, col, not reverse))
​
    def update_ui(self):
        """更新UI"""
        try:
            if not self.task_queue.empty():
                task = self.task_queue.get_nowait()
                if task == 'update':
                    results = []
                    while not self.result_queue.empty():
                        priority, result = self.result_queue.get_nowait()
                        results.extend(result)
                    if results:
                        self.process_results(results)
                elif task == 'done':
                    self.scanning = False
                    self.scan_button.config(state=tk.NORMAL)
                    self.cancel_button.config(state=tk.DISABLED)
        except Exception as e:
            messagebox.showerror("错误", f"更新UI时出错: {str(e)}")
        if self.scanning:
            self.root.after(100, self.update_ui)
​
if __name__ == "__main__":
    root = tk.Tk()
    app = IPScannerApp(root)
    root.mainloop()

打包成可执行文件

pyinstaller --onefile --noconsole --clean ip_scanner.py

功能截图

查找192.168.8网段内所有在线的IP:

下载

IP扫描器.exe