700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > Flink-电商用户行为分析(网站独立访客数(UV)的统计)

Flink-电商用户行为分析(网站独立访客数(UV)的统计)

时间:2024-03-30 21:35:28

相关推荐

Flink-电商用户行为分析(网站独立访客数(UV)的统计)

数据

链接:/s/1InfWoNYUeV1KYyvFS1aXuA

提取码:z3p4

统计流量的重要指标是网站的独立访客数(Unique Visitor,UV)。UV指的是一段时间(比如一小时)内访问网站的总人数,1 天内同一访客的多次访问只记录为一个访客。通过 IP 和 cookie 一般是判断 UV 值的两种方式。当客户端第一次访问某个网站服务器的时候,网站服务器会给这个客户端的电脑发出一个 Cookie,通常放在这个客户端电脑的C盘当中。在这个 Cookie中会分配一个独一无二的编号,这其中会记录一些访问服务器的信息,如访问时间,访问了哪些页面等等。当你下次再访问这个服务器的时候,服务器就可以直接从你的电脑中找到上一次放进去的Cookie 文件,并且对其进行一些更新,但那个独一无二的编号是不会变的。 当然,对于 UserBehavior 数据源来说,我们直接可以根据 userId 来区分不同的用户。

代码实现

import org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.AllWindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector//定义一个输入数据的样列类case class UserBehavior(UserId:Long,itemId:Long,categoryId:Int,behavior:String,timestamp:Long)//定义一个输入数据的样列类case class UvCount(windowEnd:Long,uvCount:Long)object UniqueVisitor {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//使用相对路径val resource = getClass.getResource("/UserBehavior.csv")val dataStream = env.readTextFile(resource.getPath).map(data => {val dataArray = data.split(",")UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim,dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.behavior == "pv") //只统计pv操作//不做keyby直接timeWindowAll.timeWindowAll(Time.hours(1)).apply(new UvCountByWindow())dataStream.print("pv count")env.execute("page view job")}}class UvCountByWindow() extends AllWindowFunction[UserBehavior,UvCount,TimeWindow]{override def apply(window: TimeWindow, input: Iterable[UserBehavior],out: Collector[UvCount]): Unit = {//定义一个scala set 用于保存所有的数据UserId并去重var idSet = Set[Long]()//把当前窗口所有数据的Id收集到set中,最后输出set的大小for (userBehavior <- input){idSet += userBehavior.UserId}out.collect(UvCount(window.getEnd,idSet.size))}}

我们需要进一步改进,UV增量去重,因为我们想要来一条数据处理一条,

代码实现

import org.apache.mon.functions.AggregateFunctionimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.AllWindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector//定义一个输入数据的样列类case class UserBehavior(UserId:Long,itemId:Long,categoryId:Int,behavior:String,timestamp:Long)//定义一个输入数据的样列类case class UvCount(windowEnd:Long,uvCount:Long)object UniqueVisitor {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//使用相对路径val resource = getClass.getResource("/UserBehavior.csv")val dataStream = env.readTextFile(resource.getPath).map(data => {val dataArray = data.split(",")UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim,dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.behavior == "pv") //只统计pv操作//不做keyby直接timeWindowAll.timeWindowAll(Time.hours(1)).aggregate(new UvCountAgg(),new UvCountResult())dataStream.print("pv count")env.execute("page view job")}}//自定义增量聚合函数,需要定义一个Set作为累加状态class UvCountAgg() extends AggregateFunction[UserBehavior,Set[Long],Long]{override def createAccumulator(): Set[Long] = Set[Long]()override def add(value: UserBehavior, accumulator: Set[Long]): Set[Long] = accumulator + value.UserIdoverride def getResult(accumulator: Set[Long]): Long = accumulator.sizeoverride def merge(a: Set[Long], b: Set[Long]): Set[Long] = a ++ b}//自定义窗口函数,添加window信息包装成样例类class UvCountResult() extends AllWindowFunction[Long,UvCount,TimeWindow]{override def apply(window: TimeWindow, input: Iterable[Long], out: Collector[UvCount]): Unit = {out.collect(UvCount(window.getEnd,input.head))}}

这样可能会报错

/qq_46548855/article/details/109345958

但是这样还是有bug!

如果有1亿个用户 一个用户100B,那么10^10B,相当于10GB

如果使用布隆过滤器的话,一个用户的100B会转化位1bit,那么就是10^8bit

10^8bit/1024B/1024KB=100bitM/8bit约等于10MB

为了防止hash碰撞,我们要扩充10倍,大于要100MB

那么我们要使用布隆过滤器

