Warm tip: This article is reproduced from serverfault.com, please click

java-Linux上的MapReduce Hadoop

(java - MapReduce Hadoop on Linux)

发布于 2020-12-01 02:53:07

我正在网上寻找有关如何使用map和reduce的适当教程,但是几乎每个有关WordCount的代码都很烂,并且并没有真正向你解释如何使用每个函数。我已经看过有关理论,键, map等的所有内容,但是没有CODE例如可以做与WordCount不同的事情。

我在Virtual Box和Hadoop版本3.2.1上使用Ubuntu 20.10(如果你需要更多信息,请给我评论)。

我的任务是管理一个文件,其中包含奥运会上运动员的一些数据。

你会看到它包含各种信息,例如姓名,性别,年龄,体重,身高等。

我将在此处显示一个示例(希望你理解):

ID  Name       Sex  Age Height  Weight  Team    NOC Games   Year          Season  City      
Sport          Event                        Medal
1   A Dijiang  M    24  180     80      China   CHN 1992     Summer 1992  Summer  Barcelona 
Basketball     Basketball Men's Basketball  NA

到目前为止,我必须处理与所有记录相同的数据,例如名称或ID,
它们彼此相似。
(想象有一个以上的参与者,这是我
在不同时间段的问题,因此reduce无法将记录识别为相同)。
如果我可以将reduce函数的键/识别更改为参与者的名称,那么我应该有正确的结果。
在此代码中,我搜索至少赢得奖牌的玩家。
我的主要是:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NewWordCount {

        public static void main(String[] args) throws Exception {
            
            if(args.length != 3) {
                System.err.println("Give the correct arguments.");
                System.exit(3);
            }
    
            // Job 1.
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "count");
            job.setJarByClass(NewWordCount.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            job.setMapperClass(NewWordMapper.class);
            job.setCombinerClass(NewWordReducer.class);
            job.setReducerClass(NewWordReducer.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            job.waitForCompletion(true);
       }
}

我的映射器是:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class NewWordMapper extends Mapper <LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable();
    private Text word = new Text();

    private String name = new String();
    private String sex = new String();
    private String age = new String();
    private String team = new String();
    private String sport = new String();
    private String games = new String();
    private String sum = new String();

    private String gold = "Gold";
    private String silver = "Silver";
    private String bronze = "Bronze";

    public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {
    
        if(((LongWritable)key).get() == 0) {
            return;
        }
    
        String line = value.toString();
        String[] arrOfStr = line.split(",");
        int counter = 0;
    
        for(String a : arrOfStr) {
            if(counter == 14) {             
                // setting the type of medal each player has won.
                word.set(a);
            
                // checking if the medal is gold.
                if(a.compareTo(gold) == 0 || a.compareTo(silver) == 0 || a.compareTo(bronze) == 0) {
                    String[] goldenStr = line.split(",");
                    name = goldenStr[1];
                    sex = goldenStr[2];
                    age = goldenStr[3];
                    team = goldenStr[6];
                    sport = goldenStr[12];
                    games = goldenStr[8];
                    sum = name + "," + sex + "," + age + "," + team + "," + sport + "," + games;
                    word.set(sum);
                    context.write(word, one);
                }
            }
            counter++;
        }
    }
}

我的减速器是:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class NewWordReducer extends Reducer <Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
        int count = 0;
        for(IntWritable val : values) {
        
            String line  = val.toString();
            String[] arrOfStr = line.split(",");
            String name = arrOfStr[0];
        
            count += val.get();
        }
        context.write(key, new IntWritable(count));
    }
}
Questioner
Gyftonikolos Nicholas
Viewed
11
Coursal 2020-12-02 01:37:25

关于MapReduce作业的核心思想是该Map函数用于从输入中提取有价值的信息,并将其“转换”为key-value成对的,在此基础上,将Reduce针对每个键分别执行函数。你的代码似乎显示出对后者执行方式的误解,但这没什么大不了的,因为这是WordCount示例的正确示例。

假设我们有一个包含奥林匹克运动员及其成绩的统计数据的文件,如你/olympic_stats在HDFS命名的目录下显示的,如下所示(你会看到我包含了与该示例相同的运动员记录,此示例需要进行处理):

