Spark SQL配置及使用教程
什么是Spark SQL?
Spark SQL是运行在Apache Spark之上的模块,它提供结构化数据处理的能力,可以让用户使用SQL语句或DataFrame API处理结构化数据,同时可以与其他Spark模块集成使用。
Spark SQL的配置
1. 坐标依赖配置
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
注:修改version版本号即可使用指定版本的Spark集群。
2. SparkConf配置
SparkConf sparkConf = new SparkConf()
.setAppName("appName")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SparkSession spark = SparkSession
.builder()
.appName("appName")
.config("spark.some.config.option", "some-value")
.getOrCreate();
3. SparkSession与Hive配置
SparkSession spark = SparkSession.builder()
.appName("appName")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://localhost:9083")
.enableHiveSupport()
.getOrCreate();
Spark SQL的使用
1. 创建DataFrame
List<Tuple2<Integer, String>> data = Arrays.asList(
new Tuple2<Integer, String>(1, "Alice"),
new Tuple2<Integer, String>(2, "Bob"),
new Tuple2<Integer, String>(3, "Charlie"));
JavaRDD<Tuple2<Integer, String>> rdd = jsc.parallelize(data);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("name", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(rdd, schema);
2. DataFrame API操作
df.filter(col("id").gt(1)).show();
df.select(col("name")).show();
3. Spark SQL操作
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT name FROM people WHERE id > 1");
sqlDF.show();
示例说明
示例1
在本地运行Spark SQL程序:
package com.spark.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class App {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("SparkSQLExample")
.config("spark.some.config.option", "some-value")
.getOrCreate();
// spark 读取csv文件
String csvPath = "/Users/xxx/Downloads/test.csv";
Dataset<Row> df = spark.read().format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(csvPath);
df.show();
spark.stop();
}
}
示例2
在Hive建立一张表,使用Spark SQL查询Hive表:
package com.spark.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class App {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("SparkSQLExample")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://localhost:9083")
.enableHiveSupport()
.getOrCreate();
// 创建Hive表
spark.sql("CREATE TABLE IF NOT EXISTS persons (id int, name string) row format delimited fields terminated by ',' stored as textfile location '/user/hive/warehouse/persons'");
// 查看Hive表内容
Dataset<Row> result = spark.sql("SELECT * FROM persons");
result.show();
spark.stop();
}
}
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Spark SQL配置及使用教程 - Python技术站