我正在尝试通过 kafka 消息监听器解析来自 kafka 主题的消息。 我可以通过生产者和卡夫卡主题看到该消息。 但在消费者流程中,转换消息无法正确解析 json。
消费流程
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:bigquery="http://www.mulesoft.org/schema/mule/bigquery" xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:kafka="http://www.mulesoft.org/schema/mule/kafka" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/bigquery http://www.mulesoft.org/schema/mule/bigquery/current/mule-bigquery.xsd">
<flow name="account-consumer-npFlow" doc:id="0d996208-7cb6-42b9-93f2-77ead77d1883" >
<kafka:message-listener doc:name="Message listener" doc:id="c302fcb3-539c-42d9-8d21-347da8328ecd" config-ref="EDH_Consumer_np_configuration">
<repeatable-in-memory-stream />
</kafka:message-listener>
<ee:transform doc:name="Transform Message" doc:id="28ca3ce2-2c57-4c86-a845-f00d298a6590" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
out : payload.ChangeEventHeader.changeType
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger level="INFO" doc:name="Logger" doc:id="c4bc21b6-98d6-466c-a086-1fd5bf48343b" message="#[%dw 2.0 output application/json --- payload]" />
</flow>
</mule>
错误日志:
INFO 2024-02-06 10:24:43,087 [[MuleRuntime].uber.11: [sf-setotcemailtype-procapi-poc].account-producer-np.CPU_LITE @37e0e7a1] [processor: account-producer-np/processors/0/route/1/processors/0; event: 3bc1f370-c50c-11ee-942a-acde48001122] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: {
"ChangeEventHeader": {
"commitNumber": 1707236682278494209,
"commitUser": "0057f000005zHkEAAU",
"sequenceNumber": 1,
"entityName": "Account",
"changeType": "DELETE",
"changedFields": [
],
"changeOrigin": "",
"transactionKey": "0000295d-4636-8277-714a-0fbcba7f9c9b",
"commitTimestamp": 1707236682000,
"recordIds": [
"001O1000007mPzNIAU"
]
}
}
ERROR 2024-02-06 10:24:43,907 [[MuleRuntime].uber.11: [sf-setotcemailtype-procapi-poc].account-consumer-npFlow.CPU_INTENSIVE @3e250b1c] [processor: ; event: 3c5c37a1-c50c-11ee-942a-acde48001122] org.mule.runtime.core.privileged.exception.DefaultExceptionListener:
********************************************************************************
Message : "You called the function 'Value Selector' with these arguments:
1: Binary ("ewogICJpZCI6IFsKICAgICIwMDFPMTAwMDAwN21Qek5JQVUiCiAgXSwKICAiYWN0aXZlX19jIjog...)
2: Name ("ChangeEventHeader")
But it expects one of these combinations:
(Array, Name)
(Array, String)
(Date, Name)
(DateTime, Name)
(LocalDateTime, Name)
(LocalTime, Name)
(Object, Name)
(Object, String)
(Period, Name)
(Time, Name)
5| out : payload.ChangeEventHeader.changeType
^^^^^^^^^^^^^^^^^^^^^^^^^
Trace:
at anonymous::main (line: 5, column: 8)" evaluating expression: "%dw 2.0
output application/json
---
{
out : payload.ChangeEventHeader.changeType
}".
Element : account-consumer-npFlow/processors/0 @ sf-setotcemailtype-procapi-poc:account-consumer-np.xml:15 (Transform Message)
Element DSL : <ee:transform doc:name="Transform Message" doc:id="28ca3ce2-2c57-4c86-a845-f00d298a6590">
<ee:message>
<ee:set-payload><![CDATA[
%dw 2.0
output application/json
---
{
out : payload.ChangeEventHeader.changeType
}
]]></ee:set-payload>
</ee:message>
</ee:transform>
Error type : MULE:EXPRESSION
FlowStack : at account-consumer-npFlow(account-consumer-npFlow/processors/0 @ sf-setotcemailtype-procapi-poc:account-consumer-np.xml:15 (Transform Message))
(set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
我能够在 dataWeave 游乐场中很好地解析有效负载消息。
{
"ChangeEventHeader": {
"commitNumber": 1707236682278494209,
"commitUser": "0057f000005zHkEAAU",
"sequenceNumber": 1,
"entityName": "Account",
"changeType": "DELETE",
"changedFields": [
],
"changeOrigin": "",
"transactionKey": "0000295d-4636-8277-714a-0fbcba7f9c9b",
"commitTimestamp": 1707236682000,
"recordIds": [
"001O1000007mPzNIAU"
]
}
}
带脚本
%dw 2.0
output application/json
---
{
out : payload.ChangeEventHeader.changeType
}
按预期返回。
{
"out": "DELETE"
}
不知道为什么转换消息无法解析它。
该错误告诉您 DataWeave 脚本正在接收二进制文件作为输入,而不是 JSON 值。
Message : "You called the function 'Value Selector' with these arguments:
1: Binary ("ewogICJpZCI6IFsKICAgICIwMDFPMTAwMDAwN21Qek5JQVUiCiAgXSwKICAiYWN0aXZlX19jIjog...)
2: Name ("ChangeEventHeader")
由于它是二进制文件,因此没有密钥。表达式
payload.ChangeEventHeader
失败,因为尝试使用 单值选择器 从二进制文件中获取键的值是无效的。
你必须检查为什么你的 Kafka 消息是二进制的。