-
Notifications
You must be signed in to change notification settings - Fork 0
/
Spark_ML_train_movement_modes
74 lines (50 loc) · 11.4 KB
/
Spark_ML_train_movement_modes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
%spark
// Random Forest - create model
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
sqlContext.setConf("spark.sql.parquet.mergeSchema", "true")
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
val inRestDF = sqlContext.
read.parquet("/user/sensortag/rec_round/")
inRestDF.createOrReplaceTempView("inRestDF")
val restDF = sql("select 0 as label, sequenceNumber, gyroX, gyroY, gyroZ, accX, accY, accZ, lag(gyroX,1) over(order by sequenceNumber) as gyroX1, lag(gyroY,1) over(order by sequenceNumber) as gyroY1, lag(gyroZ,1) over(order by sequenceNumber) as gyroZ1, lag(accX,1) over(order by sequenceNumber) as accX1, lag(accY,1) over(order by sequenceNumber) as accY1, lag(accZ,1) over(order by sequenceNumber) as accZ1, lag(gyroX,2) over(order by sequenceNumber) as gyroX2, lag(gyroY,2) over(order by sequenceNumber) as gyroY2, lag(gyroZ,2) over(order by sequenceNumber) as gyroZ2, lag(accX,2) over(order by sequenceNumber) as accX2, lag(accY,2) over(order by sequenceNumber) as accY2, lag(accZ,2) over(order by sequenceNumber) as accZ2, lag(gyroX,3) over(order by sequenceNumber) as gyroX3, lag(gyroY,3) over(order by sequenceNumber) as gyroY3, lag(gyroZ,3) over(order by sequenceNumber) as gyroZ3, lag(accX,3) over(order by sequenceNumber) as accX3, lag(accY,3) over(order by sequenceNumber) as accY3, lag(accZ,3) over(order by sequenceNumber) as accZ3, lag(gyroX,4) over(order by sequenceNumber) as gyroX4, lag(gyroY,4) over(order by sequenceNumber) as gyroY4, lag(gyroZ,4) over(order by sequenceNumber) as gyroZ4, lag(accX,4) over(order by sequenceNumber) as accX4, lag(accY,4) over(order by sequenceNumber) as accY4, lag(accZ,4) over(order by sequenceNumber) as accZ4, lag(gyroX,5) over(order by sequenceNumber) as gyroX5, lag(gyroY,5) over(order by sequenceNumber) as gyroY5, lag(gyroZ,5) over(order by sequenceNumber) as gyroZ5, lag(accX,5) over(order by sequenceNumber) as accX5, lag(accY,5) over(order by sequenceNumber) as accY5, lag(accZ,5) over(order by sequenceNumber) as accZ5, lag(gyroX,6) over(order by sequenceNumber) as gyroX6, lag(gyroY,6) over(order by sequenceNumber) as gyroY6, lag(gyroZ,6) over(order by sequenceNumber) as gyroZ6, lag(accX,6) over(order by sequenceNumber) as accX6, lag(accY,6) over(order by sequenceNumber) as accY6, lag(accZ,6) over(order by sequenceNumber) as accZ6, lag(gyroX,7) over(order by sequenceNumber) as gyroX7, lag(gyroY,7) over(order by sequenceNumber) as gyroY7, lag(gyroZ,7) over(order by sequenceNumber) as gyroZ7, lag(accX,7) over(order by sequenceNumber) as accX7, lag(accY,7) over(order by sequenceNumber) as accY7, lag(accZ,7) over(order by sequenceNumber) as accZ7, lag(gyroX,8) over(order by sequenceNumber) as gyroX8, lag(gyroY,8) over(order by sequenceNumber) as gyroY8, lag(gyroZ,8) over(order by sequenceNumber) as gyroZ8, lag(accX,8) over(order by sequenceNumber) as accX8, lag(accY,8) over(order by sequenceNumber) as accY8, lag(accZ,8) over(order by sequenceNumber) as accZ8, lag(gyroX,9) over(order by sequenceNumber) as gyroX9, lag(gyroY,9) over(order by sequenceNumber) as gyroY9, lag(gyroZ,9) over(order by sequenceNumber) as gyroZ9, lag(accX,9) over(order by sequenceNumber) as accX9, lag(accY,9) over(order by sequenceNumber) as accY9, lag(accZ,9) over(order by sequenceNumber) as accZ9 from inRestDF")
println("Rest DF count -- " + restDF.count())
val inWalk1DF = sqlContext.
read.parquet("/user/sensortag/rec_hex/")
inWalk1DF.createOrReplaceTempView("inWalk1DF")
val walk1DF = sql("select 1 as label, sequenceNumber, gyroX, gyroY, gyroZ, accX, accY, accZ, lag(gyroX,1) over(order by sequenceNumber) as gyroX1, lag(gyroY,1) over(order by sequenceNumber) as gyroY1, lag(gyroZ,1) over(order by sequenceNumber) as gyroZ1, lag(accX,1) over(order by sequenceNumber) as accX1, lag(accY,1) over(order by sequenceNumber) as accY1, lag(accZ,1) over(order by sequenceNumber) as accZ1, lag(gyroX,2) over(order by sequenceNumber) as gyroX2, lag(gyroY,2) over(order by sequenceNumber) as gyroY2, lag(gyroZ,2) over(order by sequenceNumber) as gyroZ2, lag(accX,2) over(order by sequenceNumber) as accX2, lag(accY,2) over(order by sequenceNumber) as accY2, lag(accZ,2) over(order by sequenceNumber) as accZ2, lag(gyroX,3) over(order by sequenceNumber) as gyroX3, lag(gyroY,3) over(order by sequenceNumber) as gyroY3, lag(gyroZ,3) over(order by sequenceNumber) as gyroZ3, lag(accX,3) over(order by sequenceNumber) as accX3, lag(accY,3) over(order by sequenceNumber) as accY3, lag(accZ,3) over(order by sequenceNumber) as accZ3, lag(gyroX,4) over(order by sequenceNumber) as gyroX4, lag(gyroY,4) over(order by sequenceNumber) as gyroY4, lag(gyroZ,4) over(order by sequenceNumber) as gyroZ4, lag(accX,4) over(order by sequenceNumber) as accX4, lag(accY,4) over(order by sequenceNumber) as accY4, lag(accZ,4) over(order by sequenceNumber) as accZ4, lag(gyroX,5) over(order by sequenceNumber) as gyroX5, lag(gyroY,5) over(order by sequenceNumber) as gyroY5, lag(gyroZ,5) over(order by sequenceNumber) as gyroZ5, lag(accX,5) over(order by sequenceNumber) as accX5, lag(accY,5) over(order by sequenceNumber) as accY5, lag(accZ,5) over(order by sequenceNumber) as accZ5, lag(gyroX,6) over(order by sequenceNumber) as gyroX6, lag(gyroY,6) over(order by sequenceNumber) as gyroY6, lag(gyroZ,6) over(order by sequenceNumber) as gyroZ6, lag(accX,6) over(order by sequenceNumber) as accX6, lag(accY,6) over(order by sequenceNumber) as accY6, lag(accZ,6) over(order by sequenceNumber) as accZ6, lag(gyroX,7) over(order by sequenceNumber) as gyroX7, lag(gyroY,7) over(order by sequenceNumber) as gyroY7, lag(gyroZ,7) over(order by sequenceNumber) as gyroZ7, lag(accX,7) over(order by sequenceNumber) as accX7, lag(accY,7) over(order by sequenceNumber) as accY7, lag(accZ,7) over(order by sequenceNumber) as accZ7, lag(gyroX,8) over(order by sequenceNumber) as gyroX8, lag(gyroY,8) over(order by sequenceNumber) as gyroY8, lag(gyroZ,8) over(order by sequenceNumber) as gyroZ8, lag(accX,8) over(order by sequenceNumber) as accX8, lag(accY,8) over(order by sequenceNumber) as accY8, lag(accZ,8) over(order by sequenceNumber) as accZ8, lag(gyroX,9) over(order by sequenceNumber) as gyroX9, lag(gyroY,9) over(order by sequenceNumber) as gyroY9, lag(gyroZ,9) over(order by sequenceNumber) as gyroZ9, lag(accX,9) over(order by sequenceNumber) as accX9, lag(accY,9) over(order by sequenceNumber) as accY9, lag(accZ,9) over(order by sequenceNumber) as accZ9 from inWalk1DF")
println("Walk1 DF count -- " + walk1DF.count())
val inJumpDF = sqlContext.
read.parquet("/user/sensortag/rec_big1/")
inJumpDF.createOrReplaceTempView("inJumpDF")
val jumpDF = sql("select 2 as label, sequenceNumber, gyroX, gyroY, gyroZ, accX, accY, accZ, lag(gyroX,1) over(order by sequenceNumber) as gyroX1, lag(gyroY,1) over(order by sequenceNumber) as gyroY1, lag(gyroZ,1) over(order by sequenceNumber) as gyroZ1, lag(accX,1) over(order by sequenceNumber) as accX1, lag(accY,1) over(order by sequenceNumber) as accY1, lag(accZ,1) over(order by sequenceNumber) as accZ1, lag(gyroX,2) over(order by sequenceNumber) as gyroX2, lag(gyroY,2) over(order by sequenceNumber) as gyroY2, lag(gyroZ,2) over(order by sequenceNumber) as gyroZ2, lag(accX,2) over(order by sequenceNumber) as accX2, lag(accY,2) over(order by sequenceNumber) as accY2, lag(accZ,2) over(order by sequenceNumber) as accZ2, lag(gyroX,3) over(order by sequenceNumber) as gyroX3, lag(gyroY,3) over(order by sequenceNumber) as gyroY3, lag(gyroZ,3) over(order by sequenceNumber) as gyroZ3, lag(accX,3) over(order by sequenceNumber) as accX3, lag(accY,3) over(order by sequenceNumber) as accY3, lag(accZ,3) over(order by sequenceNumber) as accZ3, lag(gyroX,4) over(order by sequenceNumber) as gyroX4, lag(gyroY,4) over(order by sequenceNumber) as gyroY4, lag(gyroZ,4) over(order by sequenceNumber) as gyroZ4, lag(accX,4) over(order by sequenceNumber) as accX4, lag(accY,4) over(order by sequenceNumber) as accY4, lag(accZ,4) over(order by sequenceNumber) as accZ4, lag(gyroX,5) over(order by sequenceNumber) as gyroX5, lag(gyroY,5) over(order by sequenceNumber) as gyroY5, lag(gyroZ,5) over(order by sequenceNumber) as gyroZ5, lag(accX,5) over(order by sequenceNumber) as accX5, lag(accY,5) over(order by sequenceNumber) as accY5, lag(accZ,5) over(order by sequenceNumber) as accZ5, lag(gyroX,6) over(order by sequenceNumber) as gyroX6, lag(gyroY,6) over(order by sequenceNumber) as gyroY6, lag(gyroZ,6) over(order by sequenceNumber) as gyroZ6, lag(accX,6) over(order by sequenceNumber) as accX6, lag(accY,6) over(order by sequenceNumber) as accY6, lag(accZ,6) over(order by sequenceNumber) as accZ6, lag(gyroX,7) over(order by sequenceNumber) as gyroX7, lag(gyroY,7) over(order by sequenceNumber) as gyroY7, lag(gyroZ,7) over(order by sequenceNumber) as gyroZ7, lag(accX,7) over(order by sequenceNumber) as accX7, lag(accY,7) over(order by sequenceNumber) as accY7, lag(accZ,7) over(order by sequenceNumber) as accZ7, lag(gyroX,8) over(order by sequenceNumber) as gyroX8, lag(gyroY,8) over(order by sequenceNumber) as gyroY8, lag(gyroZ,8) over(order by sequenceNumber) as gyroZ8, lag(accX,8) over(order by sequenceNumber) as accX8, lag(accY,8) over(order by sequenceNumber) as accY8, lag(accZ,8) over(order by sequenceNumber) as accZ8, lag(gyroX,9) over(order by sequenceNumber) as gyroX9, lag(gyroY,9) over(order by sequenceNumber) as gyroY9, lag(gyroZ,9) over(order by sequenceNumber) as gyroZ9, lag(accX,9) over(order by sequenceNumber) as accX9, lag(accY,9) over(order by sequenceNumber) as accY9, lag(accZ,9) over(order by sequenceNumber) as accZ9 from inJumpDF")
println("Jump DF count -- " + jumpDF.count())
val newDF = restDF.union(walk1DF.union(jumpDF))
newDF.createOrReplaceTempView("newDF")
val cleanDF = sql("select * from newDF where not accZ9 is null")
val assembler = new VectorAssembler()
.setInputCols(Array("gyroX", "gyroY", "gyroZ", "accX", "accY", "accZ", "gyroX1", "gyroY1", "gyroZ1", "accX1", "accY1", "accZ1", "gyroX2", "gyroY2", "gyroZ2", "accX2", "accY2", "accZ2", "gyroX3", "gyroY3", "gyroZ3", "accX3", "accY3", "accZ3", "gyroX4", "gyroY4", "gyroZ4", "accX4", "accY4", "accZ4", "gyroX5", "gyroY5", "gyroZ5", "accX5", "accY5", "accZ5", "gyroX6", "gyroY6", "gyroZ6", "accX6", "accY6", "accZ6", "gyroX7", "gyroY7", "gyroZ7", "accX7", "accY7", "accZ7", "gyroX8", "gyroY8", "gyroZ8", "accX8", "accY8", "accZ8", "gyroX9", "gyroY9", "gyroZ9", "accX9", "accY9", "accZ9"))
.setOutputCol("features")
val output = assembler.transform(cleanDF)
val dataDF = output.select("label", "features")
val Array(trainingData, testData) = dataDF.randomSplit(Array(0.7, 0.3))
val rf = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setNumTrees(30)
val model = new Pipeline().setStages(Array(rf)).fit(trainingData)
model.write.save("/user/sensortag/models/model_save")
// Make predictions.
val predictions = model.transform(testData)
//predictions.filter(col("label").notEqual(0)).select("prediction", "label", "features").show()
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))
predictions.select("prediction", "label").collect.foreach(println)