如何根据字段的值在Logstash节流插件中设置动态限制?

问题描述 投票:0回答:2

我正在使用 Logstash 节流插件来限制事件的发生率。如果我手动设置

after_count
,一切都可以。不过我正在尝试从 Redis 获取这个值。然后将其设置为新字段,然后在throttle插件中使用它。所以我的配置文件是这样的:

filter {
        ruby {
                init => 'require "redis"; $redis = Redis.new(host: "127.0.0.1", port: 6379, db: 5)'
                code => 'event.set("limit", $redis.get("limit"))'
        }

        mutate {
                convert => {
                        "limit" => "integer"
                }
        }

        throttle {
                after_count => %{limit}
                period => 1
                max_age => 2
                key => "[message]"
                add_tag => "throttled"
        }

        if "throttled" in [tags] {
                drop { }
        }
}

虽然检查日志时没有错误,但运行此配置时索引事件会停止。有人可以帮助我吗?有没有办法根据redis中的值来限制速率?

elasticsearch plugins logstash elk rate-limiting
2个回答
1
投票

不可以,您不能在节流过滤器的 before_count 或 after_count 选项中使用 sprintf 引用。 代码直接使用选项值,而不是sprintf它们。

“%{limit}”将转换为零,因此“count > 0”始终评估为 true,并且每个事件都会被标记并删除。


0
投票

将此Java插件添加到logstash-core中,然后打包到logstash-core.jar中



@LogstashPlugin(name = "java_rate_limit")
public class RateLimitFilter implements Filter {

    private static final Logger log = LogManager.getLogger(RateLimitFilter.class);

    public static final PluginConfigSpec<String> RATE_PATH = PluginConfigSpec.stringSetting("rate_path");

    public static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1);

    private String id;

    private String ratePath;

    private RateLimiter rateLimiter;

    private double lastRate;

    /**
     * Required constructor.
     *
     * @param id            Plugin id
     * @param configuration Logstash Configuration
     * @param context       Logstash Context
     */
    public RateLimitFilter(final String id, final Configuration configuration, final Context context) {
        this.id = id;
        this.ratePath = configuration.get(RATE_PATH);
        this.rateLimiterEnabled = ratePath != null && !ratePath.trim().isEmpty();
        if (rateLimiterEnabled) {
            SCHEDULER.scheduleWithFixedDelay(() -> updateRateLimiterIfRateChanged(), 0, 1, TimeUnit.SECONDS);
        }

    private void updateRateLimiterIfRateChanged() {
        double rate = -1D;
        try (BufferedReader bufferedReader = new BufferedReader(new FileReader(ratePath))) {
            String firstLine = bufferedReader.readLine();
            if (firstLine != null) {
                rate = Double.valueOf(firstLine);
            }
        } catch (Throwable e) {
            log.error("Get rate value failed!", e);
        }

        if (rate != lastRate) {
            if (rate <= 0D) {
                rateLimiter = null;
                log.warn("# Rate is not positive, set RateLimiter to null! lastRate:[{}] rate:[{}] ratePath:[{}].", lastRate, rate, ratePath);
            } else if (rate > 0D) {
                rateLimiter = RateLimiter.create(rate);
                log.warn("# Rate changed, set new RateLimiter! lastRate:[{}] rate:[{}] ratePath:[{}].", lastRate, rate, ratePath);
            }
            lastRate = rate;
        }

    }

    @Override
    public Collection<Event> filter(Collection<Event> events, final FilterMatchListener filterMatchListener) {
        if (events == null || events.isEmpty()) {
            return events;
        }
        if (rateLimiter != null) {
            rateLimiter.acquire(events.size());
        }
        for (Event event : events) {
            filterMatchListener.filterMatched(event);
        }
        return events;
    }

    @Override
    public Collection<PluginConfigSpec<?>> configSchema() {
        return PluginHelper.commonFilterSettings(Arrays.asList(RATE_PATH));
    }

    @Override
    public String getId() {
        return id;
    }

}

然后在conf中添加过滤器

filter {
    # plugin name
    java_rate_limit {
        # rate in a text file  
        rate_path    => "/path/rate.txt"
    }

}

您可以在文件中更改速率,1秒内生效。

© www.soinside.com 2019 - 2024. All rights reserved.