Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

spark documentation : https://spark.apache.org/docs/2.1.0/


Code Block
languagepy
# Import packages
import time
import pyspark
import os
import csv
from numpy import array
from pyspark.mllib.regression import LabeledPoint
from pyspark import SparkContext, SparkConf


Code Block
languagepy
# Creating Spark environment
os.environ["HADOOP_USER_NAME"] = "hdfs"
os.environ["PYTHON_VERSION"] = "3.5.2"
conf = pyspark.SparkConf()
sc = pyspark.SparkContext(conf=conf)
conf.getAll()


Titanic using mllib module


Code Block
languagepy
# Reading from the hdfs, removing the header
trainTitanic = sc.textFile("hdfs://cluster/user/hdfs/test/train_titanic.csv")
trainHeader = trainTitanic.first()
trainTitanic = trainTitanic.filter(lambda line: line != trainHeader).mapPartitions(lambda x: csv.reader(x))
trainTitanic.first()

# Data preprocessing
def sexTransformMapper(elem):
    '''Function which transform "male" into 1 and else things into 0
    - elem : string 
    - return : vector
    '''
    
    if elem == 'male' :
        return [0]
    else :
        return [1]

# Data Transformations and filter lines with empty strings
trainTitanic=trainTitanic.map(lambda line: line[1:3]+sexTransformMapper(line[4])+line[5:11])
trainTitanic=trainTitanic.filter(lambda line: line[3] != '' ).filter(lambda line: line[4] != '' )
trainTitanic.take(10)

# creating "labeled point" rdds specific to MLlib "(label (v1, v2...vp])"
trainTitanicLP=trainTitanic.map(lambda line: LabeledPoint(line[0],[line[1:5]]))
trainTitanicLP.first()

# splitting dataset into train and test set
(trainData, testData) = trainTitanicLP.randomSplit([0.7, 0.3])

# Random forest : same parameters as sklearn (?)
from pyspark.mllib.tree import RandomForest

time_start=time.time()
model_rf = RandomForest.trainClassifier(trainData, numClasses = 2,
        categoricalFeaturesInfo = {}, numTrees = 100, 
        featureSubsetStrategy='auto', impurity='gini', maxDepth=12, 
        maxBins=32, seed=None)

 
model_rf.numTrees()
model_rf.totalNumNodes()
time_end=time.time()
time_rf=(time_end - time_start)
print("RF takes %d s" %(time_rf))

# Predictions on test set
predictions = model_rf.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

# first metrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(labelsAndPredictions)

# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)


Titanic using sql dataframe and ml module


Code Block
languagepy
# Import packages
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *

# Creatingt Spark SQL environment
from pyspark.sql import SparkSession, HiveContext
SparkContext.setSystemProperty("hive.metastore.uris", "thrift://nn1:9083")
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

# spark is an existing SparkSession
train = spark.read.csv("hdfs://cluster/user/hdfs/test/train_titanic.csv", header = True)
# Displays the content of the DataFrame to stdout
train.show(10)

# String to float on some columns of the dataset : creates a new dataset
train = train.select(col("Survived"),col("Sex"),col("Embarked"),col("Pclass").cast("float"),col("Age").cast("float"),col("SibSp").cast("float"),col("Fare").cast("float"))

# dropping null values
train = train.dropna()

# Spliting in train and test set. Beware : It sorts the dataset
(traindf, testdf) = train.randomSplit([0.7,0.3])


Without Pipeline


Code Block
languagepy
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
train = StringIndexer(inputCol="Sex", outputCol="indexedSex").fit(train).transform(train)
train = StringIndexer(inputCol="Embarked", outputCol="indexedEmbarked").fit(train).transform(train)

train = StringIndexer(inputCol="Survived", outputCol="indexedSurvived").fit(train).transform(train)

# One Hot Encoder on indexed features
train = OneHotEncoder(inputCol="indexedSex", outputCol="sexVec").transform(train)
train = OneHotEncoder(inputCol="indexedEmbarked", outputCol="embarkedVec").transform(train)

# Feature assembler as a vector
train = VectorAssembler(inputCols=["Pclass","sexVec","embarkedVec", "Age","SibSp","Fare"],outputCol="features").transform(train)

rf = RandomForestClassifier(labelCol="indexedSurvived", featuresCol="features")

model = rf.fit(train)

predictions = model.transform(train)

# Select example rows to display.
predictions.select(col("prediction"),col("probability"),).show(5)


With Pipeline


Code Block
languagepy
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
genderIndexer = StringIndexer(inputCol="Sex", outputCol="indexedSex")
embarkIndexer = StringIndexer(inputCol="Embarked", outputCol="indexedEmbarked")

surviveIndexer = StringIndexer(inputCol="Survived", outputCol="indexedSurvived")

# One Hot Encoder on indexed features
genderEncoder = OneHotEncoder(inputCol="indexedSex", outputCol="sexVec")
embarkEncoder = OneHotEncoder(inputCol="indexedEmbarked", outputCol="embarkedVec")

# Create the vector structured data (label,features(vector))
assembler = VectorAssembler(inputCols=["Pclass","sexVec","Age","SibSp","Fare","embarkedVec"],outputCol="features") 

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedSurvived", featuresCol="features")

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[surviveIndexer, genderIndexer, embarkIndexer, genderEncoder,embarkEncoder, assembler, rf]) # genderIndexer,embarkIndexer,genderEncoder,embarkEncoder,

# Train model.  This also runs the indexers.
model = pipeline.fit(traindf)

# Predictions
predictions = model.transform(testdf)

# Select example rows to display.
predictions.columns  

# Select example rows to display.
predictions.select("prediction", "Survived", "features").show(5)

# Select (prediction, true label) and compute test error
predictions = predictions.select(col("Survived").cast("Float"),col("prediction"))
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[6]
print(rfModel)  # summary only

# close sparkcontext
sc.stop()