Haskell - Actor based mutability Haskell - Actor based mutability multithreading multithreading

Haskell - Actor based mutability


Here is a quick and dirty example using stm and pipes-network. This will set up a simple server that allows clients to connect and increment or decrement a counter. It will display a very simple status bar showing the current tallies of all connected clients and will remove client tallies from the bar when they disconnect.

First I will begin with the server, and I've generously commented the code to explain how it works:

import Control.Concurrent.STM (STM, atomically)import Control.Concurrent.STM.TVarimport qualified Data.HashMap.Strict as Himport Data.Foldable (forM_)import Control.Concurrent (forkIO, threadDelay)import Control.Monad (unless)import Control.Monad.Trans.State.Strictimport qualified Data.ByteString.Char8 as Bimport Control.Proxyimport Control.Proxy.TCPimport System.IOmain = do    hSetBuffering stdout NoBuffering    {- These are the internal data structures.  They should be an implementation       detail and you should never expose these references to the       "business logic" part of the application. -}    -- I use nRef to keep track of creating fresh Ints (which identify users)    nRef <- newTVarIO 0       :: IO (TVar Int)    {- hMap associates every user (i.e. Int) with a counter       Notice how I've "striped" the hash map by storing STM references to the       values instead of storing the values directly.  This means that I only       actually write the hashmap when adding or removing users, which reduces       contention for the hash map.       Since each user gets their own unique STM reference for their counter,       modifying counters does not cause contention with other counters or       contention with the hash map. -}    hMap <- newTVarIO H.empty :: IO (TVar (H.HashMap Int (TVar Int)))    {- The following code makes heavy use of Haskell's pure closures.  Each       'let' binding closes over its current environment, which is safe since        Haskell is pure. -}    let {- 'getCounters' is the only server-facing command in our STM API.  The           only permitted operation is retrieving the current set of user           counters.           'getCounters' closes over the 'hMap' reference currently in scope so           that the server never needs to be aware about our internal           implementation. -}        getCounters :: STM [Int]        getCounters = do            refs <- fmap H.elems (readTVar hMap)            mapM readTVar refs        {- 'init' is the only client-facing command in our STM API.  It            initializes the client's entry in the hash map and returns two            commands: the first command is what the client calls to 'increment'            their counter and the second command is what the client calls to log            off and delete            'delete' command.            Notice that those two returned commands each close over the client's            unique STM reference so the client never needs to be aware of how            exactly 'init' is implemented under the hood. -}        init :: STM (STM (), STM ())        init = do            n   <- readTVar nRef            writeTVar nRef $! n + 1            ref <- newTVar 0            modifyTVar' hMap (H.insert n ref)            let incrementRef :: STM ()                incrementRef = do                    mRef <- fmap (H.lookup n) (readTVar hMap)                    forM_ mRef $ \ref -> modifyTVar' ref (+ 1)                deleteRef :: STM ()                deleteRef = modifyTVar' hMap (H.delete n)            return (incrementRef, deleteRef)    {- Now for the actual program logic.  Everything past this point only uses       the approved STM API (i.e. 'getCounters' and 'init').  If I wanted I       could factor the above approved STM API into a separate module to enforce       the encapsulation boundary, but I am lazy. -}    {- Fork a thread which polls the current state of the counters and displays       it to the console.  There is a way to implement this without polling but       this gets the job done for now.       Most of what it is doing is just some simple tricks to reuse the same       console line instead of outputting a stream of lines.  Otherwise it       would be just:       forkIO $ forever $ do           ns <- atomically getCounters           print ns    -}    forkIO $ (`evalStateT` 0) $ forever $ do        del <- get        lift $ do            putStr (replicate del '\b')            putStr (replicate del ' ' )            putStr (replicate del '\b')        ns <- lift $ atomically getCounters        let str = show ns        lift $ putStr str        put $! length str        lift $ threadDelay 10000    {- Fork a thread for each incoming connection, which listens to the client's       commands and translates them into 'STM' actions -}    serve HostAny "8080" $ \(socket, _) -> do        (increment, delete) <- atomically init        {- Right now, just do the dumb thing and convert all keypresses into           increment commands, with the exception of the 'q' key, which will           quit -}        let handler :: (Proxy p) => () -> Consumer p Char IO ()            handler () = runIdentityP loop              where                loop = do                    c <- request ()                    unless (c == 'q') $ do                        lift $ atomically increment                        loop        {- This uses my 'pipes' library.  It basically is a high-level way to           say:           * Read binary packets from the socket no bigger than 4096 bytes           * Get the first character from each packet and discard the rest           * Handle the character using the above 'handler' function -}        runProxy $ socketReadS 4096 socket >-> mapD B.head >-> handler        {- The above pipeline finishes either when the socket closes or           'handler' stops looping because it received a 'q'.  Either case means           that the client is done so we log them out using 'delete'. -}        atomically delete

