Python: reading 12-bit binary files Python: reading 12-bit binary files python-3.x python-3.x

Python: reading 12-bit binary files


I have a slightly different implementation from the one proposed by @max9111 that doesn't require a call to unpackbits.

It creates two uint12 values from three consecutive uint8 directly by cutting the middle byte in half and using numpy's binary operations. In the following, data_chunks is assumed to be a binary string containing the information for an arbitrary number number of 12-bit integers (hence its length must be a multiple of 3).

def read_uint12(data_chunk):    data = np.frombuffer(data_chunk, dtype=np.uint8)    fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T    fst_uint12 = (fst_uint8 << 4) + (mid_uint8 >> 4)    snd_uint12 = ((mid_uint8 % 16) << 8) + lst_uint8    return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])

I benchmarked with the other implementation and this approach proved to be ~4x faster on a ~5 Mb input:
read_uint12_unpackbits 65.5 ms ± 1.11 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)read_uint12 14 ms ± 513 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


One way to speedup the numpy-vectorized methods is to avoid costly memory allocations for temporary data, use cache more efficently and make use of parallelization. This can be quite easily be done using Numba, Cython or C. Please note that the parallelization is not always beneficial. If the array you want to convert is too small, use the single threaded version (parallel=False)

Numba version of Cyril Gaudefroy answer with temporary memory allocation

import numba as nbimport numpy as np@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True)def nb_read_uint12(data_chunk):  """data_chunk is a contigous 1D array of uint8 data)  eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""    #ensure that the data_chunk has the right length  assert np.mod(data_chunk.shape[0],3)==0    out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)    for i in nb.prange(data_chunk.shape[0]//3):    fst_uint8=np.uint16(data_chunk[i*3])    mid_uint8=np.uint16(data_chunk[i*3+1])    lst_uint8=np.uint16(data_chunk[i*3+2])        out[i*2] =   (fst_uint8 << 4) + (mid_uint8 >> 4)    out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8      return out

Numba version of Cyril Gaudefroy answer with memory preallocation

If you apply this function multiple times on data-chunks of simmilar size you can preallocate the output array only once.

@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)def nb_read_uint12_prealloc(data_chunk,out):    """data_chunk is a contigous 1D array of uint8 data)    eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""    #ensure that the data_chunk has the right length    assert np.mod(data_chunk.shape[0],3)==0    assert out.shape[0]==data_chunk.shape[0]//3*2    for i in nb.prange(data_chunk.shape[0]//3):        fst_uint8=np.uint16(data_chunk[i*3])        mid_uint8=np.uint16(data_chunk[i*3+1])        lst_uint8=np.uint16(data_chunk[i*3+2])        out[i*2] =   (fst_uint8 << 4) + (mid_uint8 >> 4)        out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8    return out

Numba version of DGrifffith answer with temporary memory allocation

@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True,cache=True)def read_uint12_var_2(data_chunk):    """data_chunk is a contigous 1D array of uint8 data)    eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""    #ensure that the data_chunk has the right length    assert np.mod(data_chunk.shape[0],3)==0    out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)    for i in nb.prange(data_chunk.shape[0]//3):        fst_uint8=np.uint16(data_chunk[i*3])        mid_uint8=np.uint16(data_chunk[i*3+1])        lst_uint8=np.uint16(data_chunk[i*3+2])        out[i*2] =   (fst_uint8 << 4) + (mid_uint8 >> 4)        out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8)    return out

Numba version of DGrifffith answer with memory preallocation

@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)def read_uint12_var_2_prealloc(data_chunk,out):    """data_chunk is a contigous 1D array of uint8 data)    eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""    #ensure that the data_chunk has the right length    assert np.mod(data_chunk.shape[0],3)==0    assert out.shape[0]==data_chunk.shape[0]//3*2    for i in nb.prange(data_chunk.shape[0]//3):        fst_uint8=np.uint16(data_chunk[i*3])        mid_uint8=np.uint16(data_chunk[i*3+1])        lst_uint8=np.uint16(data_chunk[i*3+2])        out[i*2] =   (fst_uint8 << 4) + (mid_uint8 >> 4)        out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8)    return out

Timings

num_Frames=10data_chunk=np.random.randint(low=0,high=255,size=np.int(640*256*1.5*num_Frames),dtype=np.uint8)%timeit read_uint12_gaud(data_chunk)#11.3 ms ± 53.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)#435 MB/s%timeit nb_read_uint12(data_chunk)#939 µs ± 24.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)#5235 MB/sout=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)%timeit nb_read_uint12_prealloc(data_chunk,out)#407 µs ± 5.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)#11759 MB/s%timeit read_uint12_griff(data_chunk)#10.2 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)#491 MB/s%timeit read_uint12_var_2(data_chunk)#928 µs ± 16.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)#5297 MB/s%timeit read_uint12_var_2_prealloc(data_chunk,out)#403 µs ± 13.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)#12227 MB/s


Found @cyrilgaudefroy answer useful. However, initially, it did not work on my 12-bit packed binary image data. Found out that the packing is a bit different in this particular case. The "middle" byte contained the least significant nibbles. Bytes 1 and 3 of the triplet are the most significant 8 bits of the twelve. Hence modified @cyrilgaudefroy answer to:

def read_uint12(data_chunk):    data = np.frombuffer(data_chunk, dtype=np.uint8)    fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T    fst_uint12 = (fst_uint8 << 4) + (mid_uint8 >> 4)    snd_uint12 = (lst_uint8 << 4) + (np.bitwise_and(15, mid_uint8))    return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])