Read a distributed Tab delimited CSV
Let's try a simple example. For convenience I'll be using handy toolz
library but it is not really required here.
import sysimport base64if sys.version_info < (3, ): import cPickle as pickleelse: import picklefrom toolz.functoolz import composerdd = sc.parallelize([(1, {"foo": "bar"}), (2, {"bar": "foo"})])
Now, your code is not exactly portable right now. In Python 2 base64.b64encode
returns str
, while in Python 3 it returns bytes
. Lets illustrate that:
Python 2
type(base64.b64encode(pickle.dumps({"foo": "bar"})))## str
Python 3
type(base64.b64encode(pickle.dumps({"foo": "bar"})))## bytes
So lets add decoding to the pipeline:
# Equivalent to # def pickle_and_b64(x):# return base64.b64encode(pickle.dumps(x)).decode("ascii")pickle_and_b64 = compose( lambda x: x.decode("ascii"), base64.b64encode, pickle.dumps)
Please note that this doesn't assume any particular shape of the data. Because of that, we'll use mapValues
to serialize only keys:
serialized = rdd.mapValues(pickle_and_b64)serialized.first()## 1, u'KGRwMApTJ2ZvbycKcDEKUydiYXInCnAyCnMu')
Now we can follow it with simple format and save:
from tempfile import mkdtempimport osoutdir = os.path.join(mkdtemp(), "foo")serialized.map(lambda x: "{0}\t{1}".format(*x)).saveAsTextFile(outdir)
To read the file we reverse the process:
# Equivalent to# def b64_and_unpickle(x):# return pickle.loads(base64.b64decode(x))b64_and_unpickle = compose( pickle.loads, base64.b64decode)decoded = (sc.textFile(outdir) .map(lambda x: x.split("\t")) # In Python 3 we could simply use str.split .mapValues(b64_and_unpickle))decoded.first()## (u'1', {'foo': 'bar'})