如何访问 Par.Do 转换中的管道选项?

问题描述 投票:0回答:2

TL;DR:如何在 Par.Do 转换中访问创建作业时传递给作业的参数?

我有两个模板,一个用于开发,一个用于生产,它们都工作正常,只是每个模板中有一个值需要不同。到目前为止,我一直在“硬编码”这个值,然后“运行”java 程序来构建模板(使用 DataflowRunner 运行程序)。但这很容易出错,如果我不太小心,我会尝试更新开发模板中的一些代码,并且无意中仍然从产品模板中设置了这个值。不好。

所以,我认为管道选项会很好,我只是在模板编译时甚至在模板运行时传递一个不同的参数,但我很难访问 Par.Do 转换中的值我需要它的地方。 如果我使用默认的运行器并在本地运行管道,它工作得很好,但是当我切换并构建模板时,该值始终是

null
。我可以使用以下代码重现此内容:

/* 
imports...
*/

@SuppressWarnings("serial")
public class StarterPipeline {
  private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

  static String orgId;

  public interface MyOptions extends PipelineOptions {


     @Description("Org Id")
     @Default.String("123-984-a")
     String getOrgId();
     void setOrgId( String orgID );

  }

  public static void main(String[] args) {


     PipelineOptionsFactory.register(MyOptions.class);


     final MyOptions options = PipelineOptionsFactory.fromArgs( args ).withValidation().create()
        .as( MyOptions.class );


     orgId = options.getOrgId();

     LOG.info( "orgId: " + orgId );

     Pipeline p = Pipeline.create( options );


     PCollection<String> someDataRows = p.apply("Get data from BQ", Create.of(

      "string 1", "string2", "string 3"

     ) );


     someDataRows.apply( "Package into a list", ParDo.of( new DoFn<String, String>() {

           @ProcessElement
           public void processElement( ProcessContext c ) {

              LOG.info( "Hello? " );
              LOG.info( "ORG ID: " + orgId );
           }

           }));


    p.run();
  }
}

云端的输出是:

 2018-09-20 (16:16:49) Hello?
 2018-09-20 (16:16:49) ORG ID: null
 2018-09-20 (16:16:51) Hello?
 2018-09-20 (16:16:51) ORG ID: null
 2018-09-20 (16:16:53) Hello?
 2018-09-20 (16:16:53) ORG ID: null
 ...

但是本地:

Sep 20, 2018 4:15:32 PM simplepipeline.StarterPipeline main
INFO: orgId: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello? 
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello? 
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello? 
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47

这些是我用于模板的构建参数:

--project=the-project
--stagingLocation=gs://staging.the-project.appspot.com/staging/
--tempLocation=gs://staging.the-project.appspot.com/temp/
--runner=DataflowRunner
--region=us-west1
--templateLocation=gs://staging.the-project.appspot.com/templates/NoobPipelineDev
--orgId=jomama47

本地:

--project=the-project
--tempLocation=gs://staging.the-project.appspot.com
--orgId=jomama47

当我在 Dataflow 控制台(浏览器)中的参数字段中创建作业时,我尝试将参数传递给作业,参数字段为

orgId
jomama77
,但它仍然显示为 null。

抱歉帖子太长。

java google-cloud-dataflow
2个回答
3
投票

这里有两件事。首先,我建议使用

ValueProvider
,这样你就可以在运行时为不同的
orgId
传递参数:

public interface MyOptions extends PipelineOptions {    
     @Description("Org Id")
     @Default.String("123-984-a")
     ValueProvider<String> getOrgId();
     void setOrgId(ValueProvider<String> orgID);   
}

然后从选项中读取它:

ValueProvider<String> orgId = options.getOrgId();

为了在 ParDo 中访问它,您可以将其作为参数传递给构造函数,例如 docs 中的示例:

someDataRows.apply( "Package into a list", ParDo.of( new CustomFn(orgId)));

其中

CustomFn
的构造函数将其作为参数并将其存储在
ValueProvider
中,以便可以从 ParDo 中访问它。请注意,现在您需要使用
orgId.get()
:

static class CustomFn extends DoFn<String, String> {
    // access options from wihtin the ParDo
    ValueProvider<String> orgId;
    public CustomFn(ValueProvider<String> orgId) {
        this.orgId = orgId;
    }

    @ProcessElement
    public void processElement( ProcessContext c ) {
      LOG.info( "Hello? " );
      LOG.info( "ORG ID: " + orgId.get() );
    }
}

现在您可以暂存模板并使用以下命令调用它:

gcloud dataflow jobs run $JOB_NAME \
    --gcs-location gs://$BUCKET/templates/$TEMPLATE_NAME \
    --parameters orgId=jomama47

这应该按预期工作:


0
投票

使用

ValueProvider
当然是一种可能性(请参阅 Guillem 的回答),但是您可以通过直接使用该值来简化代码:

public interface MyOptions extends PipelineOptions {    
     @Description("Org Id")
     @Default.String("123-984-a")
     String getOrgId(); // Actual type here
     void setOrgId(String orgID);   
}

static class CustomFn extends DoFn<String, String> {
    // This will serialize (String is Serializable)
    private final String orgId;
    public CustomFn(String orgId) {
        this.orgId = orgId;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
      LOG.info("ORG ID: " + orgId);
    }
}

存储在

DoFn
实例中的所有数据(不是
transient
)都需要是
Serializable
String
和所有原始类型都是。
ValueProvider
有效负载也需要是
Serializable

您也可以直接在您的

DoFn
中获取选项:

static class CustomFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      LOG.info( "ORG ID: " + c.getPipelineOptions().as(MyOptions.class).getOrgId();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.