set up pyspark 2.1 and explode trick
, 17 Jan 2017First, I have to jot down how to set up PySpark 2.1 before I forget it as usual.
import os
import findspark
findspark.init(spark_home="/opt/spark-2.1.0-bin-cdh5.9.0/")
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
conf = SparkConf().\
setAppName('map').\
setMaster('local[5]').\
set('spark.yarn.appMasterEnv.PYSPARK_PYTHON', '/home/deacuna/anaconda3/bin/python').\
set('spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON', '/home/deacuna/anaconda3/bin/python').\
set('executor.memory', '8g').\
set('spark.yarn.executor.memoryOverhead', '16g').\
set('spark.sql.codegen', 'true').\
set('spark.yarn.executor.memory', '16g').\
set('yarn.scheduler.minimum-allocation-mb', '500m').\
set('spark.dynamicAllocation.maxExecutors', '3').\
set('spark.driver.maxResultSize', '0')
spark = SparkSession.builder.\
appName("testing").\
config(conf=conf).\
getOrCreate()
This is now a bit different from Spark 2.0 cause parallelize
can now be accessed
through spark.sparkContext.parallelize
. Also, we can create dataframe using spark
i.e. spark.createDataFrame
The trick that I found today is that I cannot download big CSV file to pandas dataframe
and then simply use df_spark = spark.createDataFrame(df)
… this thing crashes for me.
Instead, I put CSV file to hdfs (hadoop) first then read using spark.read.csv
.
I will put the code snippet that I have over here.
# read csv file from spark
pmid_citation_links = spark.read.csv('citation_links.csv', header=True)
# change columns name
for new, old in zip(['pmid', 'rcr', 'year', 'list_cited_pmid', 'citations'], pmid_citation_links.columns):
pmid_citation_links = pmid_citation_links.withColumnRenamed(old, new)
Here we have each row with column of pmid
(e.g. just one number) and list_cited_pmid
which
are numbers each separated by ;
. We want to match pmid
with all list_cited_pmid
but split
by ;
. We can do the following
from pyspark.sql.functions import explode, split
citation_df = pmid_citation_links.select('pmid', explode(split('list_cited_pmid', ';')).alias('cited_pmid'))
Moreover, when we write the dataframe to file now, we can give the mode to it (see more on here).
mode: specifies the behavior of the save operation when data already exists.
append
: Append contents of this DataFrame to existing data.overwrite
: Overwrite existing data.ignore
: Silently ignore this operation if data already exists.error
(default case): Throw an exception if data already exists.