Atomically increment two integers with CAS
Make me think of a sequence lock. Not very accurate (putting this from memory) but something along the lines of:
let x,y and s be 64 bit integers.
To increment:
atomic s++
(I mean atomic increment using 64 bit CAS op)
memory barrieratomic x++atomic y++atomic s++memory barrier
To read:
do { S1 = load s X = load x Y = load y memory barrier S2 = load s} while (S1 != S2)
If sse2 is available, you can use paddq to add 2 64 bit integers to two other 64 bit integers in one instruction.
#include "emmintrin.h"//initialize your values somewhere://const __m128i ones = _mm_set1_epi64x(1);//volatile register __m128i vars = // _mm_set_epi64x(24,7);static inline __m128i inc_both(__m128i vars, __m128i ones){ return _mm_add_epi64(vars,ones);}
This should compile to
paddq %xmm0, %xmm1
Since it is static inline, it may use other xmm registers though. If there is significant register pressure the ones operands may become ones(℅rip)
Note: this can be used for adding values other than 1 and there are similar operations for most other math, bitwise and compare instructions, should you need them.
So you can use the lock prefix and make it into an inline asm macro
#define inc64x2(vars) asm volatile( \ "paddq %0, %1\n":"+x"(vars):"x"(ones) \ );
The arm neon equivalent is something like: vaddq_s64(...), but there is a great article about arm/x86 equivalents here.
I've got a solution I've tested. Contained herein is a soup to nuts proof of concept program.
The algorithm is a "use CAS thread id gate" as the 3rd integer. I watched the video talk twice, and I believe this qualifies. It may not be the algorithm that the presenter was thinking of, but it does work.
The X and Y values can be anywhere in memory and the program places them far enough away from each other that they are on different cache lines. It doesn't really matter.
A quick description of the algorithm:
Each thread has a unique id number
or tid
(non-zero), taken from one's favorite source: pthead_t
, getpid
, gettid
, make one up by whatever means you want
. In the program, it just assigns them sequentially starting from 1.
Each thread will call the increment function with this number.
The increment function will spin on a global gate
variable using CAS with an old value of 0 and a new value of tid
.
When the CAS succeeds, the thread now "owns" things. In other words, if the gate
is zero, it's up for grabs. A non-zero value is the tid
of the owner and the gate
is locked.
Now, the owner is free to increment the X
and Y
values with simple x += 1
and y += 1
.
After that, the increment function releases by doing a store of 0 into the gate
.
Here is the diagnostic/proof-of-concept program with everything. The algorithm itself has no restrictions, but I coded it for my machine.
Some caveats:
- It assumes
gcc
/clang
- It assumes a 64 bit
x86_64
arch. - This was coded using nothing but inline asm and needs no [nor uses any] compiler
atomic
support for clarity, simplicity, and transparency. - This was built under linux, but should work on any "reasonable" x86 machine/OS (e.g. BSD, OSX should be fine, cygwin probably, and mingw maybe)
- Other arches are fine if they support CAS, I just didn't code for them (e.g.
arm
might work if you code the CAS withldex/stex
pairs) - There are enough abstract primitives that this would/should be easy.
- No attempt at Windows compatibility [if you want it, do your own port but send me no tears--or comments :-)].
- The makefile and program have been defaulted to best values
- Some x86 CPUs may need to use different defaults (e.g. need fence instructions). See the makefile.
Anyway, here it is:
// caslock -- prove cas lock algorithm#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <time.h>#include <pthread.h>#define systls __thread// repeat the madness only once#ifdef __clang__#define inline_common inline#else#define inline_common static inline#endif#define inline_always inline_common __attribute__((__always_inline__))#define inline_never __attribute__((__noinline__))// WARNING: inline CAS fails for gcc but works for clang!#if _USE_CASINLINE_#define inline_cas inline_always#else#define inline_cas inline_never#endiftypedef unsigned int u32;typedef unsigned long long u64;#ifndef LOOPMAX#define LOOPMAX 1000000#endif#ifndef TIDMAX#define TIDMAX 20#endif#if _USE_VPTR_typedef volatile u32 *xptr32_p;typedef volatile u64 *xptr64_p;#elsetypedef u32 *xptr32_p;typedef u64 *xptr64_p;#endif#if _USE_TID64_typedef u64 tid_t;#define tidload(_xptr) loadu64(_xptr)#define tidcas(_xptr,_oval,_nval) casu64(_xptr,_oval,_nval)#define tidstore(_xptr,_nval) storeu64(_xptr,_nval)#elsetypedef u32 tid_t;#define tidload(_xptr) loadu32(_xptr)#define tidcas(_xptr,_oval,_nval) casu32(_xptr,_oval,_nval)#define tidstore(_xptr,_nval) storeu32(_xptr,_nval)#endiftid_t tidgate; // gate controltid_t readycnt; // number of threads readytid_t donecnt; // number of threads complete// ensure that the variables are nowhere near each otheru64 ary[100];#define kickoff ary[32] // sync to fire threads#define xval ary[31] // the X value#define yval ary[87] // the Y valueint inctype; // increment algorithm to usetid_t tidmax; // maximum number of tasksu64 loopmax; // loop maximum for each task// task controlstruct tsk { tid_t tsk_tid; // task id u32 tsk_casmiss; // cas miss count};typedef struct tsk tsk_t;tsk_t *tsklist; // task listsystls tsk_t *tskcur; // current task block// show progress#define PGR(_pgr) \ do { \ fputs(_pgr,stdout); \ fflush(stdout); \ } while (0)// NOTE: some x86 arches need fence instructions// 0 -- no fence instructions// 1 -- use mfence// 2 -- use lfence/sfence#if _USE_BARRIER_ == 0#define BARRIER_RELEASE ""#define BARRIER_ACQUIRE ""#define BARRIER_ALL ""#elif _USE_BARRIER_ == 1#define BARRIER_ACQUIRE "\tmfence\n"#define BARRIER_RELEASE "\tmfence\n"#define BARRIER_ALL "\tmfence\n"#elif _USE_BARRIER_ == 2#define BARRIER_ACQUIRE "\tlfence\n"#define BARRIER_RELEASE "\tsfence\n"#define BARRIER_ALL "\tmfence\n"#else#error caslock: unknown barrier type#endif// barrier_acquire -- acquire barrierinline_always voidbarrier_acquire(void){ __asm__ __volatile__ ( BARRIER_ACQUIRE : : : "memory");}// barrier_release -- release barrierinline_always voidbarrier_release(void){ __asm__ __volatile__ ( BARRIER_RELEASE : : : "memory");}// barrier -- barrierinline_always voidbarrier(void){ __asm__ __volatile__ ( BARRIER_ALL : : : "memory");}// casu32 -- compare and exchange four bytes// RETURNS: 1=ok, 0=failinline_cas intcasu32(xptr32_p xptr,u32 oldval,u32 newval){ char ok; __asm__ __volatile__ ( " lock\n" " cmpxchg %[newval],%[xptr]\n" " sete %[ok]\n" : [ok] "=r" (ok), [xptr] "=m" (*xptr) : "a" (oldval), [newval] "r" (newval) : "memory"); return ok;}// casu64 -- compare and exchange eight bytes// RETURNS: 1=ok, 0=failinline_cas intcasu64(xptr64_p xptr,u64 oldval,u64 newval){ char ok; __asm__ __volatile__ ( " lock\n" " cmpxchg %[newval],%[xptr]\n" " sete %[ok]\n" : [ok] "=r" (ok), [xptr] "=m" (*xptr) : "a" (oldval), [newval] "r" (newval) : "memory"); return ok;}// loadu32 -- load value with barrier// RETURNS: loaded valueinline_always u32loadu32(const xptr32_p xptr){ u32 val; barrier_acquire(); val = *xptr; return val;}// loadu64 -- load value with barrier// RETURNS: loaded valueinline_always u64loadu64(const xptr64_p xptr){ u64 val; barrier_acquire(); val = *xptr; return val;}// storeu32 -- store value with barrierinline_always voidstoreu32(xptr32_p xptr,u32 val){ *xptr = val; barrier_release();}// storeu64 -- store value with barrierinline_always voidstoreu64(xptr64_p xptr,u64 val){ *xptr = val; barrier_release();}// qsleep -- do a quick sleepinline_always voidqsleep(int bigflg){ struct timespec ts; if (bigflg) { ts.tv_sec = 1; ts.tv_nsec = 0; } else { ts.tv_sec = 0; ts.tv_nsec = 1000; } nanosleep(&ts,NULL);}// incby_tidgate -- increment by using thread id gatevoidincby_tidgate(tid_t tid)// tid -- unique id for accessing entity (e.g. thread id){ tid_t *gptr; tid_t oval; gptr = &tidgate; // acquire the gate while (1) { oval = 0; // test mode -- just do a nop instead of CAS to prove diagnostic#if _USE_CASOFF_ *gptr = oval; break;#else if (tidcas(gptr,oval,tid)) break;#endif ++tskcur->tsk_casmiss; }#if _USE_INCBARRIER_ barrier_acquire();#endif // increment the values xval += 1; yval += 1;#if _USE_INCBARRIER_ barrier_release();#endif // release the gate // NOTE: CAS will always provide a barrier#if _USE_CASPOST_ && (_USE_CASOFF_ == 0) oval = tidcas(gptr,tid,0);#else tidstore(gptr,0);#endif}// tskcld -- child taskvoid *tskcld(void *arg){ tid_t tid; tid_t oval; u64 loopcur; tskcur = arg; tid = tskcur->tsk_tid; // tell master thread that we're fully ready while (1) { oval = tidload(&readycnt); if (tidcas(&readycnt,oval,oval + 1)) break; } // wait until we're given the starting gun while (1) { if (loadu64(&kickoff)) break; qsleep(0); } // do the increments for (loopcur = loopmax; loopcur > 0; --loopcur) incby_tidgate(tid); barrier(); // tell master thread that we're fully complete while (1) { oval = tidload(&donecnt); if (tidcas(&donecnt,oval,oval + 1)) break; } return (void *) 0;}// tskstart -- start a child taskvoidtskstart(tid_t tid){ pthread_attr_t attr; pthread_t thr; int err; tsk_t *tsk; tsk = tsklist + tid; tsk->tsk_tid = tid; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,1); err = pthread_create(&thr,&attr,tskcld,tsk); pthread_attr_destroy(&attr); if (err) printf("tskstart: error -- err=%d\n",err);}// tskall -- run a single testvoidtskall(void){ tid_t tidcur; tsk_t *tsk; u64 incmax; u64 val; int err; xval = 0; yval = 0; kickoff = 0; readycnt = 0; donecnt = 0; tidgate = 0; // prealloc the task blocks tsklist = calloc(tidmax + 1,sizeof(tsk_t)); // start all tasks PGR(" St"); for (tidcur = 1; tidcur <= tidmax; ++tidcur) tskstart(tidcur); // wait for all tasks to be fully ready PGR(" Sw"); while (1) { if (tidload(&readycnt) == tidmax) break; qsleep(1); } // the starting gun -- all tasks are waiting for this PGR(" Ko"); storeu64(&kickoff,1); // wait for all tasks to be fully done PGR(" Wd"); while (1) { if (tidload(&donecnt) == tidmax) break; qsleep(1); } PGR(" Done\n"); // check the final count incmax = loopmax * tidmax; // show per-task statistics for (tidcur = 1; tidcur <= tidmax; ++tidcur) { tsk = tsklist + tidcur; printf("tskall: tsk=%llu tsk_casmiss=%d (%.3f%%)\n", (u64) tidcur,tsk->tsk_casmiss,(double) tsk->tsk_casmiss / loopmax); } err = 0; // check for failure val = loadu64(&xval); if (val != incmax) { printf("tskall: xval fault -- xval=%lld incmax=%lld\n",val,incmax); err = 1; } // check for failure val = loadu64(&yval); if (val != incmax) { printf("tskall: yval fault -- yval=%lld incmax=%lld\n",val,incmax); err = 1; } if (! err) printf("tskall: SUCCESS\n"); free(tsklist);}// main -- master controlintmain(void){ loopmax = LOOPMAX; tidmax = TIDMAX; inctype = 0; tskall(); return 0;}
Here is the Makefile. Sorry for the extra boilerplate:
# caslock/Makefile -- make file for caslock## options:# LOOPMAX -- maximum loops / thread## TIDMAX -- maximum number of threads## BARRIER -- generate fence/barrier instructions# 0 -- none# 1 -- use mfence everywhere# 2 -- use lfence for acquire, sfence for release## CASOFF -- disable CAS to prove diagnostic works# 0 -- normal mode# 1 -- inhibit CAS during X/Y increment## CASINLINE -- inline the CAS functions# 0 -- do _not_ inline# 1 -- inline them (WARNING: this fails for gcc but works for clang!)## CASPOST -- increment gate release mode# 0 -- use fenced store# 1 -- use CAS store (NOTE: not really required)## INCBARRIER -- use extra barriers around increments# 0 -- rely on CAS for barrier# 1 -- add extra safety barriers immediately before increment of X/Y## TID64 -- use 64 bit thread "id"s# 0 -- use 32 bit# 1 -- use 64 bit## VPTR -- use volatile pointers in function definitions# 0 -- use ordinary pointers# 1 -- use volatile pointers (NOTE: not really required)ifndef _CASLOCK_MK__CASLOCK_MK_ = 1 OLIST += caslock.o ifndef LOOPMAX LOOPMAX = 1000000 endif ifndef TIDMAX TIDMAX = 20 endif ifndef BARRIER BARRIER = 0 endif ifndef CASINLINE CASINLINE = 0 endif ifndef CASOFF CASOFF = 0 endif ifndef CASPOST CASPOST = 0 endif ifndef INCBARRIER INCBARRIER = 0 endif ifndef TID64 TID64 = 0 endif ifndef VPTR VPTR = 0 endif CFLAGS += -DLOOPMAX=$(LOOPMAX) CFLAGS += -DTIDMAX=$(TIDMAX) CFLAGS += -D_USE_BARRIER_=$(BARRIER) CFLAGS += -D_USE_CASINLINE_=$(CASINLINE) CFLAGS += -D_USE_CASOFF_=$(CASOFF) CFLAGS += -D_USE_CASPOST_=$(CASPOST) CFLAGS += -D_USE_INCBARRIER_=$(INCBARRIER) CFLAGS += -D_USE_TID64_=$(TID64) CFLAGS += -D_USE_VPTR_=$(VPTR) STDLIB += -lpthread ALL += caslock CLEAN += caslock OVRPUB := 1 ifndef OVRTOP OVRTOP := $(shell pwd) OVRTOP := $(dir $(OVRTOP)) endifendif# ovrlib/rules.mk -- rules control## options:# GDB -- enable debug symbols# 0 -- normal# 1 -- use -O0 and define _USE_GDB_=1## CLANG -- use clang instead of gcc# 0 -- use gcc# 1 -- use clang## BNC -- enable benchmarks# 0 -- normal mode# 1 -- enable benchmarks for function enter/exit pairsifdef OVRPUB ifndef SDIR SDIR := $(shell pwd) STAIL := $(notdir $(SDIR)) endif ifndef GENTOP GENTOP := $(dir $(SDIR)) endif ifndef GENDIR GENDIR := $(GENTOP)/$(STAIL) endif ifndef ODIR ODIR := $(GENDIR) endif PROTOLST := true PROTOGEN := @trueendififndef SDIR $(error rules: SDIR not defined)endififndef ODIR $(error rules: ODIR not defined)endififndef GENDIR $(error rules: GENDIR not defined)endififndef GENTOP $(error rules: GENTOP not defined)endififndef _RULES_MK__RULES_MK_ = 1 CLEAN += *.proto CLEAN += *.a CLEAN += *.o CLEAN += *.i CLEAN += *.dis CLEAN += *.TMP QPROTO := $(shell $(PROTOLST) -i -l -O$(GENTOP) $(SDIR)/*.c $(CPROTO)) HDEP += $(QPROTO)###VPATH += $(GENDIR)###VPATH += $(SDIR) ifdef INCLUDE_MK -include $(INCLUDE_MK) endif ifdef GSYM CFLAGS += -gdwarf-2 endif ifdef GDB CFLAGS += -gdwarf-2 DFLAGS += -D_USE_GDB_ else CFLAGS += -O2 endif ifndef ZPRT DFLAGS += -D_USE_ZPRT_=0 endif ifdef BNC DFLAGS += -D_USE_BNC_=1 endif ifdef CLANG CC := clang endif DFLAGS += -I$(GENTOP) DFLAGS += -I$(OVRTOP) CFLAGS += -Wall -Werror CFLAGS += -Wno-unknown-pragmas CFLAGS += -Wempty-body CFLAGS += -fno-diagnostics-color # NOTE: we now need this to prevent inlining (enabled at -O2) ifndef CLANG CFLAGS += -fno-inline-small-functions endif # NOTE: we now need this to prevent inlining (enabled at -O3) CFLAGS += -fno-inline-functions CFLAGS += $(DFLAGS)endifall: $(PREP) proto $(ALL)%.o: %.c $(HDEP) $(CC) $(CFLAGS) -c -o $*.o $<%.i: %.c cpp $(DFLAGS) -P $*.c > $*.i%.s: %.c $(CC) $(CFLAGS) -S -o $*.s $<# build a library (type (2) build)$(LIBNAME):: $(OLIST) ar rv $@ $(OLIST).PHONY: protoproto:: $(PROTOGEN) -i -v -O$(GENTOP) $(SDIR)/*.c $(CPROTO).PHONY: cleanclean:: rm -f $(CLEAN).PHONY: helphelp:: egrep '^#' Makefilecaslock:: $(OLIST) $(LIBLIST) $(STDLIB) $(CC) $(CFLAGS) -o caslock $(OLIST) $(LIBLIST) $(STDLIB)
NOTE: I may have blown some of the asm constraints because when doing the CAS function as an inline, compiling with gcc
produces incorrect results. However, clang
works fine with inline. So, the default is that the CAS function is not inline. For consistency, I didn't use a different default for gcc/clang, even though I could.
Here's the disassembly of the relevant function with inline as built by gcc
(this fails):
00000000004009c0 <incby_tidgate>: 4009c0: 31 c0 xor %eax,%eax 4009c2: f0 0f b1 3d 3a 1a 20 lock cmpxchg %edi,0x201a3a(%rip) # 602404 <tidgate> 4009c9: 00 4009ca: 0f 94 c2 sete %dl 4009cd: 84 d2 test %dl,%dl 4009cf: 75 23 jne 4009f4 <L01> 4009d1: 0f 1f 80 00 00 00 00 nopl 0x0(%rax) 4009d8:L00 64 48 8b 14 25 f8 ff mov %fs:0xfffffffffffffff8,%rdx 4009df: ff ff 4009e1: 83 42 04 01 addl $0x1,0x4(%rdx) 4009e5: f0 0f b1 3d 17 1a 20 lock cmpxchg %edi,0x201a17(%rip) # 602404 <tidgate> 4009ec: 00 4009ed: 0f 94 c2 sete %dl 4009f0: 84 d2 test %dl,%dl 4009f2: 74 e4 je 4009d8 <L00> 4009f4:L01 48 83 05 dc 17 20 00 addq $0x1,0x2017dc(%rip) # 6021d8 <ary+0xf8> 4009fb: 01 4009fc: 48 83 05 94 19 20 00 addq $0x1,0x201994(%rip) # 602398 <ary+0x2b8> 400a03: 01 400a04: c7 05 f6 19 20 00 00 movl $0x0,0x2019f6(%rip) # 602404 <tidgate> 400a0b: 00 00 00 400a0e: c3 retq
Here's the disassembly of the relevant function with inline as built by clang
(this succeeds):
0000000000400990 <incby_tidgate>: 400990: 31 c0 xor %eax,%eax 400992: f0 0f b1 3d 3a 1a 20 lock cmpxchg %edi,0x201a3a(%rip) # 6023d4 <tidgate> 400999: 00 40099a: 0f 94 c0 sete %al 40099d: eb 1a jmp 4009b9 <L01> 40099f: 90 nop 4009a0:L00 64 48 8b 04 25 f8 ff mov %fs:0xfffffffffffffff8,%rax 4009a7: ff ff 4009a9: ff 40 04 incl 0x4(%rax) 4009ac: 31 c0 xor %eax,%eax 4009ae: f0 0f b1 3d 1e 1a 20 lock cmpxchg %edi,0x201a1e(%rip) # 6023d4 <tidgate> 4009b5: 00 4009b6: 0f 94 c0 sete %al 4009b9:L01 84 c0 test %al,%al 4009bb: 74 e3 je 4009a0 <L00> 4009bd: 48 ff 05 e4 17 20 00 incq 0x2017e4(%rip) # 6021a8 <ary+0xf8> 4009c4: 48 ff 05 9d 19 20 00 incq 0x20199d(%rip) # 602368 <ary+0x2b8> 4009cb: c7 05 ff 19 20 00 00 movl $0x0,0x2019ff(%rip) # 6023d4 <tidgate> 4009d2: 00 00 00 4009d5: c3 retq 4009d6: 66 2e 0f 1f 84 00 00 nopw %cs:0x0(%rax,%rax,1) 4009dd: 00 00 00