我正在尝试在单个 Hadoop 作业中执行以下操作:
这是一个示例文本:
The Paris Commune was a government that briefly ruled Paris from 18 March (more formally, from 28 March) to 28 May 1871.
paris, commune, government, briefly, ruled, paris, march, formally, march
paris commune, commune government, government briefly, briefly ruled, ruled paris, paris march, march formally, formally march
我想将这两个结合在一个作业中以加快处理速度。到目前为止,我想出了一个代码来计算对,但我不确定如何扩展它以便我也可以同时计算单个单词。
映射器:
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String inputText = value.toString().toLowerCase(); // convert input text to lowercase
// define the regex patterns for words and numbers
String wordPattern = "[a-z]{5,25}";
// match the words and numbers in the input text with the defined regex patterns
Pattern pattern = Pattern.compile(wordPattern);
Matcher matcher = pattern.matcher(inputText);
// emit the matched words and numbers to the reducers
while (matcher.find()) {
String word = matcher.group();
// System.out.println(word);
if (lastWord.getLength() > 0) {
pair.set(lastWord + " " + word);
System.out.println(pair);
context.write(pair, one);
}
lastWord.set(word);
}
}
减速器:
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
private int countThreshold = 100;
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values)
sum += value.get();
if (sum == countThreshold) {
context.write(key, new IntWritable(sum));
}
}
}
职位配置:
public int run(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration(), "Hadoop1000WordCount1000WordPairs");
job.setJarByClass(HadoopWordCountWordPairs.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}