700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 网站独立访客数UV的统计--海量数据去重

网站独立访客数UV的统计--海量数据去重

时间:2019-07-26 20:15:55

相关推荐

网站独立访客数UV的统计--海量数据去重

问题描述:统计每一小时的网站独立访客数UV(Unique Visitor)

问题背景:这是尚硅谷大数据技术之电商用户行为数据分析的一道例题,武晟然老师讲授的方法是,自定义布隆过滤器进行UV统计。受到老师的启发,并结合多篇网络文章,本文给出了 ①Flink自带的布隆过滤器 ②Redis整合布隆过滤器 两种方法,进行UV去重统计。

数据集:UserBehavior.csv

1. Flink自带布隆过滤器进行UV统计

package workflowanalysisimport java.langimport java.text.SimpleDateFormatimport workflowanalysis.UserBehaviorimport org.apache.mon.functions.AggregateFunctionimport org.apache.flink..mon.hash.{BloomFilter, Funnels}import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessWindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector/*** @Author liu* @Date -03-20*/object UvWithFlinkBloom {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val inputStream: DataStream[String] = env.readTextFile("D:\\ideaIU-.1.2\\IdeaProject\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv")val resultStream: DataStream[String] = inputStream.map(line => {val dataArr: Array[String] = line.split(",")UserBehavior(dataArr(0).toLong,dataArr(1).toLong,dataArr(2).toInt,dataArr(3),dataArr(4).toLong)}).filter(_.behavior == "pv").assignAscendingTimestamps(_.timestamp * 1000L).map(data=>("key",data.userId)).keyBy(_._1).timeWindow(Time.hours(1)).aggregate(new AggregateFunction[(String, Long), (Long, BloomFilter[lang.Long]), Long] {override def createAccumulator() = (0, BloomFilter.create(Funnels.longFunnel(), 100000000, 0.01))override def add(value: (String, Long), accumulator: (Long, BloomFilter[lang.Long])) = {var uvCount: Long = accumulator._1val bloom: BloomFilter[lang.Long] = accumulator._2if (!bloom.mightContain(value._2)) {bloom.put(value._2)uvCount += 1}(uvCount, bloom)}override def getResult(accumulator: (Long, BloomFilter[lang.Long])) = accumulator._1override def merge(a: (Long, BloomFilter[lang.Long]), b: (Long, BloomFilter[lang.Long])) = ???}, new ProcessWindowFunction[Long, String, String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[Long], out: Collector[String]): Unit = {out.collect("窗口=" + sdf.format(context.window.getStart) + "~" + sdf.format(context.window.getEnd) + "\tUvCount=" + elements.iterator.next())}})resultStream.print()env.execute()}}

2. Redis整合布隆过滤器进行UV统计

前置准备:Redis-5.0.8集群环境集成布隆过滤器

引入依赖:

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.0.0-m1</version></dependency><dependency><groupId>com.redislabs</groupId><artifactId>jrebloom</artifactId><version>2.2.2</version></dependency>

启动Redis集群:

[xiaokang@hadoop01 ~]$ redis-cluster-start.sh

代码编写:

package workflowanalysisimport java.text.SimpleDateFormatimport java.utilimport workflowanalysis.UserBehaviorimport io.rebloom.client.ClusterClientimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessWindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport redis.clients.jedis.{HostAndPort, JedisPoolConfig}/*** @Author liu* @Date -03-20*/case class UserBehavior(userId: Long,itemId: Long,categoryId: Int,behavior: String,timestamp: Long)object UvWithRedisBloomCluster {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval inputStream: DataStream[String] = env.readTextFile("D:\\ideaIU-.1.2\\IdeaProject\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv")val dataStream: DataStream[UserBehavior] = inputStream.map(line => {val dataArr: Array[String] = line.split(",")UserBehavior(dataArr(0).toLong,dataArr(1).toLong,dataArr(2).toInt,dataArr(3),dataArr(4).toLong)}).filter(_.behavior == "pv").assignAscendingTimestamps(_.timestamp * 1000L)dataStream.map(data => ("key", data.userId)).keyBy(_._1) //全部分到同一个组.timeWindow(Time.hours(1)).trigger(new MyTrigger()).process(new ProcessWindowFunction[(String, Long), String, String, TimeWindow] {var hostAndPortSet: util.HashSet[HostAndPort] = _var config: JedisPoolConfig = _var clusterClientBf: ClusterClient = _var count: String = _var sdf: SimpleDateFormat=_override def open(parameters: Configuration) = {hostAndPortSet = new util.HashSet[HostAndPort]()hostAndPortSet.add(new HostAndPort("192.168.50.181", 7001))hostAndPortSet.add(new HostAndPort("192.168.50.181", 7002))hostAndPortSet.add(new HostAndPort("192.168.50.182", 7001))hostAndPortSet.add(new HostAndPort("192.168.50.182", 7002))hostAndPortSet.add(new HostAndPort("192.168.50.183", 7001))hostAndPortSet.add(new HostAndPort("192.168.50.183", 7002))config = new JedisPoolConfig()config.setMaxTotal(50)config.setMaxIdle(10)config.setMinIdle(5)config.setMaxWaitMillis(10000)clusterClientBf = new ClusterClient(hostAndPortSet, config)sdf= new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss")}override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = {val windowStart: String = sdf.format(context.window.getStart)val windowEnd: String = sdf.format(context.window.getEnd)val windowRange=windowStart+"~~"+windowEndif (!clusterClientBf.exists(windowRange)) {clusterClientBf.createFilter(windowRange, 100000000, 0.1)}val curValue: String = elements.iterator.next()._2.toStringif (!clusterClientBf.exists(windowRange, curValue)) {clusterClientBf.add(windowRange, curValue)count = clusterClientBf.hget("uv", windowRange)if (count != null) {clusterClientBf.hset("uv", windowRange, (count.toLong+1).toString)}else{clusterClientBf.hset("uv", windowRange, 1L.toString)}}}})env.execute()}}class MyTrigger() extends Trigger[(String, Long), TimeWindow] {//每来一个数据的时候要做什么操作override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {TriggerResult.FIRE_AND_PURGE //计算和清空//FIRE //计算//PURGE //清空//CONTINUE //两者都不做}//系统时间有进展的时候要做什么操作override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE//收到Watermark的时候,有Watermark改变的时候,要做什么操作override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}}

查看结果:

[xiaokang@hadoop01 ~]$ redis-cli -c -p 7001127.0.0.1:7001> hgetall uv

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。