set up pyspark 2.1 and explode trick, 17 Jan 2017

First, I have to jot down how to set up PySpark 2.1 before I forget it as usual.

import os
import findspark
findspark.init(spark_home="/opt/spark-2.1.0-bin-cdh5.9.0/")
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

conf = SparkConf().\
    setAppName('map').\
    setMaster('local[5]').\
    set('spark.yarn.appMasterEnv.PYSPARK_PYTHON', '/home/deacuna/anaconda3/bin/python').\
    set('spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON', '/home/deacuna/anaconda3/bin/python').\
    set('executor.memory', '8g').\
    set('spark.yarn.executor.memoryOverhead', '16g').\
    set('spark.sql.codegen', 'true').\
    set('spark.yarn.executor.memory', '16g').\
    set('yarn.scheduler.minimum-allocation-mb', '500m').\
    set('spark.dynamicAllocation.maxExecutors', '3').\
    set('spark.driver.maxResultSize', '0')

spark = SparkSession.builder.\
    appName("testing").\
    config(conf=conf).\
    getOrCreate()

This is now a bit different from Spark 2.0 cause parallelize can now be accessed through spark.sparkContext.parallelize. Also, we can create dataframe using spark i.e. spark.createDataFrame

The trick that I found today is that I cannot download big CSV file to pandas dataframe and then simply use df_spark = spark.createDataFrame(df) … this thing crashes for me. Instead, I put CSV file to hdfs (hadoop) first then read using spark.read.csv. I will put the code snippet that I have over here.

# read csv file from spark
pmid_citation_links = spark.read.csv('citation_links.csv', header=True)

# change columns name
for new, old in zip(['pmid', 'rcr', 'year', 'list_cited_pmid', 'citations'], pmid_citation_links.columns):
    pmid_citation_links = pmid_citation_links.withColumnRenamed(old, new)

Here we have each row with column of pmid (e.g. just one number) and list_cited_pmid which are numbers each separated by ;. We want to match pmid with all list_cited_pmid but split by ;. We can do the following

from pyspark.sql.functions import explode, split
citation_df = pmid_citation_links.select('pmid', explode(split('list_cited_pmid', ';')).alias('cited_pmid'))

Moreover, when we write the dataframe to file now, we can give the mode to it (see more on here).

mode: specifies the behavior of the save operation when data already exists.