Next up is the client, which simply opens a connections and forwards all key presses as single packets:

import Control.Monadimport Control.Proxyimport Control.Proxy.Safeimport Control.Proxy.TCP.Safeimport Data.ByteString.Char8 (pack)import System.IOmain = do    hSetBuffering stdin NoBuffering    hSetEcho      stdin False    {- Again, this uses my 'pipes' library.  It basically says:        * Read characters from the console using 'commands'        * Pack them into a binary format        * send them to a server running at 127.0.0.1:8080        This finishes looping when the user types a 'q' or the connection is        closed for whatever reason.    -}    runSafeIO $ runProxy $ runEitherK $         try . commands     >-> mapD (\c -> pack [c])     >-> connectWriteD Nothing "127.0.0.1" "8080"commands :: (Proxy p) => () -> Producer p Char IO ()commands () = runIdentityP loop  where    loop = do        c <- lift getChar        respond c        unless (c == 'q') loop

It's pretty simple: commands generates a stream of Chars, which then get converted to ByteStrings and then sent as packets to the server.

If you run the server and a few clients and have them each type in a few keys, your server display will output a list showing how many keys each client typed:

[1,6,4]

... and if some of the clients disconnect they will be removed from the list:

[1,4]

Note that the pipes component of these examples will simplify greatly in the upcoming pipes-4.0.0 release, but the current pipes ecosystem still gets the job done as is.


First, I'd definitely recommend using your own specific data type for representing commands. When using (String, Maybe (Chan [a]), Maybe a) a buggy client can crash your actor simply by sending an unknown command or by sending ("add", Nothing, Nothing), etc. I'd suggest something like

data Command a = Add a | Remove a | Get (Chan [a])

Then you can pattern match on commands in storage in a save way.

Actors have their advantages, but also I feel that they have some drawbacks. For example, getting an answer from an actor requires sending it a command and then waiting for a reply. And the client can't be completely sure that it gets a reply and that the reply will be of some specific type - you can't say I want only answers of this type (and how many of them) for this particular command.

So as an example I'll give a simple, STM solution. It'd be better to use a hash table or a (balanced tree) set, but since Handle implements neither Ord nor Hashable, we can't use these data structures, so I'll keep using lists.

module ThreadSet (    TSet, add, remove, get) whereimport Control.Monadimport Control.Monad.STMimport Control.Concurrent.STM.TVarimport Data.List (delete)newtype TSet a = TSet (TVar [a])add :: (Eq a) => a -> TSet a -> STM ()add x (TSet v) = readTVar v >>= writeTVar v . (x :)remove :: (Eq a) => a -> TSet a -> STM ()remove x (TSet v) = readTVar v >>= writeTVar v . delete xget :: (Eq a) => TSet a -> STM [a]get (TSet v) = readTVar v

This module implements a STM based set of arbitrary elements. You can have multiple such sets and use them together in a single STM transaction that succeeds or fails at once. For example

-- | Ensures that there is exactly one element `x` in the set.add1 :: (Eq a) => a -> TSet a -> STM ()add1 x v = remove x v >> add x v

This would be difficult with actors, you'd have to add it as another command for the actor, you can't compose it of existing actions and still have atomicity.

Update: There is an interesting article explaining why Clojure designers chose not to use actors. For example, using actors, even if you have many reads and only very little writes to a mutable structure, they're all serialized, which can greatly impact performance.