Warm tip: This article is reproduced from serverfault.com, please click

python-数据流管道中WriteToText文件的ERRNO2

(python - ERRNO2 for WriteToText files in a Dataflow pipeline)

发布于 2020-12-01 03:52:41

我有一个分支管道,其中包含多个ParDo转换,这些转换已合并并写入GCS存储桶中的文本文件记录。

管道崩溃后,我收到以下消息:

  • The worker lost contact with the service.
  • RuntimeError: FileNotFoundError: [Errno 2] Not found: gs://MYBUCKET/JOBNAME.00000-of-00001.avro [while running 'WriteToText/WriteToText/Write/WriteImpl/WriteBundles/WriteBundles']

看起来找不到它正在写入的日志文件。直到发生错误的某个时间点似乎都没问题。我想在它的周围或一个断点周围加上一个try:/ except:,但是我什至不确定如何找出根本原因。

有没有办法只写一个文件?还是只打开一个文件一次写入?它会将数千个输出文件发送到该存储桶中,这是我要消除的一个因素,可能是一个因素。

with beam.Pipeline(argv=pipeline_args) as p:
     csvlines = (
            p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
              | 'Parse CSV to Dictionary' >> beam.ParDo(Split())
              | 'Read Files into Memory' >> beam.ParDo(DownloadFilesDoFn())
              | 'Windowing' >> beam.WindowInto(window.FixedWindows(20 * 60))
            )

     b1 = ( csvlines | 'Branch1' >> beam.ParDo(Branch1DoFn()) )
     b2 = ( csvlines | 'Branch2' >> beam.ParDo(Branch2DoFn()) )
     b3 = ( csvlines | 'Branch3' >> beam.ParDo(Branch3DoFn()) )
     b4 = ( csvlines | 'Branch4' >> beam.ParDo(Branch4DoFn()) )
     b5 = ( csvlines | 'Branch5' >> beam.ParDo(Branch5DoFn()) )
     b6 = ( csvlines | 'Branch6' >> beam.ParDo(Branch6DoFn()) )
        
     output = (
           (b1,b2,b3,b4,b5,b6) | 'Merge PCollections' >> beam.Flatten()
           | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
           )
Questioner
lys
Viewed
22
lys 2020-12-02 11:42:49

该问题与先前的问题相关,问题包含有关实现的更多详细信息。该解决方案建议google.cloud.storage.Client()start_bundle()每次调用时在中创建的实例ParDo(DoFn)它连接到相同的gcs存储桶-通过args给定WriteToText(known_args.output)

class DownloadFilesDoFn(beam.DoFn):
    def __init__(self):
        import re
        self.gcs_path_regex = re.compile(r'gs:\/\/([^\/]+)\/(.*)')

    def start_bundle(self):
        import google.cloud.storage
        self.gcs = google.cloud.storage.Client()

    def process(self, element):
        self.file_match = self.gcs_path_regex.match(element['Url'])
        self.bucket = self.gcs.get_bucket(self.file_match.group(1))
        self.blob = self.bucket.get_blob(self.file_match.group(2))
        self.f = self.blob.download_as_bytes()

此错误的原因很可能与与客户端的连接过多有关。对于好的做法,我尚不清楚,因为在其他地方建议你以这种方式为每个捆绑包建立网络连接。

将其添加到末尾以从捆绑包末尾的内存中删除客户端对象,有助于关闭一些不必要的缠结连接。

    def finish_bundle(self):
        del self.gcs, self.gcs_path_regex