Python particles simulator: out-of-core processing
Dask.array can perform chunked operations like max
, cumsum
, etc. on an on-disk array like PyTables or h5py.
import h5pyd = h5py.File('myfile.hdf5')['/data']import dask.array as dax = da.from_array(d, chunks=(1000, 1000))
X looks and feels like a numpy array and copies much of the API. Operations on x will create a DAG of in-memory operations which will execute efficiently using multiple cores streaming from disk as necessary
da.exp(x).mean(axis=0).compute()
http://dask.pydata.org/en/latest/
conda install daskor pip install dask
See here for how to store your parameters in the HDF5 file (it pickles, so you can store them how you have them; their is a 64kb limit on the size of the pickle).
import pandas as pd import numpy as np n_particles = 10 chunk_size = 1000 # 1) create a new emission file, compressing as we go emission = pd.HDFStore('emission.hdf',mode='w',complib='blosc') # generate simulated data for i in range(10): df = pd.DataFrame(np.abs(np.random.randn(chunk_size,n_particles)),dtype='float32') # create a globally unique index (time) # http://stackoverflow.com/questions/16997048/how-does-one-append-large-amounts-of- data-to-a-pandas-hdfstore-and-get-a-natural/16999397#16999397 try: nrows = emission.get_storer('df').nrows except: nrows = 0 df.index = pd.Series(df.index) + nrows emission.append('df',df) emission.close() # 2) create counts cs = pd.HDFStore('counts.hdf',mode='w',complib='blosc') # this is an iterator, can be any size for df in pd.read_hdf('emission.hdf','df',chunksize=200): counts = pd.DataFrame(np.random.poisson(lam=df).astype(np.uint8)) # set the index as the same counts.index = df.index # store the sum across all particles (as most are zero this will be a # nice sub-selector # better maybe to have multiple of these sums that divide the particle space # you don't have to do this but prob more efficient # you can do this in another file if you want/need counts['particles_0_4'] = counts.iloc[:,0:4].sum(1) counts['particles_5_9'] = counts.iloc[:,5:9].sum(1) # make the non_zero column indexable cs.append('df',counts,data_columns=['particles_0_4','particles_5_9']) cs.close() # 3) find interesting counts print pd.read_hdf('counts.hdf','df',where='particles_0_4>0') print pd.read_hdf('counts.hdf','df',where='particles_5_9>0')
You can alternatively, make each particle a data_column and select on them individually.
and some output (pretty active emission in this case :)
0 1 2 3 4 5 6 7 8 9 particles_0_4 particles_5_90 2 2 2 3 2 1 0 2 1 0 9 41 1 0 0 0 1 0 1 0 3 0 1 42 0 2 0 0 2 0 0 1 2 0 2 33 0 0 0 1 1 0 0 2 0 3 1 24 3 1 0 2 1 0 0 1 0 0 6 15 1 0 0 1 0 0 0 3 0 0 2 36 0 0 0 1 1 0 1 0 0 0 1 17 0 2 0 2 0 0 0 0 2 0 4 28 0 0 0 1 3 0 0 0 0 1 1 010 1 0 0 0 0 0 0 0 0 1 1 011 0 0 1 1 0 2 0 1 2 1 2 512 0 2 2 4 0 0 1 1 0 1 8 213 0 2 1 0 0 0 0 1 1 0 3 214 1 0 0 0 0 3 0 0 0 0 1 315 0 0 0 1 1 0 0 0 0 0 1 016 0 0 0 4 3 0 3 0 1 0 4 417 0 2 2 3 0 0 2 2 0 2 7 418 0 1 2 1 0 0 3 2 1 2 4 619 1 1 0 0 0 0 1 2 1 1 2 420 0 0 2 1 2 2 1 0 0 1 3 322 0 1 2 2 0 0 0 0 1 0 5 123 0 2 4 1 0 1 2 0 0 2 7 324 1 1 1 0 1 0 0 1 2 0 3 326 1 3 0 4 1 0 0 0 2 1 8 227 0 1 1 4 0 1 2 0 0 0 6 328 0 1 0 0 0 0 0 0 0 0 1 029 0 2 0 0 1 0 1 0 0 0 2 130 0 1 0 2 1 2 0 2 1 1 3 531 0 0 1 1 1 1 1 0 1 1 2 332 3 0 2 1 0 0 1 0 1 0 6 233 1 3 1 0 4 1 1 0 1 4 5 334 1 1 0 0 0 0 0 3 0 1 2 335 0 1 0 0 1 1 2 0 1 0 1 436 1 0 1 0 1 2 1 2 0 1 2 537 0 0 0 1 0 0 0 0 3 0 1 338 2 5 0 0 0 3 0 1 0 0 7 439 1 0 0 2 1 1 3 0 0 1 3 440 0 1 0 0 1 0 0 4 2 2 1 641 0 3 3 1 1 2 0 0 2 0 7 442 0 1 0 2 0 0 0 0 0 1 3 043 0 0 2 0 5 0 3 2 1 1 2 644 0 2 0 1 0 0 1 0 0 0 3 145 1 0 0 2 0 0 0 1 4 0 3 546 0 2 0 0 0 0 0 1 1 0 2 248 3 0 0 0 0 1 1 0 0 0 3 250 0 1 0 1 0 1 0 0 2 1 2 351 0 0 2 0 0 0 2 3 1 1 2 652 0 0 2 3 2 3 1 0 1 5 5 553 0 0 0 2 1 1 0 0 1 1 2 254 0 1 2 2 2 0 1 0 2 0 5 355 0 2 1 0 0 0 0 0 3 2 3 356 0 1 0 0 0 2 2 0 1 1 1 557 0 0 0 1 1 0 0 1 0 0 1 158 6 1 2 0 2 2 0 0 0 0 9 259 0 1 1 0 0 0 0 0 2 0 2 260 2 0 0 0 1 0 0 1 0 1 2 161 0 0 3 1 1 2 0 0 1 0 4 362 2 0 1 0 0 0 0 1 2 1 3 363 2 0 1 0 1 0 1 0 0 0 3 165 0 0 1 0 0 0 1 5 0 1 1 6 .. .. .. .. .. .. .. .. .. .. ... ...[9269 rows x 12 columns]
PyTable Solution
Since functionality provided by Pandas is not needed, and the processing is much slower (see notebook below), the best approach is using PyTables or h5py directly. I've tried only the pytables approach so far.
All tests were performed in this notebook:
Introduction to pytables data-structures
Reference: Official PyTables Docs
Pytables allows store data in HDF5 files in 2 types of formats: arrays and tables.
Arrays
There are 3 types of arrays Array
, CArray
and EArray
. They all allow to store and retrieve (multidimensional) slices with a notation similar to numpy slicing.
# Write data to store (broadcasting works)array1[:] = 3# Read data from storein_ram_array = array1[:]
For optimization in some use cases, CArray
is saved in "chunks", whose size can be chosen with chunk_shape
at creation time.
Array
and CArray
size is fixed at creation time. You can fill/write the array chunk-by-chunk after creation though. Conversely EArray
can be extended with the .append()
method.
Tables
The table
is a quite different beast. It's basically a "table". You have only 1D index and each element is a row. Inside each row there are the "columns" data types, each columns can have a different type. It you are familiar with numpy record-arrays, a table is basically an 1D record-array, with each element having many fields as the columns.
1D or 2D numpy arrays can be stored in tables but it's a bit more tricky: we need to create a row data type. For example to store an 1D uint8 numpy array we need to do:
table_uint8 = np.dtype([('field1', 'u1')])table_1d = data_file.create_table('/', 'array_1d', description=table_uint8)
So why using tables? Because, differently from arrays, tables can be efficiently queried. For example, if we want to search for elements > 3 in a huge disk-based table we can do:
index = table_1d.get_where_list('field1 > 3')
Not only it is simple (compared with arrays where we need to scan the whole file in chunks and build index
in a loop) but it is also very extremely fast.
How to store simulation parameters
The best way to store simulation parameters is to use a group (i.e. /parameters
), convert each scalar to numpy array and store it as CArray
.
Array for "emission
"
emission
is the biggest array that is generated and read sequentially. For this usage pattern A good data structure is EArray
. On "simulated" data with ~50% of zeros elements blosc compression (level=5
) achieves 2.2x compression ratio. I found that a chunk-size of 2^18 (256k) has the minimum processing time.
Storing "counts
"
Storing also "counts
" will increase the file size by 10% and will take 40% more time to compute timestamps. Having counts
stored is not an advantage per-se because only the timestamps are needed in the end.
The advantage is that recostructing the index (timestamps) is simpler because we query the full time axis in a single command (.get_where_list('counts >= 1')
). Conversely, with chunked processing, we need to perform some index arithmetics that is a bit tricky, and maybe a burden to maintain.
However the the code complexity may be small compared to all the other operations (sorting and merging) that are needed in both cases.
Storing "timestamps
"
Timestamps can be accumulated in RAM. However, we don't know the arrays size before starting and a final hstack()
call is needed to "merge" the different chunks stored in a list. This doubles the memory requirements so the RAM may be insufficient.
We can store as-we-go timestamps to a table using .append()
. At the end we can load the table in memory with .read()
. This is only 10% slower than all-in-memory computation but avoids the "double-RAM" requirement. Moreover we can avoid the final full-load and have minimal RAM usage.
H5Py
H5py is a much simpler library than pytables. For this use-case of (mainly) sequential processing seems a better fit than pytables. The only missing feature is the lack of 'blosc' compression. If this results in a big performance penalty remains to be tested.