如何配置firehose客户端将记录放入流中

问题描述 投票:0回答:1

我正在尝试使用 firehose 数据流,特别是 firehose 服务的

putRecord
api。我创建了一个类,它在构造函数中采用 firehose 客户端和传输流名称,以及一个使用 api

的方法
class RecordPublisherImpl @Inject constructor(
    private val deliveryStreamName: String,
    private val firehoseClient: FirehoseClient,
) : EventRecordPublisher {
   // some other functions
   fun publishRecord(*params*){
       //pre-processing - params get converted to recordString

       publishRecordToDataFirehose(recordString)
   }

   private fun publishRecordToDataFirehose(
        recordString: String
    ): Boolean {
        val eventRecordBytes = eventRecordString.toByteArray()
        val eventRecordSdkBytes = SdkBytes.fromByteArray(eventRecordBytes)
        val eventRecordAsDataFirehoseRecord = Record.builder().data(eventRecordSdkBytes).build()
        val eventRecordAsPutRecordRequest = PutRecordRequest.builder().deliveryStreamName(deliveryStreamName)
            .record(eventRecordAsDataFirehoseRecord).build()
        try {
            val putRecordResponse = firehoseClient.putRecord(eventRecordAsPutRecordRequest)
            log.info("Putting records into $deliveryStreamName successful with response - $putRecordResponse")
        } catch (e: RuntimeException) {
            throw IllegalStateException("Exception occurred while putting record for stream $deliveryStreamName", e)
        }
        return true
    }
}

我正在调用这个 Impl 类以及另一个组件的 lambda 的

execute
函数中的函数,并使用 dagger 注入 firehose 客户端

模块-

@Module
class AccessorModule {

    @Provides
    @Singleton
    fun provideAwsCredentialsProvider(): AwsCredentialsProvider {
        return DefaultCredentialsProvider.create()
    }

    @Provides
    @Singleton
    fun provideFirehoseClient(
        awsCredentialsProvider: AwsCredentialsProvider
    ): FirehoseClient{
        return FirehoseClient
            .builder()
            .region(Region.of("region"))
            .credentialsProvider(awsCredentialsProvider)
            .build()
    }

    @Provides
    @Singleton
    @Named("DeliveryStreamName")
    fun providesDeliveryStreamName(): String = "streamName"

    @Provides
    @Singleton
    fun provideEventRecordPublisherImpl(@Named("DeliveryStreamName") deliveryStreamName: String, firehoseClient: FirehoseClient): EventRecordPublisherImpl{
        return EventRecordPublisherImpl(deliveryStreamName, firehoseClient)
    }
}

组件 lambda -

class GenerateActivity
@Inject constructor(
    private val recordPublisher: RecordPublisherImpl
) {
    override fun execute(input: GenerateInput) {
        log.debug("Received input: {}", input)

        log.debug("Calling recordPublisher...")
        val putRecordResponse = recordPublisher.publishRecord(
            "123",
            "String1",
            "SomeString",
            "SomethingElse"
        )
        log.debug("Received response: {}", putRecordResponse)
    }
}

现在的问题是如何使用

arn
或使用
access_key
secret_key
配置 firehose 客户端?我在运行服务时收到以下错误
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: No X509TrustManager implementation available
,我认为这与未正确设置客户端有关,即缺少凭据,但我不知道如何使用凭据。如果有人有任何想法或有任何经验,请提供帮助。任何帮助,将不胜感激。先谢谢你了

附注所有代码都在 Kotlin 中

java amazon-web-services kotlin aws-lambda amazon-kinesis-firehose
1个回答
0
投票

因此您正在 Lambda 函数中使用 Java V2 Firehose 客户端。我假设您正在使用 software.amazon.awssdk.services.firehose.FirehoseClient。我不清楚您尝试将哪些附加框架与 AWS Java Lambda 函数一起使用。

无论如何在 AWS Lambda 函数中使用 Firehose 客户端,您都可以使用 com.amazonaws.services.lambda.runtime.RequestHandler。然后在您的项目中,您需要做的就是开发一个使用 FirehoseClient 的 POJO(普通的旧 jave 对象)。

类似:

公共类 FirehostOps {

   public static void putSingleRecord(FirehoseClient firehoseClient, String textValue, String streamName) {
        try {
            SdkBytes sdkBytes = SdkBytes.fromByteArray(textValue.getBytes());
            Record record = Record.builder()
                    .data(sdkBytes)
                    .build();

            PutRecordRequest recordRequest = PutRecordRequest.builder()
                    .deliveryStreamName(streamName)
                    .record(record)
                    .build();

            PutRecordResponse recordResponse = firehoseClient.putRecord(recordRequest);
            System.out.println("The record ID is " + recordResponse.recordId());

        } catch (FirehoseException e) {
            System.out.println(e.getLocalizedMessage());
            System.exit(1);
        }
    }

}

您的 Lambda 处理程序:

public class PPEHandler implements RequestHandler<Map<String,String>, String> {

    @Override
    public String handleRequest(Map<String, String> event, Context context) {
        LambdaLogger logger = context.getLogger();
        String textMessage = " I am a message" ;
        FirehostOps fireOps = new FirehostOps(); 

       FirehoseClient firehoseClient = FirehoseClient.builder()
                .region(region)
                .build();

        fireOps.putSingleRecord(firehoseClient, textMessage ) ;  

}

就 CREDS 在 Lamnbda 函数中使用 AWS Java V2 SDK 而言,没有必要。在 AWS Lambda 函数中使用 AWS 开发工具包时,您无需使用访问密钥显式配置 Firehose 客户端。 AWS Lambda 执行角色将自动为 Firehose 客户端提供访问 Firehose 传输流所需的凭证。

© www.soinside.com 2019 - 2024. All rights reserved.