我有一个分支管道,其中包含多个ParDo
转换,这些转换已合并并写入GCS存储桶中的文本文件记录。
管道崩溃后,我收到以下消息:
The worker lost contact with the service.
RuntimeError: FileNotFoundError: [Errno 2] Not found: gs://MYBUCKET/JOBNAME.00000-of-00001.avro [while running 'WriteToText/WriteToText/Write/WriteImpl/WriteBundles/WriteBundles']
看起来找不到它正在写入的日志文件。直到发生错误的某个时间点似乎都没问题。我想在它的周围或一个断点周围加上一个try:
/ except:
,但是我什至不确定如何找出根本原因。
有没有办法只写一个文件?还是只打开一个文件一次写入?它会将数千个输出文件发送到该存储桶中,这是我要消除的一个因素,可能是一个因素。
with beam.Pipeline(argv=pipeline_args) as p:
csvlines = (
p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
| 'Parse CSV to Dictionary' >> beam.ParDo(Split())
| 'Read Files into Memory' >> beam.ParDo(DownloadFilesDoFn())
| 'Windowing' >> beam.WindowInto(window.FixedWindows(20 * 60))
)
b1 = ( csvlines | 'Branch1' >> beam.ParDo(Branch1DoFn()) )
b2 = ( csvlines | 'Branch2' >> beam.ParDo(Branch2DoFn()) )
b3 = ( csvlines | 'Branch3' >> beam.ParDo(Branch3DoFn()) )
b4 = ( csvlines | 'Branch4' >> beam.ParDo(Branch4DoFn()) )
b5 = ( csvlines | 'Branch5' >> beam.ParDo(Branch5DoFn()) )
b6 = ( csvlines | 'Branch6' >> beam.ParDo(Branch6DoFn()) )
output = (
(b1,b2,b3,b4,b5,b6) | 'Merge PCollections' >> beam.Flatten()
| 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
)
该问题与先前的问题相关,该问题包含有关实现的更多详细信息。该解决方案建议google.cloud.storage.Client()
在start_bundle()
每次调用时在中创建的实例ParDo(DoFn)
。它连接到相同的gcs存储桶-通过args给定WriteToText(known_args.output)
class DownloadFilesDoFn(beam.DoFn):
def __init__(self):
import re
self.gcs_path_regex = re.compile(r'gs:\/\/([^\/]+)\/(.*)')
def start_bundle(self):
import google.cloud.storage
self.gcs = google.cloud.storage.Client()
def process(self, element):
self.file_match = self.gcs_path_regex.match(element['Url'])
self.bucket = self.gcs.get_bucket(self.file_match.group(1))
self.blob = self.bucket.get_blob(self.file_match.group(2))
self.f = self.blob.download_as_bytes()
此错误的原因很可能与与客户端的连接过多有关。对于好的做法,我尚不清楚,因为在其他地方建议你以这种方式为每个捆绑包建立网络连接。
将其添加到末尾以从捆绑包末尾的内存中删除客户端对象,将有助于关闭一些不必要的缠结连接。
def finish_bundle(self):
del self.gcs, self.gcs_path_regex