在 Python 和 NumPy 中量化正态分布浮点数

问题描述 投票:0回答:2

让数组

A
中的值从高斯采样 分配。我想将
A
中的每个值替换为
n_R
之一
R
中的“代表”,因此总量化误差为 最小化。

这是进行线性量化的 NumPy 代码:

n_A, n_R = 1_000_000, 256
mu, sig = 500, 250
A = np.random.normal(mu, sig, size = n_A)
lo, hi = np.min(A), np.max(A)
R = np.linspace(lo, hi, n_R)
I = np.round((A - lo) * (n_R - 1) / (hi - lo)).astype(np.uint32)

L = np.mean(np.abs(A - R[I]))
print('Linear loss:', L)
-> Linspace loss: 2.3303939600700603

虽然这有效,但量化误差很大。有没有更聪明的 怎么办?我认为人们可以利用

A
的优势 正态分布或可能使用迭代过程 最小化“损失”函数。

更新在研究这个问题时,我发现了一个关于“加权”量化的相关问题。调整他们的方法有时会给出更好的量化结果:

from scipy.stats import norm

dist = norm(loc = mu, scale = sig)
bounds = dist.cdf([mu - 3*sig, mu + 3*sig])
pp = np.linspace(*bounds, n_R)
R = dist.ppf(pp)

# Find closest matches
lhits = np.clip(np.searchsorted(R, A, 'left'), 0, n_R - 1)
rhits = np.clip(np.searchsorted(R, A, 'right') - 1, 0, n_R - 1)

ldiff = R[lhits] - A
rdiff = A - R[rhits]
I = lhits
idx = np.where(rdiff < ldiff)[0]
I[idx] = rhits[idx]

L = np.mean(np.abs(A - R[I]))
print('Gaussian loss:', L)
-> Gaussian loss: 1.6521974945326285

K-均值聚类可能更好,但似乎太慢了 适用于大型阵列。

python numpy floating-point k-means quantization
2个回答
3
投票

K 均值聚类可能更好,但似乎太慢,无法在大型数组上实用。

对于一维聚类情况,有比 K-means 更快的算法。请参阅https://stats.stackexchange.com/questions/40454/define- Different-clusters-of-1d-data-from-database

我选择了其中一种算法,Jenks Natural Breaks,并在数据集的随机子样本上运行它:

A_samp = np.random.choice(A, size=10000)
breaks = np.array(jenkspy.jenks_breaks(A_samp, n_classes=n_R))
R = (breaks[:-1] + breaks[1:]) / 2

这相当快,整个数据集的量化损失约为 1.28。

为了直观地了解每种方法的作用,我绘制了每种方法提出的中断的 cdf 与中断 R 内的索引的关系。

根据定义,高斯是一条直线。这意味着它在分布的每个百分位数上都有相同数量的中断。线性方法在分布的中间花费很少的中断,而在尾部使用大部分中断。 Jenks 在他们两人之间找到了妥协。

自动寻找更低的损失

看着上面的图表,我有了一个想法:所有这些选择中断的方法在分位数域中绘制时都是各种 S 型曲线。 (如果你认为高斯函数是一个真正延伸的 sigmoid 函数,那么它就有点适合。)

我编写了一个函数,该函数使用单个变量“强度”对每条曲线进行参数化,即 sigmoid 弯曲的速度。一旦我有了这个,我就使用

scipy.optimize.minimize
自动搜索一条使损失最小化的曲线。

事实证明,如果让 Scipy 对此进行优化,它会选择一条非常接近 Jenks 的曲线强度,并且它找到的曲线比 Jenks 稍差,损失约为 1.33。

您可以在这里看到采用这种失败方法的笔记本。

使用 2^16 浮点数进行量化

在需要创建 2^16 个不同代表的情况下,使用 Jenks 在计算上是不可行的。但是,您可以做一些非常接近的事情:具有少量类的 Jenks 加上线性插值。

这是代码:

import itertools


def pairwise(iterable):
    "s -> (s0, s1), (s1, s2), (s2, s3), ..."
    a, b = itertools.tee(iterable)
    next(b, None)
    return zip(a, b)


def linspace_jenks(A, n_R, jenks_classes, dist_lo, dist_hi):
    assert n_R % jenks_classes == 0, "jenks_classes must be divisor of n_R"
    simplify_factor = n_R // jenks_classes
    assert jenks_classes ** 2 <= len(A), "Need more data to estimate"
    breaks = jenkspy.jenks_breaks(A, n_classes=jenks_classes)
    # Adjust lowest and highest break to match highest/lowest observed value
    breaks[0] = dist_lo
    breaks[-1] = dist_hi
    linspace_classes = []
    for lo, hi in pairwise(breaks):
        linspace_classes.append(np.linspace(lo, hi, simplify_factor, endpoint=False))
    linspace_classes = np.hstack(linspace_classes)
    assert len(linspace_classes) == n_R
    return linspace_classes

调用示例:

A_samp = np.random.choice(A, size = 2**16)
jenks_R = linspace_jenks(A_samp, n_R, 128, np.min(A), np.max(A))

与线性方法相比,性能如何?在我的系统上,n_R=2^16 的线性损失为 0.009421。下图显示了

linspace_jenks
方法对 jenks_classes 的每个值得到的损失。

