为什么我在并行化 K-means 期间没有获得任何收益?

问题描述 投票:0回答:0

我正在使用 Simon Marlow 的书学习 Haskell 中的并行编程。我在第 3 章。我不明白为什么在带有 -N6 标志的 6 核机器上启动这段代码不会改变性能。我遇到了similar problem然后才意识到向进程添加更多数据会使并行执行和单线程执行之间的差异更加显着。但在那种情况下,即使添加更多数据也不会造成执行时间差异。在我的练习中,我试图找到世界城市的质心。为了给它更多的数据,我创建了一些“假”城市。

这是城市文件的前五行:

世界城市.csv

“城市”,“city_ascii”,“lat”,“lng”,“国家”,“iso2”,“iso3”,“admin_name”,“首都”,“人口”,“id” "Tokyo","Tokyo","35.6839","139.7744","Japan","JP","JPN","Tōkyō","primary","39105000","1392685764" "Jakarta","Jakarta" "-6.2146","106.8451","印度尼西亚","ID","IDN","雅加达","主要","35362000","1360771077" “德里”,“德里”,“28.6667”,“77.2167”,“印度”,“IN”,“IND”,“德里”,“admin”,“31870000”,“1356872604” “马尼拉”,“马尼拉”,“14.6000”,“120.9833”,“菲律宾”,“PH”,“PHL”,“马尼拉”,“小学”,“23971000”,“1608618140”

这是我用来添加更多“城市”的代码:

main :: IO ()
main = do
    [c] <- getArgs
    cities <- getCities c
    let target = "wc_extended.csv"
    BS.writeFile target (encode [("city" :: BS.ByteString, "lat" :: BS.ByteString, "lng" :: BS.ByteString)])
    forM_ cities $ \(City name (Point lat lng)) -> BS.appendFile target $ encode [
            (name,lat,lng)
        ,   ("A" <> name,lat-10,lng+10)
        ,   ("B" <> name,lat-20,lng+20)
        ,   ("C" <> name,lat-30,lng+30)
        ,   ("D" <> name,lat-40,lng+40)
        ,   ("E" <> name,lat-50,lng+50)
        ]

这是我的 K-means 聚类程序的源文件。

类型.hs

module Types where

import Data.ByteString (ByteString)
import System.Random
import System.Random.Stateful

data Point = Point {
        lat :: !Double
    ,   lng :: !Double
} deriving (Eq,Show)

instance Uniform Point where
    uniformM g = do
                    lat <- uniformRM (-180, 180) g
                    lng <- uniformRM (-180, 180) g
                    return $ Point lat lng

instance Semigroup Point where
    (Point lat lng) <> (Point lat' lng') = Point (lat + lat') (lng + lng')

instance Monoid Point where
    mempty = Point 0 0

sqDistance :: Point -> Point -> Double
sqDistance (Point lat lng) (Point lat' lng') = (lat-lat')^2 + (lng-lng')^2 

data City = City {
        name :: ByteString
    ,   location :: Point
} deriving Show

data Cluster = Cluster {
        cId :: Int
    ,   center :: Point
} deriving (Eq, Show)

data PointSum = PointSum !Int !Point

instance Semigroup PointSum where
    (PointSum c p) <> (PointSum c' p') = PointSum (c+c') (p <> p')

instance Monoid PointSum where
    mempty = PointSum 0 mempty

addToPointSum :: Point -> PointSum -> PointSum
addToPointSum point' (PointSum count point) = PointSum (count+1) $ point <> point'

pointSumToCluster :: Int -> PointSum -> Cluster
pointSumToCluster i (PointSum count (Point lat lng)) = Cluster {
      cId = i
    , center = Point (lat / fromIntegral count) (lng / fromIntegral count)
}

CitiesLoader.hs

