具有行限制的 API 数据检索优化策略

问题描述 投票:0回答:1

我目前正在从事一个数据抓取项目,需要从多个统计 API 中检索大量数据。这些 API,尤其是较旧的 API 和政府 API,通常不支持整个数据集的批量下载。相反,它们需要为每个请求指定您感兴趣的变量和相应的值。此外,许多方法对您可以在单个请求中检索的值(或“行”)的数量施加限制。这促使我寻求一种在这些约束下构建请求的最佳算法。

考虑一个标题为“按年份、性别和国家划分的人口”的数据集,其变量及其可能值如下:

数据集变量示例

{
   "sex": ["total", "women", "men"],
   "country of birth": ["Norway", "Finland", "Sweden", "Denmark"],
   "year": ["2019", "2020", "2021", "2022", "2023"]
}

每个请求必须至少包含每个变量的一个选择,返回的单元格数量是每个变量的选择的乘积。

请求参数示例

{
  "sex": ["total", "women", "men"],
  "country of birth": ["sweden", "denmark"],
  "year": ["2023"]
}

此请求将返回 3 x 2 x 1 = 6 行,涵盖指定值的每个组合:

sex, country of birth, year, value
total, sweden, 2023, X
total, denmark, 2023, X
women, sweden, 2023, X
women, denmark, 2023, X
men, denmark, 2023, X
men, sweden, 2023, X

对于此示例,API 将每个请求可检索的值/数据行数限制为 13 个。 行限制= 13

很明显,如果我们想要通过尽可能少的请求获取该表中的所有值,则给定的示例不是最佳示例,因为当限制为 13 时,它仅检索 6 行。 不清楚的是我们如何构建 API 请求(字典列表),以便我们不必执行更多必要的 API 调用?

数据集中的组合/值的总数(代表我们想要检索的所有可能的行)可以通过将字典中列出的每个变量的可用选项数相乘来计算。对于我们的示例变量:

总组合 = 3 x 4 x 5 = 60

我们可以注意到的一件事是,考虑到每个请求可以获取的行数的限制,从 API 检索所有数据组合所需的最小请求数的下限等于总行数 (total_combinations ) 除以 API 行请求限制 (row_limit)。在我们的例子中:

min_requests = Total_combinations / row_limit = 60 / 13 = 4.615.. ≈ 5

这只是一个下限,而实际的最小请求数可能会更大。

目标:开发一个函数,为请求配置创建最佳字典列表。每个字典的结果不应超过允许的最大行数(例如 13)。字典列表应涵盖所有可能的组合而不重叠,并最小化字典总数(因此,请求)。

限制:

  1. 每个字典的结果不超过给定的行/组合限制。
  2. 字典列表共同涵盖了所有可能的行/组合。
  3. 字典中没有重复的行/组合。
  4. 在遵守上述约束的同时,字典总数被最小化。

我正在寻找有关可以有效解决此问题的算法或策略的建议,同时考虑到需要最大限度地减少 API 请求数量,同时遵守指定的限制。类似实现的示例或相关算法的指针将不胜感激。

不成功的Python实现 我尝试了一些方法,其中之一是一种递归的、暴力的方法(我能想到的尽可能多的修剪),但它对于较大的输入不起作用(超时)。另外,我认为它甚至不能保证最佳(或其中一个)解决方案。

from collections import Counter
from itertools import combinations, product

def generate_optimal_combinations_recursive_optimized(
    variables: dict, limit: int
) -> list[dict]:
    """
    Generates a list of dictionaries mapping each variable to a subset of its possible values.
    Each dictionary's Cartesian product of subsets must not exceed the predefined limit.
    The goal is to cover the entire Cartesian product of variables while minimizing dictionaries and avoiding duplications.

    Args:
    variables (dict): A dictionary of variables with their possible values.
    limit (int): The maximum allowed product count for each dictionary.

    Returns:
    list[dict]: A list of dictionaries meeting the criteria.
    """
    #get the product count for a subset dictionary
    def product_count(subset: dict) -> int:
        count = 1
        for values in subset.values():
            count *= len(values)
        return count

    #generate a count-based profile
    def generate_profile(subset: dict) -> tuple:
        length_counts = Counter(len(values) for values in subset.values())
        return tuple(sorted((length, count) for length, count in length_counts.items()))

    #generate valid subsets for a variable that fit within the limit
    def valid_subsets(values: list[str], other_counts: int):
        for i in range(1, len(values) + 1):
            for subset in combinations(values, i):
                if len(subset) * other_counts <= limit:
                    yield subset

    #recursive function to explore different combinations of subsets
    def explore_subsets(
        remaining_vars: dict,
        current_solution: list[dict],
        covered_combinations: set,
    ) -> list[dict]:
        if (
            covered_combinations == all_combinations
        ):  # Base case: all combinations are covered
            return current_solution

        best_subsets = []
        best_new_combinations_count = 0
        used_profiles = set()

        # Iterate through all possible combinations of subsets for the remaining variables
        for combination in product(
            *[
                [
                    (var, subset)
                    for subset in valid_subsets(
                        values, int(limit / max(len(values), 1))
                    )
                ]
                for var, values in remaining_vars.items()
            ]
        ):
            subset_dict = {var: subset for var, subset in combination}
            if product_count(subset_dict) <= limit:
                new_combinations = (
                    set(product(*subset_dict.values())) - covered_combinations
                )
                new_combinations_count = len(new_combinations)
                profile = generate_profile(subset_dict)

                if new_combinations_count > best_new_combinations_count:
                    best_subsets = [subset_dict]
                    best_new_combinations_count = new_combinations_count
                    used_profiles = {profile}
                elif (
                    new_combinations_count == best_new_combinations_count
                    and profile not in used_profiles
                ):
                    best_subsets.append(subset_dict)
                    used_profiles.add(profile)

        #explore each of the best subsets recursively (all same size)
        best_solution = None
        for subset_dict in best_subsets:
            new_covered_combinations = covered_combinations.union(
                set(product(*subset_dict.values()))
            )
            new_current_solution = current_solution + [subset_dict]

            #recursive call
            solution = explore_subsets(
                remaining_vars, new_current_solution, new_covered_combinations
            )

            #choose the best solution (with fewer dictionaries)
            if solution and (not best_solution or len(solution) < len(best_solution)):
                best_solution = solution

        return best_solution

    all_combinations = set(product(*variables.values()))
    initial_covered_combinations = set()
    initial_solution = []

    optimal_solution = explore_subsets(
        variables.copy(), initial_solution, initial_covered_combinations
    )

    return optimal_solution if optimal_solution else []
