我是tfx的新手,正在学习将管道整合在一起。我已经在GCP的Kubeflow中成功建立了一个管道。我想知道如何运行多个训练器,这些训练器将使用相同的csvexamplegen,transform和schemagen在管道中生成不同的输出/推子。有人做过吗?请提前告知并感谢。
def create_pipeline(
pipeline_name: Text,
pipeline_root: Text,
data_path: Text,
preprocessing_fn: Text,
run_fn: Text,
train_args: trainer_pb2.TrainArgs,
eval_args: trainer_pb2.EvalArgs,
eval_accuracy_threshold: float,
serving_model_dir: Text,
metadata_connection_config: Optional[
metadata_store_pb2.ConnectionConfig] = None
) -> pipeline.Pipeline:
trainer_args = {
'run_fn': run_fn,
'transformed_examples': transform.outputs['transformed_examples'],
'schema': schema_gen.outputs['schema'],
'transform_graph': transform.outputs['transform_graph'],
'train_args': train_args,
'eval_args': eval_args,
'custom_executor_spec':
executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor)}
trainer = Trainer(**trainer_args)
components.append(trainer)
return pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components,
enable_cache=True,
metadata_connection_config=metadata_connection_config,
beam_pipeline_args=beam_pipeline_args,
)
为了使培训师具有唯一的Kubeflow,我必须输入instance_name来定义培训师。
trainer2 = Trainer(
run_fn=run_fn2,
examples=transform.outputs['transformed_examples'],
schema=schema_gen.outputs['schema'],
transform_graph= transform.outputs['transform_graph'],
train_args= train_args2,
eval_args= eval_args2,
custom_executor_spec= executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
instance_name='trainer2'
)