我有一些非常大的数据集,分析管道的一部分是确定,如标题所示,每个点是否由
n
维度中的单纯形约束。如果可能的话,我正在尝试找到一种无需并行即可快速计算的方法。障碍之一是数据集的维度各不相同,因此解决方案需要适用于任何维度,而不是固定在 2D 或 3D 等。
但是,为了简单起见,我使用了 2D 示例,因为它们很容易表示,但从理论上讲,数学应该成立。
我最初的想法是使用重心坐标,从笛卡尔坐标系转换而来,正如在这里完成的那样,但事实证明我的这种方法的实现至少可以说是不值得信赖的:
import numpy as np
import matplotlib.pyplot as plt
def is_in_simplex(point, T_inv, simplex):
first_n = np.matmul(
T_inv, (point - simplex[-1])
)
last = 1 - np.sum(first_n)
bary = np.concatenate((first_n, [last]))
return np.all((bary <= 1) & (bary >= 0))
# setup
simplex = np.array([[0, 0,], [8, 8,], [10, 3]])
rng = np.random.default_rng()
test_points = rng.random((10, 2))*10
# Maths starts here
T = np.array(simplex[:-1] - simplex[-1]).T
T_inv = np.linalg.inv(T)
within = np.vectorize(is_in_simplex, excluded={1, 2})(test_points, T_inv, simplex)
# drawing
polygon = np.concatenate([simplex, np.array([simplex[0]])])
print()
plt.plot(*polygon.T)
plt.scatter(*test_points.T)
for i, p in enumerate(test_points, 0):
print(f"{i}\t{p}\t{test_points[i]}\t{within[i]}")
plt.annotate(i, p)
其输出是:
0 [4.15391239 4.85852344] [4.15391239 4.85852344] [ True True]
1 [5.24829898 9.22879891] [5.24829898 9.22879891] [ True False]
2 [3.31255765 0.75891285] [3.31255765 0.75891285] [ True True]
3 [3.67468612 1.30045647] [3.67468612 1.30045647] [ True True]
4 [9.95049042 5.932782 ] [9.95049042 5.932782 ] [False True]
5 [8.42621723 6.35824573] [8.42621723 6.35824573] [False True]
6 [4.19569122 3.41275362] [4.19569122 3.41275362] [ True True]
7 [1.57324033 8.00273677] [1.57324033 8.00273677] [False False]
8 [1.9183791 0.54945207] [1.9183791 0.54945207] [ True True]
9 [0.52448473 7.77920839] [0.52448473 7.77920839] [False True]
第一列是索引,第二列是笛卡尔坐标,第三列应该是前两个重心坐标(应该假设它们加到1),第四列应该显示该点是否位于单纯形内.
您可能已经注意到,有一些问题。点 3、5 和 6 应标记为在单纯形内,但它们的重心坐标完全错误。由于它们受单纯形约束,因此重心坐标应大于 0 但总和为 1。
is_in_simplex()
的输出是一个数组,而每个点应该是单个布尔值。
不包括 RNG、打印和绘图,十个点需要 0.0383 秒,100 个点需要 0.0487,1,000 个点需要 0.0994,10,000 个点需要 0.523。
另一种方法是使用线性编程,但是有些事情发生了,因为我的时间远远大于此处报告的时间(第二个答案,我将其用作此问题的起点)。
import numpy as np
from scipy.optimize import linprog
import time
def vectorizable(point, simplexT, coeffs):
b = np.r_[point, np.ones(1)]
lp = linprog(coeffs, A_eq = simplexT, b_eq = b)
return lp.success
dims = 2
rng = np.random.default_rng()
test_points = rng.random((10, dims))*10
simplex = np.array([[0, 0,], [8, 8,], [10, 3]])
coeffs = np.zeros(len(simplex))
simplex_T = np.r_[simplex.T,np.ones((1,len(simplex)))]
start_time = time.time()
in_simplex = np.vectorize(vectorizable,
excluded={1, 2},
signature="(n) -> ()")(test_points, simplex_T, coeffs)
print(f"----- {time.time() - start_time} seconds -----")
polygon = np.concatenate([simplex, np.array([simplex[0]])])
print()
plt.plot(*polygon.T)
plt.scatter(*test_points.T)
for i, p in enumerate(test_points, 0):
print(f"{i}\t{p}\t{in_simplex[i]}")
plt.annotate(i, p)
这次,我得到了想要的结果:
----- 0.019016504287719727 seconds -----
0 [5.90479358 5.75174668] True
1 [0.51156474 0.86088186] False
2 [9.22371526 4.025967 ] True
3 [9.35307399 5.38630723] False
4 [2.83575442 5.66318545] False
5 [7.89786072 6.06068206] True
6 [0.09838826 1.38358132] False
7 [3.19776368 9.73562359] False
8 [9.9122709 0.76862067] False
9 [4.52352281 6.2259428 ] False
对于 10、100 和 1,000 点,时间或多或少处于同一数量级。然而,当我跳到 10,000 点时,我突然看到 4 到 8 秒之间的任何地方,这太慢了,当我增加维度时,只会增加到几十秒和几分钟。
如前所述,我希望尽可能避免并行化。任何有关重心部分的帮助/建议将不胜感激,特别是如果它可以工作,并且比线性编程方法更快。那么有什么办法可以加速LP方法呢?
谢谢
线性代数方法(我认为这里不需要LP)。使用一个矩阵乘法进行超平面半空间测试,然后进行一些max()和sign()后处理。
您可以通过在任何半空间测试之前执行直线修剪,并在任何一个半空间测试失败时对矩阵乘法进行分区和停止来变得更加聪明。当点的某些重要部分在单纯形之外进行测试时,它的帮助最大。在最极端的情况下,如果单纯形中不包含任何点(尝试半径=1.1),则非分区算法大约需要 0.6 秒,而 50 个分区则需要大约 0.01 秒。
from time import monotonic
import numpy as np
from numpy.random import default_rng
from scipy.spatial import ConvexHull
def make_homogeneous(test_points: np.ndarray) -> np.ndarray:
"""
Pre-process an array of (p, ndim) test points into a homogeneous
transformation matrix of size (ndim+1, p). This only needs to be
done once for a given point cloud.
"""
return np.vstack((
test_points.T,
np.ones(shape=test_points.shape[0], dtype=test_points.dtype),
))
def test_hull(
hull: ConvexHull, test_homogeneous: np.ndarray,
n_partitions: int = 1, trim: bool = True,
) -> np.ndarray:
"""
Vectorized test of whether each test point falls within the simplex.
:param hull: Hull defining the simplex. Number of dimensions (i.e. hull.equations.shape[1]-1)
must be equal to number of dimensions in the test point cloud.
:param test_homogeneous:
Test point cloud, in homogeneous transformation matrix format (from
make_homoegeneous()).
:param n_partitions:
Number of inner product partitions. If the number of points falling inside of the
simplex is high, partitioning will not help and this should be left as 1 (non-
partitioned). If the number of points falling inside of the simplex is low, set
this on the order of ~ 1% of the number of hull faces.
:param trim: Whether to perform a rectilinear trim before any dot products.
:return: A boolean array with length as the number of test points: true for "inside", false for
"outside". Values exactly on the simplex boundary are treated as "true" (inside the
simplex) due to `<= 0` below.
"""
# m: number of hull faces (to be partitioned)
# n: 1 + ndim
# p: number of test points
m, n = hull.equations.shape
n, p = test_homogeneous.shape
partition_size = m // n_partitions
if trim:
extents0 = hull.points.min(axis=0)[:, np.newaxis]
extents1 = hull.points.max(axis=0)[:, np.newaxis]
inside = (
(test_homogeneous[:3, :] >= extents0).all(axis=0) &
(test_homogeneous[:3, :] <= extents1).all(axis=0)
)
test_subset = test_homogeneous[:, inside]
# print(f'Trimmed to {np.count_nonzero(inside)}/{p} points')
else:
inside = np.ones(shape=p, dtype=bool)
test_subset = test_homogeneous
for i in range(0, m, partition_size):
hull_subset = hull.equations[i: i + partition_size, :]
product = hull_subset @ test_subset
inside_subset = product.max(axis=0) <= 0
# inside_subset = (product < 0).all(axis=0) # Equivalent, marginally slower?
inside[inside] = inside_subset
if not inside_subset.any():
break
test_subset = test_subset[:, inside_subset]
return inside
def cube_test() -> None:
"""
Unit test for a cube-shaped hull (2 hull facets per cube side, for 12 facets total)
"""
hull = ConvexHull([
[-1, -1, -1],
[ 1, -1, -1],
[-1, 1, -1],
[ 1, 1, -1],
[-1, -1, 1],
[ 1, -1, 1],
[-1, 1, 1],
[ 1, 1, 1],
])
in_points = np.array([[ 0. , 0. , 0. ],
[ 0.9, 0. , 0.2],
[-0.5, -0.2, -0.3],
[-0.9, 0.4, 0.6],
[ 0.1, 0.1, 0.1]])
bound_points = np.array([[ 1. , 1. , 1. ],
[-1. , -1. , -1. ],
[ 0.5, 0. , 1. ],
[ 1. , 1. , 0. ],
[ 0. , 0. , 1. ]])
out_points = np.array([[ 2. , 0. , 0. ],
[ 1. , 1. , 1.5],
[ 0. , 0. , -2. ],
[ 0. , 1.1, 0. ],
[-1.1, 0. , 1.2]])
assert np.all(test_hull(hull, make_homogeneous(in_points)))
assert np.all(test_hull(hull, make_homogeneous(bound_points)))
assert not np.any(test_hull(hull, make_homogeneous(out_points)))
assert np.all(test_hull(hull, make_homogeneous(in_points), n_partitions=4))
assert np.all(test_hull(hull, make_homogeneous(bound_points), n_partitions=4))
assert not np.any(test_hull(hull, make_homogeneous(out_points), n_partitions=4))
def random_hemisphere(
rand, n: int, radius: float = 1,
centre: tuple[float, float, float] = (0, 0, 0),
theta_limit=np.pi/2,
) -> np.ndarray:
"""
Generate a 3D hemisphere with randomly-distributed vertices. The "cut" face of the hemisphere is
not guaranteed to be exact when processed as a convex hull.
"""
phi = rand.uniform(low=0, high=2*np.pi, size=n)
theta = rand.uniform(low=0, high=theta_limit, size=n)
cx = np.sin(theta)*np.cos(phi)
cy = np.sin(theta)*np.sin(phi)
cz = np.cos(theta)
return radius*np.stack((cx, cy, cz), axis=1) + centre
def hemisphere_test() -> None:
"""
Unit test for a hemisphere-shaped ("bowl") convex hull.
"""
rand = default_rng(seed=0)
centre = 5, 10, 12 # not the barycentre: only the centre of the "cut" face
hull = ConvexHull(random_hemisphere(rand, n=100, radius=2, centre=centre))
test_in = make_homogeneous(random_hemisphere(rand, n=150, radius=1.5, centre=centre, theta_limit=np.pi*0.4))
indicators = test_hull(hull, test_in)
assert np.all(indicators)
test_close = make_homogeneous(random_hemisphere(rand, n=150, radius=1.975, centre=centre, theta_limit=np.pi*0.45))
indicators = test_hull(hull, test_close)
mean = indicators.mean()
assert 0.48 <= mean <= 0.52 # 0.5067 for this seed
test_out = make_homogeneous(random_hemisphere(rand, n=150, radius=2.1, centre=centre))
indicators = test_hull(hull, test_out)
assert not np.any(indicators)
def time_test() -> None:
"""
Output timings for non-partitioned and partitioned configurations of a hemisphere hull test
"""
rand = default_rng(seed=0)
n = 10_000
hull_pts = random_hemisphere(rand, n=n)
test_pts = random_hemisphere(rand, n=n, radius=0.9999)
t0 = monotonic()
homogeneous = make_homogeneous(test_pts)
t1 = monotonic()
hull = ConvexHull(hull_pts)
t2 = monotonic()
print('n =', n)
print(f'make_homogeneous: {t1-t0:.4f} s')
print(f'ConvexHull: {t2-t1:.4f} s')
t3 = monotonic()
indicators = test_hull(hull, homogeneous)
t4 = monotonic()
print(f'test_hull(part= 1): {t4-t3:.4f} s, {np.count_nonzero(indicators)} inside')
for part in (5, 10, 20, 50, 100, 200, 500, 1_000):
t5 = monotonic()
indicators = test_hull(hull, homogeneous, n_partitions=part)
t6 = monotonic()
print(f'test_hull(part={part:4}): {t6-t5:.4f} s, {np.count_nonzero(indicators)} inside')
if __name__ == '__main__':
cube_test()
hemisphere_test()
time_test()
n = 10000
make_homogeneous: 0.0000 s
ConvexHull: 0.0310 s
test_hull(part= 1): 0.8910 s, 3632 inside
test_hull(part= 5): 0.5780 s, 3632 inside
test_hull(part= 10): 0.4380 s, 3632 inside
test_hull(part= 20): 0.4060 s, 3632 inside
test_hull(part= 50): 0.3750 s, 3632 inside
test_hull(part= 100): 0.3910 s, 3632 inside
test_hull(part= 200): 0.3750 s, 3632 inside
test_hull(part= 500): 0.4060 s, 3632 inside
test_hull(part=1000): 0.4690 s, 3632 inside
如果将单纯形定义为三行系数 a、b、c 的数组(其中 ax+by+c=0),则单纯形示例将变为:
>>> simplex = np.array([(1, -1, 0), (-5, -2, 56), (-3, 10, 0)])
如果用齐次坐标表示测试点:
>>> test_points = np.array([[5.90479358, 5.75174668, 1],
[0.51156474, 0.86088186, 1],
[9.22371526, 4.025967 , 1],
[9.35307399, 5.38630723, 1],
[2.83575442, 5.66318545, 1],
[7.89786072, 6.06068206, 1],
[0.09838826, 1.38358132, 1],
[3.19776368, 9.73562359, 1],
[9.9122709, 0.76862067, 1],
[4.52352281, 6.2259428 , 1]])
然后以下表达式确定哪些点位于内部:
>>> (test_points @ simplex.T > 0).all(axis=1)
array([ True, False, True, False, False, True, False, False, False,
False])
请注意,您的单纯形示例是按顺时针方向定义的,因此测试为
> 0
。对于逆时针单纯形,测试应为 < 0
。
还需要考虑边界上的点是否在内部,考虑
>=
或<=
。