我想在flink中使用kafaka消息队列作为sql语句参数,但是我觉得使用和不使用没有区别。如果我在springboot中写一个消费者就可以了。所以我认为flink中一定有更好的方法。怎么优化,想想
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id","kafka");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
TOPIC, new SimpleStringSchema(), props);
consumer.setStartFromLatest();
DataStream<String> stream = env.addSource(consumer);
stream.map(new RichMapFunction<String, List<RecommendHouse>>() {
private PreparedStatement pstmt=null;
private Connection connection=null;
List<HouseInfo> houseInfoList = new ArrayList<>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
pstmt = connection.prepareStatement("select * from house_info where room_state=1");
ResultSet resultSet = pstmt.executeQuery();
while (resultSet.next()){
HouseInfo houseInfo = new HouseInfo(resultSet.getString("house_id"),resultSet.getInt("city_id"),resultSet.getInt("district_id"),resultSet.getString("building_name"),resultSet.getBigDecimal("room_area"),resultSet.getBigDecimal("rent_value"));
houseInfoList.add(houseInfo);
}
}
@Override
public List<RecommendHouse> map(String s) throws Exception {
Requirement requirement = JSON.parseObject(s, Requirement.class);
Integer cityId = requirement.getCityId();
Integer areaMax = requirement.getAreaMax();
Integer areaMin = requirement.getAreaMin();
List<RecommendHouse> recommendHouseList = Lists.newArrayList();
List<HouseInfo> list = houseInfoList.stream().filter(r -> r.getCityId().equals(cityId)).collect(Collectors.toList());
for (HouseInfo houseInfo : list) {
//score
Integer score = 0;
if(houseInfo.getDistrictId().equals(100100)){
score += 10;
}
if(houseInfo.getRoomArea().compareTo(BigDecimal.valueOf(areaMax))== -1 && houseInfo.getRoomArea().compareTo(BigDecimal.valueOf(areaMin)) == 1){
score += 10;
}
RecommendHouse recommendHouse = new RecommendHouse(requirement.getRequirementId(), houseInfo.getHouseId(), score);
recommendHouseList.add(recommendHouse);
}
List<RecommendHouse> limit = recommendHouseList.stream().sorted(Comparator.comparing(RecommendHouse::getScore).reversed()).limit(50).collect(Collectors.toList());
return limit;
}
public Connection getConnection() {
}).print();
env.execute("consumer");