我有一个很大的文件s3://my-bucket/in.tsv.gz
要加载和处理,然后将其处理后的版本写回到s3输出文件中s3://my-bucket/out.tsv.gz
。
in.tsv.gz
不将所有文件加载到内存的情况下直接简化s3的存储(它无法容纳内存)在下面的代码中,我演示了如何考虑从s3加载输入的gzip压缩数据帧,以及如何编写.tsv
位于本地的bucket_dir_local = ./
。
import pandas as pd
import s3fs
import os
import gzip
import csv
import io
bucket_dir = 's3://my-bucket/annotations/'
df = pd.read_csv(os.path.join(bucket_dir, 'in.tsv.gz'), sep='\t', compression="gzip")
bucket_dir_local='./'
# not sure how to do it with an s3 path
with gzip.open(os.path.join(bucket_dir_local, 'out.tsv.gz'), "w") as f:
with io.TextIOWrapper(f, encoding='utf-8') as wrapper:
w = csv.DictWriter(wrapper, fieldnames=['test', 'testing'], extrasaction="ignore")
w.writeheader()
for index, row in df.iterrows():
my_dict = {"test": index, "testing": row[6]}
w.writerow(my_dict)
编辑:smart_open看起来要走。
这是一个虚拟示例,使用s3读取文件并将其写回到s3 smart_open
from smart_open import open
import os
bucket_dir = "s3://my-bucket/annotations/"
with open(os.path.join(bucket_dir, "in.tsv.gz"), "rb") as fin:
with open(
os.path.join(bucket_dir, "out.tsv.gz"), "wb"
) as fout:
for line in fin:
l = [i.strip() for i in line.decode().split("\t")]
string = "\t".join(l) + "\n"
fout.write(string.encode())