python algorithm statistics combinations combinatorics
1个回答
0
投票

就像我上面用糟糕的语言表述的那样,这都是关于分区的。这个问题让人想起经典的背包问题,但要复杂得多,因为权重和值不是静态的。由于我们需要所有数据/行组合,这意味着必须包含每个输入列表中的每个元素。所以问题归结为,有多少种方法来划分 len(variable_list) 值?相当多,但如果我们考虑唯一需要考虑的是输入变量的实际长度,因为无论如何我们都会尝试将它们分开。因此,我们不考虑所有可能的配置,而只考虑晚上(最后一批可能有例外)划分长度为 X 的列表的方法数量?结果最多有 X 种不同的方式。然后我们看看,如果我们将列表分为 Y (Y

from itertools import product as iter_product
import math

def get_partition_data(variables, limit):
    """Get the number of batches and batch sizes for each variable."""
    batch_size_sets = {}
    number_of_batches_lists = []
    for var, values in variables.items():
        size_to_batches = {}

        for size in range(1, min(len(values) + 1, limit + 1)):
            nbr_of_batches = math.ceil(len(values) / size)
            # This ensures we only keep the largest size for each unique number of batches
            size_to_batches[nbr_of_batches] = size
        batch_size_sets[var] = size_to_batches
        number_of_batches_lists.append(list(size_to_batches.keys()))

    return batch_size_sets, number_of_batches_lists


def find_optimal_combination(variables, limit):
    """Optimized function to find the optimal batch sizes for each variable."""
    total_rows = math.prod([len(values) for values in variables.values()])
    lower_request_bound = math.ceil(total_rows / limit)
    if lower_request_bound == 1:
        return {var: len(values) for var, values in variables.items()}

    batch_size_sets, number_of_batches_lists = get_partition_data(variables, limit)

    best_combination = None
    min_request_count = float("inf")

    for combo in iter_product(*number_of_batches_lists):
        request_count = math.prod(combo)
        if request_count >= lower_request_bound and request_count < min_request_count:
            batch_sizes_product = math.prod(
                [batch_size_sets[var][nbr] for var, nbr in zip(variables.keys(), combo)]
            )
            if batch_sizes_product <= limit:
                min_request_count = request_count
                best_combination = combo
                if min_request_count == lower_request_bound:
                    break
    return {
        var: batch_size_sets[var][nbr]
        for var, nbr in zip(variables.keys(), best_combination)
    }



def generate_all_combinations(variables, optimal_batch_sizes):
    """Generate all combinations of batches, one batch from each variable's batched lists of values."""
    # Split each variable's values into batches according to the optimal batch size
    all_batches = {
        var: split_into_batches(values, optimal_batch_sizes[var])
        for var, values in variables.items()
    }
    # Generate the Cartesian product of all batches to form the configurations
    configurations = list(iter_product(*all_batches.values()))

    # Convert tuple configurations to dictionary format
    configurations_dicts = []
    for configuration in configurations:
        config_dict = {
            var: batch for var, batch in zip(variables.keys(), configuration)
        }
        configurations_dicts.append(config_dict)

    return configurations_dicts


def get_request_configs(variables, limit):
    """Get the optimal batch sizes and all possible combinations of batches."""
    optimal_batch_sizes = find_optimal_combination(variables, limit)
    request_configs = generate_all_combinations(variables, optimal_batch_sizes)
    return optimal_batch_sizes, request_configs

打印出来

Input variable lengths:
{
    "commodity group according to CN": 25163,
    "trading partner": 263,
    "observations": 3,
    "month": 347
}
Limit: 100000

Optimal batch sizes: 
{
    "commodity group according to CN": 95,
    "trading partner": 263,
    "observations": 1,
    "month": 4
}

Elapsed time: 0.150539 seconds
Number of request configurations: 69165
Lower request bound: 68893
Difference: 272
Total number of rows/requests: 6889201629
© www.soinside.com 2019 - 2024. All rights reserved.