flask application with watchdog observer
I ended up using multithreading.
from flask import Flask, render_template, jsonifyfrom watchdog.observers import Observerfrom watchdog.events import FileSystemEventHandlerfrom queue import Queueimport timeimport threadingclass MyHandler(FileSystemEventHandler): def __init__(self, pattern=None): self.pattern = pattern or (".xml", ".tiff", ".jpg") self.event_q = Queue() self.dummyThread = None def on_any_event(self, event): if not event.is_directory and event.src_path.endswith(self.pattern): self.event_q.put((event, time.time())) def start(self): self.dummyThread = threading.Thread(target=self._process) self.dummyThread.daemon = True self.dummyThread.start() def _process(self): while True: time.sleep(1)app = Flask(__name__)handler = MyHandler()handler.start()eventlist_flag = 0eventlist = []def run_watcher(): global eventlist_flag, eventlist observer = Observer() observer.schedule(handler, '.') observer.start() try: while True: if eventlist_flag == 0: eventlist_flag = 1 while not handler.event_q.empty(): event, ts = handler.event_q.get() eventlist.append(event) eventlist_flag = 0 time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()@app.route('/watcher/status', methods=['POST'])def watchernow(): global eventlist_flag, eventlist if eventlist_flag == 0 and len(eventlist) > 0: eventlist_flag = 2 for e in eventlist: print(e) eventlist = [] eventlist_flag = 0 return jsonify({})@app.route('/')def index(): return render_template('index.html')if __name__ == '__main__': watcher_thread = threading.Thread(target=run_watcher) watcher_thread.start() app.run(debug=True) watcher_thread.join()
Flask (Werkzeug) already integrate watchdog for development server when enable debug mode. The only thing you need to do is install watchdog:
$ pip install watchdog
More info: