Compute *rolling* maximum drawdown of pandas Series Compute *rolling* maximum drawdown of pandas Series numpy numpy

Compute *rolling* maximum drawdown of pandas Series


Here's a numpy version of the rolling maximum drawdown function. windowed_view is a wrapper of a one-line function that uses numpy.lib.stride_tricks.as_strided to make a memory efficient 2d windowed view of the 1d array (full code below). Once we have this windowed view, the calculation is basically the same as your max_dd, but written for a numpy array, and applied along the second axis (i.e. axis=1).

def rolling_max_dd(x, window_size, min_periods=1):    """Compute the rolling maximum drawdown of `x`.    `x` must be a 1d numpy array.    `min_periods` should satisfy `1 <= min_periods <= window_size`.    Returns an 1d array with length `len(x) - min_periods + 1`.    """    if min_periods < window_size:        pad = np.empty(window_size - min_periods)        pad.fill(x[0])        x = np.concatenate((pad, x))    y = windowed_view(x, window_size)    running_max_y = np.maximum.accumulate(y, axis=1)    dd = y - running_max_y    return dd.min(axis=1)

Here's a complete script that demonstrates the function:

import numpy as npfrom numpy.lib.stride_tricks import as_stridedimport pandas as pdimport matplotlib.pyplot as pltdef windowed_view(x, window_size):    """Creat a 2d windowed view of a 1d array.    `x` must be a 1d numpy array.    `numpy.lib.stride_tricks.as_strided` is used to create the view.    The data is not copied.    Example:    >>> x = np.array([1, 2, 3, 4, 5, 6])    >>> windowed_view(x, 3)    array([[1, 2, 3],           [2, 3, 4],           [3, 4, 5],           [4, 5, 6]])    """    y = as_strided(x, shape=(x.size - window_size + 1, window_size),                   strides=(x.strides[0], x.strides[0]))    return ydef rolling_max_dd(x, window_size, min_periods=1):    """Compute the rolling maximum drawdown of `x`.    `x` must be a 1d numpy array.    `min_periods` should satisfy `1 <= min_periods <= window_size`.    Returns an 1d array with length `len(x) - min_periods + 1`.    """    if min_periods < window_size:        pad = np.empty(window_size - min_periods)        pad.fill(x[0])        x = np.concatenate((pad, x))    y = windowed_view(x, window_size)    running_max_y = np.maximum.accumulate(y, axis=1)    dd = y - running_max_y    return dd.min(axis=1)def max_dd(ser):    max2here = pd.expanding_max(ser)    dd2here = ser - max2here    return dd2here.min()if __name__ == "__main__":    np.random.seed(0)    n = 100    s = pd.Series(np.random.randn(n).cumsum())    window_length = 10    rolling_dd = pd.rolling_apply(s, window_length, max_dd, min_periods=0)    df = pd.concat([s, rolling_dd], axis=1)    df.columns = ['s', 'rol_dd_%d' % window_length]    df.plot(linewidth=3, alpha=0.4)    my_rmdd = rolling_max_dd(s.values, window_length, min_periods=1)    plt.plot(my_rmdd, 'g.')    plt.show()

The plot shows the curves generated by your code. The green dots are computed by rolling_max_dd.

rolling drawdown plot

Timing comparison, with n = 10000 and window_length = 500:

In [2]: %timeit rolling_dd = pd.rolling_apply(s, window_length, max_dd, min_periods=0)1 loops, best of 3: 247 ms per loopIn [3]: %timeit my_rmdd = rolling_max_dd(s.values, window_length, min_periods=1)10 loops, best of 3: 38.2 ms per loop

rolling_max_dd is about 6.5 times faster. The speedup is better for smaller window lengths. For example, with window_length = 200, it is almost 13 times faster.

To handle NA's, you could preprocess the Series using the fillna method before passing the array to rolling_max_dd.


For the sake of posterity and for completeness, here's what I wound up with in Cython. MemoryViews materially sped things up. There was a bit of work to do to make sure I'd properly typed everything (sorry, new to c-type languages). But in the end I think it works nicely. For typical use cases, the speedup vs regular python was ~100x or ~150x. The function to call is cy_rolling_dd_custom_mv where the first argument (ser) should be a 1-d numpy array and the second argument (window) should be a positive integer. The function returns a numpy memoryview, which works well enough in most cases. You can explicitly call np.array(result) if you need to to get a nice array of the output:

import numpy as npcimport numpy as npcimport cythonDTYPE = np.float64ctypedef np.float64_t DTYPE_t@cython.boundscheck(False)@cython.wraparound(False)@cython.nonecheck(False)cpdef tuple cy_dd_custom_mv(double[:] ser):    cdef double running_global_peak = ser[0]    cdef double min_since_global_peak = ser[0]    cdef double running_max_dd = 0    cdef long running_global_peak_id = 0    cdef long running_max_dd_peak_id = 0    cdef long running_max_dd_trough_id = 0    cdef long i    cdef double val    for i in xrange(ser.shape[0]):        val = ser[i]        if val >= running_global_peak:            running_global_peak = val            running_global_peak_id = i            min_since_global_peak = val        if val < min_since_global_peak:            min_since_global_peak = val            if val - running_global_peak <= running_max_dd:                running_max_dd = val - running_global_peak                running_max_dd_peak_id = running_global_peak_id                running_max_dd_trough_id = i    return (running_max_dd, running_max_dd_peak_id, running_max_dd_trough_id, running_global_peak_id)@cython.boundscheck(False)@cython.wraparound(False)@cython.nonecheck(False)def cy_rolling_dd_custom_mv(double[:] ser, long window):    cdef double[:, :] result    result = np.zeros((ser.shape[0], 4))    cdef double running_global_peak = ser[0]    cdef double min_since_global_peak = ser[0]    cdef double running_max_dd = 0    cdef long running_global_peak_id = 0    cdef long running_max_dd_peak_id = 0    cdef long running_max_dd_trough_id = 0    cdef long i    cdef double val    cdef int prob_1    cdef int prob_2    cdef tuple intermed    cdef long newthing    for i in xrange(ser.shape[0]):        val = ser[i]        if i < window:            if val >= running_global_peak:                running_global_peak = val                running_global_peak_id = i                min_since_global_peak = val            if val < min_since_global_peak:                min_since_global_peak = val                if val - running_global_peak <= running_max_dd:                    running_max_dd = val - running_global_peak                    running_max_dd_peak_id = running_global_peak_id                    running_max_dd_trough_id = i            result[i, 0] = <double>running_max_dd            result[i, 1] = <double>running_max_dd_peak_id            result[i, 2] = <double>running_max_dd_trough_id            result[i, 3] = <double>running_global_peak_id        else:            prob_1 = 1 if result[i-1, 3] <= float(i - window) else 0            prob_2 = 1 if result[i-1, 1] <= float(i - window) else 0            if prob_1 or prob_2:                intermed = cy_dd_custom_mv(ser[i-window+1:i+1])                result[i, 0] = <double>intermed[0]                result[i, 1] = <double>(intermed[1] + i - window + 1)                result[i, 2] = <double>(intermed[2] + i - window + 1)                result[i, 3] = <double>(intermed[3] + i - window + 1)            else:                newthing = <long>(int(result[i-1, 3]))                result[i, 3] = i if ser[i] >= ser[newthing] else result[i-1, 3]                if val - ser[newthing] <= result[i-1, 0]:                    result[i, 0] = <double>(val - ser[newthing])                    result[i, 1] = <double>result[i-1, 3]                    result[i, 2] = <double>i                else:                    result[i, 0] = <double>result[i-1, 0]                    result[i, 1] = <double>result[i-1, 1]                    result[i, 2] = <double>result[i-1, 2]    cdef double[:] finalresult = result[:, 0]    return finalresult


Here is a Numba-accelerated solution:

import pandas as pdimport numpy as npimport numbafrom time import timen = 10000returns = pd.Series(np.random.normal(1.001, 0.01, n), pd.date_range("2020-01-01", periods=n, freq="1min"))@numba.njitdef max_drawdown(cum_returns):    max_drawdown = 0.0    current_max_ret = cum_returns[0]    for ret in cum_returns:        if ret > current_max_ret:            current_max_ret = ret        max_drawdown = max(max_drawdown, 1 - ret / current_max_ret)    return max_drawdownt = time()rolling_1h_max_dd = returns.cumprod().rolling("1h").apply(max_drawdown, raw=True)print("Fast:", time() - t);def max_drawdown_slow(x):    return (1 - x / x.cummax()).max()t = time()rolling_1h_max_dd_slow = returns.cumprod().rolling("1h").apply(max_drawdown_slow, raw=False)print("Slow:", time() - t);assert rolling_1h_max_dd.equals(rolling_1h_max_dd_slow)

Output:

Fast: 0.05633878707885742Slow: 4.540301084518433

=> 80x speedup