Browse Source

threadpool2 finished

threadPool
Andrea Bellandi 9 years ago
parent
commit
dc38648378
13 changed files with 325 additions and 165 deletions
  1. +2
    -2
      icfp2015.cabal
  2. +7
    -1
      src/Datatypes/Game.hs
  3. +0
    -1
      src/JSONDeser.hs
  4. +20
    -48
      src/Main.hs
  5. +56
    -0
      src/MonadParParallelization.hs
  6. +10
    -7
      src/Opt.hs
  7. +51
    -0
      src/ParMapParallelization.hs
  8. +39
    -0
      src/ParallelBenchmark.hs
  9. +7
    -2
      src/Strategy0.hs
  10. +12
    -21
      src/StrategyManager.hs
  11. +7
    -0
      src/TestUtils.hs
  12. +60
    -83
      src/ThreadPoolTimed.hs
  13. +54
    -0
      src/ThreadPoolTimed2.hs

+ 2
- 2
icfp2015.cabal View File

@ -60,7 +60,7 @@ executable icfp2015
other-extensions: OverloadedStrings, DeriveGeneric, DeriveDataTypeable other-extensions: OverloadedStrings, DeriveGeneric, DeriveDataTypeable
-- Other library packages from which modules are imported. -- Other library packages from which modules are imported.
build-depends: base >=4.6 && <4.9, hashable >=1.2 && <1.3, containers >=0.5 && <0.6, QuickCheck >=2.7 && <2.9, bytestring >=0.10 && <0.11, aeson, pqueue >=1.3 && <1.4, clock, random, deepseq >= 1.3 && <1.4
build-depends: base >=4.6 && <4.9, hashable >=1.2 && <1.3, containers >=0.5 && <0.6, QuickCheck >=2.7 && <2.9, bytestring >=0.10 && <0.11, aeson, pqueue >=1.3 && <1.4, random, deepseq >= 1.3 && <1.4, stm
-- Directories containing source files. -- Directories containing source files.
hs-source-dirs: src hs-source-dirs: src
@ -68,5 +68,5 @@ executable icfp2015
-- Base language which the package is written in. -- Base language which the package is written in.
default-language: Haskell2010 default-language: Haskell2010
ghc-options: -O3 -threaded -rtsopts
ghc-options: -O2 -threaded -rtsopts

+ 7
- 1
src/Datatypes/Game.hs View File

@ -1,5 +1,8 @@
{-# LANGUAGE DeriveGeneric #-}
module Datatypes.Game (Game(..), Command(..), isCompleted, new, notifyCommand, powerCounterToScore, powerPhrasesAsCommands, commandsToString,stringToCommands) where -- FIXME exports module Datatypes.Game (Game(..), Command(..), isCompleted, new, notifyCommand, powerCounterToScore, powerPhrasesAsCommands, commandsToString,stringToCommands) where -- FIXME exports
import GHC.Generics
import Control.DeepSeq
import Data.Hashable (hash) import Data.Hashable (hash)
import qualified Data.List as List import qualified Data.List as List
import Data.Map.Strict (Map) import Data.Map.Strict (Map)
@ -17,7 +20,10 @@ data Command = MoveW
| MoveSW | MoveSW
| RotateClockwise | RotateClockwise
| RotateCounterclockwise | RotateCounterclockwise
deriving (Show,Eq,Ord)
deriving (Show,Eq,Ord,Generic)
instance NFData Command where
rnf a = seq a ()
type UnitHash = Int type UnitHash = Int


+ 0
- 1
src/JSONDeser.hs View File

@ -10,7 +10,6 @@ import qualified Datatypes as DT
import qualified Datatypes.Game as DT.Game import qualified Datatypes.Game as DT.Game
import LCG import LCG
data Cell = Cell { x :: Int, y :: Int} data Cell = Cell { x :: Int, y :: Int}
deriving (Show, Generic) deriving (Show, Generic)


+ 20
- 48
src/Main.hs View File

@ -5,12 +5,9 @@
module Main where module Main where
import Control.DeepSeq (deepseq, NFData(..))
import Data.Int
import qualified Data.ByteString.Lazy.Char8 as BS import qualified Data.ByteString.Lazy.Char8 as BS
import System.Environment import System.Environment
import System.Random import System.Random
import System.Clock
import GHC.Generics import GHC.Generics
import Data.Aeson import Data.Aeson
@ -20,7 +17,7 @@ import Datatypes.Game(Game,Command,commandsToString,stringToCommands)
--import VM --import VM
import Opt import Opt
import JSONDeser(readInput) import JSONDeser(readInput)
import ThreadPoolTimed
strategyTag :: String strategyTag :: String
strategyTag = "lilik0" strategyTag = "lilik0"
@ -35,12 +32,11 @@ memLimitRatio :: Double
memLimitRatio = 1.0 memLimitRatio = 1.0
computationsPerStep :: Int computationsPerStep :: Int
computationsPerStep = 10
computationsPerStep = 100
data JSONSer = JSONSer { problemId :: Int,
problemSeed :: Int,
problemTag :: String,
data JSONSer = JSONSer { id :: Int,
seed :: Int,
tag :: String,
problemSolution :: String problemSolution :: String
} deriving (Show, Generic) } deriving (Show, Generic)
@ -63,30 +59,30 @@ strategies g sgen cmd = [MkStrategyWrapper (initst g sgen cmd :: Strategy0)]
-- MkStrategyWrapper (init g sgen cmd :: Strat2)] -- MkStrategyWrapper (init g sgen cmd :: Strat2)]
main :: IO () main :: IO ()
main = do initTime <- secTime
args <- getArgs
main = do args <- getArgs
opt <- parseArgs args opt <- parseArgs args
let files = optFile opt let files = optFile opt
let maxTime = optTime opt let maxTime = optTime opt
let maxMem = optMem opt
let powerPhrases = optPowerPhrase opt
-- let maxMem = optMem opt
let powerPhrases = optPowerPhrase opt
let logFile = optLog opt let logFile = optLog opt
let cores = optCores opt
rng <- getStdGen rng <- getStdGen
initialData <- createComputationsFromFiles files rng powerPhrases initialData <- createComputationsFromFiles files rng powerPhrases
let (_, _,gameComputations) = unzip3 initialData let (_, _,gameComputations) = unzip3 initialData
commandResults <- iterateGame gameComputations (timeStruct maxTime initTime) maxMem
commandResults <- iterateGame gameComputations cores maxTime
let stringResults = map (\(cmds,score,algoIdx) -> (commandsToString cmds,score,algoIdx)) commandResults let stringResults = map (\(cmds,score,algoIdx) -> (commandsToString cmds,score,algoIdx)) commandResults
let outJSONstructs = zipWith jsonBuilder initialData stringResults let outJSONstructs = zipWith jsonBuilder initialData stringResults
BS.putStrLn $ encode outJSONstructs BS.putStrLn $ encode outJSONstructs
writeLogFile logFile (zipWith logFileBuilder initialData stringResults) writeLogFile logFile (zipWith logFileBuilder initialData stringResults)
where where
timeStruct Nothing _ = Nothing
timeStruct (Just stopTime) initialTime = Just (fromIntegral stopTime,fromIntegral initialTime)
-- timeStruct Nothing _ = Nothing
-- timeStruct (Just stopTime) initialTime = Just (fromIntegral stopTime,fromIntegral initialTime)
jsonBuilder (idx, seed, _) (strCmds, _, _) = (JSONSer idx seed strategyTag strCmds)
logFileBuilder (idx, seed, _) (_ ,score , algoIdx) = (idx, seed, score, algoIdx)
jsonBuilder (idx, seedd, _) (strCmds, _, _) = (JSONSer idx seedd strategyTag strCmds)
logFileBuilder (idx, seedd, _) (_ ,score , algoIdx) = (idx, seedd, score, algoIdx)
createComputationsFromFiles :: [String] -> StdGen -> [String] -> IO [(Id,Seed,GameComputation)] createComputationsFromFiles :: [String] -> StdGen -> [String] -> IO [(Id,Seed,GameComputation)]
createComputationsFromFiles fileNames randomGen powerPhrases = do inputs <- readFiles fileNames createComputationsFromFiles fileNames randomGen powerPhrases = do inputs <- readFiles fileNames
@ -106,36 +102,12 @@ readFiles (x:xs) = do f <- BS.readFile x
fs <- readFiles xs fs <- readFiles xs
return (f:fs) return (f:fs)
instance NFData Command where rnf x = seq x ()
iterateGame :: [GameComputation] -> Maybe (Double,Double) -> Maybe Int -> IO [FinishedGame]
iterateGame gameComputations timeLimitData memLimitData = do alive <- checkComputationAlive
if alive
then nextPass
else return bestGames
where
nextPass = (bestGames `deepseq` (iterateGame nextGameComputations timeLimitData memLimitData))
nextGameComputations = (applyNtimes computationsPerStep advanceGameComputations gameComputations)
checkComputationAlive = do timeLimitFlag <- timeLimit timeLimitData
memLimitFlag <- memLimit memLimitData
let finishedComputation = (and $ map finishedGameComputation gameComputations)
return $ not (timeLimitFlag || memLimitFlag || finishedComputation)
advanceGameComputations computations = map advanceGameComputation computations
bestGames = map getBestGameComputation gameComputations
timeLimit :: Maybe (Double, Double) -> IO Bool
timeLimit Nothing = return False
timeLimit (Just (initialTime,stopTime)) = do actualTime <- secTime
let actualTimeD = fromIntegral actualTime
let timeDifference = (actualTimeD - initialTime)
return (stopTime <= timeDifference)
memLimit :: Maybe Int -> IO Bool
memLimit _ = return False
secTime :: IO Int64
secTime = do (TimeSpec s _) <- getTime Monotonic
return s
iterateGame :: [GameComputation] -> Int -> Int -> IO [FinishedGame]
iterateGame gameComputations cores timeLimit = do results <- threadPoolTimed cores timeLimit advanceGameComputation gameComputations
return (map getBestGameComputation results)
where
writeLogFile :: Bool -> [(Int,Int,Int,Int)] -> IO () writeLogFile :: Bool -> [(Int,Int,Int,Int)] -> IO ()
writeLogFile False _ = return () writeLogFile False _ = return ()


