Compute *rolling* maximum drawdown of pandas Series
Here's a numpy version of the rolling maximum drawdown function. windowed_view
is a wrapper of a one-line function that uses numpy.lib.stride_tricks.as_strided
to make a memory efficient 2d windowed view of the 1d array (full code below). Once we have this windowed view, the calculation is basically the same as your max_dd
, but written for a numpy array, and applied along the second axis (i.e. axis=1
).
def rolling_max_dd(x, window_size, min_periods=1): """Compute the rolling maximum drawdown of `x`. `x` must be a 1d numpy array. `min_periods` should satisfy `1 <= min_periods <= window_size`. Returns an 1d array with length `len(x) - min_periods + 1`. """ if min_periods < window_size: pad = np.empty(window_size - min_periods) pad.fill(x[0]) x = np.concatenate((pad, x)) y = windowed_view(x, window_size) running_max_y = np.maximum.accumulate(y, axis=1) dd = y - running_max_y return dd.min(axis=1)
Here's a complete script that demonstrates the function:
import numpy as npfrom numpy.lib.stride_tricks import as_stridedimport pandas as pdimport matplotlib.pyplot as pltdef windowed_view(x, window_size): """Creat a 2d windowed view of a 1d array. `x` must be a 1d numpy array. `numpy.lib.stride_tricks.as_strided` is used to create the view. The data is not copied. Example: >>> x = np.array([1, 2, 3, 4, 5, 6]) >>> windowed_view(x, 3) array([[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]) """ y = as_strided(x, shape=(x.size - window_size + 1, window_size), strides=(x.strides[0], x.strides[0])) return ydef rolling_max_dd(x, window_size, min_periods=1): """Compute the rolling maximum drawdown of `x`. `x` must be a 1d numpy array. `min_periods` should satisfy `1 <= min_periods <= window_size`. Returns an 1d array with length `len(x) - min_periods + 1`. """ if min_periods < window_size: pad = np.empty(window_size - min_periods) pad.fill(x[0]) x = np.concatenate((pad, x)) y = windowed_view(x, window_size) running_max_y = np.maximum.accumulate(y, axis=1) dd = y - running_max_y return dd.min(axis=1)def max_dd(ser): max2here = pd.expanding_max(ser) dd2here = ser - max2here return dd2here.min()if __name__ == "__main__": np.random.seed(0) n = 100 s = pd.Series(np.random.randn(n).cumsum()) window_length = 10 rolling_dd = pd.rolling_apply(s, window_length, max_dd, min_periods=0) df = pd.concat([s, rolling_dd], axis=1) df.columns = ['s', 'rol_dd_%d' % window_length] df.plot(linewidth=3, alpha=0.4) my_rmdd = rolling_max_dd(s.values, window_length, min_periods=1) plt.plot(my_rmdd, 'g.') plt.show()
The plot shows the curves generated by your code. The green dots are computed by rolling_max_dd
.
Timing comparison, with n = 10000
and window_length = 500
:
In [2]: %timeit rolling_dd = pd.rolling_apply(s, window_length, max_dd, min_periods=0)1 loops, best of 3: 247 ms per loopIn [3]: %timeit my_rmdd = rolling_max_dd(s.values, window_length, min_periods=1)10 loops, best of 3: 38.2 ms per loop
rolling_max_dd
is about 6.5 times faster. The speedup is better for smaller window lengths. For example, with window_length = 200
, it is almost 13 times faster.
To handle NA's, you could preprocess the Series
using the fillna
method before passing the array to rolling_max_dd
.
For the sake of posterity and for completeness, here's what I wound up with in Cython. MemoryViews materially sped things up. There was a bit of work to do to make sure I'd properly typed everything (sorry, new to c-type languages). But in the end I think it works nicely. For typical use cases, the speedup vs regular python was ~100x or ~150x. The function to call is cy_rolling_dd_custom_mv
where the first argument (ser
) should be a 1-d numpy array and the second argument (window
) should be a positive integer. The function returns a numpy memoryview, which works well enough in most cases. You can explicitly call np.array(result)
if you need to to get a nice array of the output:
import numpy as npcimport numpy as npcimport cythonDTYPE = np.float64ctypedef np.float64_t DTYPE_t@cython.boundscheck(False)@cython.wraparound(False)@cython.nonecheck(False)cpdef tuple cy_dd_custom_mv(double[:] ser): cdef double running_global_peak = ser[0] cdef double min_since_global_peak = ser[0] cdef double running_max_dd = 0 cdef long running_global_peak_id = 0 cdef long running_max_dd_peak_id = 0 cdef long running_max_dd_trough_id = 0 cdef long i cdef double val for i in xrange(ser.shape[0]): val = ser[i] if val >= running_global_peak: running_global_peak = val running_global_peak_id = i min_since_global_peak = val if val < min_since_global_peak: min_since_global_peak = val if val - running_global_peak <= running_max_dd: running_max_dd = val - running_global_peak running_max_dd_peak_id = running_global_peak_id running_max_dd_trough_id = i return (running_max_dd, running_max_dd_peak_id, running_max_dd_trough_id, running_global_peak_id)@cython.boundscheck(False)@cython.wraparound(False)@cython.nonecheck(False)def cy_rolling_dd_custom_mv(double[:] ser, long window): cdef double[:, :] result result = np.zeros((ser.shape[0], 4)) cdef double running_global_peak = ser[0] cdef double min_since_global_peak = ser[0] cdef double running_max_dd = 0 cdef long running_global_peak_id = 0 cdef long running_max_dd_peak_id = 0 cdef long running_max_dd_trough_id = 0 cdef long i cdef double val cdef int prob_1 cdef int prob_2 cdef tuple intermed cdef long newthing for i in xrange(ser.shape[0]): val = ser[i] if i < window: if val >= running_global_peak: running_global_peak = val running_global_peak_id = i min_since_global_peak = val if val < min_since_global_peak: min_since_global_peak = val if val - running_global_peak <= running_max_dd: running_max_dd = val - running_global_peak running_max_dd_peak_id = running_global_peak_id running_max_dd_trough_id = i result[i, 0] = <double>running_max_dd result[i, 1] = <double>running_max_dd_peak_id result[i, 2] = <double>running_max_dd_trough_id result[i, 3] = <double>running_global_peak_id else: prob_1 = 1 if result[i-1, 3] <= float(i - window) else 0 prob_2 = 1 if result[i-1, 1] <= float(i - window) else 0 if prob_1 or prob_2: intermed = cy_dd_custom_mv(ser[i-window+1:i+1]) result[i, 0] = <double>intermed[0] result[i, 1] = <double>(intermed[1] + i - window + 1) result[i, 2] = <double>(intermed[2] + i - window + 1) result[i, 3] = <double>(intermed[3] + i - window + 1) else: newthing = <long>(int(result[i-1, 3])) result[i, 3] = i if ser[i] >= ser[newthing] else result[i-1, 3] if val - ser[newthing] <= result[i-1, 0]: result[i, 0] = <double>(val - ser[newthing]) result[i, 1] = <double>result[i-1, 3] result[i, 2] = <double>i else: result[i, 0] = <double>result[i-1, 0] result[i, 1] = <double>result[i-1, 1] result[i, 2] = <double>result[i-1, 2] cdef double[:] finalresult = result[:, 0] return finalresult
Here is a Numba-accelerated solution:
import pandas as pdimport numpy as npimport numbafrom time import timen = 10000returns = pd.Series(np.random.normal(1.001, 0.01, n), pd.date_range("2020-01-01", periods=n, freq="1min"))@numba.njitdef max_drawdown(cum_returns): max_drawdown = 0.0 current_max_ret = cum_returns[0] for ret in cum_returns: if ret > current_max_ret: current_max_ret = ret max_drawdown = max(max_drawdown, 1 - ret / current_max_ret) return max_drawdownt = time()rolling_1h_max_dd = returns.cumprod().rolling("1h").apply(max_drawdown, raw=True)print("Fast:", time() - t);def max_drawdown_slow(x): return (1 - x / x.cummax()).max()t = time()rolling_1h_max_dd_slow = returns.cumprod().rolling("1h").apply(max_drawdown_slow, raw=False)print("Slow:", time() - t);assert rolling_1h_max_dd.equals(rolling_1h_max_dd_slow)
Output:
Fast: 0.05633878707885742Slow: 4.540301084518433
=> 80x speedup