diff --git a/icfp2015.cabal b/icfp2015.cabal index 3ec7f3e..3c9b1a6 100644 --- a/icfp2015.cabal +++ b/icfp2015.cabal @@ -60,7 +60,7 @@ executable icfp2015 other-extensions: OverloadedStrings, DeriveGeneric, DeriveDataTypeable -- Other library packages from which modules are imported. - build-depends: base >=4.6 && <4.9, hashable >=1.2 && <1.3, containers >=0.5 && <0.6, QuickCheck >=2.7 && <2.9, bytestring >=0.10 && <0.11, aeson, pqueue >=1.3 && <1.4, clock, random, deepseq >= 1.3 && <1.4 + build-depends: base >=4.6 && <4.9, hashable >=1.2 && <1.3, containers >=0.5 && <0.6, QuickCheck >=2.7 && <2.9, bytestring >=0.10 && <0.11, aeson, pqueue >=1.3 && <1.4, random, deepseq >= 1.3 && <1.4, stm -- Directories containing source files. hs-source-dirs: src @@ -68,5 +68,5 @@ executable icfp2015 -- Base language which the package is written in. default-language: Haskell2010 - ghc-options: -O3 -threaded -rtsopts + ghc-options: -O2 -threaded -rtsopts diff --git a/src/Datatypes/Game.hs b/src/Datatypes/Game.hs index 681ac63..c24ea0b 100644 --- a/src/Datatypes/Game.hs +++ b/src/Datatypes/Game.hs @@ -1,5 +1,8 @@ +{-# LANGUAGE DeriveGeneric #-} module Datatypes.Game (Game(..), Command(..), isCompleted, new, notifyCommand, powerCounterToScore, powerPhrasesAsCommands, commandsToString,stringToCommands) where -- FIXME exports +import GHC.Generics +import Control.DeepSeq import Data.Hashable (hash) import qualified Data.List as List import Data.Map.Strict (Map) @@ -17,7 +20,10 @@ data Command = MoveW | MoveSW | RotateClockwise | RotateCounterclockwise - deriving (Show,Eq,Ord) + deriving (Show,Eq,Ord,Generic) + +instance NFData Command where + rnf a = seq a () type UnitHash = Int diff --git a/src/JSONDeser.hs b/src/JSONDeser.hs index 8ffff2b..9d14786 100644 --- a/src/JSONDeser.hs +++ b/src/JSONDeser.hs @@ -10,7 +10,6 @@ import qualified Datatypes as DT import qualified Datatypes.Game as DT.Game import LCG - data Cell = Cell { x :: Int, y :: Int} deriving (Show, Generic) diff --git a/src/Main.hs b/src/Main.hs index a209ea1..b5df9c7 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -5,12 +5,9 @@ module Main where -import Control.DeepSeq (deepseq, NFData(..)) -import Data.Int import qualified Data.ByteString.Lazy.Char8 as BS import System.Environment import System.Random -import System.Clock import GHC.Generics import Data.Aeson @@ -20,7 +17,7 @@ import Datatypes.Game(Game,Command,commandsToString,stringToCommands) --import VM import Opt import JSONDeser(readInput) - +import ThreadPoolTimed strategyTag :: String strategyTag = "lilik0" @@ -35,12 +32,11 @@ memLimitRatio :: Double memLimitRatio = 1.0 computationsPerStep :: Int -computationsPerStep = 10 - +computationsPerStep = 100 -data JSONSer = JSONSer { problemId :: Int, - problemSeed :: Int, - problemTag :: String, +data JSONSer = JSONSer { id :: Int, + seed :: Int, + tag :: String, problemSolution :: String } deriving (Show, Generic) @@ -63,30 +59,30 @@ strategies g sgen cmd = [MkStrategyWrapper (initst g sgen cmd :: Strategy0)] -- MkStrategyWrapper (init g sgen cmd :: Strat2)] main :: IO () -main = do initTime <- secTime - args <- getArgs +main = do args <- getArgs opt <- parseArgs args let files = optFile opt let maxTime = optTime opt - let maxMem = optMem opt - let powerPhrases = optPowerPhrase opt +-- let maxMem = optMem opt + let powerPhrases = optPowerPhrase opt let logFile = optLog opt + let cores = optCores opt rng <- getStdGen initialData <- createComputationsFromFiles files rng powerPhrases let (_, _,gameComputations) = unzip3 initialData - commandResults <- iterateGame gameComputations (timeStruct maxTime initTime) maxMem + commandResults <- iterateGame gameComputations cores maxTime let stringResults = map (\(cmds,score,algoIdx) -> (commandsToString cmds,score,algoIdx)) commandResults let outJSONstructs = zipWith jsonBuilder initialData stringResults BS.putStrLn $ encode outJSONstructs writeLogFile logFile (zipWith logFileBuilder initialData stringResults) where - timeStruct Nothing _ = Nothing - timeStruct (Just stopTime) initialTime = Just (fromIntegral stopTime,fromIntegral initialTime) +-- timeStruct Nothing _ = Nothing +-- timeStruct (Just stopTime) initialTime = Just (fromIntegral stopTime,fromIntegral initialTime) - jsonBuilder (idx, seed, _) (strCmds, _, _) = (JSONSer idx seed strategyTag strCmds) - logFileBuilder (idx, seed, _) (_ ,score , algoIdx) = (idx, seed, score, algoIdx) + jsonBuilder (idx, seedd, _) (strCmds, _, _) = (JSONSer idx seedd strategyTag strCmds) + logFileBuilder (idx, seedd, _) (_ ,score , algoIdx) = (idx, seedd, score, algoIdx) createComputationsFromFiles :: [String] -> StdGen -> [String] -> IO [(Id,Seed,GameComputation)] createComputationsFromFiles fileNames randomGen powerPhrases = do inputs <- readFiles fileNames @@ -106,36 +102,12 @@ readFiles (x:xs) = do f <- BS.readFile x fs <- readFiles xs return (f:fs) -instance NFData Command where rnf x = seq x () - -iterateGame :: [GameComputation] -> Maybe (Double,Double) -> Maybe Int -> IO [FinishedGame] -iterateGame gameComputations timeLimitData memLimitData = do alive <- checkComputationAlive - if alive - then nextPass - else return bestGames - where - nextPass = (bestGames `deepseq` (iterateGame nextGameComputations timeLimitData memLimitData)) - nextGameComputations = (applyNtimes computationsPerStep advanceGameComputations gameComputations) - checkComputationAlive = do timeLimitFlag <- timeLimit timeLimitData - memLimitFlag <- memLimit memLimitData - let finishedComputation = (and $ map finishedGameComputation gameComputations) - return $ not (timeLimitFlag || memLimitFlag || finishedComputation) - advanceGameComputations computations = map advanceGameComputation computations - bestGames = map getBestGameComputation gameComputations - -timeLimit :: Maybe (Double, Double) -> IO Bool -timeLimit Nothing = return False -timeLimit (Just (initialTime,stopTime)) = do actualTime <- secTime - let actualTimeD = fromIntegral actualTime - let timeDifference = (actualTimeD - initialTime) - return (stopTime <= timeDifference) - -memLimit :: Maybe Int -> IO Bool -memLimit _ = return False - -secTime :: IO Int64 -secTime = do (TimeSpec s _) <- getTime Monotonic - return s + +iterateGame :: [GameComputation] -> Int -> Int -> IO [FinishedGame] +iterateGame gameComputations cores timeLimit = do results <- threadPoolTimed cores timeLimit advanceGameComputation gameComputations + return (map getBestGameComputation results) + where + writeLogFile :: Bool -> [(Int,Int,Int,Int)] -> IO () writeLogFile False _ = return () diff --git a/src/MonadParParallelization.hs b/src/MonadParParallelization.hs new file mode 100644 index 0000000..24ddf0b --- /dev/null +++ b/src/MonadParParallelization.hs @@ -0,0 +1,56 @@ +{-# OPTIONS -Wall #-} +module MonadParParallelization(parallelizedTimed) where + +import Control.DeepSeq (deepseq) +import Control.Concurrent.STM +import Control.Concurrent +import Control.Monad.Par + + +writeTVarIO :: TVar a -> a -> IO () +writeTVarIO tvar el = atomically $ writeTVar tvar el + +parallelizedTimed :: (NFData a) => Int -> (a -> a) -> [a] -> IO [a] +parallelizedTimed sec f toCompute = do stopVar <- newTVarIO False + putStrLn "WORKERS STARTED" + worker <- monadParFork (workerFunction f stopVar) toCompute + putStrLn "CLOCK STARTED" + secWait sec + putStrLn "STOP MESSAGE SENT" + sendStopMessage stopVar + putStrLn "WAITING FOR COMPLETITION" + waitWorkerCompletition worker + putStrLn "RETRIEVE DATA" + retrieveAllData worker + where + secWait = threadDelay . (1000000 * ) + sendStopMessage stopMessageC = writeTVarIO stopMessageC True + +monadParFork :: ((TVar Bool,TVar [a], [a]) -> IO ()) -> [a] -> IO (TVar Bool,TVar [a]) +monadParFork f toCompute = do stopThreadVar <- newTVarIO False + dataTVar <- newTVarIO toCompute + _ <- forkIO (f (stopThreadVar, dataTVar, toCompute)) + return (stopThreadVar, dataTVar) + +workerFunction :: (NFData a) => (a -> a) -> TVar Bool -> (TVar Bool, TVar [a], [a]) -> IO () +workerFunction f stopVar (threadEndedVar, dataVar, dataEls) = do stopped <- readTVarIO stopVar + if stopped + then do writeTVarIO dataVar dataEls + writeTVarIO threadEndedVar True + else do let newData = runPar evalf + newData `deepseq` workerFunction f stopVar (threadEndedVar, dataVar, newData) + where + evalf = do iVars <- mapM (\_ -> new) dataEls + let iVarsPEls = zip iVars dataEls + mapM_ (\(ivar, datael) -> fork (put ivar (f datael))) iVarsPEls + gotDatas <- mapM (\(ivar, _) -> get ivar) iVarsPEls + return gotDatas + +waitWorkerCompletition :: (TVar Bool, TVar [a]) -> IO () +waitWorkerCompletition workers = do finished <- readTVarIO (fst workers) + if finished + then return () + else waitWorkerCompletition workers + +retrieveAllData :: (TVar Bool, TVar [a]) -> IO [a] +retrieveAllData workers = readTVarIO (snd workers) diff --git a/src/Opt.hs b/src/Opt.hs index f6ca4de..ed090ba 100644 --- a/src/Opt.hs +++ b/src/Opt.hs @@ -3,6 +3,9 @@ module Opt(parseArgs,Options(..)) where import System.Console.GetOpt import Data.Maybe +absoluteMaxTime :: Int +absoluteMaxTime = 3600*24*365 + -- import Vm data Flag = File String @@ -12,23 +15,23 @@ data Flag = File String deriving Show data Options = Options { optFile :: [String], - optTime :: Maybe Int, + optTime :: Int, optMem :: Maybe Int, optPowerPhrase :: [String], optSeedNumber :: Int, optCores :: Int, - optLog :: Bool - } + optLog :: Bool} + deriving Show startOptions = Options { optFile = [], - optTime = Nothing, + optTime = absoluteMaxTime, optMem = Nothing, optPowerPhrase = [], optSeedNumber = 0, optCores = 1, - optLog = True - } + optLog = False } + options :: [ OptDescr (Options -> IO Options) ] options = [ Option "f" ["filename"] @@ -38,7 +41,7 @@ options = [ Option "f" ["filename"] "Input Filename" , Option "t" ["timelimit"] (ReqArg - (\arg opt -> return opt { optTime = Just (read arg) }) + (\arg opt -> return opt { optTime = (read arg) }) "TIMELIMIT") "Time Limit in seconds" , Option "m" ["memlimit"] diff --git a/src/ParMapParallelization.hs b/src/ParMapParallelization.hs new file mode 100644 index 0000000..0c0c3c0 --- /dev/null +++ b/src/ParMapParallelization.hs @@ -0,0 +1,51 @@ +{-# OPTIONS -Wall #-} +module ParMapParallelization(parallelizedTimed2) where + +import Control.DeepSeq (deepseq) +import Control.Concurrent.STM +import Control.Concurrent +import Control.Parallel.Strategies + +writeTVarIO :: TVar a -> a -> IO () +writeTVarIO tvar el = atomically $ writeTVar tvar el + +parallelizedTimed2 :: (NFData a) => Int -> (a -> a) -> [a] -> IO [a] +parallelizedTimed2 sec f toCompute = do stopVar <- newTVarIO False + putStrLn "WORKERS STARTED" + worker <- monadParFork (workerFunction f stopVar) toCompute + putStrLn "CLOCK STARTED" + secWait sec + putStrLn "STOP MESSAGE SENT" + sendStopMessage stopVar + putStrLn "WAITING FOR COMPLETITION" + waitWorkerCompletition worker + putStrLn "RETRIEVE DATA" + retrieveAllData worker + where + secWait = threadDelay . (1000000 * ) + sendStopMessage stopMessageC = writeTVarIO stopMessageC True + +monadParFork :: ((TVar Bool,TVar [a], [a]) -> IO ()) -> [a] -> IO (TVar Bool,TVar [a]) +monadParFork f toCompute = do stopThreadVar <- newTVarIO False + dataTVar <- newTVarIO toCompute + _ <- forkIO (f (stopThreadVar, dataTVar, toCompute)) + return (stopThreadVar, dataTVar) + +workerFunction :: (NFData a) => (a -> a) -> TVar Bool -> (TVar Bool, TVar [a], [a]) -> IO () +workerFunction f stopVar (threadEndedVar, dataVar, dataEls) = do stopped <- readTVarIO stopVar + if stopped + then do writeTVarIO dataVar dataEls + writeTVarIO threadEndedVar True + else do let newData = evalf + newData `deepseq` workerFunction f stopVar (threadEndedVar, dataVar, newData) + where + evalf = dataEls `deepseq` parMap rpar f dataEls + +waitWorkerCompletition :: (TVar Bool, TVar [a]) -> IO () +waitWorkerCompletition workers = do finished <- readTVarIO (fst workers) + if finished + then return () + else waitWorkerCompletition workers + +retrieveAllData :: (TVar Bool, TVar [a]) -> IO [a] +retrieveAllData workers = readTVarIO (snd workers) diff --git a/src/ParallelBenchmark.hs b/src/ParallelBenchmark.hs new file mode 100644 index 0000000..2f9e919 --- /dev/null +++ b/src/ParallelBenchmark.hs @@ -0,0 +1,39 @@ +module Main where + +import System.Environment + + +import ThreadPoolTimed +import ThreadPoolTimed2 + +initialVector :: [Int] +initialVector = [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1] + +main :: IO () +main = do args <- getArgs + let sec = read (args !! 0) :: Int + let rep = read (args !! 1) :: Int + let cores = read (args !! 2) :: Int + let evalf = (applyNtimes rep applyFun) + putStrLn "\n\n" + + res2 <- threadPoolTimed cores sec evalf initialVector + putStrLn $ "ThreadPool result: " ++ (show $ sum res2) ++ " \n" + + res1 <- threadPoolTimed2 sec evalf initialVector + putStrLn $ "ThreadPool2 result: " ++ (show $ sum res1) ++ " \n" + + + --res3 <- parallelizedTimed2 sec evalf initialVector + --putStrLn $ "ParMap result: " ++ (show $ sum res3) ++ " \n" + + + + + +applyNtimes :: Int -> (a -> a) -> a -> a +applyNtimes 0 _ accum = accum +applyNtimes n f accum = applyNtimes (n - 1) f (f accum) + +applyFun :: Int -> Int +applyFun x = x + 1 diff --git a/src/Strategy0.hs b/src/Strategy0.hs index a81c02d..38e3df0 100644 --- a/src/Strategy0.hs +++ b/src/Strategy0.hs @@ -1,7 +1,10 @@ +{-# LANGUAGE DeriveGeneric #-} module Strategy0(Strategy0) where import qualified Data.PQueue.Prio.Max as PQ +import GHC.Generics (Generic) +import Control.DeepSeq import System.Random(StdGen) import Data.Maybe (isJust) import Datatypes @@ -16,14 +19,16 @@ commandsList :: [Command] commandsList = [MoveSE, MoveSW, MoveW, MoveE, RotateClockwise, RotateCounterclockwise] type Queue = PQ.MaxPQueue (Int, Int, Int) Game -data Strategy0 = Strategy0 (Queue, [Game]) - +data Strategy0 = Strategy0 (Queue, [Game]) + deriving Generic instance Strategy Strategy0 where initst = strategy0initst advance = strategy0advance getbest = strategy0getbest +instance NFData Strategy0 + strategy0initst :: Game -> StdGen -> [[Command]] -> Strategy0 strategy0initst game _ _ = (Strategy0 (firstQueue, firstList)) where firstQueue = PQ.singleton (fullScore game, -(length $ Game.units game), snd . Unit.pivot . head . Game.units $ game) game diff --git a/src/StrategyManager.hs b/src/StrategyManager.hs index c214bad..a17f89e 100644 --- a/src/StrategyManager.hs +++ b/src/StrategyManager.hs @@ -1,29 +1,32 @@ {-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE TemplateHaskell #-} {-# OPTIONS -Wall #-} module StrategyManager where +import Control.DeepSeq import System.Random(StdGen) import Datatypes.Game(Game,Command) +import Control.DeepSeq (NFData(..)) + type Score = Int type StrategyIdx = Int type FinishedGame = ([Command], Score, StrategyIdx) type GameComputation = [StrategyWrapper] -data StrategyWrapper = forall a . Strategy a => MkStrategyWrapper a - | FinishedGame ([Command], Int) - - -data NullStrategy1 = NullS1 - +data StrategyWrapper = forall a. (Strategy a) => MkStrategyWrapper a + | FinishedGame ([Command], Int) + +instance NFData StrategyWrapper where + rnf (MkStrategyWrapper a) = seq a () + rnf (FinishedGame b) = seq b () initWrapper :: Strategy a => a -> StrategyWrapper -initWrapper = MkStrategyWrapper - +initWrapper a = getbest a `deepseq` (MkStrategyWrapper $! a) -class Strategy a where +class (NFData a) => Strategy a where initst :: Game -> StdGen -> [[Command]] -> a advance :: a -> Either a ([Command], Int) getbest :: a -> ([Command], Int) @@ -63,16 +66,4 @@ getBestGameComputation gameComputation = bestGame advanceGameComputation :: GameComputation -> GameComputation advanceGameComputation gc = map advanceWrapper gc -instance Strategy NullStrategy1 where - initst _ _ _ = NullS1 - advance _ = Left NullS1 - getbest _ = ([],0) - - -data NullStrategy2 = NullS2 - -instance Strategy NullStrategy2 where - initst _ _ _ = NullS2 - advance _ = Left NullS2 - getbest _ = ([],0) diff --git a/src/TestUtils.hs b/src/TestUtils.hs new file mode 100644 index 0000000..7c5cb08 --- /dev/null +++ b/src/TestUtils.hs @@ -0,0 +1,7 @@ +module TestTree where + + +applyNtimes :: Int -> (a -> a) -> a -> a +applyNtimes 0 _ accum = accum +applyNtimes n f accum = applyNtimes (n - 1) f (f accum) + diff --git a/src/ThreadPoolTimed.hs b/src/ThreadPoolTimed.hs index aa59274..5526328 100644 --- a/src/ThreadPoolTimed.hs +++ b/src/ThreadPoolTimed.hs @@ -1,91 +1,59 @@ -module ThreadPoolTimed where +{-# OPTIONS -Wall #-} +module ThreadPoolTimed(threadPoolTimed) where +import Control.DeepSeq (NFData, deepseq) import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TVar import Control.Monad.STM import Control.Concurrent import Control.Monad - import Data.Maybe -type NWorkers = Int -type Sec = Int - - - -threadPoolTimed :: NWorkers -> Sec -> (a -> Either a b) -> [a] -> IO ([a],[b]) -threadPoolTimed nworkers sec f toCompute = do let initialLength = length toCompute :: Int - toComputeSize <- newTVarIO initialLength - toComputeQ <- newTChanIO :: IO (TChan a) - completedQ <- newTChanIO :: IO (TChan b) - unCompletedQ <- newTChanIO :: IO (TChan a) - isTPoolAboutToFinish <- newTVarIO False - - workers <- forkNTimes nworkers (workerFunction f (toComputeSize, - toComputeQ, - completedQ, - unCompletedQ, - isTPoolAboutToFinish)) - - - secWait sec - pushDataToWorkers toCompute toComputeQ - - sendStopMessage isTPoolAboutToFinish +threadPoolTimed :: (NFData a) => Int -> Int -> (a -> a) -> [a] -> IO [a] +threadPoolTimed nworkers sec f toCompute = do toComputeQ <- newTChanIO :: IO (TChan a) + stopVar <- newTVarIO False + putStrLn "DATA TO WORKERS" + pushDataToWorkers toCompute toComputeQ + putStrLn "WORKERS STARTED" + workers <- forkNTimes nworkers (workerFunction f (toComputeQ, + stopVar)) + putStrLn "CLOCK STARTED" + secWait sec + putStrLn "STOP MESSAGE SENT" + sendStopMessage stopVar + putStrLn "WAITING FOR COMPLETITION" waitWorkerCompletition workers - computedSize <- atomically $ readTVar toComputeSize - retrieveData initialLength computedSize unCompletedQ completedQ + putStrLn "RETRIEVE DATA" + retrieveAllData toComputeQ where secWait = threadDelay . (1000000 * ) - sendStopMessage isTPoolAboutToFinish = atomically $ writeTVar isTPoolAboutToFinish True + sendStopMessage stopMessageC = atomically $ writeTVar stopMessageC True -forkNTimes :: NWorkers -> (MVar Bool -> IO ()) -> IO [MVar Bool] -forkNTimes nworkers f = do workers <- replicateM nworkers (newMVar False) +forkNTimes :: Int -> (TVar Bool -> IO ()) -> IO [TVar Bool] +forkNTimes nworkers f = do workers <- replicateM nworkers (newTVarIO False) mapM_ (forkIO . f) workers return workers -workerFunction :: (a -> Either a b) -> (TVar Int, TChan a, TChan b, TChan a, TVar Bool) -> MVar Bool -> IO () -workerFunction f workerData completedVar = do extractedData <- dataToCompute - if (isNothing extractedData) - then putMVar completedVar True - else do let newData = (f . fromJust) extractedData - pushNewData newData - workerFunction f workerData completedVar +workerFunction :: (NFData a) => (a -> a) -> (TChan a, TVar Bool) -> TVar Bool -> IO () +workerFunction f (toCompute, stopVar) threadEndedVar = do (stopMessageArrived, computationData) <- retrieveData + if stopMessageArrived + then atomically $ do writeTVar threadEndedVar True + else do executeStep computationData + workerFunction f (toCompute, stopVar) threadEndedVar where - (computeSize, toComputeQ, finishedDataQ, unfinishedDataQ, isCompAboutToFinish) = workerData - dataToCompute = atomically $ orElse nullData getData - - nullData = do r <- checkFinished - if r - then return Nothing - else retry - - checkFinished = do computesz <- readTVar computeSize - finished <- readTVar isCompAboutToFinish - return (finished || (computesz == 0)) - - getData = do d <- tryReadTChan toComputeQ - if isNothing d - then retry - else return d - - - pushNewData nData = atomically $ do finished <- checkFinished - if finished - then pushLastData nData - else pushPartialData nData - - pushPartialData (Left toBeCompleted) = writeTChan toComputeQ toBeCompleted - pushPartialData (Right completed) = do writeTChan finishedDataQ completed - modifyTVar computeSize (\x -> x - 1) - - pushLastData (Left toBeCompleted) = writeTChan unfinishedDataQ toBeCompleted - pushLastData (Right completed) = writeTChan finishedDataQ completed - - + retrieveData = atomically $ do stopMessage <- readTVar stopVar + if stopMessage + then return (True, Nothing) + else do compValue <- readTChan toCompute + return (False, Just compValue) + + executeStep computationData = do let newData = ((f . fromJust) computationData) + newData `deepseq` atomically $ writeTChan toCompute newData + + pushDataToWorkers :: [a] -> TChan a -> IO () pushDataToWorkers toCompute toComputeQ = atomically $ mapM_ (\el -> writeTChan toComputeQ el) toCompute @@ -94,17 +62,26 @@ pushDataToWorkers toCompute toComputeQ = atomically $ mapM_ (\el -> writeTChan t --retrieveData :: TChan a -> TChan b -> [(a,b)] --retrieveData unCompletedQ completedQ -waitWorkerCompletition :: [MVar Bool] -> IO () -waitWorkerCompletition workers = do workersStatus <- mapM readMVar workers - if and workersStatus - then return () - else do threadDelay 1000 - waitWorkerCompletition workers - -retrieveData :: Int -> Int -> TChan a -> TChan b -> IO ([a],[b]) -retrieveData initialSize notEvaluated completedQ unCompletedQ = atomically $ do listCompleted <- flushChan evaluated completedQ - listUnCompleted <- flushChan notEvaluated unCompletedQ - return (listCompleted,listUnCompleted) - where - evaluated = initialSize - notEvaluated - flushChan n c = replicateM n (readTChan c) +waitWorkerCompletition :: [TVar Bool] -> IO () +waitWorkerCompletition workers = atomically $ do workersStatus <- mapM readTVar workers + if and workersStatus + then return () + else retry + + +retrieveAllData :: TChan a -> IO [a] + +retrieveAllData queue = atomically $ retrieveAllDataSTM [] + where retrieveAllDataSTM acc = do isEmpty <- isEmptyTChan queue + if isEmpty + then return acc + else do elemToAcc <- readTChan queue + retrieveAllDataSTM (elemToAcc:acc) + +-- retrieveAllData queue acc = do voidQueue <- atomically $ isEmptyTChan queue +-- putStrLn $ (show voidQueue) +-- if voidQueue +-- then return acc +-- else do elemToAcc <- atomically $ readTChan queue +-- retrieveAllData queue (elemToAcc:acc) + diff --git a/src/ThreadPoolTimed2.hs b/src/ThreadPoolTimed2.hs new file mode 100644 index 0000000..d7ff388 --- /dev/null +++ b/src/ThreadPoolTimed2.hs @@ -0,0 +1,54 @@ +{-# OPTIONS -Wall #-} +module ThreadPoolTimed2(threadPoolTimed2) where + +import Control.DeepSeq (NFData, deepseq) +import Control.Concurrent.STM +import Control.Concurrent + +writeTVarIO :: TVar a -> a -> IO () +writeTVarIO tvar el = atomically $ writeTVar tvar el + +threadPoolTimed2 :: (NFData a) => Int -> (a -> a) -> [a] -> IO [a] +threadPoolTimed2 sec f toCompute = do stopVar <- newTVarIO False + putStrLn "WORKERS STARTED" + workers <- mapFork (workerFunction f stopVar) toCompute + putStrLn "CLOCK STARTED" + secWait sec + putStrLn "STOP MESSAGE SENT" + sendStopMessage stopVar + putStrLn "WAITING FOR COMPLETITION" + waitWorkerCompletition workers + putStrLn "RETRIEVE DATA" + retrieveAllData workers + where + secWait = threadDelay . (1000000 * ) + sendStopMessage stopMessageC = writeTVarIO stopMessageC True + +mapFork :: ((TVar Bool, TVar a, a) -> IO ()) -> [a] -> IO [(TVar Bool,TVar a, a)] +mapFork f toCompute = do workers <- mapM tupleM toCompute + mapM_ (forkIO . f) workers + return workers + where + tupleM x = do falseTVar <- newTVarIO False + dataTvar <- newTVarIO x + return (falseTVar, dataTvar, x) + +workerFunction :: (NFData a) => (a -> a) -> TVar Bool -> (TVar Bool, TVar a, a) -> IO () +workerFunction f stopVar (threadEndedVar, dataVar, dataEl) = do stopped <- readTVarIO stopVar + if stopped + then do writeTVarIO dataVar dataEl + writeTVarIO threadEndedVar True + else do let newData = dataEl `deepseq` f(dataEl) + newData `deepseq` workerFunction f stopVar (threadEndedVar, dataVar, newData) + + + +waitWorkerCompletition :: [(TVar Bool, TVar a, a)] -> IO () +waitWorkerCompletition workers = do workerFinishedList <- mapM (\(x, _, _) -> readTVarIO x) workers + let workersAllFinished = and workerFinishedList + if workersAllFinished + then return () + else waitWorkerCompletition workers + +retrieveAllData :: [(TVar Bool, TVar a, a)] -> IO [a] +retrieveAllData workers = mapM (\(_, x, _) -> readTVarIO x) workers