Python: rewrite a looping numpy math function to run on GPU Python: rewrite a looping numpy math function to run on GPU numpy numpy

Python: rewrite a looping numpy math function to run on GPU


Tweak #1

Its usually advised to vectorize things when working with NumPy arrays. But with very large arrays, I think you are out of options there. So, to boost performance, a minor tweak is possible to optimize on the last step of summing.

We could replace the step that makes an array of 1s and 0s and does summing :

np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

with np.count_nonzero that works efficiently to count True values in a boolean array, instead of converting to 1s and 0s -

np.count_nonzero((abcd <= data2a) & (abcd >= data2b))

Runtime test -

In [45]: abcd = np.random.randint(11,99,(10000))In [46]: data2a = np.random.randint(11,99,(10000))In [47]: data2b = np.random.randint(11,99,(10000))In [48]: %timeit np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()10000 loops, best of 3: 81.8 µs per loopIn [49]: %timeit np.count_nonzero((abcd <= data2a) & (abcd >= data2b))10000 loops, best of 3: 28.8 µs per loop

Tweak #2

Use a pre-computed reciprocal when dealing with cases that undergo implicit broadcasting. Some more info here. Thus, store reciprocal of dif and use that instead at the step :

((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ...

Sample test -

In [52]: A = np.random.rand(10000)In [53]: dif = 0.5In [54]: %timeit A/dif10000 loops, best of 3: 25.8 µs per loopIn [55]: %timeit A*(1.0/dif)100000 loops, best of 3: 7.94 µs per loop

You have four places using division by dif. So, hopefully this would bring out noticeable boost there too!


Introduction and solution code

Well, you asked for it! So, listed in this post is an implementation with PyCUDA that uses lightweight wrappers extending most of CUDA's capabilities within Python environment. We will its SourceModule functionality that lets us write and compile CUDA kernels staying in Python environment.

Getting to the problem at hand, among the computations involved, we have sliding maximum and minimum, few differences and divisions and comparisons. For the maximum and minimum parts, that involves block max finding (for each sliding window), we will use reduction-technique as discussed in some detail here. This would be done at block level. For the upper level iterations across sliding windows, we would use the grid level indexing into CUDA resources. For more info on this block and grid format, please refer to page-18. PyCUDA also supports builtins for computing reductions like max and min, but we lose control, specifically we intend to use specialized memory like shared and constant memory for leveraging GPU at its near to optimum level.

Listing out the PyCUDA-NumPy solution code -

1] PyCUDA part -

import pycuda.autoinitimport pycuda.driver as drvimport numpy as npfrom pycuda.compiler import SourceModulemod = SourceModule("""#define TBP 1024 // THREADS_PER_BLOCK__device__ void get_Bmax_Cmin(float* out, float *d1, float *d2, int L, int offset){    int tid = threadIdx.x;    int inv = TBP;    __shared__ float dS[TBP][2];    dS[tid][0] = d1[tid+offset];      dS[tid][1] = d2[tid+offset];             __syncthreads();    if(tid<L-TBP)      {        dS[tid][0] = fmaxf(dS[tid][0] , d1[tid+inv+offset]);        dS[tid][1] = fminf(dS[tid][1] , d2[tid+inv+offset]);    }    __syncthreads();    inv = inv/2;    while(inv!=0)       {        if(tid<inv)        {            dS[tid][0] = fmaxf(dS[tid][0] , dS[tid+inv][0]);            dS[tid][1] = fminf(dS[tid][1] , dS[tid+inv][1]);        }        __syncthreads();        inv = inv/2;    }    __syncthreads();    if(tid==0)    {        out[0] = dS[0][0];        out[1] = dS[0][1];    }       __syncthreads();}__global__ void main1(float* out, float *d0, float *d1, float *d2, float *d3, float *lowL, float *highL, int *BLOCKLEN){    int L = BLOCKLEN[0];    int tid = threadIdx.x;    int iterID = blockIdx.x;    float Bmax_Cmin[2];    int inv;    float Cmin, dif;       __shared__ float dS[TBP*2];       get_Bmax_Cmin(Bmax_Cmin, d1, d2, L, iterID);      Cmin = Bmax_Cmin[1];    dif = (Bmax_Cmin[0] - Cmin);    inv = TBP;    dS[tid] = (d0[tid+iterID] + d1[tid+iterID] + d2[tid+iterID] + d3[tid+iterID] - 4.0*Cmin) / (4.0*dif);    __syncthreads();    if(tid<L-TBP)          dS[tid+inv] = (d0[tid+inv+iterID] + d1[tid+inv+iterID] + d2[tid+inv+iterID] + d3[tid+inv+iterID] - 4.0*Cmin) / (4.0*dif);                        dS[tid] = ((dS[tid] >= lowL[tid]) & (dS[tid] <= highL[tid])) ? 1 : 0;     __syncthreads();     if(tid<L-TBP)         dS[tid] += ((dS[tid+inv] >= lowL[tid+inv]) & (dS[tid+inv] <= highL[tid+inv])) ? 1 : 0;     __syncthreads();    inv = inv/2;    while(inv!=0)       {        if(tid<inv)            dS[tid] += dS[tid+inv];        __syncthreads();        inv = inv/2;    }    if(tid==0)        out[iterID] = dS[0];    __syncthreads();}""")

Please note that THREADS_PER_BLOCK, TBP is to be set based on the batchSize. The rule of thumb here is to assign power of 2 value to TBP that is just lesser than batchSize. Thus, for batchSize = 2000, we needed TBP as 1024.

2] NumPy part -

