我正在网上寻找有关如何使用map和reduce的适当教程,但是几乎每个有关WordCount的代码都很烂,并且并没有真正向你解释如何使用每个函数。我已经看过有关理论,键, map等的所有内容,但是没有CODE例如可以做与WordCount不同的事情。
我在Virtual Box和Hadoop版本3.2.1上使用Ubuntu 20.10(如果你需要更多信息,请给我评论)。
我的任务是管理一个文件,其中包含奥运会上运动员的一些数据。
你会看到它包含各种信息,例如姓名,性别,年龄,体重,身高等。
我将在此处显示一个示例(希望你理解):
ID Name Sex Age Height Weight Team NOC Games Year Season City
Sport Event Medal
1 A Dijiang M 24 180 80 China CHN 1992 Summer 1992 Summer Barcelona
Basketball Basketball Men's Basketball NA
到目前为止,我必须处理与所有记录相同的数据,例如名称或ID,
它们彼此相似。
(想象有一个以上的参与者,这是我
在不同时间段的问题,因此reduce无法将记录识别为相同)。
如果我可以将reduce函数的键/识别更改为参与者的名称,那么我应该有正确的结果。
在此代码中,我搜索至少赢得奖牌的玩家。
我的主要是:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class NewWordCount {
public static void main(String[] args) throws Exception {
if(args.length != 3) {
System.err.println("Give the correct arguments.");
System.exit(3);
}
// Job 1.
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "count");
job.setJarByClass(NewWordCount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(NewWordMapper.class);
job.setCombinerClass(NewWordReducer.class);
job.setReducerClass(NewWordReducer.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
}
}
我的映射器是:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class NewWordMapper extends Mapper <LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable();
private Text word = new Text();
private String name = new String();
private String sex = new String();
private String age = new String();
private String team = new String();
private String sport = new String();
private String games = new String();
private String sum = new String();
private String gold = "Gold";
private String silver = "Silver";
private String bronze = "Bronze";
public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {
if(((LongWritable)key).get() == 0) {
return;
}
String line = value.toString();
String[] arrOfStr = line.split(",");
int counter = 0;
for(String a : arrOfStr) {
if(counter == 14) {
// setting the type of medal each player has won.
word.set(a);
// checking if the medal is gold.
if(a.compareTo(gold) == 0 || a.compareTo(silver) == 0 || a.compareTo(bronze) == 0) {
String[] goldenStr = line.split(",");
name = goldenStr[1];
sex = goldenStr[2];
age = goldenStr[3];
team = goldenStr[6];
sport = goldenStr[12];
games = goldenStr[8];
sum = name + "," + sex + "," + age + "," + team + "," + sport + "," + games;
word.set(sum);
context.write(word, one);
}
}
counter++;
}
}
}
我的减速器是:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class NewWordReducer extends Reducer <Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable val : values) {
String line = val.toString();
String[] arrOfStr = line.split(",");
String name = arrOfStr[0];
count += val.get();
}
context.write(key, new IntWritable(count));
}
}
关于MapReduce作业的核心思想是该Map
函数用于从输入中提取有价值的信息,并将其“转换”为key-value
成对的,在此基础上,将Reduce
针对每个键分别执行该函数。你的代码似乎显示出对后者执行方式的误解,但这没什么大不了的,因为这是WordCount示例的正确示例。
假设我们有一个包含奥林匹克运动员及其成绩的统计数据的文件,如你/olympic_stats
在HDFS命名的目录下显示的,如下所示(你会看到我包含了与该示例相同的运动员记录,此示例需要进行处理):
1,A Dijiang,M,24,180,80,China,CHN,1992,Summer 1992,Summer,Barcelona,Basketball,Men's Basketball,NA
2,T Kekempourdas,M,33,189,85,Greece,GRE,2004,Summer 2004,Summer,Athens,Judo,Men's Judo,Gold
3,T Kekempourdas,M,33,189,85,Greece,GRE,2000,Summer 2000,Summer,Sydney,Judo,Men's Judo,Bronze
4,K Stefanidi,F,29,183,76,Greece,GRE,2016,Summer 2016,Summer,Rio,Pole Vault, Women's Pole Vault,Silver
5,A Jones,F,26,160,56,Canada,CAN,2012,Summer 2012,Summer,London,Acrobatics,Women's Acrobatics,Gold
5,A Jones,F,26,160,56,Canada,CAN,2016,Summer 2012,Summer,Rio,Acrobatics,Women's Acrobatics,Gold
6,C Glover,M,33,175,80,USA,USA,2008,Summer 2008,Summer,Beijing,Archery,Men's Archery,Gold
7,C Glover,M,33,175,80,USA,USA,2012,Summer 2012,Summer,London,Archery,Men's Archery,Gold
8,C Glover,M,33,175,80,USA,USA,2016,Summer 2016,Summer,Rio,Archery,Men's Archery,Gold
对于该Map
功能,我们需要找到一列适合用作键值的数据,以便计算每个运动员拥有多少枚金牌。从上面我们可以很容易地看到,每位运动员都可以拥有一个或多个记录,并且他们的名字都将出现在第二列中,因此我们确定我们将以他们的名字作为key-value
对子上的键。至于价值,我们确实想计算出一名运动员拥有多少枚金牌,因此我们必须检查第14列,该栏表明该运动员是否获得以及获得了哪些奖牌。如果此记录的栏目等于String
金牌,那么我们可以肯定的是,这位运动员到目前为止在他的职业生涯中至少拥有1枚金牌。所以在这里,作为价值,我们可以把1。
现在,对于该Reduce
函数,因为它是针对每个不同的键分别执行的,所以我们可以理解,它从映射器获得的输入值将用于同一位准确的运动员。因为key-value
从映射器生成的对每个给定运动员的金牌,其价值只有1,所以我们可以将所有这些1加起来,得到每个人的金牌总数。
因此,此代码类似于下面的代码(为简单起见,我将映射器,reducer和驱动程序放在同一文件中):
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class GoldMedals
{
/* input: <byte_offset, line_of_dataset>
* output: <Athlete's Name, 1>
*/
public static class Map extends Mapper<Object, Text, Text, IntWritable>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String record = value.toString();
String[] columns = record.split(",");
// extract the athlete's name and his/hers medal indication
String athlete_name = columns[1];
String medal = columns[14];
// only hold the gold medal athletes, with their name as the key
// and 1 as the least number of gold medals they have so far
if(medal.equals("Gold"))
context.write(new Text(athlete_name), new IntWritable(1));
}
}
/* input: <Athlete's Name, 1>
* output: <Athlete's Name, Athlete's Total Gold Medals>
*/
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
// for a single athlete, add all of the gold medals they had so far...
for(IntWritable value : values)
sum += value.get();
// and write the result as the value on the output key-value pairs
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("olympic_stats");
Path output_dir = new Path("gold_medals");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job goldmedals_job = Job.getInstance(conf, "Gold Medals Counter");
goldmedals_job.setJarByClass(GoldMedals.class);
goldmedals_job.setMapperClass(Map.class);
goldmedals_job.setCombinerClass(Reduce.class);
goldmedals_job.setReducerClass(Reduce.class);
goldmedals_job.setMapOutputKeyClass(Text.class);
goldmedals_job.setMapOutputValueClass(IntWritable.class);
goldmedals_job.setOutputKeyClass(Text.class);
goldmedals_job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(goldmedals_job, input_dir);
FileOutputFormat.setOutputPath(goldmedals_job, output_dir);
goldmedals_job.waitForCompletion(true);
}
}
上面程序的输出存储在/olympic_stats_out
HDFS的目录中,该目录具有以下输出并确认MapReduce
作业设计正确:
现在没有足够多的人来表达我的感激。与一个月的课程相比,您告诉我的东西比我的专业更多。非常感谢!
问题是我有比名字更多的数据。我需要例如不同的日期,而Reduce函数无法识别名称并混合记录。
您的应用程序的最终目标是什么?就像您应该获得的结果样本一样?因为我根据您给出的信息进行了回答,以便掌握足够的信息以根据您的最终目标对代码进行所需的更改。
最终输出的示例应该是A Dijiang,M,24,China,Basketball,Summer 1992,0,0,0(因为该参与者没有获得金牌,银牌或铜牌)。
由于这是根据您在此处提出的最初问题而得出的,我建议您发布一个有关如何实现此特定输出的新问题。我想如果您愿意的话,我会调查一下。