|
@ -0,0 +1,110 @@ |
|
|
|
|
|
module ThreadPoolTimed where |
|
|
|
|
|
|
|
|
|
|
|
import Control.Concurrent.STM.TChan |
|
|
|
|
|
import Control.Concurrent.STM.TVar |
|
|
|
|
|
import Control.Monad.STM |
|
|
|
|
|
import Control.Concurrent |
|
|
|
|
|
import Control.Monad |
|
|
|
|
|
|
|
|
|
|
|
import Data.Maybe |
|
|
|
|
|
|
|
|
|
|
|
type NWorkers = Int |
|
|
|
|
|
type Sec = Int |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
threadPoolTimed :: NWorkers -> Sec -> (a -> Either a b) -> [a] -> IO ([a],[b]) |
|
|
|
|
|
threadPoolTimed nworkers sec f toCompute = do let initialLength = length toCompute :: Int |
|
|
|
|
|
toComputeSize <- newTVarIO initialLength |
|
|
|
|
|
toComputeQ <- newTChanIO :: IO (TChan a) |
|
|
|
|
|
completedQ <- newTChanIO :: IO (TChan b) |
|
|
|
|
|
unCompletedQ <- newTChanIO :: IO (TChan a) |
|
|
|
|
|
isTPoolAboutToFinish <- newTVarIO False |
|
|
|
|
|
|
|
|
|
|
|
workers <- forkNTimes nworkers (workerFunction f (toComputeSize, |
|
|
|
|
|
toComputeQ, |
|
|
|
|
|
completedQ, |
|
|
|
|
|
unCompletedQ, |
|
|
|
|
|
isTPoolAboutToFinish)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
secWait sec |
|
|
|
|
|
|
|
|
|
|
|
pushDataToWorkers toCompute toComputeQ |
|
|
|
|
|
|
|
|
|
|
|
sendStopMessage isTPoolAboutToFinish |
|
|
|
|
|
waitWorkerCompletition workers |
|
|
|
|
|
computedSize <- atomically $ readTVar toComputeSize |
|
|
|
|
|
retrieveData initialLength computedSize unCompletedQ completedQ |
|
|
|
|
|
|
|
|
|
|
|
where |
|
|
|
|
|
secWait = threadDelay . (1000000 * ) |
|
|
|
|
|
sendStopMessage isTPoolAboutToFinish = atomically $ writeTVar isTPoolAboutToFinish True |
|
|
|
|
|
|
|
|
|
|
|
forkNTimes :: NWorkers -> (MVar Bool -> IO ()) -> IO [MVar Bool] |
|
|
|
|
|
forkNTimes nworkers f = do workers <- replicateM nworkers (newMVar False) |
|
|
|
|
|
mapM_ (forkIO . f) workers |
|
|
|
|
|
return workers |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
workerFunction :: (a -> Either a b) -> (TVar Int, TChan a, TChan b, TChan a, TVar Bool) -> MVar Bool -> IO () |
|
|
|
|
|
workerFunction f workerData completedVar = do extractedData <- dataToCompute |
|
|
|
|
|
if (isNothing extractedData) |
|
|
|
|
|
then putMVar completedVar True |
|
|
|
|
|
else do let newData = (f . fromJust) extractedData |
|
|
|
|
|
pushNewData newData |
|
|
|
|
|
workerFunction f workerData completedVar |
|
|
|
|
|
where |
|
|
|
|
|
(computeSize, toComputeQ, finishedDataQ, unfinishedDataQ, isCompAboutToFinish) = workerData |
|
|
|
|
|
dataToCompute = atomically $ orElse nullData getData |
|
|
|
|
|
|
|
|
|
|
|
nullData = do r <- checkFinished |
|
|
|
|
|
if r |
|
|
|
|
|
then return Nothing |
|
|
|
|
|
else retry |
|
|
|
|
|
|
|
|
|
|
|
checkFinished = do computesz <- readTVar computeSize |
|
|
|
|
|
finished <- readTVar isCompAboutToFinish |
|
|
|
|
|
return (finished || (computesz == 0)) |
|
|
|
|
|
|
|
|
|
|
|
getData = do d <- tryReadTChan toComputeQ |
|
|
|
|
|
if isNothing d |
|
|
|
|
|
then retry |
|
|
|
|
|
else return d |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pushNewData nData = atomically $ do finished <- checkFinished |
|
|
|
|
|
if finished |
|
|
|
|
|
then pushLastData nData |
|
|
|
|
|
else pushPartialData nData |
|
|
|
|
|
|
|
|
|
|
|
pushPartialData (Left toBeCompleted) = writeTChan toComputeQ toBeCompleted |
|
|
|
|
|
pushPartialData (Right completed) = do writeTChan finishedDataQ completed |
|
|
|
|
|
modifyTVar computeSize (\x -> x - 1) |
|
|
|
|
|
|
|
|
|
|
|
pushLastData (Left toBeCompleted) = writeTChan unfinishedDataQ toBeCompleted |
|
|
|
|
|
pushLastData (Right completed) = writeTChan finishedDataQ completed |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pushDataToWorkers :: [a] -> TChan a -> IO () |
|
|
|
|
|
pushDataToWorkers toCompute toComputeQ = atomically $ mapM_ (\el -> writeTChan toComputeQ el) toCompute |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
--retrieveData :: TChan a -> TChan b -> [(a,b)] |
|
|
|
|
|
--retrieveData unCompletedQ completedQ |
|
|
|
|
|
|
|
|
|
|
|
waitWorkerCompletition :: [MVar Bool] -> IO () |
|
|
|
|
|
waitWorkerCompletition workers = do workersStatus <- mapM readMVar workers |
|
|
|
|
|
if and workersStatus |
|
|
|
|
|
then return () |
|
|
|
|
|
else do threadDelay 1000 |
|
|
|
|
|
waitWorkerCompletition workers |
|
|
|
|
|
|
|
|
|
|
|
retrieveData :: Int -> Int -> TChan a -> TChan b -> IO ([a],[b]) |
|
|
|
|
|
retrieveData initialSize notEvaluated completedQ unCompletedQ = atomically $ do listCompleted <- flushChan evaluated completedQ |
|
|
|
|
|
listUnCompleted <- flushChan notEvaluated unCompletedQ |
|
|
|
|
|
return (listCompleted,listUnCompleted) |
|
|
|
|
|
where |
|
|
|
|
|
evaluated = initialSize - notEvaluated |
|
|
|
|
|
flushChan n c = replicateM n (readTChan c) |