def gpu_app_v1(A, B, C, D, batchSize, minimumLimit):    func1 = mod.get_function("main1")    outlen = len(A)-batchSize+1    # Set block and grid sizes    BSZ = (1024,1,1)    GSZ = (outlen,1)    dest = np.zeros(outlen).astype(np.float32)    N = np.int32(batchSize)    func1(drv.Out(dest), drv.In(A), drv.In(B), drv.In(C), drv.In(D), \                     drv.In(data2b), drv.In(data2a),\                     drv.In(N), block=BSZ, grid=GSZ)    idx = np.flatnonzero(dest >= minimumLimit)    return idx, dest[idx]

Benchmarking

I have tested on GTX 960M. Please note that PyCUDA expects arrays to be of contiguous order. So, we need to slice the columns and make copies. I am expecting/assuming that the data could be read from the files such that the data is spread along rows instead of being as columns. Thus, keeping those out of the benchmarking function for now.

Original approach -

def org_app(data1, batchSize, minimumLimit):    resultArray = []    for rowNr in  range(data1.shape[0]-batchSize+1):        tmp_df = data1[rowNr:rowNr + batchSize] #rolling window        result = doTheMath(tmp_df, data2a, data2b)        if (result >= minimumLimit):            resultArray.append([rowNr , result])     return resultArray

Timings and verification -

In [2]: #Declare variables   ...: batchSize = 2000   ...: sampleSize = 50000   ...: resultArray = []   ...: minimumLimit = 490 #use 400 on the real sample data   ...:    ...: #Create Random Sample Data   ...: data1 = np.random.uniform(1, 100000, (sampleSize + batchSize, 4)).astype(np.float32)   ...: data2b = np.random.uniform(0, 1, (batchSize)).astype(np.float32)   ...: data2a = data2b + np.random.uniform(0, 1, (batchSize)).astype(np.float32)   ...:    ...: # Make column copies   ...: A = data1[:,0].copy()   ...: B = data1[:,1].copy()   ...: C = data1[:,2].copy()   ...: D = data1[:,3].copy()   ...:    ...: gpu_out1,gpu_out2 = gpu_app_v1(A, B, C, D, batchSize, minimumLimit)   ...: cpu_out1,cpu_out2 = np.array(org_app(data1, batchSize, minimumLimit)).T   ...: print(np.allclose(gpu_out1, cpu_out1))   ...: print(np.allclose(gpu_out2, cpu_out2))   ...: TrueFalse

So, there's some differences between CPU and GPU countings. Let's investigate them -

In [7]: idx = np.flatnonzero(~np.isclose(gpu_out2, cpu_out2))In [8]: idxOut[8]: array([12776, 15208, 17620, 18326])In [9]: gpu_out2[idx] - cpu_out2[idx]Out[9]: array([-1., -1.,  1.,  1.])

There are four instances of non-matching counts. These are off at max by 1. Upon research, I came across some information on this. Basically, since we are using math intrinsics for max and min computations and those I think are causing the last binary bit in the floating pt representation to be diferent than the CPU counterpart. This is termed as ULP error and has been discused in detail here and here.

Finally, puting the issue aside, let's get to the most important bit, the performance -

In [10]: %timeit org_app(data1, batchSize, minimumLimit)1 loops, best of 3: 2.18 s per loopIn [11]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)10 loops, best of 3: 82.5 ms per loopIn [12]: 2180.0/82.5Out[12]: 26.424242424242426

Let's try with bigger datasets. With sampleSize = 500000, we get -

In [14]: %timeit org_app(data1, batchSize, minimumLimit)1 loops, best of 3: 23.2 s per loopIn [15]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)1 loops, best of 3: 821 ms per loopIn [16]: 23200.0/821Out[16]: 28.25822168087698

So, the speedup stays constant at around 27.