仅使用 32 个 Jenks 类,损失就降至 0.005031,与线性误差相比减少了近 50%。


2
投票

部分是为了新颖性,主要是为了完整性,我演示了@Homer512 正确地表明是可能的 - MILP 实现。我希望它的准确性非常好,性能介于“差”和“可怕”之间。

我用一个非常小的问题进行演示,这样当你调试和查看约束矩阵时它们就清晰可见,这样我的 RAM 就不会爆炸。

import time

import numpy as np
import scipy.sparse as sp
from numpy.random import default_rng
from scipy.optimize import milp, Bounds, LinearConstraint

n_A, n_R = 10, 4
rand = default_rng(seed=0)
A = rand.normal(loc=100, scale=50, size=n_A)
lo, hi = A.min(), A.max()

A_order = A.argsort()  # for use if you want to restore the original order later
# A = A[A_order]       # if you want to modify the algorithm to assume ordered input

'''
decision variables:
    nA*nR binary assignments (sparse)
    nA discretized values (dense)
    nR discretization levels (dense) aka. R
    nA errors (dense)
'''
c = np.zeros(n_A*n_R + n_A + n_R + n_A)
c[-n_A:] = 1  # minimize errors

integrality = np.ones(n_A*n_R + n_A + n_R + n_A, dtype=np.uint8)
integrality[n_A*n_R:] = 0  # only assignments are integral

lb = np.full_like(c, -np.inf)
ub = np.full_like(c, +np.inf)
lb[:n_A*n_R] = 0  # assignments are binary
ub[:n_A*n_R] = 1

# Big-M magnitude based on range of input data, without assuming signs
M = 2*max(abs(lo), abs(hi))

# I- Each input value must be assigned exactly one discretized value (Kronecker)
exclusive_assignment = LinearConstraint(
    A=sp.hstack((
        sp.kron(sp.eye(n_A), np.ones(n_R)),
        sp.csc_array((n_A, n_A)),
        sp.csc_array((n_A, n_R)),
        sp.csc_array((n_A, n_A)),
    )),
    lb=np.ones(n_A), ub=np.ones(n_A),
)

# II- If assigned, discretized output must be <= level (Kronecker)
# discretized_output <= level + (1-assigned)*M
# assigned*M + discretized_output - level <= M
# III- If assigned, discretized output must be >= level
# discretized_output >= level - (1-assigned)*M
# assigned*-M + discretized_output - level >= -M
discrete_sparse_to_level = LinearConstraint(
    A=sp.bmat((
        (
            sp.eye(n_A*n_R) * M,
            sp.kron(sp.eye(n_A), np.ones((n_R, 1))),
            sp.csc_array(
                np.tile(-np.eye(n_R), (n_A, 1))
            ),
            sp.csc_array((n_A*n_R, n_A)),
        ),
        (
            sp.eye(n_A*n_R) * -M,
            sp.kron(sp.eye(n_A), np.ones((n_R, 1))),
            sp.csc_array(
                np.tile(-np.eye(n_R), (n_A, 1))
            ),
            sp.csc_array((n_A*n_R, n_A)),
        ),
    ), format='csc'),
    lb=np.concatenate((np.full(n_A*n_R, -np.inf), np.full(n_A*n_R, -M))),
    ub=np.concatenate((np.full(n_A*n_R, M),       np.full(n_A*n_R, np.inf))),
)

# IV- error >= output - input (Kronecker)
# A.assign - output + error >= 0
# V- error >= input - output
# -A.assign + output + error >= 0
abs_error = LinearConstraint(
    A=sp.bmat((
        (
            sp.kron(sp.diags(A), np.ones(n_R)),
            -sp.eye(n_A),
            sp.csc_array((n_A, n_R)),
            np.eye(n_A),
        ),
        (
            sp.kron(sp.diags(-A), np.ones(n_R)),
            sp.eye(n_A),
            sp.csc_array((n_A, n_R)),
            np.eye(n_A),
        ),
    ), format='csc'),
    lb=np.concatenate((np.zeros(n_A),        np.zeros(n_A))),
    ub=np.concatenate((np.full(n_A, np.inf), np.full(n_A, np.inf))),
)

level_order = LinearConstraint(
    A=sp.hstack((
        sp.csc_array((n_R-1, n_A*n_R)),
        sp.csc_array((n_R-1, n_A)),
        sp.eye(n_R-1, n_R, k=1) - sp.eye(n_R-1, n_R),
        sp.csc_array((n_R-1, n_A)),
    )),
    lb=np.zeros(n_R-1),
    ub=np.full(n_R-1, np.inf),
)

start = time.perf_counter()
res = milp(
    c=c, integrality=integrality, bounds=Bounds(lb=lb, ub=ub),
    constraints=(
        exclusive_assignment,
        abs_error,
        discrete_sparse_to_level,
        # level_order,
    ),
)
stop = time.perf_counter()
print(f'Completed in {stop - start:.4f} s')
assert res.success

assign, discretized, levels, errors = np.split(
    res.x, (n_A*n_R, n_A*n_R + n_A, -n_A)
)
assign = assign.reshape((n_A, n_R))
print(levels)
print(np.stack((A, discretized, errors), axis=1))
print(assign)
© www.soinside.com 2019 - 2024. All rights reserved.