我们把所有数据的userId都存在了窗口计算的状态里,在窗口收集数据的过程中,状态会不断增大。一般情况下,只要不超出内存的承受范围,这种做法也没什么问题;但如果我们遇到的数据量很大呢?

把所有数据暂存放到内存里,显然不是一个好注意。我们会想到,可以利用redis这种内存级k-v数据库,为我们做一个缓存。但如果我们遇到的情况非常极端,数据大到惊人呢?比如上亿级的用户,要去重计算UV。

如果放到redis中,亿级的用户id(每个20字节左右的话)可能需要几G甚至几十G的空间来存储。当然放到redis中,用集群进行扩展也不是不可以,但明显代价太大了。

一个更好的想法是,其实我们不需要完整地存储用户ID的信息,只要知道他在不在就行了。所以其实我们可以进行压缩处理,用一位(bit)就可以表示一个用户的状态。这个思想的具体实现就是布隆过滤器(Bloom Filter)。

本质上布隆过滤器是一种数据结构,比较巧妙的概率型数据结构(probabilistic data structure),特点是高效地插入和查询,可以用来告诉你 “某样东西一定不存在或者可能存在”。

它本身是一个很长的二进制向量,既然是二进制的向量,那么显而易见的,存放的不是0,就是1。相比于传统的 List、Set、Map 等数据结构,它更高效、占用空间更少,但是缺点是其返回的结果是概率性的,而不是确切的。

我们的目标就是,利用某种方法(一般是Hash函数)把每个数据,对应到一个位图的某一位上去;如果数据存在,那一位就是1,不存在则为0。

接下来我们就来具体实现一下。

注意这里我们用到了redis连接存取数据,所以需要加入redis客户端的依赖:

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.8.1</version></dependency>

代码实现

import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessWindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}import org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport redis.clients.jedis.Jedisobject UvWithBoomFilter {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 从文件读取数据val inputStream: DataStream[String] = env.readTextFile("D:\\idea\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv")// 将数据转换成样例类类型,并且提取timestamp定义watermarkval dataStream: DataStream[UserBehavior] = inputStream.map( data => {val dataArray = data.split(",")UserBehavior( dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong )} ).assignAscendingTimestamps(_.timestamp * 1000L)// 分配key,包装成二元组开创聚合val uvStream: DataStream[UvCount] = dataStream.filter(_.behavior == "pv").map( data => ("uv", data.UserId) ).keyBy(_._1).timeWindow(Time.hours(1)).trigger(new MyTrigger()) // 自定义Trigger.process( new UvCountResultWithBloomFilter() )uvStream.print()env.execute("uv job")}}// 自定义一个触发器,每来一条数据就触发一次窗口计算操作class MyTrigger() extends Trigger[(String, Long), TimeWindow]{override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}// 数据来了之后,触发计算并清空状态,不保存数据override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.FIRE_AND_PURGE}// 自定义ProcessWindowFunction,把当前数据进行处理,位图保存在redis中class UvCountResultWithBloomFilter() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{var jedis: Jedis = _var bloom: Bloom = _override def open(parameters: Configuration): Unit = {jedis = new Jedis("hadoop12", 6379)// 位图大小10亿个位,也就是2^30,占用128MBbloom = new Bloom(1<<30)}// 每来一个数据,主要是要用布隆过滤器判断redis位图中对应位置是否为1override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {// bitmap用当前窗口的end作为key,保存到redis里,(windowEnd,bitmap)val storedKey = context.window.getEnd.toString// 我们把每个窗口的uv count值,作为状态也存入redis中,存成一张叫做countMap的表val countMap = "countMap"// 先获取当前的count值var count = 0Lif( jedis.hget(countMap, storedKey) != null )count = jedis.hget(countMap, storedKey).toLong// 取userId,计算hash值,判断是否在位图中val userId = elements.last._2.toStringval offset = bloom.hash(userId, 61)val isExist = jedis.getbit( storedKey, offset )// 如果不存在,那么就将对应位置置1,count加1;如果存在,不做操作if( !isExist ){jedis.setbit( storedKey, offset, true )jedis.hset( countMap, storedKey, (count + 1).toString )}}}// 自定义一个布隆过滤器class Bloom(size: Long) extends Serializable{// 定义位图的大小,应该是2的整次幂private val cap = size// 实现一个hash函数def hash(str: String, seed: Int): Long = {var result = 0for( i <- 0 until str.length ){result = result * seed + str.charAt(i)}// 返回一个在cap范围内的一个值(cap - 1) & result}}

解释计算Trigger里面TriggerResult的一些参数

fire处不触发计算purge清不清空状态CONTINUE(false, false)FIRE_AND_PURGE(true, true)FIRE(true, false)PURGE(false, true)

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。