Using Tweepy to listen to stream and search for tweets. How to stop previous search and only listen for new stream? Using Tweepy to listen to stream and search for tweets. How to stop previous search and only listen for new stream? flask flask

Using Tweepy to listen to stream and search for tweets. How to stop previous search and only listen for new stream?


Below is some code that will cancel old streams when a new stream is created. It works by adding new streams to a global list, and then calling stream.disconnect() on all streams in the list whenever a new stream is created.

diff --git a/app.py b/app.pyindex 1e3ed10..f416ddc 100755--- a/app.py+++ b/app.py@@ -23,6 +23,8 @@ auth.set_access_token(access_token, access_token_secret) app = Flask(__name__) red = redis.StrictRedis()+# Add a place to keep track of current streams+streams = [] @app.route('/') def index():@@ -32,12 +34,18 @@ def index(): @app.route('/search', methods=['POST']) # gets search-keyword and starts stream def streamTweets():+        # cancel old streams+        for stream in streams:+            stream.disconnect()+        search_term = request.form['tweet']        search_term_hashtag = '#' + search_term        # instantiate listener        listener = StdOutListener()        # stream object uses listener we instantiated above to listen for data        stream = tweepy.Stream(auth, listener)+        # add this stream to the global list+        streams.append(stream)        stream.filter(track=[search_term or search_term_hashtag],                async=True) # make sure stream is non-blocking        redirect('/stream') # execute '/stream' sse

What this does not solve is the problem of session management. With your current setup a search by one user will affect the searches of all users. This can be avoided by giving your users some identifier and storing their streams along with their identifier. The easiest way to do this is likely to use Flask's session support. You could also do this with a requestId as Pierre suggested. In either case you will also need code to notice when a user has closed the page and close their stream.


Disclaimer: I know nothing about Tweepy, but this appears to be a design issue.

Are you trying to add state to a RESTful API? You may have a design problem.As JRichardSnape answered, your API shouldn't be the one taking care of canceling a request; it should be done in the front-end. What I mean here is in the javascript / AJAX / etc calling this function, add another call, to the new function

@app.route('/cancelSearch', methods=['POST'])With the "POST" that has the search terms. So long as you don't have state, you can't really do this safely in an async call: Imagine someone else makes the same search at the same time then canceling one will cancel both (remember, you don't have state so you don't know who you're canceling). Perhaps you do need state with your design.

If you must keep using this and don't mind breaking the "stateless" rule, then add a "state" to your request. In this case it's not so bad because you could launch a thread and name it with the userId, then kill the thread every new search

def streamTweets():    search_term = request.form['tweet']    userId = request.form['userId'] # If your limit is one request per user at a time. If multiple windows can be opened and you want to follow this limit, store userId in a cookie.    #Look for any request currently running with this ID, and cancel them

Alternatively, you could return a requestId, which you would then keep in the front-end can call cancelSearch?requestId=$requestId. In cancelSearch, you would have to find the pending request (sounds like that's in tweepy since you're not using your own threads) and disconnect it.

Out of curiosity I just watched what happens when you search on Google, and it uses a GET request. Have a look (debug tools -> Network; then enter some text and see the autofill). Google uses a token sent with every request (every time you type something)). It doesn't mean it's used for this, but that's basically what I described. If you don't want a session, then use a unique identifier.


Well I solved it by using timer method But still I'm looking for pythonic way.

from streamer import StreamListenerdef stream():    hashtag = input    #assign each user an ID ( for pubsub )    StreamListener.userid = random_user_id    def handler(signum, frame):        print("Forever is over")        raise Exception("end of time")    def main_stream():        stream = tweepy.Stream(auth, StreamListener())        stream.filter(track=track,async=True)        redirect(url_for('map_stream'))    def close_stream():        # this is for closing client list in redis but don't know it's working        obj = redis.client_list(tweet_stream)        redis_client_list = obj[0]['addr']        redis.client_kill(redis_client_list)        stream = tweepy.Stream(auth, StreamListener())        stream.disconnect()    import signal    signal.signal(signal.SIGALRM, handler)    signal.alarm(300)    try:        main_stream()    except Exception:        close_stream()        print("function terminate")