| from PyQt5 import QtCore, QtGui, QtWidgets
|
| import socket
|
| import os
|
| import platform
|
| import threading
|
| import time
|
| import subprocess
|
| import queue
|
|
|
| """
|
| The 'probe_network' function iterates over a queue in a while loop which gets each IP, determines the operating system, builds a ping command with subprocess and reports
|
| back that the IP is up. It uses threading by calling the 'probe_thread' function which opens 10 threads to ping the network in parallel, this output is then read out via
|
| PyQt5 GUI label. The port sniffing option used in 'scan_hosts' and 'scan_ports' uses multi-threading to scan the given IP for all available open ports from 1 - 65536
|
| almost instantaneously. My next objective is to remove the hard-coded IPv4 address at line 55 in ip_list[] and have it be either configured by the user in some way or
|
| automatically configured upon program start. I also need to find a way to update the progress bars accurately.
|
| """
|
|
|
| class Ui_MainWindow(object):
|
|
|
| def __init__(self):
|
| super().__init__()
|
| self.threads = []
|
| self.lock = threading.Lock()
|
|
|
| def probe_network(self, ip_queue):
|
| while not ip_queue.empty():
|
| ips = ip_queue.get()
|
| try:
|
| param = '-n' if platform.system().lower() == 'windows' else '-c'
|
| response = subprocess.run(['ping', param, '1', ips], stdout=subprocess.PIPE, universal_newlines=True, check=True)
|
|
|
| if f"Reply from {ips}: bytes=32" in response.stdout:
|
| print(ips, ':', 'IP is Up')
|
| with self.lock:
|
| self.testOutputLabel.setText(self.testOutputLabel.text() + f"\n{ips}: IP is Up")
|
|
|
| else:
|
| pass
|
|
|
| except Exception as e:
|
| print(f"Error pinging {ips}: {e}")
|
| pass
|
|
|
| finally:
|
| ip_queue.task_done()
|
| self.progress_Bar.setValue(self.progress_Bar.value() + 8)
|
|
|
| def probe_thread(self):
|
| num_threads = 10
|
| queues = []
|
|
|
| # Create queues and add IP addresses to each queue
|
| for i in range(num_threads):
|
| ip_queue = queue.Queue()
|
| queues.append(ip_queue)
|
| ip_list = [f"192.168.1.{octet}" for octet in range(i+1, 256, num_threads)]
|
| for ip in ip_list:
|
| ip_queue.put(ip)
|
|
|
| # Create threads for scanning each queue
|
| for i in range(num_threads):
|
| probe_thread = threading.Thread(target=self.probe_network, args=(queues[i],))
|
| probe_thread.start()
|
| self.threads.append(probe_thread)
|
|
|
| # Wait for all threads to finish, removed as it crashes the GUI.
|
| #for thread in self.threads:
|
| #thread.join()
|
|
|
| return
|
|
|
| def scan_hosts(self, ip, port):
|
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| sock.settimeout(1)
|
| try:
|
| result = sock.connect_ex((ip, int(port)))
|
| if result == 0:
|
| print(ip, ':', port)
|
| with self.lock:
|
| self.testOutputLabel.setText(self.testOutputLabel.text() + f"\n{ip}:{port}")
|
| self.progress_Bar.setValue(self.progress_Bar.value() + 5)
|
| except Exception as e:
|
| print(f"Error scanning {ip}:{port}: {e}")
|
| finally:
|
| sock.close()
|
|
|
|
|
| def scan_ports(self):
|
| torp_threads = 10 #torp is a mixup of port because I'm unimaginative.
|
| torp_queues = []
|
|
|
| # Take Entered IP to scan for all open ports
|
| ip = self.ipPortEntry.text()
|
|
|
| for ports in range(torp_threads):
|
| torp_q = queue.Queue()
|
| torp_queues.append(torp_q)
|
| port_list = [f"{torp}" for torp in range(int(ports)+1, 65536, torp_threads)]
|
| for addPortto_q in port_list:
|
| torp_q.put(addPortto_q)
|
|
|
| for i in range(torp_threads):
|
|
|
| # Create a thread to run the port scanning
|
| while not torp_queues[i].empty():
|
| port = torp_queues[i].get()
|
| scan_thread = threading.Thread(target=self.scan_hosts, args=(ip, port))
|
| scan_thread.start()
|
| self.threads.append(scan_thread)
|
| torp_queues[i].task_done()
|
|
|
| def arp(self):
|
| # Runs Arp -a for Windows/Linux
|
| out = os.popen(f"arp -a").read()
|
|
|
| # Displays output in Label
|
| self.testOutputLabel.setText(self.testOutputLabel.text() + f"\n{out}")
|
|
|
| def setupUi(self, MainWindow):
|
| MainWindow.setObjectName("MainWindow")
|
| MainWindow.resize(800, 600)
|
| self.centralwidget = QtWidgets.QWidget(MainWindow)
|
| self.centralwidget.setObjectName("centralwidget")
|
|
|
| self.testOutputButton = QtWidgets.QPushButton(self.centralwidget)
|
| self.testOutputButton.clicked.connect(self.probe_thread)
|
| self.testOutputButton.setGeometry(QtCore.QRect(570, 450, 201, 91))
|
| self.testOutputButton.setObjectName("testOutputButton")
|
|
|
| self.scrollArea = QtWidgets.QScrollArea(self.centralwidget)
|
| self.scrollArea.setGeometry(QtCore.QRect(10, 10, 781, 431))
|
| self.scrollArea.setWidgetResizable(True)
|
| self.scrollArea.setObjectName("scrollArea")
|
| self.scrollAreaWidgetContents = QtWidgets.QWidget()
|
| self.scrollAreaWidgetContents.setGeometry(QtCore.QRect(0, 0, 779, 429))
|
| self.scrollAreaWidgetContents.setObjectName("scrollAreaWidgetContents")
|
|
|
| self.verticalLayout = QtWidgets.QVBoxLayout(self.scrollAreaWidgetContents)
|
| self.verticalLayout.setObjectName("verticalLayout")
|
| self.testOutputLabel = QtWidgets.QLabel(self.scrollAreaWidgetContents)
|
| self.testOutputLabel.setText("")
|
| self.testOutputLabel.setObjectName("testOutputLabel")
|
| self.verticalLayout.addWidget(self.testOutputLabel)
|
|
|
| self.scrollArea.setWidget(self.scrollAreaWidgetContents)
|
|
|
| self.progress_Bar = QtWidgets.QProgressBar(self.centralwidget)
|
| self.progress_Bar.setGeometry(QtCore.QRect(0, 520, 561, 23))
|
| self.progress_Bar.setRange(0, 50)
|
| self.progress_Bar.setProperty("value", 0)
|
| self.progress_Bar.setObjectName("progress_Bar")
|
|
|
| self.arpButton = QtWidgets.QPushButton(self.centralwidget)
|
| self.arpButton.clicked.connect(self.arp)
|
| self.arpButton.setGeometry(QtCore.QRect(20, 450, 93, 28))
|
| self.arpButton.setObjectName("arpButton")
|
|
|
| self.scanPorts = QtWidgets.QPushButton(self.centralwidget)
|
| self.scanPorts.clicked.connect(self.scan_ports)
|
| self.scanPorts.setGeometry(QtCore.QRect(120, 450, 93, 28))
|
| self.scanPorts.setObjectName("scanPorts")
|
|
|
| self.ipPortEntry = QtWidgets.QLineEdit(self.centralwidget)
|
| self.ipPortEntry.setGeometry(QtCore.QRect(220, 450, 221, 21))
|
| self.ipPortEntry.setObjectName("ipPortEntry")
|
| MainWindow.setCentralWidget(self.centralwidget)
|
| self.menubar = QtWidgets.QMenuBar(MainWindow)
|
| self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 26))
|
| self.menubar.setObjectName("menubar")
|
| MainWindow.setMenuBar(self.menubar)
|
| self.statusbar = QtWidgets.QStatusBar(MainWindow)
|
| self.statusbar.setObjectName("statusbar")
|
| MainWindow.setStatusBar(self.statusbar)
|
|
|
| self.retranslateUi(MainWindow)
|
| QtCore.QMetaObject.connectSlotsByName(MainWindow)
|
|
|
| def retranslateUi(self, MainWindow):
|
| _translate = QtCore.QCoreApplication.translate
|
| MainWindow.setWindowTitle(_translate("MainWindow", "Clover Scan alfa 0.6"))
|
| self.testOutputButton.setText(_translate("MainWindow", "Run IP Scan"))
|
| self.arpButton.setText(_translate("MainWindow", "Arp Table"))
|
| self.scanPorts.setText(_translate("MainWindow", "Scan Ports"))
|
| self.ipPortEntry.setPlaceholderText(_translate("MainWindow", "Enter IP to scan for open ports"))
|
|
|
|
|
| if __name__ == "__main__":
|
| import sys
|
| app = QtWidgets.QApplication(sys.argv)
|
| MainWindow = QtWidgets.QMainWindow()
|
| ui = Ui_MainWindow()
|
| ui.setupUi(MainWindow)
|
| MainWindow.show()
|
| sys.exit(app.exec_())
|