Warm tip: This article is reproduced from serverfault.com, please click

Untar files containing multiple unrelated csv files programatically with Hadoop

发布于 2020-12-03 14:03:30

I have several compressed files (.tar.gz) containing unrelated tsv files (something like the list below) in my hdfs. I would like to untar those folders programmatically, potentially leveraging MPP architecture (e.g. Hadoop or Spark) and save them into hdfs.

- browser.tsv
- connection_type.tsv
- country.tsv
- color_depth.tsv
- javascript_version.tsv
- languages.tsv
- operating_systems.tsv
- plugins.tsv
- referrer_type.tsv
- resolution.tsv
- search_engine.tsv

So far I could only come up with a bash script that downloads each file from hdfs, untars and save back the folder into hdfs. I could even parallelize the script, but I am not happy with the solution either.

Thank you :)

Edit:

It would be interesting to see a solution done with any on the below:

  • Spark 2.4.5
  • Hive 2.3.6
  • Pig 0.17.0
  • Hadoop 2.8.5
Questioner
DanielM
Viewed
0
DanielM 2020-12-06 05:25:09

I finally found a solution to my problem and it consists of a Mapper-only Hadoop job. Each mapper gets an uncompressed file within the tar folder and writes it to a specific path using the MultipleOutput utility from Hadoop.

Furthermore, I implemented a custom non-splittable Hadoop Input format to handle the Tarball extraction, called TarballInputFormat.

public class TarballInputFormat extends FileInputFormat<Text, Text> {

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit,
                                                       TaskAttemptContext taskAttemptContext) {
        TarballRecordReader recordReader = new TarballRecordReader();
        recordReader.initialize(inputSplit, taskAttemptContext);
        return recordReader;
    }

}

The TarballRecordReader handles the extraction of all the files within the original tarball file.

public class TarballRecordReader extends RecordReader<Text, Text> {

    private static final Log log = LogFactory.getLog(TarballRecordReader.class);

    private TarInputStream tarInputStream;
    private Text key;
    private Text value;
    private boolean finished = false;
    private String folderName;

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
        key = new Text();
        value = new Text();

        try {
            FileSplit split = (FileSplit) inputSplit;
            Configuration conf = taskAttemptContext.getConfiguration();

            Path tarballPath = split.getPath();
            folderName = tarballPath.getName().split("\\.")[0];
            FileSystem fs = tarballPath.getFileSystem(conf);
            FSDataInputStream fsInputStream = fs.open(tarballPath);

            CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
            CompressionCodec codec = compressionCodecs.getCodec(tarballPath);

            tarInputStream = new TarInputStream(codec.createInputStream(fsInputStream));
        }
        catch (IOException ex) {
            log.error(ex.getMessage());
        }
    }

    @Override
    public boolean nextKeyValue() throws IOException {

        TarEntry tarEntry = tarInputStream.getNextEntry();
        while (tarEntry != null && tarEntry.isDirectory())
            tarEntry = tarInputStream.getNextEntry();

        finished = tarEntry == null;
        if (finished) {
            return false;
        }

        key.clear();
        value.clear();

        long tarSize = tarEntry.getSize();

        int read;
        int offset = 0;
        int bufSize = (int) tarSize;
        byte[] buffer = new byte[bufSize];
        while ((read = tarInputStream.read(buffer, offset, bufSize)) != -1) offset += read;

        value.set(buffer);
        key.set(folderName + "/" + tarEntry.getName());

        return true;
    }

    @Override
    public Text getCurrentKey() {
        return key;
    }

    @Override
    public Text getCurrentValue() {
        return value;
    }

    @Override
    public float getProgress() {
        return finished? 1: 0;
    }

    @Override
    public void close() throws IOException {
        if (tarInputStream != null) {
            tarInputStream.close();
        }
    }
}

Each tarball will be extracted keeping the original structure by writing each file relatively to its parent folder. In this solution, we have used the mapper to both read and write the extracted file all at once. This is obviously less performant but it might be a good trade-off for those who need to save their extracted files in the original form (ordered output). An alternative approach could leverage the reducer to write each extracted file line to the file system, which should increase the write throughput at the cost of consistency (unordered files content).

public class ExtractTarball extends Configured implements Tool {

    public static final Log log = LogFactory.getLog(ExtractTarball.class);
    private static final String LOOKUP_OUTPUT = "lookup";

    public static class MapClass extends Mapper<Text, Text, Text, Text> {

        private MultipleOutputs<Text, Text> mos;

        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String filename = key.toString();

            int length = value.getBytes().length;
            System.out.printf("%s: %s%n", filename, length);

            mos.write(LOOKUP_OUTPUT, "", value, key.toString());
        }

        public void setup(Context context) {
            mos = new MultipleOutputs<>(context);
        }

        protected void cleanup(Context context) throws IOException, InterruptedException {
            mos.close();
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "ExtractTarball");
        job.setJarByClass(this.getClass());
        job.setMapperClass(MapClass.class);

        job.setInputFormatClass(TarballInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setNumReduceTasks(0);

        MultipleOutputs.addNamedOutput(job, LOOKUP_OUTPUT, TextOutputFormat.class, Text.class, Text.class);

        log.isDebugEnabled();
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new ExtractTarball(), args);
        System.out.println(exitCode);
        System.exit(exitCode);
    }
}

This is how the output folder would look like:

- output
    - lookup_data
        - .browser.tsv-m-00000.crc
        - .browser_type.tsv-m-00000.crc
        - .color_depth.tsv-m-00000.crc
        - .column_headers.tsv-m-00000.crc
        - .connection_type.tsv-m-00000.crc
        - .country.tsv-m-00000.crc
        - .event.tsv-m-00000.crc
        - .javascript_version.tsv-m-00000.crc
        - .languages.tsv-m-00000.crc
        - .operating_systems.tsv-m-00000.crc
        - .plugins.tsv-m-00000.crc
        - .referrer_type.tsv-m-00000.crc
        - .resolution.tsv-m-00000.crc
        - .search_engines.tsv-m-00000.crc
        - browser.tsv-m-00000
        - browser_type.tsv-m-00000
        - color_depth.tsv-m-00000
        - column_headers.tsv-m-00000
        - connection_type.tsv-m-00000
        - country.tsv-m-00000
        - event.tsv-m-00000
        - javascript_version.tsv-m-00000
        - languages.tsv-m-00000
        - operating_systems.tsv-m-00000
        - plugins.tsv-m-00000
        - referrer_type.tsv-m-00000
        - resolution.tsv-m-00000
        - search_engines.tsv-m-00000
- ._SUCCESS.crc
- .part-m-00000.crc
- _SUCCESS
- part-m-00000