Haskell - Actor based mutability
Here is a quick and dirty example using stm
and pipes-network
. This will set up a simple server that allows clients to connect and increment or decrement a counter. It will display a very simple status bar showing the current tallies of all connected clients and will remove client tallies from the bar when they disconnect.
First I will begin with the server, and I've generously commented the code to explain how it works:
import Control.Concurrent.STM (STM, atomically)import Control.Concurrent.STM.TVarimport qualified Data.HashMap.Strict as Himport Data.Foldable (forM_)import Control.Concurrent (forkIO, threadDelay)import Control.Monad (unless)import Control.Monad.Trans.State.Strictimport qualified Data.ByteString.Char8 as Bimport Control.Proxyimport Control.Proxy.TCPimport System.IOmain = do hSetBuffering stdout NoBuffering {- These are the internal data structures. They should be an implementation detail and you should never expose these references to the "business logic" part of the application. -} -- I use nRef to keep track of creating fresh Ints (which identify users) nRef <- newTVarIO 0 :: IO (TVar Int) {- hMap associates every user (i.e. Int) with a counter Notice how I've "striped" the hash map by storing STM references to the values instead of storing the values directly. This means that I only actually write the hashmap when adding or removing users, which reduces contention for the hash map. Since each user gets their own unique STM reference for their counter, modifying counters does not cause contention with other counters or contention with the hash map. -} hMap <- newTVarIO H.empty :: IO (TVar (H.HashMap Int (TVar Int))) {- The following code makes heavy use of Haskell's pure closures. Each 'let' binding closes over its current environment, which is safe since Haskell is pure. -} let {- 'getCounters' is the only server-facing command in our STM API. The only permitted operation is retrieving the current set of user counters. 'getCounters' closes over the 'hMap' reference currently in scope so that the server never needs to be aware about our internal implementation. -} getCounters :: STM [Int] getCounters = do refs <- fmap H.elems (readTVar hMap) mapM readTVar refs {- 'init' is the only client-facing command in our STM API. It initializes the client's entry in the hash map and returns two commands: the first command is what the client calls to 'increment' their counter and the second command is what the client calls to log off and delete 'delete' command. Notice that those two returned commands each close over the client's unique STM reference so the client never needs to be aware of how exactly 'init' is implemented under the hood. -} init :: STM (STM (), STM ()) init = do n <- readTVar nRef writeTVar nRef $! n + 1 ref <- newTVar 0 modifyTVar' hMap (H.insert n ref) let incrementRef :: STM () incrementRef = do mRef <- fmap (H.lookup n) (readTVar hMap) forM_ mRef $ \ref -> modifyTVar' ref (+ 1) deleteRef :: STM () deleteRef = modifyTVar' hMap (H.delete n) return (incrementRef, deleteRef) {- Now for the actual program logic. Everything past this point only uses the approved STM API (i.e. 'getCounters' and 'init'). If I wanted I could factor the above approved STM API into a separate module to enforce the encapsulation boundary, but I am lazy. -} {- Fork a thread which polls the current state of the counters and displays it to the console. There is a way to implement this without polling but this gets the job done for now. Most of what it is doing is just some simple tricks to reuse the same console line instead of outputting a stream of lines. Otherwise it would be just: forkIO $ forever $ do ns <- atomically getCounters print ns -} forkIO $ (`evalStateT` 0) $ forever $ do del <- get lift $ do putStr (replicate del '\b') putStr (replicate del ' ' ) putStr (replicate del '\b') ns <- lift $ atomically getCounters let str = show ns lift $ putStr str put $! length str lift $ threadDelay 10000 {- Fork a thread for each incoming connection, which listens to the client's commands and translates them into 'STM' actions -} serve HostAny "8080" $ \(socket, _) -> do (increment, delete) <- atomically init {- Right now, just do the dumb thing and convert all keypresses into increment commands, with the exception of the 'q' key, which will quit -} let handler :: (Proxy p) => () -> Consumer p Char IO () handler () = runIdentityP loop where loop = do c <- request () unless (c == 'q') $ do lift $ atomically increment loop {- This uses my 'pipes' library. It basically is a high-level way to say: * Read binary packets from the socket no bigger than 4096 bytes * Get the first character from each packet and discard the rest * Handle the character using the above 'handler' function -} runProxy $ socketReadS 4096 socket >-> mapD B.head >-> handler {- The above pipeline finishes either when the socket closes or 'handler' stops looping because it received a 'q'. Either case means that the client is done so we log them out using 'delete'. -} atomically delete
Next up is the client, which simply opens a connections and forwards all key presses as single packets:
import Control.Monadimport Control.Proxyimport Control.Proxy.Safeimport Control.Proxy.TCP.Safeimport Data.ByteString.Char8 (pack)import System.IOmain = do hSetBuffering stdin NoBuffering hSetEcho stdin False {- Again, this uses my 'pipes' library. It basically says: * Read characters from the console using 'commands' * Pack them into a binary format * send them to a server running at 127.0.0.1:8080 This finishes looping when the user types a 'q' or the connection is closed for whatever reason. -} runSafeIO $ runProxy $ runEitherK $ try . commands >-> mapD (\c -> pack [c]) >-> connectWriteD Nothing "127.0.0.1" "8080"commands :: (Proxy p) => () -> Producer p Char IO ()commands () = runIdentityP loop where loop = do c <- lift getChar respond c unless (c == 'q') loop
It's pretty simple: commands
generates a stream of Char
s, which then get converted to ByteString
s and then sent as packets to the server.
If you run the server and a few clients and have them each type in a few keys, your server display will output a list showing how many keys each client typed:
[1,6,4]
... and if some of the clients disconnect they will be removed from the list:
[1,4]
Note that the pipes
component of these examples will simplify greatly in the upcoming pipes-4.0.0
release, but the current pipes
ecosystem still gets the job done as is.
First, I'd definitely recommend using your own specific data type for representing commands. When using (String, Maybe (Chan [a]), Maybe a)
a buggy client can crash your actor simply by sending an unknown command or by sending ("add", Nothing, Nothing)
, etc. I'd suggest something like
data Command a = Add a | Remove a | Get (Chan [a])
Then you can pattern match on commands in storage
in a save way.
Actors have their advantages, but also I feel that they have some drawbacks. For example, getting an answer from an actor requires sending it a command and then waiting for a reply. And the client can't be completely sure that it gets a reply and that the reply will be of some specific type - you can't say I want only answers of this type (and how many of them) for this particular command.
So as an example I'll give a simple, STM solution. It'd be better to use a hash table or a (balanced tree) set, but since Handle
implements neither Ord
nor Hashable
, we can't use these data structures, so I'll keep using lists.
module ThreadSet ( TSet, add, remove, get) whereimport Control.Monadimport Control.Monad.STMimport Control.Concurrent.STM.TVarimport Data.List (delete)newtype TSet a = TSet (TVar [a])add :: (Eq a) => a -> TSet a -> STM ()add x (TSet v) = readTVar v >>= writeTVar v . (x :)remove :: (Eq a) => a -> TSet a -> STM ()remove x (TSet v) = readTVar v >>= writeTVar v . delete xget :: (Eq a) => TSet a -> STM [a]get (TSet v) = readTVar v
This module implements a STM
based set of arbitrary elements. You can have multiple such sets and use them together in a single STM
transaction that succeeds or fails at once. For example
-- | Ensures that there is exactly one element `x` in the set.add1 :: (Eq a) => a -> TSet a -> STM ()add1 x v = remove x v >> add x v
This would be difficult with actors, you'd have to add it as another command for the actor, you can't compose it of existing actions and still have atomicity.
Update: There is an interesting article explaining why Clojure designers chose not to use actors. For example, using actors, even if you have many reads and only very little writes to a mutable structure, they're all serialized, which can greatly impact performance.