问题 Luigi Pipeline从S3开始


我的初始文件在 AWS S3。有人能指出我需要如何设置它 Luigi Task

我查看了文档并找到了 luigi.S3 但我不清楚该怎么做,然后我在网上搜索,只从中获取链接 mortar-luigi 并在luigi的顶部实施。

UPDATE

按照为@matagus提供的示例(我创建了 ~/.boto 文件也是如此建议的):

# coding: utf-8

import luigi

from luigi.s3 import S3Target, S3Client

class MyS3File(luigi.ExternalTask):
    def output(self):
        return S3Target('s3://my-bucket/19170205.txt')

class ProcessS3File(luigi.Task):

    def requieres(self):
        return MyS3File()

    def output(self):
        return luigi.LocalTarget('/tmp/resultado.txt')

    def run(self):
        result = None

        for input in self.input():
           print("Doing something ...")
           with input.open('r') as f:
               for line in f:
                   result = 'This is a line'

        if result:
            out_file = self.output().open('w')
            out_file.write(result)

当我执行它时没有任何反应

DEBUG: Checking if ProcessS3File() is complete
INFO: Informed scheduler that task   ProcessS3File()   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) running   ProcessS3File()
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) done      ProcessS3File()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   ProcessS3File()   has status   DONE
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) was stopped. Shutting down Keep-Alive thread

如您所见,消息 Doing something... 从不打印。哪里不对?


8527
2017-10-25 16:28


起源

错误在于 def requieres(self):。一定是 requires。 - matagus
Luigi检查该方法以获取输入文件,从那以后 requires 方法不存在,它返回一个空列表。 - matagus
你是绝对正确的!我是一个潜伏的人!谢谢! - nanounanue


答案:


这里的关键是定义一个 外部任务 没有输入,哪些输出是您在S3中生活的那些文件。 Luigi博士提到了这一点 需要另一个任务

请注意,requires()无法返回Target对象。如果您有一个在外部创建的简单Target对象,则可以将其包装在Task类中

所以,基本上你最终得到这样的东西:

import luigi

from luigi.s3 import S3Target

from somewhere import do_something_with


class MyS3File(luigi.ExternalTask):

    def output(self):
        return luigi.S3Target('s3://my-bucket/path/to/file')

class ProcessS3File(luigi.Task):

    def requires(self):
        return MyS3File()

    def output(self):
        return luigi.S3Target('s3://my-bucket/path/to/output-file')

    def run(self):
        result = None
        # this will return a file stream that reads the file from your aws s3 bucket
        with self.input().open('r') as f:
            result = do_something_with(f)

        # and the you 
        out_file = self.output().open('w')
        # it'd better to serialize this result before writing it to a file, but this is a pretty simple example
        out_file.write(result)

更新:

路易吉用 博托 从AWS S3读取文件和/或将其写入AWS S3,因此为了使此代码有效,您需要在boto配置文件中提供凭据 ~/boto (寻找其他 这里可能的配置文件位置):

[Credentials]
aws_access_key_id = <your_access_key_here>
aws_secret_access_key = <your_secret_key_here>

14
2017-10-26 17:08



您的代码存在一些问题,请问您能解决它们吗? (例如 return 在第一个 output 方法错了应该 return S3Target(... - nanounanue
另一个问题。在哪一部分我应该提供我的 aws credentials? - nanounanue
完成更新我的答案。希望能帮助到你。 - matagus
我刚刚更新了这个问题,你能帮帮我吗? - nanounanue
@DavidJ Luigi使用分段上传 64Mb块 默认情况下,您可以通过传递指定另一个大小 part_size 参数来 S3Target。所以答案是肯定的,这适用于大文件。 - matagus