一旦我们的数据被处理,如何刷新kafka队列?

问题描述 投票:3回答:1

我有一个有4个主题的kafka制作人。当我根据CSV的类型上传CSV时,我会根据其主题将其发送给消费者。如果我第一次上传它会很有效但是一旦我上传了另一个CSV,它也会上传以前的数据,然后它会放入新的CSV数据。

我是kafka的新手所以我无法找到关于这个问题的正确解决方案。我尝试搜索偏移但无法实现它。我尝试将数组重置为null,但在新文件出现后重置为null后,它们会包含上一个数据和新数据。

生产者代码.js

 var kafka = require('kafka-node'),
        HighLevelProducer = kafka.HighLevelProducer,
        HighLevelConsumer = kafka.HighLevelConsumer,
        client = new kafka.Client(),
        producer = new HighLevelProducer(client),
        fs = require('fs'),
        consumer = new HighLevelConsumer(client, [{ topic: 'csvDealData', partition: 0 }, { topic: 'csvAssetData', partition: 0 }, { topic: 'csvPricingData', partition: 0 }, { topic: 'csvRedeemData', partition: 0 }], { autoCommit: false });
    var payloads;

var async = require('async');
console.log("STARTING PRODUCER");
var config = require("./config.json")
var http = require('http');
var express = require('express');
var app = express();
var port = '9094';
let tempCSVArray = [];
var server = http.createServer(app).listen(port);
server.timeout = 24000;
var totalDataLength = 0;
var tempIndex;

// var offset = new kafka.Offset(client)
// offset.fetchLatestOffsets([topic], (err, offsets) => {
//     if (err) {
//         console.log(`error fetching latest offsets ${err}`)
//         return
//     }
//     var latest = 1
//     Object.keys(offsets[topic]).forEach( o => {
//         latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
//     })
//     console.log(" topic :::  "+topic);
//     console.log(" offsets :::  "+offsets);
//     console.log(" latest :::  "+latest);
//     consumer.setOffset(topic, 0, latest-1)
// });

var io = require('socket.io').listen(server, function () {
    console.log("Connected To Invoke Server.... ");
});
io.on('connection', function (socket) {
    socket.on('csvDataFromUI', function (data) {
        producer.on('error', function (err) { });
        tempCSVArray.push(data.dataArr);

        // here we are getting all the rows from CSV and we wait for the end line to come once we recived it we create and array and then send it tp and async function
        if (data.isEnd) {
            totalDataLength = tempCSVArray.length
            console.log(" \n length of data send to invoke function is ::::: " + totalDataLength + " \n  dataArray value :::: " + JSON.stringify(tempCSVArray));
            csvInvoke(tempCSVArray);
        }
    });
})

