当前位置: 首页 > news >正文

大数据课程L9——网站流量项目的实时业务处理代码

文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州

 ▲ 本章节目的

⚪ 掌握网站流量项目的SparkStreaming代码;

⚪ 掌握网站流量项目的HBaseUtil代码;

⚪ 掌握网站流量项目的MysqlUtil代码;

⚪ 掌握网站流量项目的LogBean代码;

⚪ 掌握网站流量项目的TongjiBean代码;

一、SparkStreaming代码

package cn.tedu.kafkasource

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.TopicPartition

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.SparkConf

import org.apache.spark.streaming.dstream.InputDStream

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.SparkContext

import cn.tedu.pojo.LogBean

import java.util.Calendar

import cn.tedu.dao.HBaseUtil

import cn.tedu.pojo.TongjiBean

import cn.tedu.dao.MysqlUtil

object SparkStreaming {

  def main(args: Array[String]): Unit = {

     val conf= new SparkConf().setMaster("local[3]").setAppName("test01")

            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 

    val sc=new SparkContext(conf)   

    val ssc=new StreamingContext(sc, Seconds(5))   

    val kafkaParams: Map[String, Object] = Map[String, Object](

            "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",

            "key.deserializer" -> classOf[StringDeserializer],

            "value.deserializer" -> classOf[StringDeserializer],

            "group.id" -> "gp2"

        )

    val topics = Array("logdata")

    val kafkaSource=KafkaUtils.createDirectStream[String, String](

            ssc,

            PreferConsistent,

            Subscribe[String, String](topics, kafkaParams)

        ).map(x=>x.value())

    kafkaSource.foreachRDD{rdd=>

     //lines里存储了当前批次内的所有数据 

      val lines=rdd.toLocalIterator

      //遍历迭代器,对每条数据进行处理

      while(lines.hasNext){

        val line=lines.next()

        //第一步:清洗出所需要的业务字段。url,urlname,uvid,ssid,sscount,sstime,cip

        val info=line.split("\\|")

        val url=info(0)

        val urlname=info(1)

        val uvid=info(13)

        val ssid=info(14).split("_")(0)

        val sscount=info(14).split("_")(1)

        val sstime=info(14).split("_")(2)

http://www.lryc.cn/news/165031.html

相关文章:

  • 【2023最新B站评论爬虫】用python爬取上千条哔哩哔哩评论
  • mysql设置max_sp_recursion_depth,sql_mode
  • 论文阅读:SERE: Exploring Feature Self-relation for Self-supervised Transformer
  • 遥感数据与作物模型同化应用:PROSAIL模型、DSSAT模型、参数敏感性分析、数据同化算法、模型耦合、精度验证等主要环节
  • Navicat15工具连接PostgreSQL15失败
  • 开源AI家庭自动化助手-手机控制家庭智能家居服务
  • 解决CSS定位错乱/疑难杂症的终极绝招==》从样式污染开始排查
  • 【笔记】《C++性能优化指南》Ch3 测量性能
  • 2023大数据面试总结
  • udev自动创建设备节点的机制
  • 访问局域网内共享文件时报错0x80070043,找不到网络名
  • Java定时器
  • 科普js加密时出现的错误
  • MYSQL优化——B+树讲解
  • Rokid Jungle--Station pro
  • 如何实现微服务
  • MySQL如何进行增量备份与恢复?
  • 微服务框架
  • (matplotlib)如何让各个子图ax大小(宽度和高度)相等
  • python http 上传文件
  • IPO解读:Instacart曲折上市,业务模式如何持续“绚烂”?
  • 使用sql profile 稳定执行计划的案例
  • 海南大学金秋悦读《乡村振兴战略下传统村落文化旅游设计》2023新学年许少辉八一新书​
  • [N0wayback 2023春节红包题] happyGame python反编译
  • Redis 初识与入门
  • 【STM32】片上ADC的初步使用
  • esxi下实现ikuai相同的两个网卡,单独路由配置
  • Windows环境下Elasticsearch相关软件安装
  • 配置Jedis连接池
  • Windows 12 开源网页版