Flink使用kafka作为消息队列作为参数

问题描述 投票:0回答:0

我想在flink中使用kafaka消息队列作为sql语句参数,但是我觉得使用和不使用没有区别。如果我在springboot中写一个消费者就可以了。所以我认为flink中一定有更好的方法。怎么优化,想想

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id","kafka");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                TOPIC, new SimpleStringSchema(), props);
       
        consumer.setStartFromLatest();
        DataStream<String> stream = env.addSource(consumer);

        stream.map(new RichMapFunction<String, List<RecommendHouse>>() {

                    private PreparedStatement pstmt=null;
                    private Connection connection=null;

                    List<HouseInfo> houseInfoList = new ArrayList<>();
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        connection = getConnection();
                        pstmt = connection.prepareStatement("select * from house_info where room_state=1");
                        ResultSet resultSet = pstmt.executeQuery();
                        while (resultSet.next()){
                         
                            HouseInfo houseInfo = new HouseInfo(resultSet.getString("house_id"),resultSet.getInt("city_id"),resultSet.getInt("district_id"),resultSet.getString("building_name"),resultSet.getBigDecimal("room_area"),resultSet.getBigDecimal("rent_value"));
                            houseInfoList.add(houseInfo);
                        }
                    }

                    @Override
                    public List<RecommendHouse> map(String s) throws Exception {
                   
                        Requirement requirement = JSON.parseObject(s, Requirement.class);
                        Integer cityId = requirement.getCityId();
                        Integer areaMax = requirement.getAreaMax();
                        Integer areaMin = requirement.getAreaMin();
                        List<RecommendHouse> recommendHouseList = Lists.newArrayList();
                        List<HouseInfo> list = houseInfoList.stream().filter(r -> r.getCityId().equals(cityId)).collect(Collectors.toList());
                        for (HouseInfo houseInfo : list) {
                            //score
                            Integer score = 0;
                            if(houseInfo.getDistrictId().equals(100100)){
                                score += 10;
                            }
                            if(houseInfo.getRoomArea().compareTo(BigDecimal.valueOf(areaMax))== -1 && houseInfo.getRoomArea().compareTo(BigDecimal.valueOf(areaMin)) == 1){
                                score += 10;
                            }
                            RecommendHouse recommendHouse = new RecommendHouse(requirement.getRequirementId(), houseInfo.getHouseId(), score);
                            recommendHouseList.add(recommendHouse);
                        }

                        List<RecommendHouse> limit = recommendHouseList.stream().sorted(Comparator.comparing(RecommendHouse::getScore).reversed()).limit(50).collect(Collectors.toList());

                        return limit;
                    }


                    public Connection getConnection() {
                       
                }).print();
        env.execute("consumer");
apache-flink flink-sql
© www.soinside.com 2019 - 2024. All rights reserved.