让数组
A
中的值从高斯采样
分配。我想将 A
中的每个值替换为 n_R
之一
R
中的“代表”,因此总量化误差为
最小化。
这是进行线性量化的 NumPy 代码:
n_A, n_R = 1_000_000, 256
mu, sig = 500, 250
A = np.random.normal(mu, sig, size = n_A)
lo, hi = np.min(A), np.max(A)
R = np.linspace(lo, hi, n_R)
I = np.round((A - lo) * (n_R - 1) / (hi - lo)).astype(np.uint32)
L = np.mean(np.abs(A - R[I]))
print('Linear loss:', L)
-> Linspace loss: 2.3303939600700603
虽然这有效,但量化误差很大。有没有更聪明的 怎么办?我认为人们可以利用
A
的优势
正态分布或可能使用迭代过程
最小化“损失”函数。
更新在研究这个问题时,我发现了一个关于“加权”量化的相关问题。调整他们的方法有时会给出更好的量化结果:
from scipy.stats import norm
dist = norm(loc = mu, scale = sig)
bounds = dist.cdf([mu - 3*sig, mu + 3*sig])
pp = np.linspace(*bounds, n_R)
R = dist.ppf(pp)
# Find closest matches
lhits = np.clip(np.searchsorted(R, A, 'left'), 0, n_R - 1)
rhits = np.clip(np.searchsorted(R, A, 'right') - 1, 0, n_R - 1)
ldiff = R[lhits] - A
rdiff = A - R[rhits]
I = lhits
idx = np.where(rdiff < ldiff)[0]
I[idx] = rhits[idx]
L = np.mean(np.abs(A - R[I]))
print('Gaussian loss:', L)
-> Gaussian loss: 1.6521974945326285
K-均值聚类可能更好,但似乎太慢了 适用于大型阵列。
K 均值聚类可能更好,但似乎太慢,无法在大型数组上实用。
对于一维聚类情况,有比 K-means 更快的算法。请参阅https://stats.stackexchange.com/questions/40454/define- Different-clusters-of-1d-data-from-database
我选择了其中一种算法,Jenks Natural Breaks,并在数据集的随机子样本上运行它:
A_samp = np.random.choice(A, size=10000)
breaks = np.array(jenkspy.jenks_breaks(A_samp, n_classes=n_R))
R = (breaks[:-1] + breaks[1:]) / 2
这相当快,整个数据集的量化损失约为 1.28。
为了直观地了解每种方法的作用,我绘制了每种方法提出的中断的 cdf 与中断 R 内的索引的关系。
根据定义,高斯是一条直线。这意味着它在分布的每个百分位数上都有相同数量的中断。线性方法在分布的中间花费很少的中断,而在尾部使用大部分中断。 Jenks 在他们两人之间找到了妥协。
看着上面的图表,我有了一个想法:所有这些选择中断的方法在分位数域中绘制时都是各种 S 型曲线。 (如果你认为高斯函数是一个真正延伸的 sigmoid 函数,那么它就有点适合。)
我编写了一个函数,该函数使用单个变量“强度”对每条曲线进行参数化,即 sigmoid 弯曲的速度。一旦我有了这个,我就使用
scipy.optimize.minimize
自动搜索一条使损失最小化的曲线。
事实证明,如果让 Scipy 对此进行优化,它会选择一条非常接近 Jenks 的曲线强度,并且它找到的曲线比 Jenks 稍差,损失约为 1.33。
您可以在这里看到采用这种失败方法的笔记本。
在需要创建 2^16 个不同代表的情况下,使用 Jenks 在计算上是不可行的。但是,您可以做一些非常接近的事情:具有少量类的 Jenks 加上线性插值。
这是代码:
import itertools
def pairwise(iterable):
"s -> (s0, s1), (s1, s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)
def linspace_jenks(A, n_R, jenks_classes, dist_lo, dist_hi):
assert n_R % jenks_classes == 0, "jenks_classes must be divisor of n_R"
simplify_factor = n_R // jenks_classes
assert jenks_classes ** 2 <= len(A), "Need more data to estimate"
breaks = jenkspy.jenks_breaks(A, n_classes=jenks_classes)
# Adjust lowest and highest break to match highest/lowest observed value
breaks[0] = dist_lo
breaks[-1] = dist_hi
linspace_classes = []
for lo, hi in pairwise(breaks):
linspace_classes.append(np.linspace(lo, hi, simplify_factor, endpoint=False))
linspace_classes = np.hstack(linspace_classes)
assert len(linspace_classes) == n_R
return linspace_classes
调用示例:
A_samp = np.random.choice(A, size = 2**16)
jenks_R = linspace_jenks(A_samp, n_R, 128, np.min(A), np.max(A))
与线性方法相比,性能如何?在我的系统上,n_R=2^16 的线性损失为 0.009421。下图显示了
linspace_jenks
方法对 jenks_classes 的每个值得到的损失。
仅使用 32 个 Jenks 类,损失就降至 0.005031,与线性误差相比减少了近 50%。
部分是为了新颖性,主要是为了完整性,我演示了@Homer512 正确地表明是可能的 - MILP 实现。我希望它的准确性非常好,性能介于“差”和“可怕”之间。
我用一个非常小的问题进行演示,这样当你调试和查看约束矩阵时它们就清晰可见,这样我的 RAM 就不会爆炸。
import time
import numpy as np
import scipy.sparse as sp
from numpy.random import default_rng
from scipy.optimize import milp, Bounds, LinearConstraint
n_A, n_R = 10, 4
rand = default_rng(seed=0)
A = rand.normal(loc=100, scale=50, size=n_A)
lo, hi = A.min(), A.max()
A_order = A.argsort() # for use if you want to restore the original order later
# A = A[A_order] # if you want to modify the algorithm to assume ordered input
'''
decision variables:
nA*nR binary assignments (sparse)
nA discretized values (dense)
nR discretization levels (dense) aka. R
nA errors (dense)
'''
c = np.zeros(n_A*n_R + n_A + n_R + n_A)
c[-n_A:] = 1 # minimize errors
integrality = np.ones(n_A*n_R + n_A + n_R + n_A, dtype=np.uint8)
integrality[n_A*n_R:] = 0 # only assignments are integral
lb = np.full_like(c, -np.inf)
ub = np.full_like(c, +np.inf)
lb[:n_A*n_R] = 0 # assignments are binary
ub[:n_A*n_R] = 1
# Big-M magnitude based on range of input data, without assuming signs
M = 2*max(abs(lo), abs(hi))
# I- Each input value must be assigned exactly one discretized value (Kronecker)
exclusive_assignment = LinearConstraint(
A=sp.hstack((
sp.kron(sp.eye(n_A), np.ones(n_R)),
sp.csc_array((n_A, n_A)),
sp.csc_array((n_A, n_R)),
sp.csc_array((n_A, n_A)),
)),
lb=np.ones(n_A), ub=np.ones(n_A),
)
# II- If assigned, discretized output must be <= level (Kronecker)
# discretized_output <= level + (1-assigned)*M
# assigned*M + discretized_output - level <= M
# III- If assigned, discretized output must be >= level
# discretized_output >= level - (1-assigned)*M
# assigned*-M + discretized_output - level >= -M
discrete_sparse_to_level = LinearConstraint(
A=sp.bmat((
(
sp.eye(n_A*n_R) * M,
sp.kron(sp.eye(n_A), np.ones((n_R, 1))),
sp.csc_array(
np.tile(-np.eye(n_R), (n_A, 1))
),
sp.csc_array((n_A*n_R, n_A)),
),
(
sp.eye(n_A*n_R) * -M,
sp.kron(sp.eye(n_A), np.ones((n_R, 1))),
sp.csc_array(
np.tile(-np.eye(n_R), (n_A, 1))
),
sp.csc_array((n_A*n_R, n_A)),
),
), format='csc'),
lb=np.concatenate((np.full(n_A*n_R, -np.inf), np.full(n_A*n_R, -M))),
ub=np.concatenate((np.full(n_A*n_R, M), np.full(n_A*n_R, np.inf))),
)
# IV- error >= output - input (Kronecker)
# A.assign - output + error >= 0
# V- error >= input - output
# -A.assign + output + error >= 0
abs_error = LinearConstraint(
A=sp.bmat((
(
sp.kron(sp.diags(A), np.ones(n_R)),
-sp.eye(n_A),
sp.csc_array((n_A, n_R)),
np.eye(n_A),
),
(
sp.kron(sp.diags(-A), np.ones(n_R)),
sp.eye(n_A),
sp.csc_array((n_A, n_R)),
np.eye(n_A),
),
), format='csc'),
lb=np.concatenate((np.zeros(n_A), np.zeros(n_A))),
ub=np.concatenate((np.full(n_A, np.inf), np.full(n_A, np.inf))),
)
level_order = LinearConstraint(
A=sp.hstack((
sp.csc_array((n_R-1, n_A*n_R)),
sp.csc_array((n_R-1, n_A)),
sp.eye(n_R-1, n_R, k=1) - sp.eye(n_R-1, n_R),
sp.csc_array((n_R-1, n_A)),
)),
lb=np.zeros(n_R-1),
ub=np.full(n_R-1, np.inf),
)
start = time.perf_counter()
res = milp(
c=c, integrality=integrality, bounds=Bounds(lb=lb, ub=ub),
constraints=(
exclusive_assignment,
abs_error,
discrete_sparse_to_level,
# level_order,
),
)
stop = time.perf_counter()
print(f'Completed in {stop - start:.4f} s')
assert res.success
assign, discretized, levels, errors = np.split(
res.x, (n_A*n_R, n_A*n_R + n_A, -n_A)
)
assign = assign.reshape((n_A, n_R))
print(levels)
print(np.stack((A, discretized, errors), axis=1))
print(assign)