How do I convert an array (i.e. list) column to Vector How do I convert an array (i.e. list) column to Vector python python

How do I convert an array (i.e. list) column to Vector


Personally I would go with Python UDF and wouldn't bother with anything else:

But if you really want other options here you are:

  • Scala UDF with Python wrapper:

    Install sbt following the instructions on the project site.

    Create Scala package with following structure:

    .├── build.sbt└── udfs.scala

    Edit build.sbt (adjust to reflect Scala and Spark version):

    scalaVersion := "2.11.8"libraryDependencies ++= Seq(  "org.apache.spark" %% "spark-sql" % "2.4.4",  "org.apache.spark" %% "spark-mllib" % "2.4.4")

    Edit udfs.scala:

    package com.example.spark.udfsimport org.apache.spark.sql.functions.udfimport org.apache.spark.ml.linalg.DenseVectorobject udfs {  val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))}

    Package:

    sbt package

    and include (or equivalent depending on Scala version):

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar

    as an argument for --driver-class-path when starting shell / submitting application.

    In PySpark define a wrapper:

    from pyspark.sql.column import _to_java_column, _to_seq, Columnfrom pyspark import SparkContextdef as_vector(col):    sc = SparkContext.getOrCreate()    f = sc._jvm.com.example.spark.udfs.udfs.as_vector()    return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

    Test:

    with_vec = df.withColumn("vector", as_vector("temperatures"))with_vec.show()
    +--------+------------------+----------------+|    city|      temperatures|          vector|+--------+------------------+----------------+| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]||New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|+--------+------------------+----------------+with_vec.printSchema()
    root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) |    |-- element: double (containsNull = true) |-- vector: vector (nullable = true)
  • Dump data to a JSON format reflecting DenseVector schema and read it back:

    from pyspark.sql.functions import to_json, from_json, col, struct, litfrom pyspark.sql.types import StructType, StructFieldfrom pyspark.ml.linalg import VectorUDTjson_vec = to_json(struct(struct(    lit(1).alias("type"),  # type 1 is dense, type 0 is sparse    col("temperatures").alias("values")).alias("v")))schema = StructType([StructField("v", VectorUDT())])with_parsed_vector = df.withColumn(    "parsed_vector", from_json(json_vec, schema).getItem("v"))with_parsed_vector.show()
    +--------+------------------+----------------+|    city|      temperatures|   parsed_vector|+--------+------------------+----------------+| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]||New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|+--------+------------------+----------------+
    with_parsed_vector.printSchema()
    root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) |    |-- element: double (containsNull = true) |-- parsed_vector: vector (nullable = true)


I had a same problem like you and I did this way.This way includes RDD transformation, so is not performance critical, but it works.

from pyspark.sql import Rowfrom pyspark.ml.linalg import Vectorssource_data = [    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), ]df = spark.createDataFrame(source_data)city_rdd = df.rdd.map(lambda row:row[0])temp_rdd = df.rdd.map(lambda row:row[1])new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])new_df

the result is,

DataFrame[city: string, temperatures: vector]