问题 如何将数据发送到正在运行的python线程?


我有一个在我的应用程序中的不同线程中运行的类。我可以一次运行多个线程,线程是守护进程。过了一段时间, 一些 这些线程需要接收和处理消息。我该怎么做呢?

我的代码示例如下所示:

import threading
import time 

class MyThread(threading.Thread):
    def __init__(self, args=(), kwargs=None):
        threading.Thread.__init__(self, args=(), kwargs=None)
        self.daemon = True
        self.receive_messages = args[0]

    def run(self):
        print threading.currentThread().getName(), self.receive_messages

    def do_thing_with_message(self, message):
        if self.receive_messages:
            print threading.currentThread().getName(), "Received %s".format(message)

if __name__ == '__main__':
    threads = []
    for t in range(10):
        threads.append( MyThread(args=(t % 2 == 0,)))
        threads[t].start()
        time.sleep(0.1)

    for t in threads:
        t.do_thing_with_message("Print this!")

这输出:

Thread-1 True
Thread-2 False
Thread-3 True
Thread-4 False
Thread-5 True
Thread-6 False
Thread-7 True
Thread-8 False
Thread-9 True
Thread-10 False
MainThread Received %s
MainThread Received %s
MainThread Received %s
MainThread Received %s
MainThread Received %s

然而,我期待最后五行不与之相关 MainThread而不是 %s,我期待它 Print this!,像这样:

Thread-1 True
Thread-2 False
Thread-3 True
Thread-4 False
Thread-5 True
Thread-6 False
Thread-7 True
Thread-8 False
Thread-9 True
Thread-10 False
Thread-1 Received Print this!
Thread-3 Received Print this!
Thread-5 Received Print this!
Thread-7 Received Print this!
Thread-9 Received Print this!

如何正确地将这样的消息发送到正在运行的线程?

附录:

如果我有这个块后 Print this! 阻止,并利用@ dano的代码来解决上面的问题,它似乎没有响应这些新消息。

for t in threads:
    t.queue.put("Print this again!")
    time.sleep(0.1)

在这种情况下,我希望我的输出结束看起来像这样

Thread-1 Received Print this!
Thread-3 Received Print this!
Thread-5 Received Print this!
Thread-7 Received Print this!
Thread-9 Received Print this!
Thread-1 Received Print this again!
Thread-3 Received Print this again!
Thread-5 Received Print this again!
Thread-7 Received Print this again!
Thread-9 Received Print this again!

6930
2017-09-18 05:08


起源

这是什么 任务队列 是专为。 - Burhan Khalid
@BurhanKhalid我会说使用像芹菜这样的东西在这里会有点过分。 Celery对于将工作分配到集群中的多个进程或多台机器非常有用,但更简单的东西将适合OP的需求。 - dano


答案:


你可以用一个 Queue.Queue (要么 queue.Queue 在Python 3)为此:

import threading
import time 
from Queue import Queue

print_lock = threading.Lock()

class MyThread(threading.Thread):
    def __init__(self, queue, args=(), kwargs=None):
        threading.Thread.__init__(self, args=(), kwargs=None)
        self.queue = queue
        self.daemon = True
        self.receive_messages = args[0]

    def run(self):
        print threading.currentThread().getName(), self.receive_messages
        val = self.queue.get()
        self.do_thing_with_message(val)

    def do_thing_with_message(self, message):
        if self.receive_messages:
            with print_lock:
                print threading.currentThread().getName(), "Received {}".format(message)

if __name__ == '__main__':
    threads = []
    for t in range(10):
        q = Queue()
        threads.append(MyThread(q, args=(t % 2 == 0,)))
        threads[t].start()
        time.sleep(0.1)

    for t in threads:
        t.queue.put("Print this!")

    for t in threads:
        t.join()

我们通过了 Queue 实例到每个线程,并将我们的消息发送给 Thread 运用 queue.put。我们等待消息到达 run 方法,这是的一部分 Thread 实际上在一个单独的执行线程中运行的对象。收到消息后,我们会打电话 do_thing_with_message,它将在相同的后台线程中运行。

我还添加了一个 threading.Lock 代码所以打印到stdout不会混淆。

编辑:

如果您希望能够向线程传递多条消息,只需使用循环:

def run(self):
    print threading.currentThread().getName(), self.receive_messages
    while True:
        val = self.queue.get()
        if val is None:   # If you send `None`, the thread will exit.
            return
        self.do_thing_with_message(val)

11
2017-09-18 05:21



这很棒!它解决了我问的问题,但也许并不完全是我所期待的。如果我添加另一个 Print this! 循环(或任何数量的循环,因为我希望能够在这些线程中发送消息,只要它们存在),只打印每个线程的第一条消息。如何接受任意数量的消息? (如果这很多,我可以问一个单独的问题并将其标记为我原始问题的答案) - Python Novice
感谢您提供有关Lock的信息。 - Python Novice
@PythonNoob刚刚放了一个 while True: 环绕着 val = self.queue.get() ; self.do_thing_with_message(val) 调用。 - dano
@PythonNoob我已经编辑了我的答案来演示如何接收多条消息,以及如何通过输入一个sentinel值来指示消息接收循环从主线程结束(None) 进入 Queue。 - dano