class EmittingStream(QObject):
textWritten = Signal(str)
progress = Signal(int)
def write(self, text):
self.textWritten.emit(text)
def set_progress(self, value):
self.progress.emit(value)
def flush(self):
pass
sys.stdout has attribute write
I overwrote it
stream = EmittingStream()
sys.stdout = stream
stream.textWritten.connect(self.showOuput)
stream.progress.connect(self.setProgress)
Ref:
from PySide6.QtCore import QObject, QThreadPool, Signal
class LogPyGUI(QMainWindow):
def __init__(self):
...
self.threadpool = QThreadPool()
...
self.ui.start_button.clicked.connect(self.start_analysis)
...
def start_analysis(self):
...
command = ["logpy"]
command.extend(["-l", self.args.log_file])
command.extend(["-o", self.args.out_file])
...
command.extend(["--GUI"])
sys.argv = command
stream = EmittingStream()
sys.stdout = stream
stream.textWritten.connect(self.showOuput)
stream.progress.connect(self.setProgress)
analysis_thread = threading.Thread(target=main())
# Where main.py takes command line arguments
analysis_thread.start()
# To reset the sys.stdout back to default
def __del__(self):
sys.stdout = sys.__stdout__
Reference: Multithreading PySide6 applications with QThreadPool (pythonguis.com)
from PySide6.QtCore import QObject, QRunnable, QThreadPool, Signal, Slot, QThread
class WorkerSignals(QObject):
finished = Signal()
error = Signal(tuple)
result = Signal(object)
progress = Signal(int)
class Worker(QRunnable):
def __init__(self, args):
super(Worker, self).__init__()
self.p = args
self.signals = WorkerSignals()
def AnalyzeLogs(self, p, progress_callback):
log_analyzer = LogAnalyzer(p)
...
return log_analyzer.print_summary()
@Slot()
def run(self):
try:
output = self.AnalyzeLogs(self.p, self.signals.progress)
except:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit((exctype, value, traceback.format_exc()))
else:
self.signals.result.emit(output)
finally:
self.signals.finished.emit()
class LogPyGUI(QMainWindow):
def __init__(self):
super().__init__()
...
self.threadpool = QThreadPool()
...
self.ui.start_button.clicked.connect(self.start_a)
...
def finished(self):
QMessageBox.about(self, "Finished", "Log Analysis Finished")
@Slot()
def start(self):
if not self.get_args():
return
self.worker = Worker(self.args)
self.worker.signals.result.connect(self.showOuput)
self.worker.signals.finished.connect(self.finished)
self.worker.signals.progress.connect(self.setProgress)
self.threadpool.start(self.worker)
Reference: QThread - Qt for Python