1,A Dijiang,M,24,180,80,China,CHN,1992,Summer 1992,Summer,Barcelona,Basketball,Men's Basketball,NA
2,T Kekempourdas,M,33,189,85,Greece,GRE,2004,Summer 2004,Summer,Athens,Judo,Men's Judo,Gold
3,T Kekempourdas,M,33,189,85,Greece,GRE,2000,Summer 2000,Summer,Sydney,Judo,Men's Judo,Bronze
4,K Stefanidi,F,29,183,76,Greece,GRE,2016,Summer 2016,Summer,Rio,Pole Vault, Women's Pole Vault,Silver
5,A Jones,F,26,160,56,Canada,CAN,2012,Summer 2012,Summer,London,Acrobatics,Women's Acrobatics,Gold
5,A Jones,F,26,160,56,Canada,CAN,2016,Summer 2012,Summer,Rio,Acrobatics,Women's Acrobatics,Gold
6,C Glover,M,33,175,80,USA,USA,2008,Summer 2008,Summer,Beijing,Archery,Men's Archery,Gold
7,C Glover,M,33,175,80,USA,USA,2012,Summer 2012,Summer,London,Archery,Men's Archery,Gold
8,C Glover,M,33,175,80,USA,USA,2016,Summer 2016,Summer,Rio,Archery,Men's Archery,Gold

对于该Map功能,我们需要找到一列适合用作键值的数据,以便计算每个运动员拥有多少枚金牌。从上面我们可以很容易地看到,每位运动员都可以拥有一个或多个记录,并且他们的名字都将出现在第二列中,因此我们确定我们将以他们的名字作为key-value对子上的键至于价值,我们确实想计算出一名运动员拥有多少枚金牌,因此我们必须检查第14列,该栏表明该运动员是否获得以及获得了哪些奖牌。如果此记录的栏目等于String 金牌,那么我们可以肯定的是,这位运动员到目前为止在他的职业生涯中至少拥有1枚金牌。所以在这里,作为价值,我们可以把1

现在,对于该Reduce函数,因为它是针对每个不同的键分别执行的,所以我们可以理解,它从映射器获得的输入值将用于同一位准确的运动员。因为key-value从映射器生成的对每个给定运动员的金牌,其价值只有1,所以我们可以将所有这些1加起来,得到每个人的金牌总数。

因此,此代码类似于下面的代码(为简单起见,我将映射器,reducer和驱动程序放在同一文件中):

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;

public class GoldMedals 
{
    /* input:  <byte_offset, line_of_dataset>
     * output: <Athlete's Name, 1>
     */
    public static class Map extends Mapper<Object, Text, Text, IntWritable> 
    {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        {
            String record = value.toString();
            String[] columns = record.split(",");

            // extract the athlete's name and his/hers medal indication
            String athlete_name = columns[1];
            String medal = columns[14];

            // only hold the gold medal athletes, with their name as the key
            // and 1 as the least number of gold medals they have so far
            if(medal.equals("Gold")) 
                context.write(new Text(athlete_name), new IntWritable(1));
        }
    }

    /* input:  <Athlete's Name, 1>
     * output: <Athlete's Name, Athlete's Total Gold Medals>
     */
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
    {
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
        {
            int sum = 0;
            
            // for a single athlete, add all of the gold medals they had so far...
            for(IntWritable value : values)
                    sum += value.get();

            // and write the result as the value on the output key-value pairs
            context.write(key, new IntWritable(sum));
        }
    }


    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("olympic_stats");
        Path output_dir = new Path("gold_medals");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job goldmedals_job = Job.getInstance(conf, "Gold Medals Counter");
        goldmedals_job.setJarByClass(GoldMedals.class);
        goldmedals_job.setMapperClass(Map.class);
        goldmedals_job.setCombinerClass(Reduce.class);
        goldmedals_job.setReducerClass(Reduce.class);    
        goldmedals_job.setMapOutputKeyClass(Text.class);
        goldmedals_job.setMapOutputValueClass(IntWritable.class);
        goldmedals_job.setOutputKeyClass(Text.class);
        goldmedals_job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(goldmedals_job, input_dir);
        FileOutputFormat.setOutputPath(goldmedals_job, output_dir);
        goldmedals_job.waitForCompletion(true);
    }
}

上面程序的输出存储在/olympic_stats_outHDFS目录中,目录具有以下输出并确认MapReduce作业设计正确: 在此处输入图片说明