Can't pickle _thread.lock objects Pyspark send request to elasticseach Can't pickle _thread.lock objects Pyspark send request to elasticseach elasticsearch elasticsearch

Can't pickle _thread.lock objects Pyspark send request to elasticseach


Connections objects in general, are not serializable so cannot be passed by closure. You have to use foreachPartition pattern:

def sendPut(docs):    es = ... # Initialize es object    for doc in docs        es.index(index = "tweetrepository", doc_type= 'tweet', body = doc)myJson = (dataStream    .map(decodeJson)    .map(addSentiment)    # Here you need an action.    # `map` is lazy, and `pprint` doesn't guarantee complete execution    .foreachPartition(sendPut))

If you want to return something use mapPartitions:

def sendPut(docs):    es = ... # Initialize es object    for doc in docs        yield es.index(index = "tweetrepository", doc_type= 'tweet', body = doc)myJson = (dataStream   .map(decodeJson)   .map(addSentiment)   .mapPartitions(sendPut))

but you'll need an additional action to force execution.