Spark 2.0 安装部署
本篇将简单部署 Spark 2.0 的环境,以及如何使用 Spark RDD/SQL/SHELL 访问数据
选择下载安装包
可以在这里下载相应版本的安装包。
此处如下:
配置 Spark
将文件上传到各个节点,然后解压
tar -zxvf spark-2.0.0-bin-hadoop2.6.tgz
配置 SPARK_HOME
export SPARK_HOME=/home/utf7/spark-2.0.0-bin-hadoop2.6
export PATH=$SPARK_HOME/bin:$PATH
修改配置文件: 进入 spark 安装目录
- spark-env.sh
cd conf
cp spark-env.sh.template spark-env.sh
vi spark
export SPARK_HOME=/home/utf7/spark-2.0.0-bin-hadoop2.6
export HADOOP_CONF_DIR=/home/utf7/hadoop/etc/hadoop
export SPARK_MASTER_IP=master
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8090
export SPARK_WORKER_CORES=2
export SPARK_WORKER_MEMORY=1g
export SPARK_WORKER_PORT=8091
export SPARK_LOG_DIR=$SPARK_HOME/logs/spark
export SPARK_PID_DIR=$SPARK_HOME/tmp/spark
- slaves
cat slaves
yc3
-
log4j.properties
cp log4j.properties.template log4j.properties
根据需要修改,此处没有修改
- 引入 hadoop 配置
此处将 core-site.xml 和 hdfs-site.xml 复制到了 spark 的 conf 目录下面。应该也可以通过其他方式去做 将修改的文件同步到所有的节点中
启动
cd $SPARK_HOME/sbin
./start-all.sh
查看启动日志
日志记录在 $SPARK_HOME/logs/spark 目录下面
测试
- 查看 spark 管理页面: http://master:8090
访问 Spark
- 使用 spark-shell
首先,将 spark 的 README.MD 上传到 hdfs 上,后面我们需要用,上传文件:
hdfs dfs -put /home/utf7/spark-2.0.0-bin-hadoop2.6/README.md /home/utf7/README.md
cd $SPARK_HOME/bin
./spark-shell
scala>var textFile = sc.textFile(“/user/utf7/README.md”) textFile: org.apache.spark.rdd.RDD[String] = /user/utf7/README.md MapPartitionsRDD[7] at textFile at
:24
scala>textFile.count() res2: Long = 99
scala> textFile.first() res3: String = # Apache Spark
scala> val linesWithSpark = textFile.filter(line => line.contains(“Spark”)) linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at filter at
:26
scala> textFile.filter(line => line.contains(“Spark”)).count() res4: Long = 19
更多,请参考quick-start
编写 Spark程序
使用maven管理包依赖:
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.utf7</groupId>
<artifactId>sparkLearn</artifactId>
<version>1.0</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.30</version>
</dependency>
</dependencies>
</project>
- 使用 Java RDD API
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
/**
* show how to use javardd api
*/
public class HelloSparkJavaRDD {
public final static String MASTER = "spark://master:7077";
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("helloSparkJava").setMaster(MASTER);
JavaSparkContext sc = new JavaSparkContext(conf);
JavaSparkContext.jarOfClass(HelloSparkJavaRDD.class);
JavaRDD<String> lines = sc.textFile("README.md");
//java8
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
@Override
public Integer call(String v1) throws Exception {
return v1.length();
}
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println("total length is :" + totalLength);
/* final String spark = "Spark";
long numSpark= lines.filter(new Function<String, Boolean>() {
public Boolean call(String v1) throws Exception {
return v1.contains(spark);
}
}).count();
System.out.println("find word '" + spark + "':" + numSpark);*/
}
}
更加请参考:resilient-distributed-datasets-rdds
- 使用 Java Spark SQL API
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
/**
* Created by utf7 on 2016/9/19.
* show how to user spark sql api
*/
public class HelloSparkJavaSQL {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Java Spark SQL Example " + new Timestamp(System.currentTimeMillis())).config("spark.some.config.option", "some-value").getOrCreate();
Dataset<Row> df = spark.read().json("file:/home/utf7/spark-2.0.0-bin-hadoop2.6/examples/src/main/resources/people.json");
System.out.println("show df:");
df.show();
System.out.println("show printSchema:");
df.printSchema();
System.out.println("select name:");
df.select("name").show();
System.out.println("select name,age+1 :");
df.select(new Column("name"), new Column("age").plus(1)).show();
System.out.println("select age >21:");
df.filter(new Column("age").gt(21)).show();
System.out.println("select age,count(*) group by age :");
df.groupBy("age").count().show();
System.out.println("show how create view and select from the view:");
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("select * from people");
sqlDF.show();
System.out.println("show how to use dataset:");
testDatasets(spark);
testReadFromMySQL(spark);
}
/**
* show how to read from rdms use jdbc,here is mysql
* @param spark
*/
public static void testReadFromMySQL(SparkSession spark) {
String MYSQL_CONNECTION_URL = "jdbc:mysql://yc1:3306/test";
String MYSQL_USERNAME = "utf7";
String MYSQL_PWD = "utf7";
String TABLE_NAME = "user";
Person person = new Person();
person.setName("utf7");
person.setAge(28);
// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
final Properties connectionProperties = new Properties();
connectionProperties.put("user", MYSQL_USERNAME);
connectionProperties.put("password", MYSQL_PWD);
System.out.println("show how read from mysql:");
Dataset<Person> peopleDS = spark.read().jdbc(MYSQL_CONNECTION_URL, TABLE_NAME, connectionProperties).as(personEncoder);
peopleDS.show();
}
/**
* show how to convert file to dataset,here is local file "file://..."
* @param spark
*/
public static void testDatasets(SparkSession spark) {
Person person = new Person();
person.setName("utf7");
person.setAge(28);
// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(Collections.singletonList(person), personEncoder);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, Integer>() {
@Override
public Integer call(Integer value) throws Exception {
return value + 1;
}
}, integerEncoder);
transformedDS.collect();// Returns [2,3,4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "file:/home/utf7/spark-2.0.0-bin-hadoop2.6/examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
}
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return this.name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
}
更多请参考:sql-programming-guide
- 提交 spark jar脚本
将上面的程序打包,放到 spark 环境中执行,执行脚本为:
#!/usr/bin/env bash
# you can use this script to run spark app:
# ./runSpark.sh runRDD : run RDD example
# ./runSpark.sh runSQL : run SQL example
export JAR_HOME=/home/utf7/
function runRDD(){
$SPARK_HOME/bin/spark-submit \
--class "HelloSparkJavaRDD" \
--executor-memory 128m \
--total-executor-cores 1 \
$JAR_HOME/sparkLearn-1.0.jar
}
function runSQL(){
$SPARK_HOME/bin/spark-submit \
--class "HelloSparkJavaSQL" \
--executor-memory 128m \
--total-executor-cores 1 \
$JAR_HOME/sparkLearn-1.0.jar
}
if [ $# -gt 0 ]
then
$@
exit 0
fi
echo "GUIDE:"
echo "USE runRDD/runSQL"
$ ./runSpark.sh runRDD ,执行如上命令,spark rdd 示例输出如下:
16/09/26 17:44:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
total length is :3729
$./runSpark.sh runSQL 执行如上命令,使用 spark sql 示例输出如下:
16/09/26 17:39:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/09/26 17:39:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
16/09/26 17:39:32 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
show df:
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
show printSchema:
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
select name:
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
select name,age+1 :
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
select age >21:
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
select age,count(*) group by age :
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
show how create view and select from the view:
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
show how to use dataset:
+---+-----+
|age| name|
+---+-----+
| 28|seven|
+---+-----+
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
show how read from mysql:
+-------+---+
| name|age|
+-------+---+
| seven| 1|
| utf7| 28|
+-------+---+