我有一个服务(用java堆栈编写),它将加密和压缩字符串文本并发送到云服务(在Typescript中)。然后,加密的消息被解压缩和解密,以便在云端进一步处理。
在云中,消息在 Eventhub 上发送并使用 Azure function-eventhub 触发器读取,函数基于 typescript 中的编程模型 V4。
加密(java中):
private static byte[] compress(String str) throws IOException {
ByteArrayOutputStream obj = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(obj);
gzip.write(str.getBytes(StandardCharsets.UTF_8));
gzip.close();
return obj.toByteArray();
}
在Azure函数eventhub触发器上解密(在打字稿中):
app.eventHub('EventTrigger', {
connection: 'connectionString',
eventHubName: 'eventHubName',
consumerGroup: 'cg',
cardinality: 'one',
extraOutputs: [Output],
extraInputs: [cosmosInput],
handler: EventTrigger
});
export async function EventTrigger(message: any, context: InvocationContext): Promise<void> {
const mh: Handler = new Handler();
const deviceMessage = mh.decompressBuf(message, context);
.....
....
}
export class Handler {
decompressBuf(message: string, context?: InvocationContext): MessageObj {
const decodedMessage=Buffer.from(message);
return this.decom(new Uint8Array(decodedMessage) , context);
}
decom(message: Uint8Array, context?: InvocationContext): MessageObj {
let resultGZipArray = pako.ungzip(message);
var dec:TextDecoderLite = new TextDecoderLite('utf-8');
const text= dec.decode(resultGZipArray);
return JSON.parse(text) as MessageObj;
}
}
错误如下,
[2024-01-08T08:57:11.082Z] System.Private.CoreLib: Exception while executing function: Functions.EventTrigger. System.Private.CoreLib: Result: Failure
Exception: incorrect header check
Stack: Error: incorrect header check
at t.ensureErrorType (C:\Program Files\Microsoft\Azure Functions Core Tools\workers\node\dist\src\worker-bundle.js:2:29312)
at C:\Program Files\Microsoft\Azure Functions Core Tools\workers\node\dist\src\worker-bundle.js:2:45931
at Generator.throw (<anonymous>)
at a (C:\Program Files\Microsoft\Azure Functions Core Tools\workers\node\dist\src\worker-bundle.js:2:44362)
at process.processTicksAndRejections (node:internal/process/task_queues:95:5).
任何帮助将不胜感激。
确保压缩和解码时使用的编码一致。在 Java 代码中,压缩字符串时使用
StandardCharsets.UTF_8
。确保在 TypeScript 中解码时使用相同的编码。
向 TypeScript 函数添加更多日志记录,以在尝试解压缩之前显示原始压缩数据和其他相关信息。 解码前的输出:
import { AzureFunction, Context } from "@azure/functions"
const eventHubTrigger: AzureFunction = async function (context: Context, eventHubMessages: any[]): Promise<void> {
context.log(`Eventhub trigger function called for message array ${eventHubMessages}`);
eventHubMessages.forEach((message, index) => {
context.log(`Processed message ${message}`);
});
};
export default eventHubTrigger;
“解码后输出:”
下面的示例代码适用于事件中心中收到的消息,这些消息采用 Base64 编码,使用 Gzip 压缩,内容预计为 JSON 格式。
import { AzureFunction, Context } from "@azure/functions";
import * as pako from 'pako';
const eventHubTrigger: AzureFunction = async function (context: Context, eventHubMessages: any[]): Promise<void> {
context.log(`Eventhub trigger function called for message array ${eventHubMessages}`);
eventHubMessages.forEach((message, index) => {
context.log(`Processing message ${message}`);
const mh: Handler = new Handler();
const deviceMessage = mh.decompressBuf(message, context);
// Your further processing logic here with 'deviceMessage'
if (deviceMessage) {
context.log(`Device message: ${JSON.stringify(deviceMessage)}`);
// Continue with your processing logic, e.g., storing in a database, triggering other functions, etc.
} else {
context.log.warn('Skipping processing due to empty device message.');
}
});
};
class Handler {
decompressBuf(message: string, context?: Context): any {
try {
const decodedMessage = Buffer.from(message, 'base64'); // Assuming the message is base64 encoded
return this.decompress(new Uint8Array(decodedMessage), context);
} catch (error) {
context.log.error(`Error decoding message: ${error}`);
throw error;
}
}
private decompress(message: Uint8Array, context?: Context): any {
try {
if (message.length === 0) {
context.log.warn('Empty message received. Skipping decompression.');
return null; // or handle accordingly
}
let resultGZipArray = pako.ungzip(message);
const text = new TextDecoder('utf-8').decode(resultGZipArray);
return JSON.parse(text);
} catch (error) {
context.log.error(`Error during decompression: ${error}`);
throw error;
}
}
}
export default eventHubTrigger;