flask application with watchdog observer flask application with watchdog observer flask flask

flask application with watchdog observer


I ended up using multithreading.

from flask import Flask, render_template, jsonifyfrom watchdog.observers import Observerfrom watchdog.events import FileSystemEventHandlerfrom queue import Queueimport timeimport threadingclass MyHandler(FileSystemEventHandler):    def __init__(self, pattern=None):        self.pattern = pattern or (".xml", ".tiff", ".jpg")        self.event_q = Queue()        self.dummyThread = None    def on_any_event(self, event):        if not event.is_directory and event.src_path.endswith(self.pattern):        self.event_q.put((event, time.time()))    def start(self):        self.dummyThread = threading.Thread(target=self._process)        self.dummyThread.daemon = True        self.dummyThread.start()    def _process(self):        while True:            time.sleep(1)app = Flask(__name__)handler = MyHandler()handler.start()eventlist_flag = 0eventlist = []def run_watcher():    global eventlist_flag, eventlist    observer = Observer()    observer.schedule(handler, '.')    observer.start()    try:        while True:            if eventlist_flag == 0:                eventlist_flag = 1                while not handler.event_q.empty():                    event, ts = handler.event_q.get()                    eventlist.append(event)                eventlist_flag = 0            time.sleep(1)    except KeyboardInterrupt:        observer.stop()    observer.join()@app.route('/watcher/status', methods=['POST'])def watchernow():    global eventlist_flag, eventlist    if eventlist_flag == 0 and len(eventlist) > 0:        eventlist_flag = 2        for e in eventlist:            print(e)            eventlist = []    eventlist_flag = 0    return jsonify({})@app.route('/')def index():    return render_template('index.html')if __name__ == '__main__':    watcher_thread = threading.Thread(target=run_watcher)    watcher_thread.start()    app.run(debug=True)    watcher_thread.join()


Flask (Werkzeug) already integrate watchdog for development server when enable debug mode. The only thing you need to do is install watchdog:

$ pip install watchdog

More info: