当前位置: 首页 > news >正文

数据采集平台(二)

5. 安装Kafka

5.1 基础架构

  1. 为方便扩展,并提高吞吐量,一个topic分为多个partition
  2. 配合分区的设计,提出消费者组的概念,组内每个消费者并行消费
  3. 为提高可用性,为每个partition增加若干副本,类似NameNode HA

5.2 安装步骤

  1. 解压kafka文件,改为缩略名
  2. 进入kafka/config文件,修改配置文件:
    • myid和brokerID对应
    • connect
    • kafka日志存放地址
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=0#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://hadoop102:9092
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
  1. 配置kafka环境变量
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
  1. 向不存在的主题发送数据时,kafka会自己创建该主题,是单副本和单分区的主题。
  2. 测试kafka能否正常运行:
    • 发送消息:bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
    • 消费消息:bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
  3. 编写一键启停脚本
#! /bin/bashcase $1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done
};;
"stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "done
};;
esac

6. 安装flume

6.1 概述

使用flume很方便,架构方面source, channel,sink不是分布式的,没有高可用。但是如果结合kafka channel之后还是不错的。

6.2 安装步骤

  1. 解压flume安装包并改名
  2. 修改conf/log4j文件,修改LOG_DIR路径,改为/opt/module/flume/log
  3. 让日志同时打印在控制台,添加参数
# 引入控制台输出,方便学习查看日志<Root level="INFO"><AppenderRef ref="LogFile" /><AppenderRef ref="Console" /></Root>

6.3 选择source、channel、sink

官网地址:flume.apache.org, 可以通过官网查找对应的配置参数。

  1. source: taildir source
    • 日志文件地址:filegroups.f1
    • 偏移量文件地址:positionFile
  2. channel:Kafka Channel
    • kafka.topic主题名字:默认为flume-channel,建议修改为topic_log
    • parseAsFlumeEvent : 修改为false, 不转换成flume事件格式
  3. sink:kafka sink, 由于kafka channel是channel和sink一体的,实际上不需要配置sink。
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false#组装 
a1.sources.r1.channels = c1
  1. 编写Flume启停脚本
    • nohup ... &: 表示进程不会随着窗口的关闭而消失,即后台运行或者挂起了。
    • >/dev/null 将控制台打印信息放到黑洞中,日志可以在flume/log目录下查看即可。
    • 2>&1: 将正确输出流和错误输出流合并
    • ps -ef : 打印进程状态,ef表示父子关系
#!/bin/bashcase $1 in
"start"){echo " --------启动 hadoop102 采集flume-------"ssh hadoop102 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
};; 
"stop"){echo " --------停止 hadoop102 采集flume-------"ssh hadoop102 "ps -ef | grep file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
};;
esac

7. 电商业务流程

7.1 用户购物流程

  • 首页
    • 三级分类索引
    • 全文检索
    • 营销活动
  • 商品详情
    • 购物车
    • SSO单点登录:浏览器token保存
    • 下单结算
    • 第三方支付
  • 售后管理
    • 评价
    • 退货退款
    • 物流服务
    • 客户服务
    • 库存服务

7.2 电商常识

  1. sku:商品库存量基本单位,产品统一编号。
  2. spu: 商品名称聚合最小单位,易于复用、检索。
  3. UV:人次,不管是否重复
  4. PV:人数,同一个人只算一次

7.3 电商系统表结构

  1. 活动相关表,主键为aciivity_id
    • 活动信息表
    • 活动规则表
    • 活动商品关联表
  2. 平台相关表,就是搜索商品后出现的属性
    • 平台属性表,一级索引
    • 平台属性值表,二级索引
    • 营销坑位表
    • 营销渠道表
    • 字典表:用于替换解释某些字段
  3. 分类相关表:
    • 一级分类表
    • 二级分类表
    • 三级分类表
  4. 用户表
    • 用户信息表
    • 用户地址表
  5. 订单相关表
    • 订单表
    • 订单明细表
    • 退单表
    • 订单明细活动关联表
    • 订单明细优惠券关联表
    • 订单状态流水表
    • 省份表/地区表
    • 品牌表
    • 购物车表
    • 评价表
  6. 优惠券相关表
    • 优惠券信息表
    • 优惠券范围表
    • 优惠券领用表
  7. 支付相关表
    • 支付表
    • 退款表

