Pyspark - Tutorial based on Titanic Dataset

This tutorial is based on Titanic data from Kaggle website. it has 2 parts:

- First one is using mllib package with rdds, and the mmlib random forest classification
- Second one is using sql dataframes and ml packages, and the ml random forest classification (same principle as in llib). Once it is solved without pipeline, once with pipeline.

ml and sql dataframe simplify a lot the preprocessing stages, whereas with mmlib and rdds you often have to create your own functions to process the data.

ml also have a vast range of metrics compared to mllib.

spark documentation : https://spark.apache.org/docs/2.1.0/


# Import packages
import time
import pyspark
import os
import csv
from numpy import array
from pyspark.mllib.regression import LabeledPoint
from pyspark import SparkContext, SparkConf


# Creating Spark environment
os.environ["HADOOP_USER_NAME"] = "hdfs"
os.environ["PYTHON_VERSION"] = "3.5.2"
conf = pyspark.SparkConf()
sc = pyspark.SparkContext(conf=conf)
conf.getAll()


Titanic using mllib module


# Reading from the hdfs, removing the header
trainTitanic = sc.textFile("hdfs://cluster/user/hdfs/test/train_titanic.csv")
trainHeader = trainTitanic.first()
trainTitanic = trainTitanic.filter(lambda line: line != trainHeader).mapPartitions(lambda x: csv.reader(x))
trainTitanic.first()

# Data preprocessing
def sexTransformMapper(elem):
    '''Function which transform "male" into 1 and else things into 0
    - elem : string 
    - return : vector
    '''
    
    if elem == 'male' :
        return [0]
    else :
        return [1]

# Data Transformations and filter lines with empty strings
trainTitanic=trainTitanic.map(lambda line: line[1:3]+sexTransformMapper(line[4])+line[5:11])
trainTitanic=trainTitanic.filter(lambda line: line[3] != '' ).filter(lambda line: line[4] != '' )
trainTitanic.take(10)

# creating "labeled point" rdds specific to MLlib "(label (v1, v2...vp])"
trainTitanicLP=trainTitanic.map(lambda line: LabeledPoint(line[0],[line[1:5]]))
trainTitanicLP.first()

# splitting dataset into train and test set
(trainData, testData) = trainTitanicLP.randomSplit([0.7, 0.3])

# Random forest : same parameters as sklearn (?)
from pyspark.mllib.tree import RandomForest

time_start=time.time()
model_rf = RandomForest.trainClassifier(trainData, numClasses = 2,
        categoricalFeaturesInfo = {}, numTrees = 100, 
        featureSubsetStrategy='auto', impurity='gini', maxDepth=12, 
        maxBins=32, seed=None)

 
model_rf.numTrees()
model_rf.totalNumNodes()
time_end=time.time()
time_rf=(time_end - time_start)
print("RF takes %d s" %(time_rf))

# Predictions on test set
predictions = model_rf.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

# first metrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(labelsAndPredictions)

# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)


Titanic using sql dataframe and ml module


# Import packages
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *

# Creatingt Spark SQL environment
from pyspark.sql import SparkSession, HiveContext
SparkContext.setSystemProperty("hive.metastore.uris", "thrift://nn1:9083")
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

# spark is an existing SparkSession
train = spark.read.csv("hdfs://cluster/user/hdfs/test/train_titanic.csv", header = True)
# Displays the content of the DataFrame to stdout
train.show(10)

# String to float on some columns of the dataset : creates a new dataset
train = train.select(col("Survived"),col("Sex"),col("Embarked"),col("Pclass").cast("float"),col("Age").cast("float"),col("SibSp").cast("float"),col("Fare").cast("float"))

# dropping null values
train = train.dropna()

# Spliting in train and test set. Beware : It sorts the dataset
(traindf, testdf) = train.randomSplit([0.7,0.3])


Without Pipeline


# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
train = StringIndexer(inputCol="Sex", outputCol="indexedSex").fit(train).transform(train)
train = StringIndexer(inputCol="Embarked", outputCol="indexedEmbarked").fit(train).transform(train)

train = StringIndexer(inputCol="Survived", outputCol="indexedSurvived").fit(train).transform(train)

# One Hot Encoder on indexed features
train = OneHotEncoder(inputCol="indexedSex", outputCol="sexVec").transform(train)
train = OneHotEncoder(inputCol="indexedEmbarked", outputCol="embarkedVec").transform(train)

# Feature assembler as a vector
train = VectorAssembler(inputCols=["Pclass","sexVec","embarkedVec", "Age","SibSp","Fare"],outputCol="features").transform(train)

rf = RandomForestClassifier(labelCol="indexedSurvived", featuresCol="features")

model = rf.fit(train)

predictions = model.transform(train)

# Select example rows to display.
predictions.select(col("prediction"),col("probability"),).show(5)


With Pipeline


# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
genderIndexer = StringIndexer(inputCol="Sex", outputCol="indexedSex")
embarkIndexer = StringIndexer(inputCol="Embarked", outputCol="indexedEmbarked")

surviveIndexer = StringIndexer(inputCol="Survived", outputCol="indexedSurvived")

# One Hot Encoder on indexed features
genderEncoder = OneHotEncoder(inputCol="indexedSex", outputCol="sexVec")
embarkEncoder = OneHotEncoder(inputCol="indexedEmbarked", outputCol="embarkedVec")

# Create the vector structured data (label,features(vector))
assembler = VectorAssembler(inputCols=["Pclass","sexVec","Age","SibSp","Fare","embarkedVec"],outputCol="features") 

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedSurvived", featuresCol="features")

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[surviveIndexer, genderIndexer, embarkIndexer, genderEncoder,embarkEncoder, assembler, rf]) # genderIndexer,embarkIndexer,genderEncoder,embarkEncoder,

# Train model.  This also runs the indexers.
model = pipeline.fit(traindf)

# Predictions
predictions = model.transform(testdf)

# Select example rows to display.
predictions.columns  

# Select example rows to display.
predictions.select("prediction", "Survived", "features").show(5)

# Select (prediction, true label) and compute test error
predictions = predictions.select(col("Survived").cast("Float"),col("prediction"))
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[6]
print(rfModel)  # summary only

evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)

evaluatorf1 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)

evaluatorwp = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedPrecision")
wp = evaluatorwp.evaluate(predictions)
print("weightedPrecision = %g" % wp)

evaluatorwr = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedRecall")
wr = evaluatorwr.evaluate(predictions)
print("weightedRecall = %g" % wr)

# close sparkcontext
sc.stop()