np.repeat inplace 解决方案以避免内存重新分配

问题描述 投票:0回答:1

如何将

np.repeat
(或等效函数)输出分配给现有内存? (许多 numpy 函数都有一个“out=”参数来执行此操作)。这种重新分配将非常有用,因为我有一个(非常大)先前创建的与新数组大小相同的数组,我在程序的其余部分中不再使用该数组。重用它在内存中的位置可以让我避免大量的内存重新分配。

我已经尝试过这个,但它并没有加速分配。

diff_da1[:,:,:] = np.repeat(np.linalg.norm(diffs, axis=2)[:, :, None], repeats=d, axis=2)
dist_point_vertex = diff_da1

这个广播技巧稍微加速了:

diff_da1[:,:,:] = np.linalg.norm(diffs, axis=2)[:,:,None]
dist_point_vertex = diff_da1

距离快还很远。


更新:可重现的代码示例

import numpy as np


class Map:

    def __init__(self, polygon):

        self._polygon = polygon

        # auxiliar prealloc
        self._extended_polygon = None
        self._polygon_angles = None


    def preallocate_memory(self, n: int, d: int) -> None:

        """
        Preallocates memory to accelerate directed_meassurements computation

        Args:
            n: int
                number of points
            d: int
                number of directions for each point

        Returns:
            None
        """

        sides  = np.roll(polygon, -1, axis=0) - polygon
        sigmas = ((np.arctan2(sides[:, 0], - sides[:, 1]) - (np.pi/2)) % (2*np.pi))[None,:,None]
        sigmas = np.repeat(sigmas, n, axis=0)
        sigmas = np.repeat(sigmas, d, axis=2)

        self._extended_polygon = np.repeat(self._polygon[None, :, :], n, axis=0)
        self._polygon_angles = sigmas

        return None


    def directed_meassurements(
            self, points: np.array, directions: np.array
        ) -> np.array:

        """
        Calculates distances from interior points to a polygon in specified directions.

        Args:
            points: np.array (n,2)
                array of interior points
            directions: np.array (n,d) 
                array of angles (bounded between 0 and 2pi)

        Returns:
            array of distances for each point and direction (shape: [n, d])
        """

        TWO_PI = 2 * np.pi
        ONE_PI = np.pi
        MID_PI = np.pi / 2
        INFNTY = np.inf

        def fast_module(arg: np.array, mod: np.array) -> np.array:
            aux = np.divide(arg, mod)
            aux = np.floor(aux, out=aux)
            aux = aux.__imul__(mod)
            arg = arg.__isub__(aux)
            return arg

        # Calculate vectors from interest points to polygon vertices
        diffs = self._extended_polygon.__sub__(points[:, None, :])
        vertix_angles = np.arctan2(diffs[:, :, 0], -diffs[:, :, 1])
        vertix_angles = vertix_angles.__isub__(MID_PI)

        # Calculate angles between differences and directions
        alphas = directions[:, None, :]
        deltas = vertix_angles[:, :, None]

        # Determine whether each delta direction of each point looks at each side
        shape = (*deltas.shape[:2], alphas.shape[2])
        deltas_ = np.broadcast_to(deltas, shape=shape)
        alphas_ = np.broadcast_to(alphas, shape=shape)
        diff_da1 = deltas_.__sub__(alphas_)
        diff_da2 = np.roll(diff_da1, -1, axis=1)
        diff_da1 = fast_module(diff_da1, TWO_PI)
        diff_da2 = fast_module(diff_da2, TWO_PI)
        mask = np.absolute(diff_da1.__isub__(diff_da2), out=diff_da1).__ge__(ONE_PI)

        # Calculate distances where intersections occur
        diff_da1[:,:,:] = np.linalg.norm(diffs, axis=2)[:,:,None]
        dist_point_vertex = diff_da1

        # Translate the angles to set as angle zero one perpendicular to the polygon side
        sigmas_mid_pi = np.full_like(self._polygon_angles, MID_PI)
        sigmas_mid_pi[mask] = sigmas_mid_pi[mask].__isub__(self._polygon_angles[mask])
        deltas_orth_ref = deltas.__add__(sigmas_mid_pi)
        alphas_orth_ref = sigmas_mid_pi.__iadd__(alphas)

        # Adjust distances
        np.copyto(dist_point_vertex, INFNTY, where=(~mask))
        np.cos(deltas_orth_ref, out=deltas_orth_ref, where=mask)
        np.cos(alphas_orth_ref, out=alphas_orth_ref, where=mask)
        np.multiply(dist_point_vertex, deltas_orth_ref, out=dist_point_vertex, where=mask)
        np.divide(dist_point_vertex, alphas_orth_ref, out=dist_point_vertex, where=mask)

        return np.min(dist_point_vertex, axis=1, initial=INFNTY, where=mask)


if __name__ == '__main__':

    # create points
    points = np.random.rand(2000,2)

    # create polygon
    polygon = np.array([[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]])
    side_points = 5000
    result = np.empty((0, 2), float)
    for i in range(len(polygon) - 1):
        x_vals = np.linspace(polygon[i][0], polygon[i+1][0], side_points)
        y_vals = np.linspace(polygon[i][1], polygon[i+1][1], side_points)
        result = np.append(result, np.stack((x_vals, y_vals), axis=-1), axis=0)
    polygon = result

    # create directions
    directions = (np.repeat(np.array([0, np.pi/2, np.pi, 3*np.pi/2])[None,:], points.shape[0], axis=0) + \
        np.random.rand(points.shape[0])[:,None] * 2*np.pi) % 2*np.pi

    # Run the meassurement method
    my_map = Map(polygon)
    my_map.preallocate_memory(points.shape[0], 4)
    my_map.directed_meassurements(points, directions)
python python-3.x numpy performance memory-reallocation
1个回答
0
投票

当您执行广播作业时,会发生两件事:

  1. 标量被包装在具有正确尺寸但没有步幅的数组视图中,这很便宜
  2. 视图被复制到输出缓冲区的每个元素中,这可能会很昂贵

您可以尝试通过手动创建广播包装器并将其直接分配给您的输出来绕过第二部分。你最终会承担成本,但不是预先的:

dist_point_vertex = np.broadcast_to(np.linalg.norm(diffs, axis=2)[:, :, None], dist_point_vertex.shape)

在这种情况下,完全摆脱中间缓冲区

diff_da1
,并记住形状可能必须来自
dist_point_vertex.shape
以外的其他地方。

np.broadcast_to
的另一个选项是
np.lib.stride_tricks.as_strided

© www.soinside.com 2019 - 2024. All rights reserved.