How can take advantage of multiprocessing and multithreading in Deep learning using Keras? How can take advantage of multiprocessing and multithreading in Deep learning using Keras? multithreading multithreading

How can take advantage of multiprocessing and multithreading in Deep learning using Keras?


It's a good thing that training one model doesn't use all 100% of your CPU! Now we have space to train multiple models in parallel and speed up your overall training times.

NB: If you want to just speed up this model, look into GPUs or changing the hyperparameters like batch size and number of neurons (layer size).

Here's how you can use multiprocessing to train multiple models at the same time (using processes running in parallel on each separate CPU core of your machine).

The multiprocessing.Pool basically creates a pool of jobs that need doing. The processes will pick up these jobs and run them. When a job is finished, the process will pick up another job from the pool.

import timeimport signalimport multiprocessingdef init_worker():    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''    signal.signal(signal.SIGINT, signal.SIG_IGN)def train_model(layer_size):    '''    This code is parallelised and runs on each process    It trains a model with different layer sizes (hyperparameters)    It saves the model and returns the score (error)    '''    import keras    from keras.models import Sequential    from keras.layers import Dense    print(f'Training a model with layer size {layer_size}')    # build your model here    model_RNN = Sequential()    model_RNN.add(Dense(layer_size))    # fit the model (the bit that takes time!)    model_RNN.fit(...)    # lets demonstrate with a sleep timer    time.sleep(5)    # save trained model to a file    model_RNN.save(...)    # you can also return values eg. the eval score    return model_RNN.evaluate(...)num_workers = 4hyperparams = [800, 960, 1100]pool = multiprocessing.Pool(num_workers, init_worker)scores = pool.map(train_model, hyperparams)print(scores)

Output:

Training a model with layer size 800Training a model with layer size 960Training a model with layer size 1100[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]

This is easily demonstrated with a time.sleep in the code. You'll see that all 3 processes start the training job, and then they all finish at about the same time. If this was single processed, you'd have to wait for each to finish before starting the next (yawn!).

EDITOP also wanted full code. This is difficult on Stack Overflow because I can't test in your environment and with your code. I've taken the liberty of copying and pasting your code into my template above. You may need to add some imports but this is as close as you'll get to "runnable" and "full" code.

import timeimport signalimport numpy as npimport pandas as pdimport multiprocessingfrom sklearn.preprocessing import MinMaxScalerfrom sklearn.model_selection import train_test_splitfrom sklearn.metrics import mean_squared_errorfrom sklearn.metrics import accuracy_scoredef init_worker():    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''    signal.signal(signal.SIGINT, signal.SIG_IGN)def train_model(model_type):    '''    This code is parallelised and runs on each process    It trains a model with different layer sizes (hyperparameters)    It saves the model and returns the score (error)    '''    from keras.layers import LSTM, SimpleRNN, Dense, Activation    from keras.models import Sequential    from keras.callbacks import EarlyStopping, ReduceLROnPlateau    from keras.layers.normalization import BatchNormalization    print(f'Training a model: {model_type}')    callbacks = [        EarlyStopping(patience=10, verbose=1),        ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),    ]    model = Sequential()    if model_type == 'rnn':        model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))    elif model_type == 'lstm':        model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))    model.add(Dense(480))    model.add(BatchNormalization())    model.add(Activation('tanh'))    model.compile(loss='mean_squared_error', optimizer='adam')    model.fit(        trainX,        trainY,        epochs=50,        batch_size=20,        validation_data=(testX, testY),        verbose=1,        callbacks=callbacks,    )    # predict    Y_Train_pred = model.predict(trainX)    Y_Test_pred = model.predict(testX)    train_MSE = mean_squared_error(trainY, Y_Train_pred)    test_MSE = mean_squared_error(testY, Y_Test_pred)    # you can also return values eg. the eval score    return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}# Your code# ---------df = pd.read_csv("D:\Train.csv", header=None)index = [i for i in list(range(1440)) if i % 3 == 2]Y_train = df[index]df = df.values# making history by using look-back to prediction nextdef create_dataset(dataset, data_train, look_back=1):    dataX, dataY = [], []    print("Len:", len(dataset) - look_back - 1)    for i in range(len(dataset) - look_back - 1):        a = dataset[i : (i + look_back), :]        dataX.append(a)        dataY.append(data_train[i + look_back, :])    return np.array(dataX), np.array(dataY)Y_train = np.array(Y_train)df = np.array(df)look_back = 10trainX, trainY = create_dataset(df, Y_train, look_back=look_back)# Split data into train & testtrainX, testX, trainY, testY = train_test_split(    trainX, trainY, test_size=0.2, shuffle=False)# My Code# -------num_workers = 2model_types = ['rnn', 'lstm']pool = multiprocessing.Pool(num_workers, init_worker)scores = pool.map(train_model, model_types)print(scores)

Output of the program:

[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866},  {'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]