这篇文章主要为大家详细介绍了python3+pyqt5实现支持多线程的页面索引器应用程序,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
本文通过python3+pyqt5实现了python qt gui 快速编程的19章的页面索引器应用程序例子。
/home/yrd/eric_workspace/chap19/walker_ans.py
#!/usr/bin/env python3
import codecs
import html.entities
import re
import sys
from pyqt5.qtcore import (qmutex, qthread,pyqtsignal,qt)
class walker(qthread):
finished = pyqtsignal(bool,int)
indexed = pyqtsignal(str,int)
common_words_threshold = 250
min_word_len = 3
max_word_len = 25
invalid_first_or_last = frozenset("0123456789_")
striphtml_re = re.compile(r"<[^>]*?>", re.ignorecase|re.multiline)
entity_re = re.compile(r"&(\w+?);|(\d+?);")
split_re = re.compile(r"\w+", re.ignorecase|re.multiline)
def __init__(self, index, lock, files, filenamesforwords,
commonwords, parent=none):
super(walker, self).__init__(parent)
self.index = index
self.lock = lock
self.files = files
self.filenamesforwords = filenamesforwords
self.commonwords = commonwords
self.stopped = false
self.mutex = qmutex()
self.completed = false
def stop(self):
try:
self.mutex.lock()
self.stopped = true
finally:
self.mutex.unlock()
def isstopped(self):
try:
self.mutex.lock()
return self.stopped
finally:
self.mutex.unlock()
def run(self):
self.processfiles()
self.stop()
self.finished.emit(self.completed,self.index)
def processfiles(self):
def unichrfromentity(match):
text = match.group(match.lastindex)
if text.isdigit():
return chr(int(text))
u = html.entities.name2codepoint.get(text)
return chr(u) if u is not none else ""
for fname in self.files:
if self.isstopped():
return
words = set()
fh = none
try:
fh = codecs.open(fname, "r", "utf8", "ignore")
text = fh.read()
except environmenterror as e:
sys.stderr.write("error: {0}\n".format(e))
continue
finally:
if fh is not none:
fh.close()
if self.isstopped():
return
text = self.striphtml_re.sub("", text)
text = self.entity_re.sub(unichrfromentity, text)
text = text.lower()
for word in self.split_re.split(text):
if (self.min_word_len <= len(word) <=
self.max_word_len and
word[0] not in self.invalid_first_or_last and
word[-1] not in self.invalid_first_or_last):
try:
self.lock.lockforread()
new = word not in self.commonwords
finally:
self.lock.unlock()
if new:
words.add(word)
if self.isstopped():
return
for word in words:
try:
self.lock.lockforwrite()
files = self.filenamesforwords[word]
if len(files) > self.common_words_threshold:
del self.filenamesforwords[word]
self.commonwords.add(word)
else:
files.add(str(fname))
finally:
self.lock.unlock()
self.indexed.emit(fname,self.index)
self.completed = true
/home/yrd/eric_workspace/chap19/pageindexer_ans.pyw
#!/usr/bin/env python3
import collections
import os
import sys
from pyqt5.qtcore import (qdir, qreadwritelock, qmutex,qt)
from pyqt5.qtwidgets import (qapplication, qdialog, qfiledialog, qframe,
qhboxlayout, qlcdnumber, qlabel, qlineedit, qlistwidget,
qpushbutton, qvboxlayout)
import walker_ans as walker
def isalive(qobj):
import sip
try:
sip.unwrapinstance(qobj)
except runtimeerror:
return false
return true
class form(qdialog):
def __init__(self, parent=none):
super(form, self).__init__(parent)
self.mutex = qmutex()
self.filecount = 0
self.filenamesforwords = collections.defaultdict(set)
self.commonwords = set()
self.lock = qreadwritelock()
self.path = qdir.homepath()
pathlabel = qlabel("indexing path:")
self.pathlabel = qlabel()
self.pathlabel.setframestyle(qframe.styledpanel|qframe.sunken)
self.pathbutton = qpushbutton("set &path...")
self.pathbutton.setautodefault(false)
findlabel = qlabel("&find word:")
self.findedit = qlineedit()
findlabel.setbuddy(self.findedit)
commonwordslabel = qlabel("&common words:")
self.commonwordslistwidget = qlistwidget()
commonwordslabel.setbuddy(self.commonwordslistwidget)
fileslabel = qlabel("files containing the &word:")
self.fileslistwidget = qlistwidget()
fileslabel.setbuddy(self.fileslistwidget)
filesindexedlabel = qlabel("files indexed")
self.filesindexedlcd = qlcdnumber()
self.filesindexedlcd.setsegmentstyle(qlcdnumber.flat)
wordsindexedlabel = qlabel("words indexed")
self.wordsindexedlcd = qlcdnumber()
self.wordsindexedlcd.setsegmentstyle(qlcdnumber.flat)
commonwordslcdlabel = qlabel("common words")
self.commonwordslcd = qlcdnumber()
self.commonwordslcd.setsegmentstyle(qlcdnumber.flat)
self.statuslabel = qlabel("click the 'set path' "
"button to start indexing")
self.statuslabel.setframestyle(qframe.styledpanel|qframe.sunken)
toplayout = qhboxlayout()
toplayout.addwidget(pathlabel)
toplayout.addwidget(self.pathlabel, 1)
toplayout.addwidget(self.pathbutton)
toplayout.addwidget(findlabel)
toplayout.addwidget(self.findedit, 1)
leftlayout = qvboxlayout()
leftlayout.addwidget(fileslabel)
leftlayout.addwidget(self.fileslistwidget)
rightlayout = qvboxlayout()
rightlayout.addwidget(commonwordslabel)
rightlayout.addwidget(self.commonwordslistwidget)
middlelayout = qhboxlayout()
middlelayout.addlayout(leftlayout, 1)
middlelayout.addlayout(rightlayout)
bottomlayout = qhboxlayout()
bottomlayout.addwidget(filesindexedlabel)
bottomlayout.addwidget(self.filesindexedlcd)
bottomlayout.addwidget(wordsindexedlabel)
bottomlayout.addwidget(self.wordsindexedlcd)
bottomlayout.addwidget(commonwordslcdlabel)
bottomlayout.addwidget(self.commonwordslcd)
bottomlayout.addstretch()
layout = qvboxlayout()
layout.addlayout(toplayout)
layout.addlayout(middlelayout)
layout.addlayout(bottomlayout)
layout.addwidget(self.statuslabel)
self.setlayout(layout)
self.walkers = []
self.completed = []
self.pathbutton.clicked.connect(self.setpath)
self.findedit.returnpressed.connect(self.find)
self.setwindowtitle("page indexer")
def stopwalkers(self):
for walker in self.walkers:
if isalive(walker) and walker.isrunning():
walker.stop()
for walker in self.walkers:
if isalive(walker) and walker.isrunning():
walker.wait()
self.walkers = []
self.completed = []
def setpath(self):
self.stopwalkers()
self.pathbutton.setenabled(false)
path = qfiledialog.getexistingdirectory(self,
"choose a path to index", self.path)
if not path:
self.statuslabel.settext("click the 'set path' "
"button to start indexing")
self.pathbutton.setenabled(true)
return
self.statuslabel.settext("scanning directories...")
qapplication.processevents() # needed for windows
self.path = qdir.tonativeseparators(path)
self.findedit.setfocus()
self.pathlabel.settext(self.path)
self.statuslabel.clear()
self.fileslistwidget.clear()
self.filecount = 0
self.filenamesforwords = collections.defaultdict(set)
self.commonwords = set()
nofilesfound = true
files = []
index = 0
for root, dirs, fnames in os.walk(str(self.path)):
for name in [name for name in fnames
if name.endswith((".htm", ".html"))]:
files.append(os.path.join(root, name))
if len(files) == 1000:
self.processfiles(index, files[:])
files = []
index += 1
nofilesfound = false
if files:
self.processfiles(index, files[:])
nofilesfound = false
if nofilesfound:
self.finishedindexing()
self.statuslabel.settext(
"no html files found in the given path")
def processfiles(self, index, files):
thread = walker.walker(index, self.lock, files,
self.filenamesforwords, self.commonwords, self)
thread.indexed[str,int].connect(self.indexed)
thread.finished[bool,int].connect(self.finished)
thread.finished.connect(thread.deletelater)
self.walkers.append(thread)
self.completed.append(false)
thread.start()
thread.wait(300) # needed for windows
def find(self):
word = str(self.findedit.text())
if not word:
try:
self.mutex.lock()
self.statuslabel.settext("enter a word to find in files")
finally:
self.mutex.unlock()
return
try:
self.mutex.lock()
self.statuslabel.clear()
self.fileslistwidget.clear()
finally:
self.mutex.unlock()
word = word.lower()
if " " in word:
word = word.split()[0]
try:
self.lock.lockforread()
found = word in self.commonwords
finally:
self.lock.unlock()
if found:
try:
self.mutex.lock()
self.statuslabel.settext("common words like '{0}' "
"are not indexed".format(word))
finally:
self.mutex.unlock()
return
try:
self.lock.lockforread()
files = self.filenamesforwords.get(word, set()).copy()
finally:
self.lock.unlock()
if not files:
try:
self.mutex.lock()
self.statuslabel.settext("no indexed file contains "
"the word '{0}'".format(word))
finally:
self.mutex.unlock()
return
files = [qdir.tonativeseparators(name) for name in
sorted(files, key=str.lower)]
try:
self.mutex.lock()
self.fileslistwidget.additems(files)
self.statuslabel.settext(
"{0} indexed files contain the word '{1}'".format(
len(files), word))
finally:
self.mutex.unlock()
def indexed(self, fname, index):
try:
self.mutex.lock()
self.statuslabel.settext(fname)
self.filecount += 1
count = self.filecount
finally:
self.mutex.unlock()
if count % 25 == 0:
try:
self.lock.lockforread()
indexedwordcount = len(self.filenamesforwords)
commonwordcount = len(self.commonwords)
finally:
self.lock.unlock()
try:
self.mutex.lock()
self.filesindexedlcd.display(count)
self.wordsindexedlcd.display(indexedwordcount)
self.commonwordslcd.display(commonwordcount)
finally:
self.mutex.unlock()
elif count % 101 == 0:
try:
self.lock.lockforread()
words = self.commonwords.copy()
finally:
self.lock.unlock()
try:
self.mutex.lock()
self.commonwordslistwidget.clear()
self.commonwordslistwidget.additems(sorted(words))
finally:
self.mutex.unlock()
def finished(self, completed, index):
done = false
if self.walkers:
self.completed[index] = true
if all(self.completed):
try:
self.mutex.lock()
self.statuslabel.settext("finished")
done = true
finally:
self.mutex.unlock()
else:
try:
self.mutex.lock()
self.statuslabel.settext("finished")
done = true
finally:
self.mutex.unlock()
if done:
self.finishedindexing()
def reject(self):
if not all(self.completed):
self.stopwalkers()
self.finishedindexing()
else:
self.accept()
def closeevent(self, event=none):
self.stopwalkers()
def finishedindexing(self):
self.filesindexedlcd.display(self.filecount)
self.wordsindexedlcd.display(len(self.filenamesforwords))
self.commonwordslcd.display(len(self.commonwords))
self.pathbutton.setenabled(true)
qapplication.processevents() # needed for windows
app = qapplication(sys.argv)
form = form()
form.show()
app.exec_()
运行结果:
相关推荐:
python3+pyqt5+qt designer实现堆叠窗口部件
python3+pyqt5+qt designer实现扩展对话框
以上就是python3+pyqt5实现支持多线程的页面索引器应用程序的详细内容。