我还在学习如何编写代码,这些都是我在多线程的第一次尝试。我读过一帮多线程的文章。我认为这些都是非常有益的。
https://stackoverflow.com/questions/11196367/processing-single-file-from-multiple-processes
https://pymotw.com/2/multiprocessing/basics.html
https://www.agiliq.com/blog/2013/10/producer-consumer-problem-in-python/
https://docs.python.org/3/library/multiprocessing.html
有相当多的思考。特别是对于初学者。不幸的是,当我试图把这些信息付诸实践。我的代码是不是很努力。
这段代码背后的想法是读取simplified.txt其中包含逗号分隔号线。例如:0.275,0.28,0.275,0.275,36078。生产者线程读取每个行和钢带从行末尾的换行符。然后,在线路的每个号码被分割和分配一个变量。然后变量1被放置到队列中。消费者线程将拾取物品在队列中,将其平方,再添加一个条目到日志文件。
我使用的代码来自这个模板:
https://www.bogotobogo.com/python/Multithread/python_multithreading_Synchronization_Producer_Consumer_using_Queue.php
这是我到目前为止的代码:
import threading
import queue
import time
import logging
import random
import sys
read_file = 'C:/temp/temp1/simplified.txt'
log1 = open('C:/temp/temp1/simplified_log1.txt', "a+")
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-9s) %(message)s',)
BUF_SIZE = 10
q = queue.Queue(BUF_SIZE)
class ProducerThread(threading.Thread):
def __init__(self, name, read_file):
super(ProducerThread,self).__init__()
self.name = name
self.read_file = read_file
def run(self, read_file):
while True:
if not q.full():
with open(read_file, 'r') as f:
for line in f:
stripped = line.strip('\n\r')
value1,value2,value3,value4,value5,value6,value7 = stripped.split(',')
q.put(value1)
logging.debug('Putting ' + str(value1) + ' : ' + str(q.qsize()) + ' items in queue')
time.sleep(random.random())
return
class ConsumerThread(threading.Thread):
def __init__(self, name, value1, log1):
super(ConsumerThread,self).__init__()
self.name = name
self.value1 = value1
self.log1 = log1
return
def run(self):
while True:
if not q.empty():
value1 = q.get()
sqr_value1 = value1 * value1
log1.write("The square of " + str(value1) + " is " + str(sqr_value1))
logging.debug('Getting ' + str(value1) + ' : ' + str(q.qsize()) + ' items in queue')
time.sleep(random.random())
return
if __name__ == '__main__':
p = ProducerThread(name='producer')
c = ConsumerThread(name='consumer')
p.start()
time.sleep(2)
c.start()
time.sleep(2)
当我运行的代码,我得到这个错误:
Traceback (most recent call last):
File "c:/Scripta/A_Simplified_Producer_Consumer_Queue_v0.1.py", line 60, in <module>
p = ProducerThread(name='producer')
TypeError: __init__() missing 1 required positional argument: 'read_file'
我不知道还有什么地方我需要添加“READ_FILE”。任何帮助将不胜感激。提前致谢。
你ProducerThread
类需要在其name
方法,在这里你只有当你在你的主块创建一个实例提供了第一个这样的参数定义2个参数(read_file
和__init__
)作为参数传递给它的构造。你有你的第二类同样的问题。
您应该创建实例时提供read_file
的建设者或只是从构造签名删除它,因为你似乎没有反正使用它(您使用传入read_file
功能run
,但我不认为这是正确的) 。好像你试图覆盖从超线程那个方法,我怀疑,采取这样的参数。
感谢您userSeventeen设置我在正确的道路上。我认为,为了使用外面的变量,我需要将它们放置在init方法,然后再进入run方法。你已经澄清,我只需要使用变量在运行方法。这是工作的代码。我不得不删除,而真正:语句,我不想代码永远运行。
import threading
import queue
import time
import logging
import random
import sys
import os
read_file = 'C:/temp/temp1/simplified.txt'
log1 = open('C:/temp/temp1/simplified_log1.txt', "a+")
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-9s) %(message)s',)
BUF_SIZE = 10
q = queue.Queue(BUF_SIZE)
class ProducerThread(threading.Thread):
def __init__(self, name):
super(ProducerThread,self).__init__()
self.name = name
def run(self):
with open(read_file, 'r') as f:
for line in f:
stripped = line.strip('\n\r')
value1,value2,value3,value4,value5 = stripped.split(',')
float_value1 = float(value1)
if not q.full():
q.put(float_value1)
logging.debug('Putting ' + str(float_value1) + ' : ' + str(q.qsize()) + ' items in queue')
time.sleep(random.random())
return
class ConsumerThread(threading.Thread):
def __init__(self, name):
super(ConsumerThread,self).__init__()
self.name = name
return
def run(self):
while not q.empty():
float_value1 = q.get()
sqr_value1 = float_value1 * float_value1
log1.write("The square of " + str(float_value1) + " is " + str(sqr_value1))
logging.debug('Getting ' + str(float_value1) + ' : ' + str(q.qsize()) + ' items in queue')
time.sleep(random.random())
return
if __name__ == '__main__':
p = ProducerThread(name='producer')
c = ConsumerThread(name='consumer')
p.start()
time.sleep(2)
c.start()
time.sleep(2)