function csvInvoke(tempCSVArray) {
    async.eachOfSeries(tempCSVArray, (a, index, asyncCallback) => {
        a = a[0];
        tempIndex = index;
        let csvType = a.CsvType;
        if (csvType === "DealCaptureUpload") {
            var message = a;
            var originator = a.RepoData[0].Party[0].ParticipantID.trim();
            var collection = a.Collection;
            console.log(" originator  :::: " + originator);
            console.log("\nCollection: " + collection);
            if (a.RepoData[0].Trade[0].TransactionStatus == "NEW") {
                var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "DealPrivateCC", "fcn": "invoke", "Invokeargs": ["Capture", collection, message, "NEW", "DealPrivateCC", "globalchannel"], "username": "adminY", "orgName": config.orgList[0][originator] };
                payloads = [{ topic: 'csvDealData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
            } else if (a.RepoData[0].Trade[0].TransactionStatus == "CANCEL") {
                var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "DealPrivateCC", "fcn": "invoke", "Invokeargs": ["Capture", collection, message, "CANCEL", "PartyPrivateCC", "globalchannel"], "username": "adminY", "orgName": config.orgList[0][originator] };
                payloads = [{ topic: 'csvDealData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
            }
            setTimeout(() => {
                producer.send(payloads, function (err, data) {
                    if (err != null)
                        console.log("Error sending payload to consumer - " + err)
                    else
                        console.log("\n index ::: " + tempIndex + "\n Payloads  ::::::::  " + JSON.stringify(payloads));
                });
                asyncCallback();
            }, 6000);
        } else if (csvType === "AssetIssuanceUpload") {
            var message = a.Record[0];
            var originator = a.Party.trim();
            var collection = a.Collection;
            console.log(" \n originator  :::: " + originator);
            console.log("\n Collection: " + collection);
            var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "OwnershipPrivateCC", "fcn": "invokeInternal", "Invokeargs": ["Creation", collection, message], "username": "adminY", "orgName": config.orgList[0][originator] };
            payloads = [{ topic: 'csvAssetData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
            setTimeout(() => {
                producer.send(payloads, function (err, data) {
                    if (err != null)
                        console.log("Error sending payload to consumer - " + err)
                    else
                        console.log("\n index ::: " + tempIndex + "\n Payloads  ::::::::  " + JSON.stringify(payloads));
                });
                asyncCallback();
            }, 500);
        } else if (csvType === "PricingDataUpload") {
            var message = a.Record[0];
            var originator = a.Party.trim();
            console.log(" \n originator  :::: " + originator);
            var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "DataCC", "fcn": "invoke", "Invokeargs": ["DataSetup", message], "username": "adminY", "orgName": config.orgList[0][originator] };
            payloads = [{ topic: 'csvPricingData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
            setTimeout(() => {
                producer.send(payloads, function (err, data) {
                    if (err != null)
                        console.log("Error sending payload to consumer - " + err)
                    else
                        console.log("\n index ::: " + index + "\n Payloads  ::::::::  " + JSON.stringify(payloads));
                });
                asyncCallback();
            }, 500);
        } else if (csvType === "RedeemDataUpload") {
            var message = a.Record[0];
            var originator = a.Party.trim();
            var collection = a.Collection;
            console.log(" \n originator  :::: " + originator);
            console.log("\n Collection: " + collection);
            var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "PrivateCC", "fcn": "invoke", "Invokeargs": ["invokeWithdrawal", collection, message, "OwnershipPrivateCC"], "username": "adminY", "orgName": config.orgList[0][originator] };
            payloads = [{ topic: 'csvRedeemData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
            setTimeout(() => {
                producer.send(payloads, function (err, data) {
                    if (err != null)
                        console.log("Error sending payload to consumer - " + err)
                    else
                        console.log("\n index ::: " + index + "\n Payloads  ::::::::  " + JSON.stringify(payloads));
                });
                asyncCallback();
            }, 2000);
        }

        if (index === totalDataLength - 1) {
            tempCSVArray = [];
            //a = [];
            payloads = [];
            console.log(" We are flushing the tempCSVArray  ::::");
            console.log("\n final tempCSVArray  ::: " + JSON.stringify(tempCSVArray) + "    final a :::::: final  payloads ::::: " + JSON.stringify(payloads))

        }
        client.refreshMetadata(['csvDealData', 'csvAssetData', 'csvPricingData', 'csvRedeemData'], (err) => {
            if (err) {
                console.warn('Error refreshing kafka metadata', err);
            }
        });

    }, function (err) {
        if (err) console.error(err.message);
        console.warn('Error refreshing kafka metadata', err);
    });
}

消费者代码

var kafka = require('kafka-node');
var HighLevelConsumer = kafka.HighLevelConsumer;
var Client = kafka.Client;
var argv = require('optimist').argv;
var client = new Client('localhost:2181');
var topics = [{ topic: 'csvDealData' }, { topic: 'csvAssetData' }, { topic: 'csvPricingData' }, { topic: 'csvRedeemData' }];
var options = { autoCommit: true, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 };
var consumer = new HighLevelConsumer(client, topics, options);

console.log("STARTING CONSUMER");

var hfcSocket = require("socket.io-client");
var invoke1 = hfcSocket.connect('http://10.21.134.17:5001');
var invoke2 = hfcSocket.connect('http://10.21.134.17:5004');
var invoke3 = hfcSocket.connect('http://10.21.134.17:5005');
var invoke4 = hfcSocket.connect('http://10.21.134.17:5002');
var invoke5 = hfcSocket.connect('http://10.21.134.17:5006');
var invoke6 = hfcSocket.connect('http://10.21.134.17:5003');
var invoke7 = hfcSocket.connect('http://10.21.134.17:5007');
var invoke8 = hfcSocket.connect('http://10.21.134.17:5008');

consumer.on('message', function (message) {
   console.log(" message in consumer  :::: " + JSON.stringify(message));

   if (message.topic == "csvDealData") {
      console.log(" Message ::: " + JSON.stringify(message));
      var originator = JSON.parse(message.value).originator;
      if (originator == "Org1") 
            invoke1.emit('csvDealData', message)
      else if (originator == "Org2") 
            invoke2.emit('csvDealData', message);
      else if (originator == "Org3")
            invoke6.emit('csvDealData', message);
      else if (originator == "Org4") 
            invoke5.emit('csvDealData', message);
      else if (originator == "Org5") 
            invoke3.emit('csvDealData', message);
      else if (originator == "Org6") 
            invoke4.emit('csvDealData', message);
      else if (originator == "Org7") 
            invoke7.emit('csvDealData', message);
      else if (originator == "Org8") 
            invoke8.emit('csvDealData', message);
   } else if (message.topic == "csvAssetData") {
      var originator = JSON.parse(message.value).originator;
      if (originator == "Org1") 
            invoke1.emit('csvAssetData', message)
      else if (originator == "Org2") 
            invoke2.emit('csvAssetData', message);
      else if (originator == "Org3") 
            invoke6.emit('csvAssetData', message);
      else if (originator == "Org4") 
            invoke5.emit('csvAssetData', message);
      else if (originator == "Org5") 
            invoke3.emit('csvAssetData', message);
      else if (originator == "Org6") 
            invoke4.emit('csvAssetData', message);
      else if (originator == "Org7") 
            invoke7.emit('csvAssetData', message);
      else if (originator == "Org8")
            invoke8.emit('csvAssetData', message);
   } else if (message.topic == "csvPricingData") {
      var originator = JSON.parse(message.value).originator;
      if (originator == "Org1") 
            invoke1.emit('csvPricingData', message)
      else if (originator == "Org2")
            invoke2.emit('csvPricingData', message);
      else if (originator == "Org3") 
            invoke6.emit('csvPricingData', message);
      else if (originator == "Org4") 
            invoke5.emit('csvPricingData', message);
      else if (originator == "Org5") 
            invoke3.emit('csvPricingData', message);
      else if (originator == "Org6") 
            invoke4.emit('csvPricingData', message);
      else if (originator == "Org7") 
            invoke7.emit('csvPricingData', message);
      else if (originator == "Org8")
            invoke8.emit('csvPricingData', message);
   } else if (message.topic == "csvRedeemData") {
      var originator = JSON.parse(message.value).originator;
      if (originator == "Org1") 
            invoke1.emit('csvRedeemData', message)
      else if (originator == "Org2") 
            invoke2.emit('csvRedeemData', message);
      else if (originator == "Org3")
            invoke6.emit('csvRedeemData', message);
      else if (originator == "Org4") 
            invoke5.emit('csvRedeemData', message);
      else if (originator == "Org5") 
            invoke3.emit('csvRedeemData', message);
      else if (originator == "Org6") 
            invoke4.emit('csvRedeemData', message);
      else if (originator == "Org7") 
            invoke7.emit('csvRedeemData', message);
      else if (originator == "Org8") 
            invoke8.emit('csvRedeemData', message);
   }
});

consumer.on('error', function (err) {
   console.log('error', err);
});

我期待用户是否已经上传了CSV并上传了其他类型的CSV,它不应该妨碍以前的CSV数据,然后它也应该正确发送新的CSV而不调用以前的数据。

my fiddle link for producer.js

link for consumer

javascript arrays apache-kafka kafka-producer-api
1个回答
0
投票

如果您需要将CSV文件中的数据导入Kafka,请使用Kafka Connect。有a connector将摄取CSV文件。 Kafka Connect是Apache Kafka的一部分,只需要使用简单的JSON配置。

© www.soinside.com 2019 - 2024. All rights reserved.