-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWeatherWithSparkSQL.java
More file actions
80 lines (62 loc) · 2.44 KB
/
WeatherWithSparkSQL.java
File metadata and controls
80 lines (62 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com;
import java.io.Serializable;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
public class WeatherWithSparkSQL {
public static void main(String[] args) {
SparkConf sConf = new SparkConf().setAppName("WeatherWithSparkSQL")
.setMaster("spark://Ankits-MacBook-Air.local:7077")
.set("spark.cores.max", "4")
.set("spark.deploy.defaultCores", "4");
JavaSparkContext sc = new JavaSparkContext(sConf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> file = sc.textFile("/Users/ankitjindal/Documents/Apache_Spark/Tutorials_and_Certification/Spark_offline_current/labfiles/nycweather/nycweather.csv");
JavaRDD<Weather> weather = file.map(m -> Arrays.asList(m.split(",")))
.map(mp -> new Weather(mp.get(0),
Integer.parseInt(mp.get(1).trim()),
Double.parseDouble(mp.get(2).trim())
)
);
//weather.take(10).forEach(aa -> System.out.println(aa.date+"~~"+aa.temp+"~~"+aa.precip));
DataFrame schemaWeather = sqlContext.createDataFrame(weather, Weather.class);
schemaWeather.registerTempTable("weather");
DataFrame precip = sqlContext.sql("Select date, temp, precip FROM weather where precip >= 0.0");
precip.toJavaRDD().map(f -> "Date:"+f.get(0)+" Temp:"+f.get(1)+" Precip:"+f.get(2)).take(10).forEach(a -> System.out.println(a));
DataFrame temp = sqlContext.sql("Select date, temp, precip FROM weather where temp <= 0");
temp.toJavaRDD().map(f -> "Date:"+f.getString(0)+" Temp:"+f.getInt(1)+" Precip:"+f.getDouble(2)).take(10).forEach(a -> System.out.println(a));
sc.close();
}
public static class Weather implements Serializable{
private static final long serialVersionUID = 1L;
private String date;
private Integer temp;
private Double precip;
Weather(String date, Integer temp, Double precip){
this.date = date;
this.temp = temp;
this.precip = precip;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public Integer getTemp() {
return temp;
}
public void setTemp(Integer temp) {
this.temp = temp;
}
public Double getprecip() {
return precip;
}
public void setprecip(Double precip) {
this.precip = precip;
}
}
}