700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 大数据Spark(二十):Spark Core外部数据源引入

大数据Spark(二十):Spark Core外部数据源引入

时间:2023-08-27 09:16:22

相关推荐

大数据Spark(二十):Spark Core外部数据源引入

目录

外部数据源

MySQL 数据源

演示代码

HBase 数据源

HBase Sink

​​​​​​​HBase Source

外部数据源

Spark可以从外部存储系统读取数据,比如RDBMs表中或者HBase表中读写数据,这也是企业中常常使用,如:

1)、要分析的数据存储在HBase表中,需要从其中读取数据数据分析

日志数据:电商网站的商家操作日志

订单数据:保险行业订单数据

2)、使用Spark进行离线分析以后,往往将报表结果保存到MySQL表中

网站基本分析(pv、uv。。。。。)

注意:实际开发中会封装为工具类直接使用

/teeyog/blog/issues/22

/u011817217/article/details/81667115

MySQL 数据源

实际开发中常常将分析结果RDD保存至MySQL表中,使用foreachPartition函数;此外Spark中提供JdbcRDD用于从MySQL表中读取数据。

调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中,保存时考虑降低RDD分区数目和批量插入,提升程序性能。

演示代码

package cn.itcast.coreimport java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd.{JdbcRDD, RDD}/*** Author itcast* Desc 演示使用Spark将数据写入到MySQL,再从MySQL读取出来*/object SparkJdbcDataSource {def main(args: Array[String]): Unit = {//1.创建SparkContextval sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")//2.准备数据val data: RDD[(String, Int)] = sc.parallelize(List(("jack", 18), ("tom", 19), ("rose", 20)))//3.将RDD中的数据保存到MySQL中去//将每一个分区中的数据保存到MySQL中去,有几个分区,就会开启关闭连接几次//data.foreachPartition(itar=>dataToMySQL(itar))data.foreachPartition(dataToMySQL) //方法即函数,函数即对象//4.从MySQL读取数据/*class JdbcRDD[T: ClassTag](sc: SparkContext,getConnection: () => Connection,sql: String,lowerBound: Long,upperBound: Long,numPartitions: Int,mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)*/val getConnection = ()=> DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")val sql:String = "select id,name,age from t_student where id >= ? and id <= ?"val mapRow = (rs:ResultSet) => {val id: Int = rs.getInt(1)val name: String = rs.getString(2)val age: Int = rs.getInt("age")(id,name,age)}val studentRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD(sc,getConnection,sql,4,5,2,mapRow)println(studentRDD.collect().toBuffer)}/*** 将分区中的数据保存到MySQL* @param itar 传过来的每个分区有多条数据*/def dataToMySQL(itar: Iterator[(String, Int)]): Unit = {//0.加载驱动//Class.forName("") //源码中已经加载了//1.获取连接val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")//2.编写sqlval sql:String = "INSERT INTO `t_student` (`name`, `age`) VALUES (?, ?);"//3.获取psval ps: PreparedStatement = connection.prepareStatement(sql)itar.foreach(data=>{//4.设置参数ps.setString(1,data._1)ps.setInt(2,data._2)//5.执行sqlps.addBatch()})ps.executeBatch()ps.close()connection.close()}}

​​​​​​​HBase 数据源

Spark可以从HBase表中读写(Read/Write)数据,底层采用TableInputFormatTableOutputFormat方式,与MapReduce与HBase集成完全一样,使用输入格式InputFormat和输出格式OutputFoamt。

​​​​​​​HBase Sink

回顾MapReduce向HBase表中写入数据,使用TableReducer,其中OutputFormat为TableOutputFormat,读取数据Key:ImmutableBytesWritable(Rowkey),Value:Put(Put对象)

写入数据时,需要将RDD转换为RDD[(ImmutableBytesWritable, Put)]类型,调用saveAsNewAPIHadoopFile方法数据保存至HBase表中。

HBase Client连接时,需要设置依赖Zookeeper地址相关信息及表的名称,通过Configuration设置属性值进行传递。

范例演示:将词频统计结果保存HBase表,表的设计

代码如下:

package cn.itcast.coreimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.client.Putimport org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableOutputFormatimport org.apache.hadoop.hbase.util.Bytesimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/*** 将RDD数据保存至HBase表中*/object SparkWriteHBase {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 构建RDDval list = List(("hadoop", 234), ("spark", 3454), ("hive", 343434), ("ml", 8765))val outputRDD: RDD[(String, Int)] = sc.parallelize(list, numSlices = 2)// 将数据写入到HBase表中, 使用saveAsNewAPIHadoopFile函数,要求RDD是(key, Value)// 组装RDD[(ImmutableBytesWritable, Put)]/*** HBase表的设计:* 表的名称:htb_wordcount* Rowkey: word* 列簇: info* 字段名称: count*/val putsRDD: RDD[(ImmutableBytesWritable, Put)] = outputRDD.mapPartitions { iter =>iter.map { case (word, count) =>// 创建Put实例对象val put = new Put(Bytes.toBytes(word))// 添加列put.addColumn(// 实际项目中使用HBase时,插入数据,先将所有字段的值转为String,再使用Bytes转换为字节数组Bytes.toBytes("info"), Bytes.toBytes("cout"), Bytes.toBytes(count.toString))// 返回二元组(new ImmutableBytesWritable(put.getRow), put)}}// 构建HBase Client配置信息val conf: Configuration = HBaseConfiguration.create()// 设置连接Zookeeper属性conf.set("hbase.zookeeper.quorum", "node1")conf.set("hbase.zookeeper.property.clientPort", "2181")conf.set("zookeeper.znode.parent", "/hbase")// 设置将数据保存的HBase表的名称conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")/*def saveAsNewAPIHadoopFile(path: String,// 保存的路径keyClass: Class[_], // Key类型valueClass: Class[_], // Value类型outputFormatClass: Class[_ <: NewOutputFormat[_, _]], // 输出格式OutputFormat实现conf: Configuration = self.context.hadoopConfiguration // 配置信息): Unit*/putsRDD.saveAsNewAPIHadoopFile("datas/spark/htb-output-" + System.nanoTime(), //classOf[ImmutableBytesWritable], //classOf[Put], //classOf[TableOutputFormat[ImmutableBytesWritable]], //conf)// 应用程序运行结束,关闭资源sc.stop()}}

运行完成以后,使用hbase shell查看数据:

​​​​​​​HBase Source

回顾MapReduce从读HBase表中的数据,使用TableMapper,其中InputFormat为TableInputFormat,读取数据Key:ImmutableBytesWritable,Value:Result

从HBase表读取数据时,同样需要设置依赖Zookeeper地址信息和表的名称,使用Configuration设置属性,形式如下:

此外,读取的数据封装到RDD中,Key和Value类型分别为:ImmutableBytesWritable和Result,不支持Java Serializable导致处理数据时报序列化异常。设置Spark Application使用Kryo序列化,性能要比Java 序列化要好,创建SparkConf对象设置相关属性,如下所示:

范例演示:从HBase表读取词频统计结果,代码如下

package cn.itcast.coreimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}import org.apache.hadoop.hbase.client.Resultimport org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.hadoop.hbase.util.Bytesimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/*** 从HBase 表中读取数据,封装到RDD数据集*/object SparkReadHBase {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 读取HBase Client 配置信息val conf: Configuration = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum", "node1")conf.set("hbase.zookeeper.property.clientPort", "2181")conf.set("zookeeper.znode.parent", "/hbase")// 设置读取的表的名称conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")/*def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration = hadoopConfiguration,fClass: Class[F],kClass: Class[K],vClass: Class[V]): RDD[(K, V)]*/val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])println(s"Count = ${resultRDD.count()}")resultRDD.take(5).foreach { case (rowKey, result) =>println(s"RowKey = ${Bytes.toString(rowKey.get())}")// HBase表中的每条数据封装在result对象中,解析获取每列的值result.rawCells().foreach { cell =>val cf = Bytes.toString(CellUtil.cloneFamily(cell))val column = Bytes.toString(CellUtil.cloneQualifier(cell))val value = Bytes.toString(CellUtil.cloneValue(cell))val version = cell.getTimestampprintln(s"\t $cf:$column= $value, version = $version")}}// 应用程序运行结束,关闭资源sc.stop()}}

运行结果:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。