如何创建空的PCollection<KV<String, Object>>

问题描述 投票:0回答:1

我正在尝试创建一个名为 Incident 的自定义对象的空 PCollection

public class Incident implements Serializable {
    private Integer incidentId;
    private String appId;
    private Long minutes;
     
   // getter setter 
}

我正在尝试以下操作:

PCollection<KV<String, Incident>> incidents = pipeline.apply("Create Empty Collection", Create.empty(TypeDescriptor.of(KV.class)))
                .setTypeDescriptor(TypeDescriptor.of(KV.class, String.class, Incident.class));

但是它给了我编译错误:

Cannot resolve method 'of(Class<KV>, Class<String>, Class<Incident>)'

java apache-beam
1个回答
0
投票

一种方法:

  1. 为您的自定义对象/类创建自定义编码器:
import org.apache.beam.sdk.coders.*;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class IncidentCoder extends AtomicCoder<Incident> {

    private final Coder<Integer> incidentIdCoder = NullableCoder.of(BigEndianIntegerCoder.of());
    private final Coder<Long> minutesCoder = NullableCoder.of(SerializableCoder.of(Long.class));
    private final Coder<String> appIdCoder = NullableCoder.of(StringUtf8Coder.of());


    @Override
    public void encode(Incident value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull @Initialized IOException {
        appIdCoder.encode(value.getAppId(), outStream);
        minutesCoder.encode(value.getMinutes(), outStream);
        incidentIdCoder.encode(value.getIncidentId(), outStream);
    }

    @Override
    public Incident decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull @Initialized IOException {
        String appId = appIdCoder.decode(inStream);
        Long minutes = minutesCoder.decode(inStream);
        Integer incidentId = incidentIdCoder.decode(inStream);
        Incident incident = new Incident();
        incident.setIncidentId(incidentId);
        incident.setMinutes(minutes);
        incident.setAppId(appId);
        return incident;
    }
}

  1. 使用 Coder 创建空 PCollection:
            Coder<Incident> incidentCoder = new IncidentCoder();
            Map<String, Incident> map = new HashMap<>();
            PCollection<KV<String, Incident>> incidents = pipeline.apply("Creating empty PCollection", Create.of(map)
                    .withCoder(KvCoder.of(StringUtf8Coder.of(),
                            incidentCoder))); 
© www.soinside.com 2019 - 2024. All rights reserved.