PySpark : Setting Executors/Cores and Memory Local Machine PySpark : Setting Executors/Cores and Memory Local Machine json json

PySpark : Setting Executors/Cores and Memory Local Machine


Although the answer to your question lies only in one of the following issues, let me rewrite your example to explain what is happening.

Setting your configuration

First, you don't need to start and stop a context to set your config. Since spark 2.0 you can create the spark session and then set the config options.

from pyspark.sql import SparkSessionspark = (SparkSession.builder.appName("yourAwesomeApp").getOrCreate())spark.conf.set("spark.executor.memory", "40g")spark.conf.set("spark.executor.cores", "2")

Reading your data

Spark will lazily evaluate the DAG. The time you are measuring in your snipped is not the load of the data into the data frame, but just the schema inference for the JSON file. Schema inference is expensive, you should try to avoid it by setting the schema of your data.You will see a big difference in performance between:

df = spark.read.json("../data/a_very_large_json.json.gz")

and

from pyspark.sql.types import (    StructType,     StringType,     StructField,)json_schema = schema = StructType([    StructField('data', StructType([        StructField("field1", StringType(), nullable=False),        StructField("field2", StringType(), nullable=False),        StructField("field3", StringType(), nullable=True),        StructField("field4", StringType(), nullable=True),        StructField("field5", LongType(), nullable=False),    ])),])df = spark.read.json("../data/a_very_large_json.json.gz", schema=json_schema)

If you supply the schema this instruction should be almost instantly.As another user has already mentioned, to execute the task you need to have an activity, such as show, head, collect, persist, etc.

df.show()

You can set up the number of executor instances and cores on the configuration, but the actual use of those instances also depends on your input data and the transformations/actions you perform. By your description, I assume you are working on standalone mode, so having one executor instance will be the default (using all the cores), and you should set the executor memory to use the one you have available. As far as I remember, when you work on a standalone mode the spark.executor.instances is ignored and the actual number of executors is based on the number of cores available and the spark.executor.cores

Comparison with pandas

If you are working with only one node, loading the data into a data frame, the comparison between spark and pandas is unfair. Spark will always have a higher overhead. Sparks will shine when you have datasets that don't fit on one machine's memory and you have multiple nodes to perform the computation work. If you are comfortable with pandas, I think you can be interested in koalas from Databricks.

Recommendation

I prefer to set up the execution details outside the application (e.g. using the spark-submit parameters). On rare occasions, to improve the performance, you will need to set some of them into the code, but with every new version of Spark, this is less frequent. If you can achieve this, your application will be more future-proof, and easy to scale.


spark.sparkContext.stop()spark = SparkSession.builder.config(conf=conf).getOrCreate()df = spark.read.json("../Data/inasnelylargefile.json.gz")

Add this:

df.show() ##ORdf.persist()

The comparison you are doing is not apples to apples, spark performs lazy evaluation, meaning if you don't call an action over your operation, it will do nothing but just compile and keep the DAG ready for you.

In Spark, there are two concepts,

  1. Transformation: Evaluated lazily
  2. Actions: (like collect(), take(), show(),persist()) evaluated instantly.

In your case, read() is just a transformation, adding an action should trigger the computation.

More about actions vs transformation: https://training.databricks.com/visualapi.pdf


The reason your Spark read is slower then pandas is because the gz file is not splittable, therefore Spark has to read the whole file with a single task. However, when reading an uncompressed file, or a file compressed with a splittable compression format like bzip2, the Spark will deploy x number of tasks in parallel (up to the number of cores available in your cluster) to read the file. Try unpacking the file before you pass it to Spark.