问题 Flask:后台线程将非空队列视为空


当我在uwsgi中运行Flask应用程序时,后台线程和应用程序函数在查询同一队列的大小时会看到不同的值。

组件

  • 带有一个Flask应用程序 线程安全队列
  • 一个 GET call返回队列大小。
  • 一个 POST call将一个元素添加到Queue中。
  • 后台线程打印队列大小

问题

当应用程序来自shell时使用 python tester.py,我得到了预期的结果:

2014-06-07 14:20:50.677995 Queue size is: 0
127.0.0.1 - - [07/Jun/2014 14:20:51] "POST /addMessage/X HTTP/1.1" 200 -
2014-06-07 14:20:51.679277 Queue size is: 1
2014-06-07 14:20:52.680425 Queue size is: 1
2014-06-07 14:20:53.681566 Queue size is: 1
2014-06-07 14:20:54.682708 Queue size is: 1
127.0.0.1 - - [07/Jun/2014 14:20:55] "POST /addMessage/Y HTTP/1.1" 200 -
2014-06-07 14:20:55.687755 Queue size is: 2
2014-06-07 14:20:56.688867 Queue size is: 2

但是,使用时执行应用程序 uwsgi,我在日志中得到以下内容:

2014-06-07 14:17:42.056863 Queue size is: 0
2014-06-07 14:17:43.057952 Queue size is: 0
[pid: 9879|app: 0|req: 6/6] 127.0.0.1 () {24 vars in 280 bytes} [Sat Jun  7 14:17:43 2014] POST /addMessage/X => generated 16 bytes in 0 msecs (HTTP/1.1 200) 2 headers in 71 bytes (1 switches on core 0)
2014-06-07 14:17:44.059037 Queue size is: 0
2014-06-07 14:17:45.060118 Queue size is: 0
[pid: 9879|app: 0|req: 7/7] 127.0.0.1 () {24 vars in 280 bytes} [Sat Jun  7 14:17:45 2014] POST /addMessage/X => generated 16 bytes in 0 msecs (HTTP/1.1 200) 2 headers in 71 bytes (1 switches on core 0)
2014-06-07 14:17:46.061205 Queue size is: 0
2014-06-07 14:17:47.062286 Queue size is: 0

在uwsgi下运行时,后台线程看不到与应用程序相同的队列。这是为什么?如何让这两个线程看到同一个Queue对象?

更新

  • 即使它作为Python脚本执行,我也会看到不一致的行为:有时它无法记录消息(使用 app.logger),我只能看到 print秒。这意味着线程正在运行,但它无法执行任何操作 app.logger

uwsgi .ini 组态

[uwsgi]
http-socket    = :9002
plugin         = python
wsgi-file      = /home/ubuntu/threadtest-uwsgi.py
enable-threads = true
workers        = 1
chdir          = /home/ubuntu/thread-tester/thread_tester

from flask import Flask, jsonify
import Queue
from threading import Thread
import time
import datetime
import logging
import sys

logging.basicConfig(stream=sys.stderr,
                    format='%(asctime)s %(levelname)s - %(message)s')

app = Flask(__name__)
messages = Queue.Queue()

def print_queue_size():
    while True:
        app.logger.debug("%s Queue size is: %d" % (datetime.datetime.now(),
                                        messages.qsize()))
        time.sleep(1)

t = Thread(target=print_queue_size, args=())
t.setDaemon(True)
t.start()

@app.route("/queueSize", methods=["GET"])
def get_queue_size():
    return jsonify({"qsize": messages.qsize()}), 200

@app.route("/addMessage/<message>", methods=["POST"])
def add_message_to_queue(message):
    messages.put(message)
    return jsonify({"qsize": messages.qsize()}), 200

if __name__ == "__main__":
    app.run(port=6000)

10750
2018-06-07 14:39


起源



答案:


来自 需要了解的事项文件页面

uWSGI尝试(ab)使用的Copy On Write语义 fork() 尽可能打电话。默认情况下,它会在加载应用程序之后进行分叉,以尽可能多地共享内存。如果由于某种原因这种行为是不受欢迎的,请使用 lazy 选项。这将指示uWSGI在每个worker的fork()之后加载应用程序。延迟模式改变了正常重新加载的工作方式:不是重新加载整个实例,而是在链中重新加载每个工作程序。如果你想要“延迟应用程序加载”,但想要保持标准的uWSGI重新加载行为,从1.3开始就可以使用 lazy-apps 选项。

您的Flask应用程序在uWSGI启动时启动,然后是一个工作进程 。在分叉, Queue 对象为空,不再与原始进程共享。线程没有被带走。

尝试设置 lazy-apps 选项  延迟加载Flask应用程序,直到工作人员启动。


12
2018-06-07 23:47



+1但使用--lazy-apps作为--lazy真的是入侵的 - roberto
这似乎有效!我将在一些临时环境中测试它。 - Adam Matan
美丽。这个答案结束了一个非常痛苦的一周,它应得的每一点赏金。 - Adam Matan
@AdamMatan:谢谢!很高兴得到帮助。 :-) - Martijn Pieters♦


答案:


来自 需要了解的事项文件页面

uWSGI尝试(ab)使用的Copy On Write语义 fork() 尽可能打电话。默认情况下,它会在加载应用程序之后进行分叉,以尽可能多地共享内存。如果由于某种原因这种行为是不受欢迎的,请使用 lazy 选项。这将指示uWSGI在每个worker的fork()之后加载应用程序。延迟模式改变了正常重新加载的工作方式:不是重新加载整个实例,而是在链中重新加载每个工作程序。如果你想要“延迟应用程序加载”,但想要保持标准的uWSGI重新加载行为,从1.3开始就可以使用 lazy-apps 选项。

您的Flask应用程序在uWSGI启动时启动,然后是一个工作进程 。在分叉, Queue 对象为空,不再与原始进程共享。线程没有被带走。

尝试设置 lazy-apps 选项  延迟加载Flask应用程序,直到工作人员启动。


12
2018-06-07 23:47



+1但使用--lazy-apps作为--lazy真的是入侵的 - roberto
这似乎有效!我将在一些临时环境中测试它。 - Adam Matan
美丽。这个答案结束了一个非常痛苦的一周,它应得的每一点赏金。 - Adam Matan
@AdamMatan:谢谢!很高兴得到帮助。 :-) - Martijn Pieters♦


@Martijn Pieters答案中的文档链接指出 lazy-apps 可能比预先消耗更多的内存。如果您对此感到担忧,您可能还会考虑这个问题 @postfork 装饰器可以更精细地控制分叉后运行的内容。你可以在里面创建你的队列 @postfork-decorated功能,它将在每个工人中创建。


1
2017-11-08 19:02