我正在尝试使用 firehose 数据流,特别是 firehose 服务的
putRecord
api。我创建了一个类,它在构造函数中采用 firehose 客户端和传输流名称,以及一个使用 api 的方法
class RecordPublisherImpl @Inject constructor(
private val deliveryStreamName: String,
private val firehoseClient: FirehoseClient,
) : EventRecordPublisher {
// some other functions
fun publishRecord(*params*){
//pre-processing - params get converted to recordString
publishRecordToDataFirehose(recordString)
}
private fun publishRecordToDataFirehose(
recordString: String
): Boolean {
val eventRecordBytes = eventRecordString.toByteArray()
val eventRecordSdkBytes = SdkBytes.fromByteArray(eventRecordBytes)
val eventRecordAsDataFirehoseRecord = Record.builder().data(eventRecordSdkBytes).build()
val eventRecordAsPutRecordRequest = PutRecordRequest.builder().deliveryStreamName(deliveryStreamName)
.record(eventRecordAsDataFirehoseRecord).build()
try {
val putRecordResponse = firehoseClient.putRecord(eventRecordAsPutRecordRequest)
log.info("Putting records into $deliveryStreamName successful with response - $putRecordResponse")
} catch (e: RuntimeException) {
throw IllegalStateException("Exception occurred while putting record for stream $deliveryStreamName", e)
}
return true
}
}
我正在调用这个 Impl 类以及另一个组件的 lambda 的
execute
函数中的函数,并使用 dagger 注入 firehose 客户端
模块-
@Module
class AccessorModule {
@Provides
@Singleton
fun provideAwsCredentialsProvider(): AwsCredentialsProvider {
return DefaultCredentialsProvider.create()
}
@Provides
@Singleton
fun provideFirehoseClient(
awsCredentialsProvider: AwsCredentialsProvider
): FirehoseClient{
return FirehoseClient
.builder()
.region(Region.of("region"))
.credentialsProvider(awsCredentialsProvider)
.build()
}
@Provides
@Singleton
@Named("DeliveryStreamName")
fun providesDeliveryStreamName(): String = "streamName"
@Provides
@Singleton
fun provideEventRecordPublisherImpl(@Named("DeliveryStreamName") deliveryStreamName: String, firehoseClient: FirehoseClient): EventRecordPublisherImpl{
return EventRecordPublisherImpl(deliveryStreamName, firehoseClient)
}
}
组件 lambda -
class GenerateActivity
@Inject constructor(
private val recordPublisher: RecordPublisherImpl
) {
override fun execute(input: GenerateInput) {
log.debug("Received input: {}", input)
log.debug("Calling recordPublisher...")
val putRecordResponse = recordPublisher.publishRecord(
"123",
"String1",
"SomeString",
"SomethingElse"
)
log.debug("Received response: {}", putRecordResponse)
}
}
现在的问题是如何使用
arn
或使用 access_key
和 secret_key
配置 firehose 客户端?我在运行服务时收到以下错误software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: No X509TrustManager implementation available
,我认为这与未正确设置客户端有关,即缺少凭据,但我不知道如何使用凭据。如果有人有任何想法或有任何经验,请提供帮助。任何帮助,将不胜感激。先谢谢你了
附注所有代码都在 Kotlin 中
因此您正在 Lambda 函数中使用 Java V2 Firehose 客户端。我假设您正在使用 software.amazon.awssdk.services.firehose.FirehoseClient。我不清楚您尝试将哪些附加框架与 AWS Java Lambda 函数一起使用。
无论如何在 AWS Lambda 函数中使用 Firehose 客户端,您都可以使用 com.amazonaws.services.lambda.runtime.RequestHandler。然后在您的项目中,您需要做的就是开发一个使用 FirehoseClient 的 POJO(普通的旧 jave 对象)。
类似:
公共类 FirehostOps {
public static void putSingleRecord(FirehoseClient firehoseClient, String textValue, String streamName) {
try {
SdkBytes sdkBytes = SdkBytes.fromByteArray(textValue.getBytes());
Record record = Record.builder()
.data(sdkBytes)
.build();
PutRecordRequest recordRequest = PutRecordRequest.builder()
.deliveryStreamName(streamName)
.record(record)
.build();
PutRecordResponse recordResponse = firehoseClient.putRecord(recordRequest);
System.out.println("The record ID is " + recordResponse.recordId());
} catch (FirehoseException e) {
System.out.println(e.getLocalizedMessage());
System.exit(1);
}
}
}
您的 Lambda 处理程序:
public class PPEHandler implements RequestHandler<Map<String,String>, String> {
@Override
public String handleRequest(Map<String, String> event, Context context) {
LambdaLogger logger = context.getLogger();
String textMessage = " I am a message" ;
FirehostOps fireOps = new FirehostOps();
FirehoseClient firehoseClient = FirehoseClient.builder()
.region(region)
.build();
fireOps.putSingleRecord(firehoseClient, textMessage ) ;
}
就 CREDS 在 Lamnbda 函数中使用 AWS Java V2 SDK 而言,没有必要。在 AWS Lambda 函数中使用 AWS 开发工具包时,您无需使用访问密钥显式配置 Firehose 客户端。 AWS Lambda 执行角色将自动为 Firehose 客户端提供访问 Firehose 传输流所需的凭证。