我有一些代码试图将一棵树分成两部分。这些部分需要在某个 ε 范围内通过人口来平衡,我一直在使用以下代码:
def find_balanced_edge_cuts_memoization(
h: PopulatedGraph, choice: Callable = random.choice
) -> List[Cut]:
"""
Find balanced edge cuts using memoization.
This function takes a PopulatedGraph object and a choice function as input and returns a list
of balanced edge cuts. A balanced edge cut is defined as a cut that divides the graph into
two subsets, such that the population of each subset is close to the ideal population
defined by the PopulatedGraph object.
:param h: The PopulatedGraph object representing the graph.
:type h: PopulatedGraph
:param choice: The choice function used to select the root node.
:type choice: Callable, optional
:returns: A list of balanced edge cuts.
:rtype: List[Cut]
"""
root = choice([x for x in h if h.degree(x) > 1])
pred = predecessors(h.graph, root)
succ = successors(h.graph, root)
total_pop = h.tot_pop
subtree_pops: Dict[Any, Union[int, float]] = {}
stack = deque(n for n in succ[root])
while stack:
next_node = stack.pop()
if next_node not in subtree_pops:
if next_node in succ:
children = succ[next_node]
if all(c in subtree_pops for c in children):
subtree_pops[next_node] = sum(subtree_pops[c] for c in children)
subtree_pops[next_node] += h.population[next_node]
else:
stack.append(next_node)
for c in children:
if c not in subtree_pops:
stack.append(c)
else:
subtree_pops[next_node] = h.population[next_node]
def part_nodes(start):
nodes = set()
queue = deque([start])
while queue:
next_node = queue.pop()
if next_node not in nodes:
nodes.add(next_node)
if next_node in succ:
for c in succ[next_node]:
if c not in nodes:
queue.append(c)
return nodes
cuts = []
for node, tree_pop in subtree_pops.items():
if abs(tree_pop - h.ideal_pop) <= h.ideal_pop * h.epsilon:
e = (node, pred[node])
wt = random.random()
cuts.append(
Cut(
edge=e,
weight=h.graph.edges[e].get("random_weight", wt),
subset=part_nodes(node)
)
)
elif abs((total_pop - tree_pop) - h.ideal_pop) <= h.ideal_pop * h.epsilon:
e = (node, pred[node])
wt = random.random()
cuts.append(
Cut(
edge=e,
weight=h.graph.edges[e].get("random_weight", wt),
subset=set(h.graph.nodes) - part_nodes(node),
)
)
return cuts
当我去分析这段代码时,我注意到我的一个测试运行了很长时间,而这个函数是罪魁祸首。于是我就废话了一番,发现把最后一个
for
循环改成
for node, tree_pop in subtree_pops.items():
wt = random.random() # <- CHANGE HERE
if abs(tree_pop - h.ideal_pop) <= h.ideal_pop * h.epsilon:
e = (node, pred[node])
cuts.append(
Cut(
edge=e,
weight=h.graph.edges[e].get("random_weight", wt),
subset=part_nodes(node)
)
)
elif abs((total_pop - tree_pop) - h.ideal_pop) <= h.ideal_pop * h.epsilon:
e = (node, pred[node])
cuts.append(
Cut(
edge=e,
weight=h.graph.edges[e].get("random_weight", wt),
subset=set(h.graph.nodes) - part_nodes(node),
)
)
return cuts
将代码的执行速度提高约 250-400 倍,具体取决于我正在使用的图表,我想了解的是为什么。对我来说,这种情况毫无意义,因为在第一种情况下,
random.random()
调用的次数应该严格少于第二种情况,但这并没有反映在性能中。
以下是本例中使用的相关类:
Cut = namedtuple("Cut", "edge weight subset")
和
class PopulatedGraph:
"""
A class representing a graph with population information.
:ivar graph: The underlying graph structure.
:type graph: nx.Graph
:ivar subsets: A dictionary mapping nodes to their subsets.
:type subsets: Dict
:ivar population: A dictionary mapping nodes to their populations.
:type population: Dict
:ivar tot_pop: The total population of the graph.
:type tot_pop: Union[int, float]
:ivar ideal_pop: The ideal population for each district.
:type ideal_pop: float
:ivar epsilon: The tolerance for population deviation from the ideal population within each
district.
:type epsilon: float
"""
def __init__(
self,
graph: nx.Graph,
populations: Dict,
ideal_pop: Union[float, int],
epsilon: float,
) -> None:
"""
:param graph: The underlying graph structure.
:type graph: nx.Graph
:param populations: A dictionary mapping nodes to their populations.
:type populations: Dict
:param ideal_pop: The ideal population for each district.
:type ideal_pop: Union[float, int]
:param epsilon: The tolerance for population deviation as a percentage of
the ideal population within each district.
:type epsilon: float
"""
self.graph = graph
self.subsets = {node: {node} for node in graph.nodes}
self.population = populations.copy()
self.tot_pop = sum(self.population.values())
self.ideal_pop = ideal_pop
self.epsilon = epsilon
self._degrees = {node: graph.degree(node) for node in graph.nodes}
def __iter__(self):
return iter(self.graph)
def degree(self, node) -> int:
return self._degrees[node]
def contract_node(self, node, parent) -> None:
self.population[parent] += self.population[node]
self.subsets[parent] |= self.subsets[node]
self._degrees[parent] -= 1
def has_ideal_population(self, node) -> bool:
return (
abs(self.population[node] - self.ideal_pop) < self.epsilon * self.ideal_pop
)
def __repr__(self) -> str:
graph_info = (
f"Graph(nodes={len(self.graph.nodes)}, edges={len(self.graph.edges)})"
)
return (
f"{self.__class__.__name__}("
f"graph={graph_info}, "
f"total_population={self.tot_pop}, "
f"ideal_population={self.ideal_pop}, "
f"epsilon={self.epsilon})"
)
如果有人能够对这里发生的事情提供一些见解,我们将不胜感激。另外,如果您需要更多信息,请告诉我:这是我第一次在这里发帖,所以我可能不知道一些标准,感谢您的耐心配合。谢谢!
我查看了有关
random.random()
如何工作的 python 文档,并尝试研究有关解释器如何处理这种情况的任何信息,但没有走得太远。我还确保这个问题可以在多个 python 版本和多台机器上复制,而且情况似乎确实如此,所以我真的不知道下一步该怎么做。
问题已解决。在这里标记,这样其他人就不需要阅读整个帖子了