{-# LANGUAGE OverloadedStrings #-}
module CitiesLoader where

import Data.Attoparsec.ByteString
import Data.Csv
import Data.Vector (Vector)
import qualified Data.Vector as V 
import qualified Data.ByteString as BS
import qualified Data.ByteString.UTF8 as UTF8
import Data.Csv.Parser (csvWithHeader)
import Data.HashMap.Strict ( (!) )

import Types

getCSV :: FilePath -> IO (Vector NamedRecord)
getCSV path = do
    raw <- BS.readFile path
    case parseOnly (csvWithHeader defaultDecodeOptions) raw of
        Left error -> do
            putStrLn $ "Error during parsing: " <> error <> ", returned empty result"
            return mempty
        Right (_, values) -> return values

extractCities :: Vector NamedRecord -> Vector City
extractCities = fmap f
                where f vmap = City (vmap ! "city") $ Point ((read . UTF8.toString) $ vmap ! "lat") ((read . UTF8.toString) $ vmap ! "lng")

getCities :: FilePath -> IO (Vector City)
getCities = (fmap . fmap) extractCities getCSV

聚类.hs

module Clustering where

import Types
import Data.Vector (Vector)
import qualified Data.Vector as V
import qualified Data.Vector.Mutable as M
import Data.Function (on)
import Data.List (minimumBy)
import Control.Monad.Trans.Except
import Control.Parallel.Strategies

assign :: Int -> [Cluster] -> Vector City -> Vector PointSum
assign n clusters points = V.create $ do
    vec <- M.replicate n mempty
    let addpoint (City _ p) = M.modify vec (addToPointSum p) (cId $ nearest p)
    V.mapM_ addpoint points
    return vec
    where nearest p = fst $ minimumBy (compare `on` snd) [(c, sqDistance p (center c)) | c <- clusters]

makeNewClusters :: Vector PointSum -> [Cluster]
makeNewClusters vec = [pointSumToCluster i ps | (i,ps@(PointSum count _)) <- zip [0..] (V.toList vec), count > 0]

step :: Int -> Vector City -> [Cluster] -> [Cluster]
step n cities clusters = makeNewClusters $ assign n clusters cities

kmeansSeq :: Int -> Vector City -> [Cluster] -> Except String [Cluster]
kmeansSeq limit cities clusters = loop 0 clusters
                            where loop n c | n > limit = throwE "reached loop limit"
                                  loop n c = let c' = step nClusters cities c
                                                     in if c' == c
                                                            then return c'
                                                            else loop (n+1) c'
                                  nClusters = length clusters

split :: Int -> Vector a -> [Vector a]
split numChunks xs = chunk (V.length xs `quot` numChunks) xs

chunk :: Int -> Vector a -> [Vector a]
chunk n xs | V.null xs = []
chunk n xs = as : chunk n bs
    where (as, bs) = V.splitAt n xs

combine :: Vector PointSum -> Vector PointSum -> Vector PointSum
combine = V.zipWith (<>)

parStepsStrat :: Int -> [Vector City] -> [Cluster] -> [Cluster]
parStepsStrat n pointss clusters = makeNewClusters $ foldr1 combine (map (assign n clusters) pointss `using` parList rseq)

kMeansStrat :: Int -> Int -> Vector City -> [Cluster] -> Except String [Cluster]
kMeansStrat limit numChunks points clusters = loop 0 clusters
                                        where loop n clusters | n > limit = throwE "reached loop limit"
                                              loop n clusters = let c' = parStepsStrat nClusters chunks clusters
                                                                         in if c' == clusters
                                                                            then return c'
                                                                            else loop (n+1) c'
                                              chunks = split numChunks points
                                              nClusters = length clusters

Main.hs

{-# LANGUAGE OverloadedStrings#-}

module Main where
import System.Environment (getArgs)
import CitiesLoader (getCities)
import System.Random
import System.Random.Stateful (newIOGenM, uniformListM)
import Types
import Clustering
import Control.Monad.Trans.Except (runExcept)
import Data.Vector(forM_)
import Data.Csv(encode)
import qualified Data.ByteString.Lazy as BS

main :: IO ()
main = do
    [c] <- getArgs
    cities <- getCities c
    print (length cities)
    let seed = mkStdGen $ length cities
    g <- newIOGenM seed
    centroids <- uniformListM 1000 g
    let clusters = zipWith Cluster [0..] centroids
    case runExcept (kMeansStrat 10000 6 cities clusters) of
        Left err -> putStrLn err
        Right c -> print c

当我在包含 257431 条记录的文件上运行时,我得到了以下执行时间:

cabal exec kcities -- wc_extended.csv +RTS -N1 -s -l

总时间 266.631s(经过 266.351s)

和线程范围配置文件

cabal exec kcities -- wc_extended.csv +RTS -N6 -s -l

总时间 1737.342s(经过 340.016s)(执行时间甚至增加)

和线程范围配置文件

performance haskell parallel-processing k-means
© www.soinside.com 2019 - 2024. All rights reserved.