我正在编写一个 AWS Lambda,它将侦听 DynamoDB Streams 事件并使用这些事件来更新 Elasticsearch 索引。我的代码是使用 Quarkus 框架编写的,我发现了指南和示例展示了如何执行此操作,所以我的代码如下所示:
import javax.inject.Named;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
/**
* The AWS Lambda used to update an Elasticsearch index when new records are
* written to DynamoDB
*
* @author XXX
*
*/
@Named("dynamoDBSearchIndexEventHandler")
public class DynamoDBSearchIndexEventHandler implements RequestHandler<DynamodbEvent, Void> {
/**
* Logger for this class
*/
private static final Logger logger = LoggerFactory.getLogger(DynamoDBSearchIndexEventHandler.class);
/*
* (non-Javadoc)
*
* @see
* com.amazonaws.services.lambda.runtime.RequestHandler#handleRequest(java.lang.
* Object, com.amazonaws.services.lambda.runtime.Context)
*/
@Override
public Void handleRequest(DynamodbEvent input, Context context) {
if (logger.isDebugEnabled()) {
logger.debug("handleRequest(DynamodbEvent, Context) - start"); //$NON-NLS-1$
}
if (logger.isDebugEnabled()) {
logger.debug("handleRequest(DynamodbEvent, Context) - input={}", input); //$NON-NLS-1$
}
if (logger.isDebugEnabled()) {
logger.debug("handleRequest(DynamodbEvent, Context) - end"); //$NON-NLS-1$
}
return null;
}
}
当我向 DynamoDB 表写入项目时,我看到我的 Lambda 正在被调用,但出现以下异常:
com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.util.Date` from Floating-point value (token `JsonToken.VALUE_NUMBER_FLOAT`)
我不知道为什么序列化失败。如果我修改 Lambda 以将
Map<String, AttributeValue>
作为输入而不是 DynamodbEvent
,它就可以正常工作。我可以看到 Map
填充了所有预期值。但是,我无法获得 DynamodbEvent
,然后我可以使用它来迭代其 DynamodbStreamRecord
对象以更新我的索引。
知道这里可能出现什么问题吗?
我经常遇到 Java Lambda 和反序列化数据的问题。此时我已经放弃并只使用 JsonPath:
public class S3EventLambdaHandler implements RequestStreamHandler {
public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) {
try {
List<String> keys = JsonPath.read(inputStream, "$.Records[*].s3.object.key");
for( String nextKey: keys )
System.out.println(nextKey);
}
catch( IOException ioe ) {
context.getLogger().log("caught IOException reading input stream");
}
}
这是处理 S3 事件,但概念应该是相同的。
事实证明,这是 Quarkus 的一个“已知问题”。我使用了问题记录中提供的ObjectMapperCustomizer
,现在一切正常。