通过python子进程和PyQt Button实现Windump。

问题描述 投票:0回答:1

简单的想法是,用户输入持续时间(秒),并按下一个PyQt按钮,调用一个函数,创建一个python子进程,并通过它运行windump。然后使用time sleep等待用户定义的持续时间,然后process.terminate(),终止它(代码如下)

def windump_exec(duration):
    p = s.Popen(['windump', '-i', '3', '-w', 'packets.pcap'], stdout=s.PIPE)
    time.sleep(duration)
    p.terminate()

现在一旦这样做了,scapy就会读取.pcap文件,我就会在屏幕上显示一些东西。当这一切发生的时候,QWaitingSpinner正在运行,为了处理这个问题,我使用QRunnable来运行上面的逻辑(包括scapy)(代码如下)。

class ThreadRunnable(QRunnable):

    def __init__(self, _time, filler):
        QRunnable.__init__(self)
        self.time = _time
        self.filler = filler

        self.signal = RunnableSignal()

    def run(self):
        windump_exec(self.time)
        packets = parse_data()
        self.filler(packets)
        self.signal.result.emit()

问题 windump代码本身工作正常,但在QThread中却没有创建输出文件,因此scapy没有任何东西可读(打开),并给出错误信息。

python python-3.x pyqt pyqt5 scapy
1个回答
1
投票

你可以用QThread代替Popen,在我的测试中,我使用了tcpdump,但我想换成windump应该有同样的行为。

import os

from PyQt5 import QtCore, QtGui, QtWidgets

from scapy.all import rdpcap

import psutil


CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))


class DumpProcesor(QtCore.QObject):
    started = QtCore.pyqtSignal()
    finished = QtCore.pyqtSignal()

    def __init__(self, parent=None):
        super().__init__(parent)

        self._process = QtCore.QProcess()
        self._timer = QtCore.QTimer(singleShot=True)
        self._timer.timeout.connect(self.handle_timeout)

        self._pid = -1

    @property
    def process(self):
        return self._process

    @property
    def timer(self):
        return self._timer

    @QtCore.pyqtSlot()
    def start(self):
        self.started.emit()
        status, self._pid = self._process.startDetached()
        if status:
            self._timer.start()
        else:
            self.finished.emit()

    @QtCore.pyqtSlot()
    def handle_timeout(self):
        if self._pid > 0:
            p = psutil.Process(self._pid)
            p.terminate()
            QtCore.QTimer.singleShot(100, self.finished.emit)


class Widget(QtWidgets.QWidget):
    def __init__(self, parent=None):
        super().__init__(parent)

        self.log_te = QtWidgets.QTextEdit(readOnly=True)
        self.time_sb = QtWidgets.QSpinBox(minimum=1)
        self.start_btn = QtWidgets.QPushButton(self.tr("Start"))

        grid_layout = QtWidgets.QGridLayout(self)
        grid_layout.addWidget(self.log_te, 0, 0, 1, 3)
        grid_layout.addWidget(QtWidgets.QLabel("Time (seg):"), 1, 0)
        grid_layout.addWidget(self.time_sb, 1, 1)
        grid_layout.addWidget(self.start_btn, 1, 2)

        self.dump_procesor = DumpProcesor(self)
        self.dump_procesor.process.setProgram("tcpdump")
        filename = os.path.join(CURRENT_DIR, "packets.pcap")
        self.dump_procesor.process.setArguments(["-i", "3", "-w", filename])

        self.start_btn.clicked.connect(self.start)
        self.dump_procesor.finished.connect(self.on_finished)

    @QtCore.pyqtSlot()
    def start(self):
        self.log_te.clear()
        self.start_btn.setDisabled(True)
        self.dump_procesor.timer.setInterval(self.time_sb.value() * 1000)
        self.dump_procesor.start()

    @QtCore.pyqtSlot()
    def on_finished(self):
        self.start_btn.setDisabled(False)
        filename = os.path.join(CURRENT_DIR, "packets.pcap")
        packets = rdpcap(filename)

        for packet in packets:
            t = packet.show(dump=True)
            self.log_te.append(t)


def main():
    import sys

    app = QtWidgets.QApplication(sys.argv)
    w = Widget()
    w.show()
    sys.exit(app.exec_())


if __name__ == "__main__":
    main()
© www.soinside.com 2019 - 2024. All rights reserved.