!gcloud dataproc作业提交pyspark-错误AttributeError:'str'对象没有属性'batch'

问题描述 投票:1回答:1

我如何输入数据集-输入为dataproc作业的输入?

下面是我的代码

%%writefile spark_job.py 

import sys
import pyspark
import argparse
import pickle 

#def time_configs_rdd(test_set, batch_sizes,batch_numbers,repetitions):
def time_configs_rdd(argv):
  print(argv)
  parser = argparse.ArgumentParser() # get a parser object
  parser.add_argument('--out_bucket', metavar='out_bucket', required=True,
                      help='The bucket URL for the result.') # add a required argument
  parser.add_argument('--out_file', metavar='out_file', required=True,
                      help='The filename for the result.') # add a required argument
  parser.add_argument('--batch_size', metavar='batch_size', required=True,
                      help='The bucket URL for the result.') # add a required argument
  parser.add_argument('--batch_number', metavar='batch_number', required=True,
                      help='The filename for the result.') # add a required argument
  parser.add_argument('--repetitions', metavar='repetitions', required=True,
                      help='The filename for the result.') # add a required argument
  parser.add_argument('--test_set', metavar='test_set', required=True,
                      help='The filename for the result.') # add a required argument
  args = parser.parse_args(argv) # read the value
  # the value provided with --out_bucket is now in args.out_bucket
  time_configs_results = []
  for s in args.batch_size:
    for n in args.batch_number:
      dataset = **args.test_set.batch(s).take(n)**
      for r in args.repetitions:
        tt0 = time.time()
        for i in enumerate(dataset):
          totaltime = str(time.time()-tt0)
        batchtime = totaltime
        #imgpersec = s*n/totaltime
      time_configs_results.append((s,n,r,float(batchtime)))
      #time_configs_results.append((s,n,r,batchtime,imgpersec))
    time_configs_results_rdd = sc.parallelize(time_configs_results) #create an RDD with all results for each parameter
    time_configs_results_rdd_avg = time_configs_results_rdd.map(lambda x: (x, x[0]*x[1]/x[3]))  #RDD with the average reading speeds (RDD.map)
    #mapping = time_configs_results_rdd_avg.collect()
  #print(mapping)
  return (time_configs_results_rdd_avg)

if  'google.colab' not in sys.modules: # Don't use system arguments when run in Colab 
    time_configs_rdd(sys.argv[1:])  
elif __name__ == "__main__" : # but define them manually
    time_configs_rdd(["--out_bucket", BUCKET, "--out_file", "time_configs_rdd_out.pkl","--batch_size", batch_size, "--batch_number", batch_number,"--test_set", test_set ] )

和执行它的代码

FILENAME = 'file_RDD_OUT.pkl'
batch_size = [1]
batch_number = [1]
repetitions = [1]
#test_set = 1 will give string error
 test_set = dataset2 # file <ParallelMapDataset shapes: ((192, 192, None), ()), types: (tf.float32, 
 tf.string)> cannot be inserted
!gcloud dataproc jobs submit pyspark --cluster $CLUSTER --region $REGION \
    ./spark_job.py \
    -- --out_bucket $BUCKET --out_file $FILENAME --batch_size $batch_size --batch_number $batch_number --repetitions $repetitions --test_set $test_set

不幸的是不断失败并出现错误

AttributeError:'str'对象没有属性'batch'错误:(gcloud.dataproc.jobs.submit.pyspark)作业[c2048c422f334b08a628af5a1aa492eb]失败,并出现错误:作业失败,并显示消息[AttributeError:'str'对象没有属性'batch']。

问题与test_set有关,我应该如何转换数据集2(ParallelMapDataset)以由作业读取

tensorflow pyspark rdd argv google-cloud-dataproc
1个回答
0
投票

因此,您正在尝试将命令行参数中的字符串解析为ParallelMapDataset类型。您想在type调用中使用add_argument参数。

https://docs.python.org/3/library/argparse.html#type起,我引用:

By default, ArgumentParser objects read command-line arguments in as simple strings. However, quite often the command-line string should instead be interpreted as another type, like a float or int. The type keyword argument of add_argument() allows any necessary type-checking and type conversions to be performed.

type= can take any callable that takes a single string argument and returns the converted value

所以您可能想要类似的东西:

def parse_parallel_map_dataset(string):
  # your logic to parse the string into your desired data structure

...
parser.add_argument('--test_set', metavar='test_set', required=True, 
    type=parse_parallel_map_dataset)

或者更好的是,从文件读取test_set并将文件名作为参数传递。

© www.soinside.com 2019 - 2024. All rights reserved.