在Hadoop集群中运行代码时Mapper.py和Reducer.py中的问题

问题描述 投票:0回答:1

运行此代码以使Hadoop中的概率将我的数据存储在CSV文件中。

当我在集群中运行此代码时遇到此错误“ java.lang.RuntimeException:PipeMapRed.waitOutputThreads():子进程失败,代码为1”,任何人都可以修复我的代码。

#!/usr/bin/env python3
"""mapper.py"""
import sys

# Get input lines from stdin
for line in sys.stdin:
    # Remove spaces from beginning and end of the line
    line = line.strip()

    # Split it into tokens
    tokens = line.split('')

    #Get ClassA values
    try:
        ClassA = tokens[1]
        print ('%s\t%s') % (None, ClassA)
    except ValueError: pass
#!/usr/bin/env python3
"""reducer.py"""
import sys
from collections import Counter
# Create a dictionary to map marks
Classprob = {}

# Get input from stdin
for line in sys.stdin:
    #Remove spaces from beginning and end of the line
    line = line.strip()

    # parse the input from mapper.py
    ClassA = line.split('\t', 1)

#from collections import Counter
#samples = [10,10,60,10,30]
counts = Counter(ClassA)
total = sum(counts.values())
probability_mass = {k:v/total for k,v in counts.items()}
probability_mass
probability_mass.get(4)
# Print each probability
for probability_mass in Classprob.keys():
    print ('%s\t%s') % (probability_mass, Classprob[probability_mass])
#print(str(probability_mass))
python python-3.x hadoop mapreduce
1个回答
0
投票

真正的错误应该在YARN UI中可用,但是

[print ('%s\t%s') % (ClassA)需要两个值来格式化

如果没有用于分组值的键,则可以使用

print('%s\t%s' % (None, ClassA)

此外,变量不应以大写字母开头,而请参考http://pyformat.info

您可以在没有Hadoop的情况下使用python mapper.py | sort -u | python reducer.py测试代码>

加上,mrjob或pyspark是可以提供更多有用功能的高级语言

© www.soinside.com 2019 - 2024. All rights reserved.