How does the pyspark mapPartitions function work? How does the pyspark mapPartitions function work? python python

How does the pyspark mapPartitions function work?


mapPartition should be thought of as a map operation over partitions and not over the elements of the partition. It's input is the set of current partitions its output will be another set of partitions.

The function you pass to map operation must take an individual element of your RDD

The function you pass to mapPartition must take an iterable of your RDD type and return an iterable of some other or the same type.

In your case you probably just want to do something like:

def filter_out_2(line):    return [x for x in line if x != 2]filtered_lists = data.map(filterOut2)

If you wanted to use mapPartition it would be:

def filter_out_2_from_partition(list_of_lists):  final_iterator = []  for sub_list in list_of_lists:    final_iterator.append( [x for x in sub_list if x != 2])  return iter(final_iterator)filtered_lists = data.mapPartition(filterOut2FromPartion)


It's easier to use mapPartitions with a generator function using the yield syntax:

def filter_out_2(partition):    for element in partition:        if element != 2:            yield elementfiltered_lists = data.mapPartitions(filter_out_2)


Need a final Iter

def filter_out_2(partition):for element in partition:    sec_iterator = []    for i in element:        if i!= 2:            sec_iterator.append(i)    yield sec_iteratorfiltered_lists = data.mapPartitions(filter_out_2)for i in filtered_lists.collect(): print(i)