+ 56
- 0
src/MonadParParallelization.hs View File

@ -0,0 +1,56 @@
{-# OPTIONS -Wall #-}
module MonadParParallelization(parallelizedTimed) where
import Control.DeepSeq (deepseq)
import Control.Concurrent.STM
import Control.Concurrent
import Control.Monad.Par
writeTVarIO :: TVar a -> a -> IO ()
writeTVarIO tvar el = atomically $ writeTVar tvar el
parallelizedTimed :: (NFData a) => Int -> (a -> a) -> [a] -> IO [a]
parallelizedTimed sec f toCompute = do stopVar <- newTVarIO False
putStrLn "WORKERS STARTED"
worker <- monadParFork (workerFunction f stopVar) toCompute
putStrLn "CLOCK STARTED"
secWait sec
putStrLn "STOP MESSAGE SENT"
sendStopMessage stopVar
putStrLn "WAITING FOR COMPLETITION"
waitWorkerCompletition worker
putStrLn "RETRIEVE DATA"
retrieveAllData worker
where
secWait = threadDelay . (1000000 * )
sendStopMessage stopMessageC = writeTVarIO stopMessageC True
monadParFork :: ((TVar Bool,TVar [a], [a]) -> IO ()) -> [a] -> IO (TVar Bool,TVar [a])
monadParFork f toCompute = do stopThreadVar <- newTVarIO False
dataTVar <- newTVarIO toCompute
_ <- forkIO (f (stopThreadVar, dataTVar, toCompute))
return (stopThreadVar, dataTVar)
workerFunction :: (NFData a) => (a -> a) -> TVar Bool -> (TVar Bool, TVar [a], [a]) -> IO ()
workerFunction f stopVar (threadEndedVar, dataVar, dataEls) = do stopped <- readTVarIO stopVar
if stopped
then do writeTVarIO dataVar dataEls
writeTVarIO threadEndedVar True
else do let newData = runPar evalf
newData `deepseq` workerFunction f stopVar (threadEndedVar, dataVar, newData)
where
evalf = do iVars <- mapM (\_ -> new) dataEls
let iVarsPEls = zip iVars dataEls
mapM_ (\(ivar, datael) -> fork (put ivar (f datael))) iVarsPEls
gotDatas <- mapM (\(ivar, _) -> get ivar) iVarsPEls
return gotDatas
waitWorkerCompletition :: (TVar Bool, TVar [a]) -> IO ()
waitWorkerCompletition workers = do finished <- readTVarIO (fst workers)
if finished
then return ()
else waitWorkerCompletition workers
retrieveAllData :: (TVar Bool, TVar [a]) -> IO [a]
retrieveAllData workers = readTVarIO (snd workers)

+ 10
- 7
src/Opt.hs View File

@ -3,6 +3,9 @@ module Opt(parseArgs,Options(..)) where
import System.Console.GetOpt import System.Console.GetOpt
import Data.Maybe import Data.Maybe
absoluteMaxTime :: Int
absoluteMaxTime = 3600*24*365
-- import Vm -- import Vm
data Flag = File String data Flag = File String
@ -12,23 +15,23 @@ data Flag = File String
deriving Show deriving Show
data Options = Options { optFile :: [String], data Options = Options { optFile :: [String],
optTime :: Maybe Int,
optTime :: Int,
optMem :: Maybe Int, optMem :: Maybe Int,
optPowerPhrase :: [String], optPowerPhrase :: [String],
optSeedNumber :: Int, optSeedNumber :: Int,
optCores :: Int, optCores :: Int,
optLog :: Bool
}
optLog :: Bool}
deriving Show deriving Show
startOptions = Options { optFile = [], startOptions = Options { optFile = [],
optTime = Nothing,
optTime = absoluteMaxTime,
optMem = Nothing, optMem = Nothing,
optPowerPhrase = [], optPowerPhrase = [],
optSeedNumber = 0, optSeedNumber = 0,
optCores = 1, optCores = 1,
optLog = True
}
optLog = False }
options :: [ OptDescr (Options -> IO Options) ] options :: [ OptDescr (Options -> IO Options) ]
options = [ Option "f" ["filename"] options = [ Option "f" ["filename"]
@ -38,7 +41,7 @@ options = [ Option "f" ["filename"]
"Input Filename" "Input Filename"
, Option "t" ["timelimit"] , Option "t" ["timelimit"]
(ReqArg (ReqArg
(\arg opt -> return opt { optTime = Just (read arg) })
(\arg opt -> return opt { optTime = (read arg) })
"TIMELIMIT") "TIMELIMIT")
"Time Limit in seconds" "Time Limit in seconds"
, Option "m" ["memlimit"] , Option "m" ["memlimit"]


+ 51
- 0
src/ParMapParallelization.hs View File

@ -0,0 +1,51 @@
{-# OPTIONS -Wall #-}
module ParMapParallelization(parallelizedTimed2) where
import Control.DeepSeq (deepseq)
import Control.Concurrent.STM
import Control.Concurrent
import Control.Parallel.Strategies
writeTVarIO :: TVar a -> a -> IO ()
writeTVarIO tvar el = atomically $ writeTVar tvar el
parallelizedTimed2 :: (NFData a) => Int -> (a -> a) -> [a] -> IO [a]
parallelizedTimed2 sec f toCompute = do stopVar <- newTVarIO False
putStrLn "WORKERS STARTED"
worker <- monadParFork (workerFunction f stopVar) toCompute
putStrLn "CLOCK STARTED"
secWait sec
putStrLn "STOP MESSAGE SENT"
sendStopMessage stopVar
putStrLn "WAITING FOR COMPLETITION"
waitWorkerCompletition worker
putStrLn "RETRIEVE DATA"
retrieveAllData worker
where
secWait = threadDelay . (1000000 * )
sendStopMessage stopMessageC = writeTVarIO stopMessageC True
monadParFork :: ((TVar Bool,TVar [a], [a]) -> IO ()) -> [a] -> IO (TVar Bool,TVar [a])
monadParFork f toCompute = do stopThreadVar <- newTVarIO False
dataTVar <- newTVarIO toCompute
_ <- forkIO (f (stopThreadVar, dataTVar, toCompute))
return (stopThreadVar, dataTVar)
workerFunction :: (NFData a) => (a -> a) -> TVar Bool -> (TVar Bool, TVar [a], [a]) -> IO ()
workerFunction f stopVar (threadEndedVar, dataVar, dataEls) = do stopped <- readTVarIO stopVar
if stopped
then do writeTVarIO dataVar dataEls
writeTVarIO threadEndedVar True
else do let newData = evalf
newData `deepseq` workerFunction f stopVar (threadEndedVar, dataVar, newData)
where
evalf = dataEls `deepseq` parMap rpar f dataEls
waitWorkerCompletition :: (TVar Bool, TVar [a]) -> IO ()
waitWorkerCompletition workers = do finished <- readTVarIO (fst workers)
if finished
then return ()
else waitWorkerCompletition workers
retrieveAllData :: (TVar Bool, TVar [a]) -> IO [a]
retrieveAllData workers = readTVarIO (snd workers)

+ 39
- 0
src/ParallelBenchmark.hs View File

@ -0,0 +1,39 @@
module Main where
import System.Environment
import ThreadPoolTimed
import ThreadPoolTimed2
initialVector :: [Int]
initialVector = [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]
main :: IO ()
main = do args <- getArgs
let sec = read (args !! 0) :: Int
let rep = read (args !! 1) :: Int
let cores = read (args !! 2) :: Int
let evalf = (applyNtimes rep applyFun)
putStrLn "\n\n"
res2 <- threadPoolTimed cores sec evalf initialVector
putStrLn $ "ThreadPool result: " ++ (show $ sum res2) ++ " \n"
res1 <- threadPoolTimed2 sec evalf initialVector
putStrLn $ "ThreadPool2 result: " ++ (show $ sum res1) ++ " \n"
--res3 <- parallelizedTimed2 sec evalf initialVector
--putStrLn $ "ParMap result: " ++ (show $ sum res3) ++ " \n"
applyNtimes :: Int -> (a -> a) -> a -> a
applyNtimes 0 _ accum = accum
applyNtimes n f accum = applyNtimes (n - 1) f (f accum)
applyFun :: Int -> Int
applyFun x = x + 1

+ 7
- 2
src/Strategy0.hs View File

@ -1,7 +1,10 @@
{-# LANGUAGE DeriveGeneric #-}
module Strategy0(Strategy0) where module Strategy0(Strategy0) where
import qualified Data.PQueue.Prio.Max as PQ import qualified Data.PQueue.Prio.Max as PQ
import GHC.Generics (Generic)
import Control.DeepSeq
import System.Random(StdGen) import System.Random(StdGen)
import Data.Maybe (isJust) import Data.Maybe (isJust)
import Datatypes import Datatypes
@ -16,14 +19,16 @@ commandsList :: [Command]
commandsList = [MoveSE, MoveSW, MoveW, MoveE, RotateClockwise, RotateCounterclockwise] commandsList = [MoveSE, MoveSW, MoveW, MoveE, RotateClockwise, RotateCounterclockwise]
type Queue = PQ.MaxPQueue (Int, Int, Int) Game type Queue = PQ.MaxPQueue (Int, Int, Int) Game
data Strategy0 = Strategy0 (Queue, [Game])
data Strategy0 = Strategy0 (Queue, [Game])
deriving Generic
instance Strategy Strategy0 where instance Strategy Strategy0 where
initst = strategy0initst initst = strategy0initst
advance = strategy0advance advance = strategy0advance
getbest = strategy0getbest getbest = strategy0getbest
instance NFData Strategy0
strategy0initst :: Game -> StdGen -> [[Command]] -> Strategy0 strategy0initst :: Game -> StdGen -> [[Command]] -> Strategy0
strategy0initst game _ _ = (Strategy0 (firstQueue, firstList)) where strategy0initst game _ _ = (Strategy0 (firstQueue, firstList)) where
firstQueue = PQ.singleton (fullScore game, -(length $ Game.units game), snd . Unit.pivot . head . Game.units $ game) game firstQueue = PQ.singleton (fullScore game, -(length $ Game.units game), snd . Unit.pivot . head . Game.units $ game) game


+ 12
- 21
src/StrategyManager.hs View File

@ -1,29 +1,32 @@
{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS -Wall #-} {-# OPTIONS -Wall #-}
module StrategyManager where module StrategyManager where
import Control.DeepSeq
import System.Random(StdGen) import System.Random(StdGen)
import Datatypes.Game(Game,Command) import Datatypes.Game(Game,Command)
import Control.DeepSeq (NFData(..))
type Score = Int type Score = Int
type StrategyIdx = Int type StrategyIdx = Int
type FinishedGame = ([Command], Score, StrategyIdx) type FinishedGame = ([Command], Score, StrategyIdx)
type GameComputation = [StrategyWrapper] type GameComputation = [StrategyWrapper]
data StrategyWrapper = forall a . Strategy a => MkStrategyWrapper a
| FinishedGame ([Command], Int)
data NullStrategy1 = NullS1
data StrategyWrapper = forall a. (Strategy a) => MkStrategyWrapper a
| FinishedGame ([Command], Int)
instance NFData StrategyWrapper where
rnf (MkStrategyWrapper a) = seq a ()
rnf (FinishedGame b) = seq b ()
initWrapper :: Strategy a => a -> StrategyWrapper initWrapper :: Strategy a => a -> StrategyWrapper
initWrapper = MkStrategyWrapper
initWrapper a = getbest a `deepseq` (MkStrategyWrapper $! a)
class Strategy a where
class (NFData a) => Strategy a where
initst :: Game -> StdGen -> [[Command]] -> a initst :: Game -> StdGen -> [[Command]] -> a
advance :: a -> Either a ([Command], Int) advance :: a -> Either a ([Command], Int)
getbest :: a -> ([Command], Int) getbest :: a -> ([Command], Int)
@ -63,16 +66,4 @@ getBestGameComputation gameComputation = bestGame
advanceGameComputation :: GameComputation -> GameComputation advanceGameComputation :: GameComputation -> GameComputation
advanceGameComputation gc = map advanceWrapper gc advanceGameComputation gc = map advanceWrapper gc
instance Strategy NullStrategy1 where
initst _ _ _ = NullS1
advance _ = Left NullS1
getbest _ = ([],0)
data NullStrategy2 = NullS2
instance Strategy NullStrategy2 where
initst _ _ _ = NullS2
advance _ = Left NullS2
getbest _ = ([],0)

+ 7
- 0
src/TestUtils.hs View File

@ -0,0 +1,7 @@
module TestTree where
applyNtimes :: Int -> (a -> a) -> a -> a
applyNtimes 0 _ accum = accum
applyNtimes n f accum = applyNtimes (n - 1) f (f accum)

+ 60
- 83
src/ThreadPoolTimed.hs View File

@ -1,91 +1,59 @@
module ThreadPoolTimed where
{-# OPTIONS -Wall #-}
module ThreadPoolTimed(threadPoolTimed) where
import Control.DeepSeq (NFData, deepseq)
import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Monad.STM import Control.Monad.STM
import Control.Concurrent import Control.Concurrent
import Control.Monad import Control.Monad
import Data.Maybe import Data.Maybe
type NWorkers = Int
type Sec = Int
threadPoolTimed :: NWorkers -> Sec -> (a -> Either a b) -> [a] -> IO ([a],[b])
threadPoolTimed nworkers sec f toCompute = do let initialLength = length toCompute :: Int
toComputeSize <- newTVarIO initialLength
toComputeQ <- newTChanIO :: IO (TChan a)
completedQ <- newTChanIO :: IO (TChan b)
unCompletedQ <- newTChanIO :: IO (TChan a)
isTPoolAboutToFinish <- newTVarIO False
workers <- forkNTimes nworkers (workerFunction f (toComputeSize,
toComputeQ,
completedQ,
unCompletedQ,
isTPoolAboutToFinish))
secWait sec
pushDataToWorkers toCompute toComputeQ
sendStopMessage isTPoolAboutToFinish
threadPoolTimed :: (NFData a) => Int -> Int -> (a -> a) -> [a] -> IO [a]
threadPoolTimed nworkers sec f toCompute = do toComputeQ <- newTChanIO :: IO (TChan a)
stopVar <- newTVarIO False
putStrLn "DATA TO WORKERS"
pushDataToWorkers toCompute toComputeQ
putStrLn "WORKERS STARTED"
workers <- forkNTimes nworkers (workerFunction f (toComputeQ,
stopVar))
putStrLn "CLOCK STARTED"
secWait sec
putStrLn "STOP MESSAGE SENT"
sendStopMessage stopVar
putStrLn "WAITING FOR COMPLETITION"
waitWorkerCompletition workers waitWorkerCompletition workers
computedSize <- atomically $ readTVar toComputeSize
retrieveData initialLength computedSize unCompletedQ completedQ
putStrLn "RETRIEVE DATA"
retrieveAllData toComputeQ
where where
secWait = threadDelay . (1000000 * ) secWait = threadDelay . (1000000 * )
sendStopMessage isTPoolAboutToFinish = atomically $ writeTVar isTPoolAboutToFinish True
sendStopMessage stopMessageC = atomically $ writeTVar stopMessageC True
forkNTimes :: NWorkers -> (MVar Bool -> IO ()) -> IO [MVar Bool]
forkNTimes nworkers f = do workers <- replicateM nworkers (newMVar False)
forkNTimes :: Int -> (TVar Bool -> IO ()) -> IO [TVar Bool]
forkNTimes nworkers f = do workers <- replicateM nworkers (newTVarIO False)
mapM_ (forkIO . f) workers mapM_ (forkIO . f) workers
return workers return workers
workerFunction :: (a -> Either a b) -> (TVar Int, TChan a, TChan b, TChan a, TVar Bool) -> MVar Bool -> IO ()
workerFunction f workerData completedVar = do extractedData <- dataToCompute
if (isNothing extractedData)
then putMVar completedVar True
else do let newData = (f . fromJust) extractedData
pushNewData newData
workerFunction f workerData completedVar
workerFunction :: (NFData a) => (a -> a) -> (TChan a, TVar Bool) -> TVar Bool -> IO ()
workerFunction f (toCompute, stopVar) threadEndedVar = do (stopMessageArrived, computationData) <- retrieveData
if stopMessageArrived
then atomically $ do writeTVar threadEndedVar True
else do executeStep computationData
workerFunction f (toCompute, stopVar) threadEndedVar
where where
(computeSize, toComputeQ, finishedDataQ, unfinishedDataQ, isCompAboutToFinish) = workerData
dataToCompute = atomically $ orElse nullData getData
nullData = do r <- checkFinished
if r
then return Nothing
else retry
checkFinished = do computesz <- readTVar computeSize
finished <- readTVar isCompAboutToFinish
return (finished || (computesz == 0))
getData = do d <- tryReadTChan toComputeQ
if isNothing d
then retry
else return d
pushNewData nData = atomically $ do finished <- checkFinished
if finished
then pushLastData nData
else pushPartialData nData
pushPartialData (Left toBeCompleted) = writeTChan toComputeQ toBeCompleted
pushPartialData (Right completed) = do writeTChan finishedDataQ completed
modifyTVar computeSize (\x -> x - 1)
pushLastData (Left toBeCompleted) = writeTChan unfinishedDataQ toBeCompleted
pushLastData (Right completed) = writeTChan finishedDataQ completed
retrieveData = atomically $ do stopMessage <- readTVar stopVar
if stopMessage
then return (True, Nothing)
else do compValue <- readTChan toCompute
return (False, Just compValue)
executeStep computationData = do let newData = ((f . fromJust) computationData)
newData `deepseq` atomically $ writeTChan toCompute newData
pushDataToWorkers :: [a] -> TChan a -> IO () pushDataToWorkers :: [a] -> TChan a -> IO ()
pushDataToWorkers toCompute toComputeQ = atomically $ mapM_ (\el -> writeTChan toComputeQ el) toCompute pushDataToWorkers toCompute toComputeQ = atomically $ mapM_ (\el -> writeTChan toComputeQ el) toCompute
@ -94,17 +62,26 @@ pushDataToWorkers toCompute toComputeQ = atomically $ mapM_ (\el -> writeTChan t
--retrieveData :: TChan a -> TChan b -> [(a,b)] --retrieveData :: TChan a -> TChan b -> [(a,b)]
--retrieveData unCompletedQ completedQ --retrieveData unCompletedQ completedQ
waitWorkerCompletition :: [MVar Bool] -> IO ()
waitWorkerCompletition workers = do workersStatus <- mapM readMVar workers
if and workersStatus
then return ()
else do threadDelay 1000
waitWorkerCompletition workers
retrieveData :: Int -> Int -> TChan a -> TChan b -> IO ([a],[b])
retrieveData initialSize notEvaluated completedQ unCompletedQ = atomically $ do listCompleted <- flushChan evaluated completedQ
listUnCompleted <- flushChan notEvaluated unCompletedQ
return (listCompleted,listUnCompleted)
where
evaluated = initialSize - notEvaluated
flushChan n c = replicateM n (readTChan c)
waitWorkerCompletition :: [TVar Bool] -> IO ()
waitWorkerCompletition workers = atomically $ do workersStatus <- mapM readTVar workers
if and workersStatus
then return ()
else retry
retrieveAllData :: TChan a -> IO [a]
retrieveAllData queue = atomically $ retrieveAllDataSTM []
where retrieveAllDataSTM acc = do isEmpty <- isEmptyTChan queue
if isEmpty
then return acc
else do elemToAcc <- readTChan queue
retrieveAllDataSTM (elemToAcc:acc)
-- retrieveAllData queue acc = do voidQueue <- atomically $ isEmptyTChan queue
-- putStrLn $ (show voidQueue)
-- if voidQueue
-- then return acc
-- else do elemToAcc <- atomically $ readTChan queue
-- retrieveAllData queue (elemToAcc:acc)

+ 54
- 0
src/ThreadPoolTimed2.hs View File

@ -0,0 +1,54 @@
{-# OPTIONS -Wall #-}
module ThreadPoolTimed2(threadPoolTimed2) where
import Control.DeepSeq (NFData, deepseq)
import Control.Concurrent.STM
import Control.Concurrent
writeTVarIO :: TVar a -> a -> IO ()
writeTVarIO tvar el = atomically $ writeTVar tvar el
threadPoolTimed2 :: (NFData a) => Int -> (a -> a) -> [a] -> IO [a]
threadPoolTimed2 sec f toCompute = do stopVar <- newTVarIO False
putStrLn "WORKERS STARTED"
workers <- mapFork (workerFunction f stopVar) toCompute
putStrLn "CLOCK STARTED"
secWait sec
putStrLn "STOP MESSAGE SENT"
sendStopMessage stopVar
putStrLn "WAITING FOR COMPLETITION"
waitWorkerCompletition workers
putStrLn "RETRIEVE DATA"
retrieveAllData workers
where
secWait = threadDelay . (1000000 * )
sendStopMessage stopMessageC = writeTVarIO stopMessageC True
mapFork :: ((TVar Bool, TVar a, a) -> IO ()) -> [a] -> IO [(TVar Bool,TVar a, a)]
mapFork f toCompute = do workers <- mapM tupleM toCompute
mapM_ (forkIO . f) workers
return workers
where
tupleM x = do falseTVar <- newTVarIO False
dataTvar <- newTVarIO x
return (falseTVar, dataTvar, x)
workerFunction :: (NFData a) => (a -> a) -> TVar Bool -> (TVar Bool, TVar a, a) -> IO ()
workerFunction f stopVar (threadEndedVar, dataVar, dataEl) = do stopped <- readTVarIO stopVar
if stopped
then do writeTVarIO dataVar dataEl
writeTVarIO threadEndedVar True
else do let newData = dataEl `deepseq` f(dataEl)
newData `deepseq` workerFunction f stopVar (threadEndedVar, dataVar, newData)
waitWorkerCompletition :: [(TVar Bool, TVar a, a)] -> IO ()
waitWorkerCompletition workers = do workerFinishedList <- mapM (\(x, _, _) -> readTVarIO x) workers
let workersAllFinished = and workerFinishedList
if workersAllFinished
then return ()
else waitWorkerCompletition workers
retrieveAllData :: [(TVar Bool, TVar a, a)] -> IO [a]
retrieveAllData workers = mapM (\(_, x, _) -> readTVarIO x) workers

Loading…
Cancel
Save