我目前正在编写一个 MapReduce 任务来解析数据集并列出具有 500+ 5 星评级的电影。
为此,我已经有一个 mapreduce 作业,可以从电影列表中过滤动作片,还有一个可以过滤电影的 5 星评论。这个新工作和每个映射器的输入是一个 movieIDS 列表。
第一个映射器的输入是适用于计算的 MovieIDS 列表。第二个映射器的输入是一个获得单次 5 星评论的电影 ID。
然后我的任务是将这些加入到 reducer 中,通过计算特定电影(关键)获得的 5 星评论的数量,查看它是否总共获得了 500 条评论,然后查看该特定电影是否已获得在适用电影列表中过滤。
然而,我的问题是,我用来计算特定电影 ID 的 5 星评论数量的 HashMap 数据结构只被初始化为一个。
代码如下:
public class JoinRatings extends Configured implements Tool {
public static class TokenizerMapperA extends Mapper<Object, Text, Text, Text> {
private Text node1;
private Text node2 = new Text("1");
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//Write movie ID and int writeable 1
node1 = new Text(value.toString());
context.write(node1, node2);
}
}
public static class TokenizerMapperB extends Mapper<Object, Text, Text, Text> {
private Text node1;
private Text node2 = new Text("2");
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
node1 = new Text(value.toString());
context.write(node1, node2);
}
}
public static class CountReducer extends Reducer<Text, Text, Text, NullWritable>{
private Text node1;
private Set<String> distinctNodes;
Map<String, Integer> map;
private final static IntWritable one = new IntWritable(1);
private final static IntWritable two = new IntWritable(2);
@Override
protected void setup(Context context) {
distinctNodes = new HashSet<String>();
map = new HashMap<String,Integer>();
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) {
//Add all action movie IDs to unique set distinctNodes
String keyString = key.toString().strip();
String valueString = "NULL";
int counter = 0
for (Text text : values) {
String value = text.toString().strip();
// try {
// Text testText = new Text(value);
// context.write(testText, NullWritable.get());
// } catch (IOException e) {
// e.printStackTrace();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
if (value.equals("1")) {
distinctNodes.add(keyString);
}
else if (value.equals("2")) {
if (map.containsKey(keyString)) {
map.put(keyString, map.get(keyString) + 1);
} else {
map.put(keyString, 1);
}
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
String numOfDistinctNodes = Integer.toString(distinctNodes.size());
context.write(new Text(numOfDistinctNodes), NullWritable.get());
String sizeOfMap = Integer.toString(map.size());
context.write(new Text(sizeOfMap), NullWritable.get());
for(Map.Entry<String, Integer> mapElement : map.entrySet()) {
String test = Integer.toString(mapElement.getValue());
context.write(new Text(test), NullWritable.get());
}
for (String s: distinct nodes) {
if (map.containsKey(s)) {
if (map.get(s) >= 500) {
node1 = new Text(s);
context.write(node1, NullWritable.get());
}
}
}
//Order movieID by ascending
//Write all movie IDs with 500+ 5-star reviews (to context)
}
}
如您所见,我正在使用映射器中的文本值来区分减速器应该添加到哪个数据结构。这似乎适用于文本“1”,它正在将 5 星评级可以计入 HashSet 的适用电影添加到 HashSet,但我的 HashMap 不计算特定键/MovieID 的 5 星评论的数量,只计算初始化值到 1.
我的倾向是我错误地使用了 MapReduce,并且 reducer 只获得 {MovieID, 2} 的单个输入,而不是 {MovieID, 2, 2, 2, 2}。
Reducer 是分布式的,因此不能用于存储每个实例之间的状态,例如 Map。更具体地说,请注意
key
参数是一个实例 - 每个唯一键都会得到一个 Reducer 类实例;并非所有键都通过该实例函数传递。
您可以使用 Hadoop
DistributedCache
或 Counter
跨任务维护某些状态,但您应该已经在单个 Iterable<Text>
对象中拥有来自同一 ID 的所有值。例如。所有评论(含星数)按 ID。因此,按value.getStars() == 5
过滤。 (考虑定义一个可写的Movie
类,而不是使用Text
),然后定义一个简单的计数器,例如
int fiveStarCount = 0;
for (Text t : values) {
// todo: get stars
fiveStarCount += parseStars(t) == 5 ? 1 : 0;
}
if (fiveStarCount >= 500) {
context.write(movieId, NullWritable.get()); // assuming you only care about the movie id
}
然后,这里没有什么可排序的...您需要第二个 MapReduce 作业来读取该输出,然后编写
(null,id)
元组,然后 reducer 可以通过将所有数据转储到 TreeSet
对象和写回上下文。
如果你真的需要过滤、连接和排序,我建议 Hive 或 Spark 而不是纯 Mapreduce