MySQL安装

  1. 下载MySQL安装包
  2. 去除CentOS系统中自带的mariadb依赖
  3. 安装libaio
  4. 切换为root用户,su root
  5. 修改/etc/my.cnf文件,降低密码级别
  6. 获取临时密码,进入MySQL命令行
  7. 设置MySQL密码
  8. 开放权限, 让外部主机也能访问,mysql默认只能本机访问,并刷新flush privileges
  9. 修改外界访问密码也为000000
#!/bin/bash
set -x
[ "$(whoami)" = "root" ] || exit 1
[ "$(ls *.rpm | wc -l)" = "7" ] || exit 1
test -f mysql-community-client-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-client-plugins-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-common-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-icu-data-files-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-libs-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-libs-compat-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-server-8.0.31-1.el7.x86_64.rpm || exit 1# 卸载MySQL
systemctl stop mysql mysqld 2>/dev/null
rpm -qa | grep -i 'mysql\|mariadb' | xargs -n1 rpm -e --nodeps 2>/dev/null
rm -rf /var/lib/mysql /var/log/mysqld.log /usr/lib64/mysql /etc/my.cnf /usr/my.cnfset -e
# 安装并启动MySQL
yum install -y *.rpm >/dev/null 2>&1
systemctl start mysqld#更改密码级别并重启MySQL
sed -i '/\[mysqld\]/avalidate_password.length=4\nvalidate_password.policy=0' /etc/my.cnf
systemctl restart mysqld# 更改MySQL配置
tpass=$(cat /var/log/mysqld.log | grep "temporary password" | awk '{print $NF}') # NF为awk切分时的列的个数,$NF为最后一列
cat << EOF | mysql -uroot -p"${tpass}" --connect-expired-password >/dev/null 2>&1
set password='000000';
update mysql.user set host='%' where user='root';
alter user 'root'@'%' identified with mysql_native_password by '000000';
flush privileges;
EOF
http://www.lryc.cn/news/187039.html

相关文章:

  • Nginx + PHP 异常排查,open_basedir 异常处理
  • Linux免密登录
  • 迷宫 蓝桥杯
  • 25 mysql like 是否使用索引
  • Android---Class 对象在执行引擎中的初始化过程
  • Altium Designer实用系列(二)----PCB绘图小技巧
  • threejs-开发入门与调试设置
  • win11安装双系统Ubuntu的坎坷记录
  • 关于docker的xuexi
  • Python接口自动化测试实战详解,你想要的全都有
  • SparkSQL 外部数据源
  • leetcode做题笔记167. 两数之和 II - 输入有序数组
  • [ZJCTF 2019]NiZhuanSiWei - 伪协议+文件包含+反序列化
  • 如何提升和扩展 PostgreSQL — 从共享缓冲区到内存数据网格
  • Elasticsearch:使用 huggingface 模型的 NLP 文本搜索
  • 论文解析——异构多芯粒神经网络加速器
  • MyBatisPlus(十六)逻辑删除
  • 基于黏菌优化的BP神经网络(分类应用) - 附代码
  • C语言基础语法复习08-位域bit-fields
  • 3.2.OpenCV技能树--二值图像处理--图像腐蚀与膨胀
  • 基于FPGA的数字时钟系统设计
  • linux centos Python + Selenium+Chrome自动化测试环境搭建?
  • mysql面试题20:有哪些合适的分布式主键方案
  • git的基础操作
  • lua 中文字符的判断简介
  • SSM-XML整合
  • 线性代数小例子
  • ASP.NET Core 开发 Web API
  • QImage函数setAlphaChannel
  • 区块链、隐私计算、联邦学习、人工智能的关联