Python: reading 12-bit binary files
I have a slightly different implementation from the one proposed by @max9111 that doesn't require a call to unpackbits
.
It creates two uint12
values from three consecutive uint8
directly by cutting the middle byte in half and using numpy's binary operations. In the following, data_chunks
is assumed to be a binary string containing the information for an arbitrary number number of 12-bit integers (hence its length must be a multiple of 3).
def read_uint12(data_chunk): data = np.frombuffer(data_chunk, dtype=np.uint8) fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T fst_uint12 = (fst_uint8 << 4) + (mid_uint8 >> 4) snd_uint12 = ((mid_uint8 % 16) << 8) + lst_uint8 return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])
I benchmarked with the other implementation and this approach proved to be ~4x faster on a ~5 Mb input:read_uint12_unpackbits
65.5 ms ± 1.11 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)read_uint12
14 ms ± 513 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
One way to speedup the numpy-vectorized methods is to avoid costly memory allocations for temporary data, use cache more efficently and make use of parallelization. This can be quite easily be done using Numba
, Cython
or C
. Please note that the parallelization is not always beneficial. If the array you want to convert is too small, use the single threaded version (parallel=False
)
Numba version of Cyril Gaudefroy answer with temporary memory allocation
import numba as nbimport numpy as np@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True)def nb_read_uint12(data_chunk): """data_chunk is a contigous 1D array of uint8 data) eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)""" #ensure that the data_chunk has the right length assert np.mod(data_chunk.shape[0],3)==0 out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16) for i in nb.prange(data_chunk.shape[0]//3): fst_uint8=np.uint16(data_chunk[i*3]) mid_uint8=np.uint16(data_chunk[i*3+1]) lst_uint8=np.uint16(data_chunk[i*3+2]) out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4) out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8 return out
Numba version of Cyril Gaudefroy answer with memory preallocation
If you apply this function multiple times on data-chunks of simmilar size you can preallocate the output array only once.
@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)def nb_read_uint12_prealloc(data_chunk,out): """data_chunk is a contigous 1D array of uint8 data) eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)""" #ensure that the data_chunk has the right length assert np.mod(data_chunk.shape[0],3)==0 assert out.shape[0]==data_chunk.shape[0]//3*2 for i in nb.prange(data_chunk.shape[0]//3): fst_uint8=np.uint16(data_chunk[i*3]) mid_uint8=np.uint16(data_chunk[i*3+1]) lst_uint8=np.uint16(data_chunk[i*3+2]) out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4) out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8 return out
Numba version of DGrifffith answer with temporary memory allocation
@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True,cache=True)def read_uint12_var_2(data_chunk): """data_chunk is a contigous 1D array of uint8 data) eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)""" #ensure that the data_chunk has the right length assert np.mod(data_chunk.shape[0],3)==0 out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16) for i in nb.prange(data_chunk.shape[0]//3): fst_uint8=np.uint16(data_chunk[i*3]) mid_uint8=np.uint16(data_chunk[i*3+1]) lst_uint8=np.uint16(data_chunk[i*3+2]) out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4) out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8) return out
Numba version of DGrifffith answer with memory preallocation
@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)def read_uint12_var_2_prealloc(data_chunk,out): """data_chunk is a contigous 1D array of uint8 data) eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)""" #ensure that the data_chunk has the right length assert np.mod(data_chunk.shape[0],3)==0 assert out.shape[0]==data_chunk.shape[0]//3*2 for i in nb.prange(data_chunk.shape[0]//3): fst_uint8=np.uint16(data_chunk[i*3]) mid_uint8=np.uint16(data_chunk[i*3+1]) lst_uint8=np.uint16(data_chunk[i*3+2]) out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4) out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8) return out
Timings
num_Frames=10data_chunk=np.random.randint(low=0,high=255,size=np.int(640*256*1.5*num_Frames),dtype=np.uint8)%timeit read_uint12_gaud(data_chunk)#11.3 ms ± 53.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)#435 MB/s%timeit nb_read_uint12(data_chunk)#939 µs ± 24.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)#5235 MB/sout=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)%timeit nb_read_uint12_prealloc(data_chunk,out)#407 µs ± 5.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)#11759 MB/s%timeit read_uint12_griff(data_chunk)#10.2 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)#491 MB/s%timeit read_uint12_var_2(data_chunk)#928 µs ± 16.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)#5297 MB/s%timeit read_uint12_var_2_prealloc(data_chunk,out)#403 µs ± 13.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)#12227 MB/s
Found @cyrilgaudefroy answer useful. However, initially, it did not work on my 12-bit packed binary image data. Found out that the packing is a bit different in this particular case. The "middle" byte contained the least significant nibbles. Bytes 1 and 3 of the triplet are the most significant 8 bits of the twelve. Hence modified @cyrilgaudefroy answer to:
def read_uint12(data_chunk): data = np.frombuffer(data_chunk, dtype=np.uint8) fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T fst_uint12 = (fst_uint8 << 4) + (mid_uint8 >> 4) snd_uint12 = (lst_uint8 << 4) + (np.bitwise_and(15, mid_uint8)) return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])