How to define thread safe array? How to define thread safe array? arrays arrays

How to define thread safe array?


My naive attempt was to do something like this:

import std.typecons : Proxy:synchronized class Array(T){    static import std.array;    private std.array.Array!T data;    mixin Proxy!data;}

Sadly, it doesn't work because of https://issues.dlang.org/show_bug.cgi?id=14509

Can't say I am very surprised though as automagical handling of multi-threading via hidden mutexes is very unidiomatic in modern D and the very concept of synchronized classes is mostly a relict from D1 times.

You can implement same solution manually, of course, by defining own SharedArray class with all necessary methods and adding locks inside the methods before calling internal private plain Array methods. But I presume you want something that work more out of the box.

Can't invent anything better right here and now (will think about it more) but it is worth noting that in general it is encouraged in D to create data structures designed for handling shared access explicitly instead of just protecting normal data structures with mutexes. And, of course, most encouraged approach is to not shared data at all using message passing instead.

I will update the answer if anything better comes to my mind.


It is fairly easy to make a wrapper around array that will make it thread-safe. However, it is extremely difficult to make a thread-safe array that is not a concurrency bottleneck.

The closest thing that comes to mind is Java's CopyOnWriteArrayList class, but even that is not ideal...


You can wrap the array inside a struct that locks the access to the array when a thread acquires a token and until it releases it.

The wrapper/locker:

  • acquire(): is called in loop by a thread. As it returns a pointer, the thread knows that it has the token when the method returns a non null value.
  • release(): is called by a thread after processing the data whose access has been acquired previously.

.

shared struct Locker(T){    private:        T t;        size_t token;       public:          shared(T) * acquire()         {            if (token) return null;            else            {                import core.atomic;                atomicOp!"+="(token, 1);                return &t;            }        }        void release()        {            import core.atomic;            atomicOp!"-="(token, 1);        }}

and a quick test:

alias LockedIntArray = Locker!(size_t[]);shared LockedIntArray intArr;void arrayTask(size_t cnt){    import core.thread, std.random;    // ensure the desynchronization of this job.    Thread.sleep( dur!"msecs"(uniform(4, 20)));    shared(size_t[])* arr = null;    // wait for the token    while(arr == null) {arr = intArr.acquire;}    *arr ~= cnt;        import std.stdio;    writeln(*arr);    // release the token for the waiting threads    intArr.release;}void main(string[] args){    import std.parallelism;    foreach(immutable i; 0..16)    {       auto job = task(&arrayTask, i);       job.executeInNewThread();     } }

With the downside that each block of operation over the array must be surrounded with an acquire/release pair.