我有一个使用的短代码 multiprocessing
打包并在我的本地机器上正常工作。
当我上传到 AWS Lambda
并在那里运行,我得到以下错误(stacktrace修剪):
[Errno 38] Function not implemented: OSError
Traceback (most recent call last):
File "/var/task/recorder.py", line 41, in record
pool = multiprocessing.Pool(10)
File "/usr/lib64/python2.7/multiprocessing/__init__.py", line 232, in Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 138, in __init__
self._setup_queues()
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 234, in _setup_queues
self._inqueue = SimpleQueue()
File "/usr/lib64/python2.7/multiprocessing/queues.py", line 354, in __init__
self._rlock = Lock()
File "/usr/lib64/python2.7/multiprocessing/synchronize.py", line 147, in __init__
SemLock.__init__(self, SEMAPHORE, 1, 1)
File "/usr/lib64/python2.7/multiprocessing/synchronize.py", line 75, in __init__
sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
OSError: [Errno 38] Function not implemented
是不是python的核心包的一部分没有实现?我不知道我在底下跑什么,所以我无法登录并进行调试。
任何想法如何运行 multiprocessing
在Lambda?
据我所知,多处理将无法在AWS Lambda上运行,因为缺少执行环境/容器 /dev/shm
- 看 https://forums.aws.amazon.com/thread.jspa?threadID=219962 (可能需要登录)。
没有任何关于if /何时亚马逊会改变这个的话(我可以找到),所以我正在寻找其他库,例如 https://pythonhosted.org/joblib/parallel.html 将回归 /tmp
(如果它找不到,我们知道它存在) /dev/shm
。
multiprocessing.Pool
似乎没有原生支持(因为有问题 SemLock
),但是 multiprocessing.Process
, multiprocessing.Queue
, multiprocessing.Pipe
等在AWSLambda中正常工作。
这应该允许您通过手动创建/分叉进程并使用a来构建变通解决方案 multiprocessing.Pipe
用于父进程和子进程之间的通信。希望有所帮助
您可以使用Python的多处理模块在AWS Lambda上并行运行例程,但不能像其他答案中所述那样使用池或队列。一个可行的解决方案是使用本文中概述的Process和Pipe https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/
虽然这篇文章肯定帮助我找到了解决方案(下面分享),但有一些事情需要注意。首先,基于Process和Pipe的解决方案并不像Pool中的内置map函数那么快,尽管我确实看到了近乎线性的加速,因为我增加了Lambda函数中的可用内存/ CPU资源。其次,在以这种方式开发多处理功能时必须进行相当多的管理。我怀疑这至少部分是为什么我的解决方案比内置方法慢。如果有人有建议加快速度,我很乐意听到他们的声音!最后,尽管本文指出多处理对于卸载异步进程很有用,但还有其他原因可以使用多处理,例如我正在尝试的大量强化数学运算。最后,我对性能提升感到满意,因为它比顺序执行要好得多!
代码:
# Python 3.6
from multiprocessing import Pipe, Process
def myWorkFunc(data, connection):
result = None
# Do some work and store it in result
if result:
connection.send([result])
else:
connection.send([None])
def myPipedMultiProcessFunc():
# Get number of available logical cores
plimit = multiprocessing.cpu_count()
# Setup management variables
results = []
parent_conns = []
processes = []
pcount = 0
pactive = []
i = 0
for data in iterable:
# Create the pipe for parent-child process communication
parent_conn, child_conn = Pipe()
# create the process, pass data to be operated on and connection
process = Process(target=myWorkFunc, args=(data, child_conn,))
parent_conns.append(parent_conn)
process.start()
pcount += 1
if pcount == plimit: # There is not currently room for another process
# Wait until there are results in the Pipes
finishedConns = multiprocessing.connection.wait(parent_conns)
# Collect the results and remove the connection as processing
# the connection again will lead to errors
for conn in finishedConns:
results.append(conn.recv()[0])
parent_conns.remove(conn)
# Decrement pcount so we can add a new process
pcount -= 1
# Ensure all remaining active processes have their results collected
for conn in parent_conns:
results.append(conn.recv()[0])
conn.close()
# Process results as needed