set up pyspark 2.1 and explode trick, 17 Jan 2017

First, I have to jot down how to set up PySpark 2.1 before I forget it as usual.

import os
import findspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

conf = SparkConf().\
    set('spark.yarn.appMasterEnv.PYSPARK_PYTHON', '/home/deacuna/anaconda3/bin/python').\
    set('spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON', '/home/deacuna/anaconda3/bin/python').\
    set('executor.memory', '8g').\
    set('spark.yarn.executor.memoryOverhead', '16g').\
    set('spark.sql.codegen', 'true').\
    set('spark.yarn.executor.memory', '16g').\
    set('yarn.scheduler.minimum-allocation-mb', '500m').\
    set('spark.dynamicAllocation.maxExecutors', '3').\
    set('spark.driver.maxResultSize', '0')

spark = SparkSession.builder.\

This is now a bit different from Spark 2.0 cause parallelize can now be accessed through spark.sparkContext.parallelize. Also, we can create dataframe using spark i.e. spark.createDataFrame

The trick that I found today is that I cannot download big CSV file to pandas dataframe and then simply use df_spark = spark.createDataFrame(df) … this thing crashes for me. Instead, I put CSV file to hdfs (hadoop) first then read using I will put the code snippet that I have over here.

# read csv file from spark
pmid_citation_links ='citation_links.csv', header=True)

# change columns name
for new, old in zip(['pmid', 'rcr', 'year', 'list_cited_pmid', 'citations'], pmid_citation_links.columns):
    pmid_citation_links = pmid_citation_links.withColumnRenamed(old, new)

Here we have each row with column of pmid (e.g. just one number) and list_cited_pmid which are numbers each separated by ;. We want to match pmid with all list_cited_pmid but split by ;. We can do the following

from pyspark.sql.functions import explode, split
citation_df ='pmid', explode(split('list_cited_pmid', ';')).alias('cited_pmid'))

Moreover, when we write the dataframe to file now, we can give the mode to it (see more on here).

mode: specifies the behavior of the save operation when data already exists.