Flink实时流量统计:基于窗口函数与Redis Sink的每小时PV监控系统(学习记录)
题目:
利用flink统计网站浏览量,并写入redis。
利用窗口函数以及算子实现每小时PV(网站的页面浏览量)统计,对统计后结果数据格式进行设计,存储至Redis中(利用sink将处理后结果数据输出到redis数据库中)。
操作步骤:
1.redis在虚拟机上的安装
(1)下载redis安装包
cd /opt/software # 进入一个用于存放安装包的目录,可自行选择
wget http://download.redis.io/releases/redis-6.2.6.tar.gz
# 下载Redis 6.2.6版本,可根据需求更换版本号
(2)解压安装包——使用tar命令解压下载好的安装包
tar -zxvf redis-6.2.6.tar.gz
解压后会生成一个名为redis-6.2.6的目录。
(3)安装gcc编译工具
yum install -y gcc gcc-c++ make
(4)编译和安装 Redis
cd redis-6.2.6
make # 编译Redis,此过程可能需要一些时间,取决于虚拟机性能
make install # 安装Redis,默认会安装到/usr/local/bin目录下
(5)配置redis—— Redis 默认没有生成配置文件,需要手动创建相关目录和文件:
mkdir /etc/redis # 创建存放配置文件的目录
cp /opt/redis-6.2.6/redis.conf /etc/redis/ # 将示例配置文件复制到新目录下,路径根据实际解压位置调整
如图所示,redis安装成功并且进行了正常访问。
此处还需要关闭虚拟机上的防火墙,使redis的6379端口可以被正常访问。
2.代码展示
项目创建
在 IntelliJ IDEA 中,选择File -> New -> Project,然后选择Maven,按照向导创建一个新的 Maven 项目。在pom.xml文件中添加以下依赖:
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flink-pv-redis</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.13.6</flink.version><redis.clients.version>3.8.0</redis.clients.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- Flink 核心依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><!-- Flink Redis 连接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><!-- Redis 客户端依赖 --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>${redis.clients.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency></dependencies><!-- 阿里云镜像 --><repositories><repository><id>aliyunmaven</id><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url></repository></repositories><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin></plugins></build>
</project>
定义数据类型
创建一个 Java 类来表示UserBehavior,UserBehavior.csv文件的每一行包含userId、behavior等五个字段,以逗号分隔。
package Bean;public class UserBehavior {private Long userId;private Long itemId;private Integer categoryId;private String behavior; // 行为类型:"pv"为页面浏览private Long timestamp; // 时间戳(毫秒)public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {this.userId = userId;this.itemId = itemId;this.categoryId = categoryId;this.behavior = behavior;this.timestamp = timestamp;}public Long getUserId() {return userId;}public void setUserId(Long userId) {this.userId = userId;}public Long getItemId() {return itemId;}public void setItemId(Long itemId) {this.itemId = itemId;}public Integer getCategoryId() {return categoryId;}public void setCategoryId(Integer categoryId) {this.categoryId = categoryId;}public String getBehavior() {return behavior;}public void setBehavior(String behavior) {this.behavior = behavior;}public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}}
编写Flink程序
创建一个主类,比如PvStatisticsToRedis.java,编写 Flink 程序来统计每小时 PV 并写入 Redis。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.connector.redis.sink.RedisSink;
import org.apache.flink.connector.redis.sink.RedisSinkFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;public class PvStatisticsToRedis {public static void main(String[] args) throws Exception {// 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从 CSV 文件读取数据,假设第一行是表头,这里简单跳过DataStream<UserBehavior> userBehaviorDataStream = env.readTextFile("/mnt/UserBehavior(1).csv").skip(1).map(new MapFunction<String, UserBehavior>() {@Overridepublic UserBehavior map(String line) throws Exception {String[] parts = line.split(",");long userId = Long.parseLong(parts[0]);String behavior = parts[1];return new UserBehavior(userId, behavior);}});// 过滤出 pv 行为的数据SingleOutputStreamOperator<Tuple2<String, Long>> pvStream = userBehaviorDataStream.filter(behavior -> "pv".equals(behavior.getBehavior())).map(behavior -> Tuple2.of("pv", behavior.getUserId())).returns(Types.TUPLE(Types.STRING, Types.LONG));// 按照 "pv" 进行分组,并使用滑动窗口统计每小时的 PV 数量KeyedStream<Tuple2<String, Long>, String> keyedStream = pvStream.keyBy(t -> t.f0);SingleOutputStreamOperator<Map<String, Object>> pvCountStream = (SingleOutputStreamOperator<Map<String, Object>>) keyedStream.window(TumblingProcessingTimeWindows.of(Time.hours(1))).process(new KeyedProcessFunction<String, Tuple2<String, Long>, Map<String, Object>>() {private transient HashSet<Long> userIds;@Overridepublic void open(Configuration parameters) throws Exception {userIds = new HashSet<>();}@Overridepublic void processElement(Tuple2<String, Long> value, Context ctx, Collector<Map<String, Object>> out) throws Exception {userIds.add(value.f1);Map<String, Object> result = new HashMap<>();result.put("window_end_time", ctx.timerService().currentProcessingTime());result.put("pv_count", userIds.size());out.collect(result);}@Overridepublic void close() throws Exception {userIds.clear();}});// 将统计结果转换为字符串格式SingleOutputStreamOperator<String> resultStream = pvCountStream.map(new MapFunction<Map<String, Object>, String>() {@Overridepublic String map(Map<String, Object> value) throws Exception {return "window_end_time: " + value.get("window_end_time") + ", pv_count: " + value.get("pv_count");}});// 配置 Redis 连接FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();// 创建 Redis SinkRedisSinkFunction<String> redisSinkFunction = new RedisSinkFunction<>(conf, new RedisMapper<String>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH, "pv_statistics");}@Overridepublic String getKeyFromData(String data) {return null;}@Overridepublic String getValueFromData(String data) {return data;}});RedisSink<String> redisSink = new RedisSink<>(redisSinkFunction);// 将结果写入 RedisresultStream.addSink(redisSink);// 打印结果到控制台(可选)resultStream.addSink(new PrintSinkFunction<>());// 执行 Flink 任务env.execute("PV Statistics to Redis");}
}
资源文件中加载日志log4j.poverities,代码如下:
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
运行效果:
3.sink输出到redis数据库
核心原理
Flink 的RedisSink通过自定义RedisMapper实现数据写入 Redis,主要需指定:
Redis 命令:如SET(存储键值对)、HSET(存储哈希)、LPUSH(存储列表)等。
键(Key):用于标识数据的唯一性(如按小时的 PV 统计可将window_end_time作为 Key)。
值(Value):需要存储的具体统计结果(如 PV 数量)。
具体实现步骤
假设统计结果为每小时的 PV 数,设计存储格式如下:
Redis 键(Key):pv_statistics:{window_end_time}(其中window_end_time为窗口结束时间戳,精确到小时)。
Redis 值(Value):该小时的 PV 总数(如12345)。
数据结构:采用String类型(通过SET命令存储),便于后续查询和聚合。
RedisMapper是 Flink 与 Redis 交互的核心接口,需实现 3 个方法:
getCommandDescription():指定 Redis 命令(如SET)。
getKeyFromData():从统计结果中提取 Redis 的 Key。
getValueFromData():从统计结果中提取 Redis 的 Value。
代码实现
RedisMapper<PvResult> redisMapper = new RedisMapper<PvResult>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "pv:hour");}@Overridepublic String getKeyFromData(PvResult data) {SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH");String key = sdf.format(new Date(data.getWindowStart()));System.out.println("Redis Key: " + key);return key;}@Overridepublic String getValueFromData(PvResult data) {return String.valueOf(data.getCount());}
};
package Bean;
public class PvResult {private long windowStart; // 窗口开始时间(毫秒)private long windowEnd; // 窗口结束时间(毫秒)private long count; // 该窗口的 PV 数public PvResult() {}public PvResult(long windowStart, long windowEnd, long count) {this.windowStart = windowStart;this.windowEnd = windowEnd;this.count = count;}// Getters & Setterspublic long getWindowStart() { return windowStart; }public void setWindowStart(long windowStart) { this.windowStart = windowStart; }public long getWindowEnd() { return windowEnd; }public void setWindowEnd(long windowEnd) { this.windowEnd = windowEnd; }public long getCount() { return count; }public void setCount(long count) { this.count = count; }
}
配置 Redis 连接
通过FlinkJedisPoolConfig配置 Redis 连接信息(如主机、端口、密码等):
FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder().setHost("192.168.100.20") // 虚拟机Redis IP.setPort(6379).build();
注意,这里连接的是虚拟机上的redis,需要更改为虚拟机上的IP地址。
效果展示: