Python - Working around memory leaks Python - Working around memory leaks python python

Python - Working around memory leaks


You can use something like this to help track down memory leaks

>>> from collections import defaultdict>>> from gc import get_objects>>> before = defaultdict(int)>>> after = defaultdict(int)>>> for i in get_objects():...     before[type(i)] += 1 ... 

now suppose the tests leaks some memory

>>> leaked_things = [[x] for x in range(10)]>>> for i in get_objects():...     after[type(i)] += 1... >>> print [(k, after[k] - before[k]) for k in after if after[k] - before[k]][(<type 'list'>, 11)]

11 because we have leaked one list containing 10 more lists


Threads would not help. If you must give up on finding the leak, then the only solution to contain its effect is running a new process once in a while (e.g., when a test has left overall memory consumption too high for your liking -- you can determine VM size easily by reading /proc/self/status in Linux, and other similar approaches on other OS's).

Make sure the overall script takes an optional parameter to tell it what test number (or other test identification) to start from, so that when one instance of the script decides it's taking up too much memory, it can tell its successor where to restart from.

Or, more solidly, make sure that as each test is completed its identification is appended to some file with a well-known name. When the program starts it begins by reading that file and thus knows what tests have already been run. This architecture is more solid because it also covers the case where the program crashes during a test; of course, to fully automate recovery from such crashes, you'll want a separate watchdog program and process to be in charge of starting a fresh instance of the test program when it determines the previous one has crashed (it could use subprocess for the purpose -- it also needs a way to tell when the sequence is finished, e.g. a normal exit from the test program could mean that while any crash or exit with a status != 0 signify the need to start a new fresh instance).

If these architectures appeal but you need further help implementing them, just comment to this answer and I'll be happy to supply example code -- I don't want to do it "preemptively" in case there are as-yet-unexpressed issues that make the architectures unsuitable for you. (It might also help to know what platforms you need to run on).


I had the same problem with a third party C library which was leaking. The most clean work-around that I could think of was to fork and wait. The advantage of it is that you don't even have to create a separate process after each run. You can define the size of your batch.

Here's a general solution (if you ever find the leak, the only change you need to make is to change run() to call run_single_process() instead of run_forked() and you'll be done):

import os,sysbatchSize = 20class Runner(object):    def __init__(self,dataFeedGenerator,dataProcessor):        self._dataFeed = dataFeedGenerator        self._caller = dataProcessor    def run(self):        self.run_forked()    def run_forked(self):        dataFeed = self._dataFeed        dataSubFeed = []        for i,dataMorsel in enumerate(dataFeed,1):            if i % batchSize > 0:                dataSubFeed.append(dataMorsel)            else:                self._dataFeed = dataSubFeed                self.fork()                dataSubFeed = []                if self._child_pid is 0:                    self.run_single_process()                self.endBatch()    def run_single_process(self)        for dataMorsel in self._dataFeed:            self._caller(dataMorsel)    def fork(self):        self._child_pid = os.fork()    def endBatch(self):        if self._child_pid is not 0:            os.waitpid(self._child_pid, 0)        else:            sys.exit() # exit from the child when done

This isolates the memory leak to the child process. And it will never leak more times than the value of the batchSize variable.