在 Mulesoft 中通过 Kafka 监听器解析 json 消息时出错

问题描述 投票:0回答:1

我正在尝试通过 kafka 消息监听器解析来自 kafka 主题的消息。 我可以通过生产者和卡夫卡主题看到该消息。 但在消费者流程中,转换消息无法正确解析 json。

消费流程

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:bigquery="http://www.mulesoft.org/schema/mule/bigquery" xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
    xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
    xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:kafka="http://www.mulesoft.org/schema/mule/kafka" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd
http://www.mulesoft.org/schema/mule/bigquery http://www.mulesoft.org/schema/mule/bigquery/current/mule-bigquery.xsd">
    <flow name="account-consumer-npFlow" doc:id="0d996208-7cb6-42b9-93f2-77ead77d1883" >
        <kafka:message-listener doc:name="Message listener" doc:id="c302fcb3-539c-42d9-8d21-347da8328ecd" config-ref="EDH_Consumer_np_configuration">
            <repeatable-in-memory-stream />
        </kafka:message-listener>
        <ee:transform doc:name="Transform Message" doc:id="28ca3ce2-2c57-4c86-a845-f00d298a6590" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
    out : payload.ChangeEventHeader.changeType
}]]></ee:set-payload>
            </ee:message>
        </ee:transform>
        <logger level="INFO" doc:name="Logger" doc:id="c4bc21b6-98d6-466c-a086-1fd5bf48343b" message="#[%dw 2.0&#10;output application/json&#10;---&#10;payload]" />
    </flow>
</mule>

错误日志:

INFO  2024-02-06 10:24:43,087 [[MuleRuntime].uber.11: [sf-setotcemailtype-procapi-poc].account-producer-np.CPU_LITE @37e0e7a1] [processor: account-producer-np/processors/0/route/1/processors/0; event: 3bc1f370-c50c-11ee-942a-acde48001122] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: {
  "ChangeEventHeader": {
    "commitNumber": 1707236682278494209,
    "commitUser": "0057f000005zHkEAAU",
    "sequenceNumber": 1,
    "entityName": "Account",
    "changeType": "DELETE",
    "changedFields": [
      
    ],
    "changeOrigin": "",
    "transactionKey": "0000295d-4636-8277-714a-0fbcba7f9c9b",
    "commitTimestamp": 1707236682000,
    "recordIds": [
      "001O1000007mPzNIAU"
    ]
  }
}
ERROR 2024-02-06 10:24:43,907 [[MuleRuntime].uber.11: [sf-setotcemailtype-procapi-poc].account-consumer-npFlow.CPU_INTENSIVE @3e250b1c] [processor: ; event: 3c5c37a1-c50c-11ee-942a-acde48001122] org.mule.runtime.core.privileged.exception.DefaultExceptionListener: 
********************************************************************************
Message               : "You called the function 'Value Selector' with these arguments: 
  1: Binary ("ewogICJpZCI6IFsKICAgICIwMDFPMTAwMDAwN21Qek5JQVUiCiAgXSwKICAiYWN0aXZlX19jIjog...)
  2: Name ("ChangeEventHeader")

But it expects one of these combinations:
  (Array, Name)
  (Array, String)
  (Date, Name)
  (DateTime, Name)
  (LocalDateTime, Name)
  (LocalTime, Name)
  (Object, Name)
  (Object, String)
  (Period, Name)
  (Time, Name)

5|  out : payload.ChangeEventHeader.changeType
          ^^^^^^^^^^^^^^^^^^^^^^^^^
Trace:
  at anonymous::main (line: 5, column: 8)" evaluating expression: "%dw 2.0
output application/json
---
{
    out : payload.ChangeEventHeader.changeType
}".
Element               : account-consumer-npFlow/processors/0 @ sf-setotcemailtype-procapi-poc:account-consumer-np.xml:15 (Transform Message)
Element DSL           : <ee:transform doc:name="Transform Message" doc:id="28ca3ce2-2c57-4c86-a845-f00d298a6590">
<ee:message>
<ee:set-payload><![CDATA[
%dw 2.0
output application/json
---
{
    out : payload.ChangeEventHeader.changeType
}
]]></ee:set-payload>
</ee:message>
</ee:transform>
Error type            : MULE:EXPRESSION
FlowStack             : at account-consumer-npFlow(account-consumer-npFlow/processors/0 @ sf-setotcemailtype-procapi-poc:account-consumer-np.xml:15 (Transform Message))

  (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

我能够在 dataWeave 游乐场中很好地解析有效负载消息。

{
  "ChangeEventHeader": {
    "commitNumber": 1707236682278494209,
    "commitUser": "0057f000005zHkEAAU",
    "sequenceNumber": 1,
    "entityName": "Account",
    "changeType": "DELETE",
    "changedFields": [
      
    ],
    "changeOrigin": "",
    "transactionKey": "0000295d-4636-8277-714a-0fbcba7f9c9b",
    "commitTimestamp": 1707236682000,
    "recordIds": [
      "001O1000007mPzNIAU"
    ]
  }
}

带脚本

%dw 2.0
output application/json
---
{
    out : payload.ChangeEventHeader.changeType
}

按预期返回。

{
  "out": "DELETE"
}

不知道为什么转换消息无法解析它。

confluent-platform dataweave mulesoft mule4 mule-connector
1个回答
0
投票

该错误告诉您 DataWeave 脚本正在接收二进制文件作为输入,而不是 JSON 值。

Message               : "You called the function 'Value Selector' with these arguments: 
  1: Binary ("ewogICJpZCI6IFsKICAgICIwMDFPMTAwMDAwN21Qek5JQVUiCiAgXSwKICAiYWN0aXZlX19jIjog...)
  2: Name ("ChangeEventHeader")

由于它是二进制文件,因此没有密钥。表达式

payload.ChangeEventHeader
失败,因为尝试使用 单值选择器 从二进制文件中获取键的值是无效的。

你必须检查为什么你的 Kafka 消息是二进制的。

© www.soinside.com 2019 - 2024. All rights reserved.