遵循此代码,如何在发送记录和获取记录之间实现多线程。
public class Test_ProducerConsumer {
// Main method
public static void main(String[] args) {
// Send records
for(int i=0; i<10; i++) {
kafka_io.send_records(sendtopic, "test_key", "test_value" + String.valueOf(i));
}
// Get records
try {
kafka_io.get_records(gettopic, 100000);
} catch (JSONException e) {
e.printStackTrace();
}
}
}
您可以使用BlockingQueue。
[在接收项目的一侧,您可以阻止线程,直到新的项目到来,而与其他线程一起,您可以添加该项目并通知该线程谁正在接收该项目。
// Inside a class
public static void main(String[] args) {
Runnable r = Consumer()
Thread t = Thread(r)
t.start()
// Send records
for(int i=0; i<10; i++) {
r.queue.add(DataWrapper(...));
}
}
class Consumer implements Runnable {
BlockingQueue<DataWrapper> queue = BlockingQueue();
@Override
public void run() {
while(true) {
DataWrapper data = queue.take();
// use data here
}
}
}
class DataWrapper {
String dataString; // these are your variables change as you want
int dataInt;
}