简单的想法是,用户输入持续时间(秒),并按下一个PyQt按钮,调用一个函数,创建一个python子进程,并通过它运行windump。然后使用time sleep等待用户定义的持续时间,然后process.terminate(),终止它(代码如下)
def windump_exec(duration):
p = s.Popen(['windump', '-i', '3', '-w', 'packets.pcap'], stdout=s.PIPE)
time.sleep(duration)
p.terminate()
现在一旦这样做了,scapy就会读取.pcap文件,我就会在屏幕上显示一些东西。当这一切发生的时候,QWaitingSpinner正在运行,为了处理这个问题,我使用QRunnable来运行上面的逻辑(包括scapy)(代码如下)。
class ThreadRunnable(QRunnable):
def __init__(self, _time, filler):
QRunnable.__init__(self)
self.time = _time
self.filler = filler
self.signal = RunnableSignal()
def run(self):
windump_exec(self.time)
packets = parse_data()
self.filler(packets)
self.signal.result.emit()
问题 windump代码本身工作正常,但在QThread中却没有创建输出文件,因此scapy没有任何东西可读(打开),并给出错误信息。
你可以用QThread代替Popen,在我的测试中,我使用了tcpdump,但我想换成windump应该有同样的行为。
import os
from PyQt5 import QtCore, QtGui, QtWidgets
from scapy.all import rdpcap
import psutil
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
class DumpProcesor(QtCore.QObject):
started = QtCore.pyqtSignal()
finished = QtCore.pyqtSignal()
def __init__(self, parent=None):
super().__init__(parent)
self._process = QtCore.QProcess()
self._timer = QtCore.QTimer(singleShot=True)
self._timer.timeout.connect(self.handle_timeout)
self._pid = -1
@property
def process(self):
return self._process
@property
def timer(self):
return self._timer
@QtCore.pyqtSlot()
def start(self):
self.started.emit()
status, self._pid = self._process.startDetached()
if status:
self._timer.start()
else:
self.finished.emit()
@QtCore.pyqtSlot()
def handle_timeout(self):
if self._pid > 0:
p = psutil.Process(self._pid)
p.terminate()
QtCore.QTimer.singleShot(100, self.finished.emit)
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.log_te = QtWidgets.QTextEdit(readOnly=True)
self.time_sb = QtWidgets.QSpinBox(minimum=1)
self.start_btn = QtWidgets.QPushButton(self.tr("Start"))
grid_layout = QtWidgets.QGridLayout(self)
grid_layout.addWidget(self.log_te, 0, 0, 1, 3)
grid_layout.addWidget(QtWidgets.QLabel("Time (seg):"), 1, 0)
grid_layout.addWidget(self.time_sb, 1, 1)
grid_layout.addWidget(self.start_btn, 1, 2)
self.dump_procesor = DumpProcesor(self)
self.dump_procesor.process.setProgram("tcpdump")
filename = os.path.join(CURRENT_DIR, "packets.pcap")
self.dump_procesor.process.setArguments(["-i", "3", "-w", filename])
self.start_btn.clicked.connect(self.start)
self.dump_procesor.finished.connect(self.on_finished)
@QtCore.pyqtSlot()
def start(self):
self.log_te.clear()
self.start_btn.setDisabled(True)
self.dump_procesor.timer.setInterval(self.time_sb.value() * 1000)
self.dump_procesor.start()
@QtCore.pyqtSlot()
def on_finished(self):
self.start_btn.setDisabled(False)
filename = os.path.join(CURRENT_DIR, "packets.pcap")
packets = rdpcap(filename)
for packet in packets:
t = packet.show(dump=True)
self.log_te.append(t)
def main():
import sys
app = QtWidgets.QApplication(sys.argv)
w = Widget()
w.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()