Limitations :

1) We are using float32 numbers, as GPUs work best with those. Double precision specially on non-server GPUs aren't popular when it comes to performance and since you are working with such a GPU, I tested with float32.

Further improvement :

1) We could use faster constant memory to feed in data2a and data2b, rather than use global memory.


Before you start tweaking the target (GPU) or using anything else (i.e. parallel executions ), you might want to consider how to improve the already existing code. You used the -tag so I'll use it to improve the code: First we operate on arrays not on matrices:

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limitdata2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit

Each time you call doTheMath you expect an integer back, however you use a lot of arrays and create a lot of intermediate arrays:

abcd = ((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ((C   - Cmin) / dif) + ((D - Cmin) / dif)) / 4)return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

This creates an intermediate array each step:

  • tmp1 = A-Cmin,
  • tmp2 = tmp1 / dif,
  • tmp3 = B - Cmin,
  • tmp4 = tmp3 / dif
  • ... you get the gist.

However this is a reduce function (array -> integer) so having a lot of intermediate arrays is unnecessary weight, just calculate the value of the "fly".

import numba as nb@nb.njitdef doTheMathNumba(tmpData, data2a, data2b):    Bmax = np.max(tmpData[:, 1])    Cmin = np.min(tmpData[:, 2])    diff = (Bmax - Cmin)    idiff = 1 / diff    sum_ = 0    for i in range(tmpData.shape[0]):        val = (tmpData[i, 0] + tmpData[i, 1] + tmpData[i, 2] + tmpData[i, 3]) / 4 * idiff - Cmin * idiff        if val <= data2a[i] and val >= data2b[i]:            sum_ += 1    return sum_

I did something else here to avoid multiple operations:

(((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4= ((A - Cmin + B - Cmin + C - Cmin + D - Cmin) / dif) / 4= (A + B + C + D - 4 * Cmin) / (4 * dif)= (A + B + C + D) / (4 * dif) - (Cmin / dif)

This actually cuts down the execution time by almost a factor of 10 on my computer:

%timeit doTheMath(tmp_df, data2a, data2b)       # 1000 loops, best of 3: 446 µs per loop%timeit doTheMathNumba(tmp_df, data2a, data2b)  # 10000 loops, best of 3: 59 µs per loop

There are certainly also other improvements, for example using a rolling min/max to calculate Bmax and Cmin, that would make at least part of the calculation run in O(sampleSize) instead of O(samplesize * batchsize). This would also make it possible to reuse some of the (A + B + C + D) / (4 * dif) - (Cmin / dif) calculations because if Cmin and Bmax don't change for the next sample these values don't differ. It's a bit complicated to do because the comparisons differ. But definitely possible! See here:

import timeimport numpy as npimport numba as nb@nb.njitdef doTheMathNumba(abcd, data2a, data2b, Bmax, Cmin):    diff = (Bmax - Cmin)    idiff = 1 / diff    quarter_idiff = 0.25 * idiff    sum_ = 0    for i in range(abcd.shape[0]):        val = abcd[i] * quarter_idiff - Cmin * idiff        if val <= data2a[i] and val >= data2b[i]:            sum_ += 1    return sum_@nb.njitdef doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, resultArray):    found = 0    for rowNr in range(data1.shape[0]):        if(abcd[rowNr:rowNr + batchSize].shape[0] == batchSize):            result = doTheMathNumba(abcd[rowNr:rowNr + batchSize],                                     data2a, data2b, Bmaxs[rowNr], Cmins[rowNr])            if (result >= minimumLimit):                resultArray[found, 0] = rowNr                resultArray[found, 1] = result                found += 1    return resultArray[:found]#Declare variablesbatchSize = 2000sampleSize = 50000resultArray = []minimumLimit = 490 #use 400 on the real sample data data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limitdata2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limitfrom scipy import ndimaget0 = time.time()abcd = np.sum(data1, axis=1)Bmaxs = ndimage.maximum_filter1d(data1[:, 1],                                  size=batchSize,                                  origin=-((batchSize-1)//2-1))  # correction for even shapesCmins = ndimage.minimum_filter1d(data1[:, 2],                                  size=batchSize,                                  origin=-((batchSize-1)//2-1))result = np.zeros((sampleSize, 2), dtype=np.int64)doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, result)print('Runtime:', time.time() - t0)

This gives me a Runtime: 0.759593152999878 (after numba compiled the functions!), while your original took had Runtime: 24.68975639343262. Now we're 30 times faster!

With your sample size it still takes Runtime: 60.187848806381226 but that's not too bad, right?

And even if I haven't done this myself, says that it's possible to write "Numba for CUDA GPUs" and it doesn't seem to complicated.