前言
在工作当中经常会涉及到查到某些指定设备的IP,网上也有很方便的IP扫描器(Advanced IP Scanner),但是吧,自己嫌找资源麻烦,这里自己复刻了一个功能一致的IP扫描器
功能描述
查找指定IP范围内在线的设备
源代码
import tkinter as tk
from tkinter import ttk, messagebox
import socket
import ipaddress
import queue
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import psutil
import subprocess
import platform
import re
import logging
logging.basicConfig(level=logging.ERROR, filename='ip_scanner.log', filemode='a',
format='%(asctime)s - %(levelname)s - %(message)s')
class IPScannerApp:
def __init__(self, root):
self.root = root
self.root.title("IP 扫描器")
self.scanning = False
self.cancel_scan = False
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.worker_thread = None
self.mac_cache = {}
self.local_ip = self.get_local_ip()
self.network = self.get_network_range()
ttk.Label(root, text="起始IP:").grid(row=0, column=0, padx=5, pady=5)
self.start_ip = ttk.Entry(root)
self.start_ip.grid(row=0, column=1, padx=5, pady=5)
ttk.Label(root, text="结束IP:").grid(row=1, column=0, padx=5, pady=5)
self.end_ip = ttk.Entry(root)
self.end_ip.grid(row=1, column=1, padx=5, pady=5)
self.scan_button = ttk.Button(root, text="开始扫描", command=self.start_scan)
self.scan_button.grid(row=2, column=0, pady=10)
self.cancel_button = ttk.Button(root, text="取消扫描", command=self.cancel_scanning, state=tk.DISABLED)
self.cancel_button.grid(row=2, column=1, pady=10)
self.progress = ttk.Progressbar(root, orient="horizontal", length=300, mode="determinate")
self.progress.grid(row=4, column=0, padx=5, pady=10, sticky='w')
self.stats_label = ttk.Label(root, text="0/0")
self.stats_label.grid(row=4, column=1, padx=5, pady=10, sticky='e')
if self.network:
network_start = str(self.network.network_address)
network_end = str(self.network.broadcast_address)
self.start_ip.insert(0, network_start)
self.end_ip.insert(0, network_end)
self.tree = ttk.Treeview(root, columns=('序号', 'IP地址', '在线状态', 'Mac地址', '延时'), show='headings')
self.tree.grid(row=3, column=0, columnspan=2, padx=5, pady=5, sticky='nsew')
self.tree.heading('序号', text='序号')
self.tree.heading('IP地址', text='IP地址')
self.tree.heading('在线状态', text='在线状态')
self.tree.heading('Mac地址', text='Mac地址')
self.tree.heading('延时', text='延时(ms)')
self.tree.column('序号', width=50, anchor='center')
self.tree.column('IP地址', width=120, anchor='center')
self.tree.column('在线状态', width=80, anchor='center')
self.tree.column('Mac地址', width=120, anchor='center')
self.tree.column('延时', width=80, anchor='center')
scrollbar = ttk.Scrollbar(root, orient="vertical", command=self.tree.yview)
scrollbar.grid(row=3, column=2, sticky='ns')
self.tree.configure(yscrollcommand=scrollbar.set)
self.online_count = 0
self.total_count = 0
self.tree.bind('<<TreeviewSelect>>', lambda event: self.show_port_details(self.tree.item(self.tree.selection()[0], 'values')[1]))
for col in self.tree['columns']:
self.tree.heading(col, text=col, command=lambda c=col: self.sort_tree(self.tree, c, False))
def get_local_ip(self):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return None
def get_network_range(self):
if not self.local_ip:
return None
try:
interfaces = psutil.net_if_addrs()
for interface, addresses in interfaces.items():
for addr in addresses:
if addr.family == socket.AF_INET and addr.address == self.local_ip:
netmask = addr.netmask
network = ipaddress.IPv4Network(f"{self.local_ip}/{netmask}", strict=False)
return network
except Exception:
return None
def ping_ip(self, ip):
"""Ping IP地址并返回是否在线"""
try:
if platform.system() == "Windows":
command = ['ping', '-n', '1', '-w', '500', str(ip)] # Windows: ping -n 1 -w 500
else:
command = ['ping', '-c', '1', '-W', '1', str(ip)] # Linux/macOS: ping -c 1 -W 1
output = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=1)
if output.returncode == 0:
return True, str(ip)
else:
return False, str(ip)
except:
return False, str(ip)
def get_latency(self, ip):
"""获取延迟"""
try:
if platform.system() == "Windows":
command = ['ping', '-n', '4', str(ip)] # Windows: ping -n 4
output = subprocess.check_output(command, timeout=5).decode('gbk')
latency_values = re.findall(r'时间[\<=](\d+\.?\d*)ms', output)
else:
command = ['ping', '-c', '4', str(ip)] # Linux/macOS: ping -c 4
output = subprocess.check_output(command, timeout=5).decode('utf-8')
latency_match = re.search(r'rtt min/avg/max/mdev = (\d+\.\d+)/(\d+\.\d+)/(\d+\.\d+)/(\d+\.\d+) ms', output)
if latency_match:
avg_latency = float(latency_match.group(2))
return f"{avg_latency:.2f}"
else:
return '未知'
if latency_values:
avg_latency = sum(float(val) for val in latency_values) / len(latency_values)
return f"{avg_latency:.2f}"
else:
return '未知'
except:
return '超时'
def ping_and_scan(self, ip):
"""Ping IP并获取延迟"""
latency = self.get_latency(str(ip))
# 根据延时判断在线状态
if latency != '未知' and latency != '超时' and float(latency) > 0:
status = '在线'
else:
status = '离线'
mac = self.get_mac_address(str(ip))
return {'ip': str(ip), 'status': status, 'latency': latency, 'mac': mac}
def cancel_scanning(self):
"""取消扫描"""
self.cancel_scan = True
self.scanning = False
self.scan_button.config(state=tk.NORMAL)
self.cancel_button.config(state=tk.DISABLED)
while not self.task_queue.empty():
self.task_queue.get_nowait()
while not self.result_queue.empty():
self.result_queue.get_nowait()
def scan_worker(self):
"""扫描工作线程"""
try:
start_ip = ipaddress.ip_address(self.start_ip.get())
end_ip = ipaddress.ip_address(self.end_ip.get())
# 生成 IP 范围
ip_range = list(ipaddress.summarize_address_range(start_ip, end_ip))
total_tasks = sum(len(list(ip_network)) for ip_network in ip_range)
completed_tasks = 0
print(f"生成的 IP 范围: {ip_range}")
print(f"任务总数: {total_tasks}")
with ThreadPoolExecutor(max_workers=100) as executor:
futures = []
batch_size = 100 # 每批次提交 100 个任务
current_batch = []
for ip_network in ip_range:
for ip in ip_network:
if self.cancel_scan:
break
current_batch.append(ip)
if len(current_batch) >= batch_size:
# 提交当前批次的任务
futures.extend(executor.submit(self.ping_and_scan, ip) for ip in current_batch)
current_batch = []
if self.cancel_scan:
break
# 提交剩余的任务
if current_batch:
futures.extend(executor.submit(self.ping_and_scan, ip) for ip in current_batch)
for future in as_completed(futures):
if self.cancel_scan:
break
ip_info = future.result()
self.result_queue.put((0, [ip_info]))
completed_tasks += 1
self.progress['value'] = (completed_tasks / total_tasks) * 100
self.task_queue.put('update')
except Exception as e:
logging.error(f"扫描错误: {e}")
self.result_queue.put(f"错误: {str(e)}")
finally:
self.task_queue.put('done')
def start_scan(self):
"""开始扫描"""
if self.scanning:
return
self.scanning = True
self.cancel_scan = False
self.scan_button.config(state=tk.DISABLED)
self.cancel_button.config(state=tk.NORMAL)
self.progress['value'] = 0
for item in self.tree.get_children():
self.tree.delete(item)
self.online_count = 0
self.total_count = 0
self.stats_label.config(text="0/0")
self.worker_thread = threading.Thread(target=self.scan_worker, daemon=True)
self.worker_thread.start()
self.update_ui()
def process_results(self, results):
"""处理扫描结果"""
items_to_add = []
for ip_info in results:
ip = ip_info['ip']
status = ip_info['status']
latency = ip_info['latency']
mac = ip_info['mac']
self.total_count += 1
if status == '在线':
self.online_count += 1
status_icon = "●" if status == '在线' else "○"
status_color = "green" if status == '在线' else "gray"
items_to_add.append((ip, status_icon, status_color, mac, latency))
self.stats_label.config(text=f"{self.online_count}/{self.total_count}")
self.root.after(0, lambda: self.update_tree(items_to_add))
def update_tree(self, items):
"""更新表格"""
for item in items:
row_number = len(self.tree.get_children()) + 1
self.tree.insert('', 'end', values=(row_number, item[0], item[1], item[3], item[4]), tags=(item[2],))
self.tree.tag_configure(item[2], foreground=item[2])
self.root.update_idletasks()
def show_port_details(self, ip):
"""显示IP的端口详情"""
pass
def get_mac_address(self, ip):
"""获取MAC地址"""
if ip in self.mac_cache:
return self.mac_cache[ip]
try:
if platform.system() == "Windows":
output = subprocess.check_output(['arp', '-a', ip]).decode('gbk')
else:
output = subprocess.check_output(['arp', '-n', ip]).decode('utf-8')
for line in output.split('\n'):
if ip in line:
parts = line.split()
if len(parts) > 2:
mac = parts[1]
self.mac_cache[ip] = mac
return mac
return '未知'
except Exception:
return '未知'
def sort_tree(self, tree, col, reverse):
"""表格排序功能"""
data = []
for child in tree.get_children(''):
value = tree.set(child, col)
if col == '延时':
try:
value = float(value)
except ValueError:
value = float('inf')
elif col == '序号':
value = int(value)
data.append((value, child))
data.sort(reverse=reverse, key=lambda x: (isinstance(x[0], str), x[0]))
for index, (val, child) in enumerate(data):
tree.move(child, '', index)
tree.heading(col, command=lambda: self.sort_tree(tree, col, not reverse))
def update_ui(self):
"""更新UI"""
try:
if not self.task_queue.empty():
task = self.task_queue.get_nowait()
if task == 'update':
results = []
while not self.result_queue.empty():
priority, result = self.result_queue.get_nowait()
results.extend(result)
if results:
self.process_results(results)
elif task == 'done':
self.scanning = False
self.scan_button.config(state=tk.NORMAL)
self.cancel_button.config(state=tk.DISABLED)
except Exception as e:
messagebox.showerror("错误", f"更新UI时出错: {str(e)}")
if self.scanning:
self.root.after(100, self.update_ui)
if __name__ == "__main__":
root = tk.Tk()
app = IPScannerApp(root)
root.mainloop()打包成可执行文件
pyinstaller --onefile --noconsole --clean ip_scanner.py功能截图
查找192.168.8网段内所有在线的IP:
