Logistic Regression Model in PySpark

PROBLEM STATEMENT :

To predict the type of flower from a given set of flowers, using Logistic Regression in PySpark.

File to use: iris.csv

RESPONSE VARIABLE :

Class

FACTOR VARIABLES :

Sepal length

Sepal width

Petal length

Petal width

The PySpark code is as follows :

p1 = sc.textFile(“../iris.csv”)
p1 = p1.map(lambda line: line.split(“,”))
p1.take(5)


[[u’sepallength’, u’sepalwidth’, u’petallength’, u’petalwidth’, u’class’], [u’5.1′, u’3.5′, u’1.4′, u’0.2′, u’Iris-setosa’], [u’4.9′, u’3′, u’1.4′, u’0.2′, u’Iris-setosa’], [u’4.7′, u’3.2′, u’1.3′, u’0. 2′, u’Iris-setosa’], [u’4.6′, u’3.1′, u’1.5′, u’0.2′, u’Iris-setosa’]]


header = p1.first()
p1 = p1.filter(lambda x: x != header)
p1.take(5)


[[u’5.1′, u’3.5′, u’1.4′, u’0.2′, u’Iris-setosa’], [u’4.9′, u’3′, u’1.4′, u’0.2′, u’Iris-setosa’], [u’4.7′, u’3.2′, u’1.3′, u’0.2′, u’Iris-setosa’], [u’4.6′, u’3.1′, u’1.5′, u’0.2′, u’Iris-setosa’], [u’ 5′, u’3.6′, u’1.4′, u’0.2′, u’Iris-setosa’]]


from pyspark.sql import Row
p1 = p1.map(lambda line: Row(sl = line[0], sw = line[1], pl = line[2], pw = line[3], target = line[4]))
p1.take(5)


[Row(pl=u’1.4′, pw=u’0.2′, sl=u’5.1′, sw=u’3.5′, target=u’Iris-setosa’), Row(pl=u’1.4′, pw=u’0.2′, sl=u’4.9′, sw=u’3′, target=u’Iris-setosa’), Row(pl=u’1.3′, pw=u’0.2′, sl=u’4.7′, sw=u’3.2′, target=u’Ir is-setosa’), Row(pl=u’1.5′, pw=u’0.2′, sl=u’4.6′, sw=u’3.1′, target=u’Iris-setosa’), Row(pl=u’1.4′, pw=u’0.2′, sl=u’5′, sw=u’3.6′, target=u’Iris-setosa’)]


df1 = p1.toDF()
df1.show(5)


+—+—+—+—+———–+
| pl| pw| sl| sw| target|
+—+—+—+—+———–+
|1.4|0.2|5.1|3.5|Iris-setosa|
|1.4|0.2|4.9| 3|Iris-setosa|
|1.3|0.2|4.7|3.2|Iris-setosa|
|1.5|0.2|4.6|3.1|Iris-setosa|
|1.4|0.2| 5|3.6|Iris-setosa|
+—+—+—+—+———–+
only showing top 5 rows

from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol = “target”, outputCol = “targetIndex”)

indexed = indexer.fit(df1).transform(df1)
indexed.show(5)


+—+—+—+—+———–+———–+
| pl| pw| sl| sw| target|targetIndex| +—+—+—+—+———–+———–+
|1.4|0.2|5.1|3.5|Iris-setosa| 2.0|
|1.4|0.2|4.9| 3|Iris-setosa| 2.0|
|1.3|0.2|4.7|3.2|Iris-setosa| 2.0|
|1.5|0.2|4.6|3.1|Iris-setosa| 2.0|
|1.4|0.2| 5|3.6|Iris-setosa| 2.0| +—+—+—+—+———–+———–+
only showing top 5 rows

df2 = indexed.select(‘targetIndex’, ‘sl’, ‘sw’, ‘pl’, ‘pw’)

df2.show(5)


+———–+—+—+—+—+
|targetIndex| sl| sw| pl| pw|

+———–+—+—+—+—+

|  2.0|5.1|3.5|1.4|0.2|
|  2.0|4.9| 3|1.4|0.2|
|  2.0|4.7|3.2|1.3|0.2|
|  2.0|4.6|3.1|1.5|0.2|
|  2.0| 5|3.6|1.4|0.2| +———–+—+—+—+—+ only showing top 5 rows

import pyspark.mllib

import pyspark.mllib.regression

from pyspark.mllib.regression import LabeledPoint

from pyspark.sql.functions import *

df3 = df2.rdd.map(lambda line: LabeledPoint(line[0],[line[1:]]))

df3.take(5)

[LabeledPoint(2.0, [5.1,3.5,1.4,0.2]), LabeledPoint(2.0, [4.9,3.0,1.4,0.2]), LabeledPoint(2.0, [4.7,3.2,1.3,0.2]), LabeledPoint(2.0, [4.6,3.1,1.5,0.2]), LabeledPoint(2.0, [5.0,3.6,1.4,0.2])]

trainingData, testingData = df3.randomSplit([.8,.2], seed = 1234)

from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import MulticlassMetrics

model = LogisticRegressionWithLBFGS.train(trainingData, numClasses = 3)

model.weights
DenseVector([-5.8082, -6.1716, 7.7382, 9.6625, -4.0563, 22.406, -9.694, -22.0841])

testingData.take(20)

[LabeledPoint(2.0, [5.4,3.7,1.5,0.2]), LabeledPoint(2.0, [5.1,3.3,1.7,0.5]), LabeledPoint(2.0, [4.9,3.1,1.5,0.1]), LabeledPoint(2.0, [5.1,3.4,1.5,0.2]), LabeledPoint(2.0, [5.1,3.8,1.9,0.4]), LabeledPoin t(2.0, [4.6,3.2,1.4,0.2]), LabeledPoint(2.0, [5.0,3.3,1.4,0.2]), LabeledPoint(0.0, [6.5,2.8,4.6,1.5]), LabeledPoint(0.0, [6.6,2.9,4.6,1.3]), LabeledPoint(0.0, [5.2,2.7,3.9,1.4]), LabeledPoint(0.0, [6.0, 2.2,4.0,1.0]), LabeledPoint(0.0, [5.6,2.5,3.9,1.1]), LabeledPoint(0.0, [6.1,2.8,4.0,1.3]), LabeledPoint(0.0, [6.8,2.8,4.8,1.4]), LabeledPoint(0.0, [6.7,3.0,5.0,1.7]), LabeledPoint(0.0, [5.8,2.7,3.9,1.2] ), LabeledPoint(0.0, [6.3,2.3,4.4,1.3]), LabeledPoint(0.0, [5.5,2.6,4.4,1.2]), LabeledPoint(0.0, [5.0,2.3,3.3,1.0]), LabeledPoint(1.0, [5.8,2.7,5.1,1.9])]

predictions = model.predict(testingData.map(lambda r: r.features))

predictions.collect()

[2, 2, 2, 2, 2, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1]

predictionAndLabels = testingData.map(lambda x: (float(model.predict(x.features)), x.label))
metrics = MulticlassMetrics(predictionAndLabels)
metrics.precision()

warnings.warn(“Deprecated in 2.0.0. Use accuracy.”) 1.0

metrics.recall()

warnings.warn(“Deprecated in 2.0.0. Use accuracy.”) 1.0

metrics.fMeasure()

warnings.warn(“Deprecated in 2.0.0. Use accuracy.”) 1.0

 

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s