es下载、安装、部署以及集成和mysql数据同步
es相关
es的下载和部署(windows版)
不说了,直接下载安装解压就可以…
es的下载和部署(docker版)
- 拉取 es 镜像
docker pull elasticsearch:7.14.0 # 推荐使用稳定版本
- 启动容器
docker run -d \--name elasticsearch \-p 9200:9200 \-p 9300:9300 \-e "discovery.type=single-node" \-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \-v es_data:/usr/share/elasticsearch/data \elasticsearch:7.14.0
参数说明:
-d:后台运行容器
–name elasticsearch:指定容器名称
-p:映射端口(9200 用于 HTTP 访问,9300 用于节点间通信)
-e “discovery.type=single-node”:单节点模式
-e “ES_JAVA_OPTS=…”:设置 JVM 内存大小(根据实际情况调整)
-v es_data:/usr/share/elasticsearch/data:创建数据卷持久化数据
- 验证部署成功
访问: http://localhost:9200
es的可视化部署
docker run -d \--name kibana \-p 5601:5601 \-e "ELASTICSEARCH_HOSTS=http://elasticsearch:9200" \--link elasticsearch:elasticsearch \kibana:7.14.0
访问 http://localhost:5601 即可打开 Kibana 界面。
-e “ELASTICSEARCH_HOSTS=http://elasticsearch:9200” 访问指定 es 实例
–link 是一个用于在容器之间建立网络连接的选项,主要作用是让一个容器能够通过别名访问另一个容器,实现容器间的通信。
关于–link的替代
注意容器间通信推荐自定义网络
已被官方逐步淘汰:
Docker 官方在新版本中推荐使用自定义网络(docker network) 替代 --link,因为自定义网络更灵活、安全,且支持容器重命名、IP 变化自动同步等功能。
替代方案:
用自定义网络实现容器通信(更推荐):
bash
# 创建网络
docker network create es-network# 启动ES并加入网络
docker run -d --name elasticsearch --network es-network ...# 启动Kibana并加入同一网络(自动可通过容器名访问)
docker run -d --name kibana --network es-network ...加入同一网络后,容器之间可以直接通过容器名(如elasticsearch)相互访问,无需--link。
单向性:
--link 是单向的(A 链接 B,不代表 B 能访问 A),而自定义网络是双向互通的。
注意 docker-compose 进行多容器部署时
默认开启一个项目级别的默认网络,因此不需要自定义网络了。命名为:项目_default
这里的意思是直接通过docker-compose可以部署多容器,并且利用默认的自定义网络。
docker-compose的直接部署
version: '3.8'# 定义服务
services:# Elasticsearch 服务elasticsearch:image: elasticsearch:7.14.0container_name: elasticsearch-composerestart: always # 容器退出时自动重启environment:- discovery.type=single-node # 单节点模式- ES_JAVA_OPTS=-Xms512m -Xmx512m # JVM 内存设置- TZ=Asia/Shanghai # 设置时区为上海ports:- "9200:9200" # 暴露 HTTP 端口供外部访问volumes:- es_data:/usr/share/elasticsearch/data # 数据持久化- es_plugins:/usr/share/elasticsearch/plugins # 插件目录- es_config:/usr/share/elasticsearch/config # 配置文件目录healthcheck: # 健康检查test: ["CMD", "curl", "-f", "http://localhost:9200/_cluster/health"]interval: 10stimeout: 5sretries: 5# Kibana 服务kibana:image: kibana:7.14.0container_name: kibana-composerestart: alwaysenvironment:- ELASTICSEARCH_HOSTS=http://elasticsearch:9200 # 通过服务名访问 ES(默认网络支持)- TZ=Asia/Shanghaiports:- "5601:5601" # Kibana 访问端口depends_on:elasticsearch:condition: service_healthy # 等待 ES 健康后再启动healthcheck:test: ["CMD", "curl", "-f", "http://localhost:5601/api/status"]interval: 10stimeout: 5sretries: 5# 定义数据卷
volumes:es_data: # 存储 ES 数据es_plugins: # 存储 ES 插件es_config: # 存储 ES 配置
相关启动、停止、彻底删除的bat文件:
start.bat
@echo off
echo 正在启动 Elasticsearch 和 Kibana...
docker-compose start
rem docker-compose up -d
echo 启动完成!
echo Elasticsearch 地址: http://localhost:9200
echo Kibana 地址: http://localhost:5601
rem echo 正在启动程序...
rem go run main.go
rem echo 程序启动成功,正在运行...
pause # 暂停显示结果,按任意键关闭窗口
stop.bat
@echo off
echo 正在停止 Elasticsearch 和 Kibana...
docker-compose stop
echo 已停止所有服务
pause
fix.bat
rem 停止并删除所有相关容器和数据卷
docker-compose down -v
rem 重新构建并启动
docker-compose up -d
es的相关操作demo
go创建连接
esCli, err = elastic.NewClient(elastic.SetURL("http://localhost:9200"),elastic.SetSniff(false), // 关键:禁用节点嗅探(容器环境必需))if err == nil {break}
// SetSniff(false):如果开启,当节点第一次连接后,随后会尝试连接嗅探到的内部 IP 节点(比如:172....),结果因网络不可达而失败,最终报出 no active connection found 错误
go操作索引
判断索引是否存在
exist, err := esCli.IndexExists(name).Do(context.Background())
if err != nil {panic(err)
}
创建索引
// 创建index
func CreateIndex(name string) {// CreateIndex:创建索引操作对象// BodyString:进行索引映射配置// 索引映射:exist, err := esCli.IndexExists(name).Do(context.Background())if err != nil {panic(err)}if !exist {mapping :=`{"mappings": { // 映射的根节点"properties": { // 定义字段属性"title": { // 字段名:title(标题)"type": "text", // 字段类型:text(可分词的文本,支持全文检索)"analyzer": "ik_max_word", // 索引时分词器:ik_max_word(细粒度分词,适合索引)"search_analyzer": "ik_smart" // 搜索时分词器:ik_smart(粗粒度分词,适合查询)},"content": { // 字段名:content(内容)"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"}}}}`// 创建索引并应用映射_, err := esCli.CreateIndex(name).BodyString(mapping).Do(context.Background())if err != nil {panic(err)}fmt.Printf("索引 %s 已创建(带 IK 分词器)\n", name)} else {fmt.Printf("索引 %s 已存在\n", name)}
}
删除索引
// 主要:_, err = esCli.DeleteIndex(name).Do(context.Background())
func deleteIndex(name string) {exist, err := esCli.IndexExists(name).Do(context.Background())if err != nil {panic(err)}if !exist {return}_, err = esCli.DeleteIndex(name).Do(context.Background())
}
go操作文档
根据id添加文档
这里可以直接插入的对象:map类型、结构体类型、数组/切片等可json序列化的对象
// 增加文档
func AddJson(jsonMap map[string]string) {// index:创建索引操作对象// index:指定索引 test-es// id:可以随机生成uuid,但是一般是同步mysql数据,id一定是插入的,mysql保证id唯一性_, err := esCli.Index().Index("test-es").Id("1").BodyJson(jsonMap).Do(context.Background())if err != nil {log.Fatal("添加JSON文档失败")return}log.Println("添加JSON文档成功")
}
func AddString(JSONStr string) {_, err := esCli.Index().Id("1").BodyString(JSONStr).Do(context.Background())if err != nil {log.Fatal("添加JSON文档失败")return}log.Println("添加JSON文档成功")
}
根据id批量添加文档
bulk := client.Bulk().Index("user").Refresh("true")
for _, user := range list {doc := elastic.NewBulkCreateRequest().Doc(&user)bulk.Add(doc)
}do, err1 := bulk.Do(context.Background())
if err1 != nil {panic(err1)
}
根据id删除文档
// Refresh("true"):允许删除后索引更新
do, err1 := client.Delete().Index("user").Id(deleteId).Refresh("true").Do(context.Background())
if err1 != nil {panic(err1)
}
根据id批量删除文档
// client.Bulk():创建一个批量操作实例,用于添加多个操作(删除、新增、更新等)。
// elastic.NewBulkDeleteRequest().Id(s):为每个 ID 创建一个批量删除请求,指定要删除的文档 ID 为 s。
// bulk.Add(req):将单个删除请求添加到批量操作实例中,等待统一执行。
bulk := client.Bulk().Index("user").Refresh("true")
for _, s := range list {req := elastic.NewBulkDeleteRequest().Id(s)bulk.Add(req)
}do, err1 := bulk.Do(context.Background())
if err1 != nil {panic(err1)
}
go文档查询
- 全文检索(
MatchQuery
等):适合文本字段的模糊匹配,关注相关性得分。 - 精确查询(
TermQuery
、RangeQuery
等):适合结构化字段的精确匹配或范围过滤。 - 复合查询(
BoolQuery
):通过must/should/must_not/filter
组合上述查询,实现复杂业务逻辑。
分页查询
// 分页查询
func SelectByPage(index string, offset, size int) []User {var result []User// 查询query := elastic.NewBoolQuery()res, err := esCli.Search().Index(index).Query(query).From(offset).Size(size).Do(context.Background())if err != nil {log.Fatal(err)}// 序列化for _, hit := range res.Hits.Hits {// 第一个Hits:结构体:记录hits数组、hits数量、最大Score// 第二个Hits:hits数组// hit:hit记录了每一条文档的 元信息 + 数据(SOURCE)var user Usererr := json.Unmarshal(hit.Source, &user)if err != nil {log.Fatal(err)}result = append(result, user)}return result
}
精确查询
query := elastic.NewTermQuery("nickname", "晓智科技")
res, err1 := client.Search("user").Query(query).From(0).Size(10).Do(context.Background())
if err1 != nil {panic(err1)
}
模糊查询
// 在 desc 中查询 "It从业人员" 的相关文档
query := elastic.NewMatchQuery("desc", "It从业人员")
res, err1 := client.Search("user").Query(query).From(0).Size(10).Do(context.Background())
if err1 != nil {panic(err1)
}
go更新文档
do, err1 := client.Update().Index("user").Id(updateId).Doc(map[string]any{"username": "晓晓智"}).Do(context.Background())
if err1 != nil {panic(err1)
}
使用异步请求
为了提高响应速度,可以使用异步请求处理搜索和索引操作。异步请求不会阻塞主线程,可以提高吞吐量。
// DoAsync = Do + func回调
client.Index().Index("products").BodyJson(product).DoAsync(context.Background(), func(response *elastic.IndexResponse, err error) {if err != nil {log.Printf("Error indexing document asynchronously: %s", err)} else {fmt.Printf("Asynchronous indexing completed for document %s\n", response.Id)}})
大量数据导入或者日志收集等对实时性要求不高的情况。
es高级查询
分词查询
// 分词搜索
// elastic.NewMatchQuery:短语分词后只需要匹配一个就ok
func searchByKeyword(index string, keyword string) []int {// 创建一个匹配查询(会对关键词进行分词)// 这里假设搜索 "content" 字段,你可以根据实际字段名修改query := elastic.NewMatchQuery("content", keyword)// 执行搜索searchResult, err := esCli.Search().Index(index). // 指定索引Query(query). // 设置查询条件Pretty(true). // 格式化输出结果Do(context.Background())if err != nil {log.Fatalf("搜索失败: %v", err)}// 处理搜索结果log.Printf("找到 %d 条匹配结果\n", searchResult.TotalHits())resIDs := []int{}for _, hit := range searchResult.Hits.Hits {// hit.Id 是文档的 ID// hit.Source 是文档的原始内容(JSON 字符串)id, _ := strconv.Atoi(hit.Id)resIDs = append(resIDs, id)}return resIDs
}
根据短语搜索
// elastic.NewMatchPhraseQuery:短语完整出现
func (es *EsService) SearchByMatchPhrase(field, keyword string) (users []User) {client, _ := es.CreateClient()ctx := context.Background() result, err := client.Search(IndexName).Query(elastic.NewMatchPhraseQuery(field, keyword)).Do(ctx)if err != nil {panic(err)}if result.TotalHits() > 0 {// 查询结果不为空,遍历结果var u User// 通过Each方法,将es结果的json结构转换成struct对象for _, item := range result.Each(reflect.TypeOf(u)) {// 转换成User对象if t, ok := item.(User); ok {users = append(users, t)}}}return
}
前缀搜索
// elastic.NewPrefixQuery
func (es *EsService) SearchByPrefixQuery(field, keyword string) (users []User) {client, _ := es.CreateClient()ctx := context.Background()result, err := client.Search(IndexName).Query(elastic.NewPrefixQuery(field, keyword)).Do(ctx)if err != nil {panic(err)}if result.TotalHits() > 0 {// 查询结果不为空,遍历结果var u User// 通过Each方法,将es结果的json结构转换成struct对象for _, item := range result.Each(reflect.TypeOf(u)) {// 转换成User对象if t, ok := item.(User); ok {users = append(users, t)}}}return
}
复杂查询
boolQuery:可以结合下面等进行复杂查询
query := elastic.NewBoolQuery()res, err := esCli.Search().Index(index).Query(query).From(offset).Size(size).Do(context.Background())if err != nil {log.Fatal(err)}
- must 条件
- 类似SQL的 and,代表必须匹配的条件
- must_not 条件
- 与 must 作用相反,用法相似
- should 条件
- 类似SQL的 or,只需匹配其中一个条件即可
es的中文分词器(ik)下载
下载地址:https://github.com/medcl/elasticsearch-analysis-ik
官网下载之后,可以从宿主机到 docker 容器中
docker cp "C:\Users\11067\Downloads\elasticsearch-analysis-ik-7.14.0.zip" elasticsearch-compose:/usr/share/elasticsearch/plugins/
# 进入容器
docker exec -it elasticsearch-compose /bin/bash
cd /usr/share/elasticsearch/plugins/
mkdir analysis-ik
unzip elasticsearch-analysis-ik-7.14.0.zip -d analysis-ik/
exit # 退出容器
docker restart elasticsearch-compose
es的集群部署
可以参考:初识Elasticsearch——GO集成ES_go es-CSDN博客
Mysql和es的同步
go-mysql-transfer
这是一个用于mysql同步的工具,相比较之前见过的canal使用更加简单。
主要特点:
- 集成多种接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API等,无需编写客户端,开箱即用
- 支持Lua脚本扩展,可处理复杂逻辑,如:数据的转换、清洗、打宽
- 集成Prometheus客户端,支持监控、告警
- 集成Web Admin监控页面
- 支持高可用集群部署
- 数据同步失败重试
- 支持全量数据初始化
实现原理:
1、go-mysql-transfer将自己伪装成MySQL的Slave,
2、向Master发送dump协议获取binlog,解析binlog并生成消息
3、将生成的消息实时、批量发送给接收端
go-mysql-transfer的使用
下载go-mysql-transfer并解压
https://gitee.com/wj596/go-mysql-transfer/releases
我这里是1.0.4版本
修改配置
解压后打开到exe文件目录,修改app.yml
大概修改的有:
1. 源mysql的相关连接信息
2. 目标接受源的连接配置(选择对应的种类:es、redis、kafka、mysql等)
3. 目标接受源的规则配置(对应的url等)
查看es-demo,看基于go-mysql-transfer的go-mysql-es的集成
es的字段类型
一、核心数据类型
1. 字符串类型
text
- 用于全文检索的长文本(如文章内容、描述)。
- 会被分词器处理为词项(terms),支持模糊匹配、全文搜索。
- 不支持聚合(aggregation)和排序(需通过
fielddata
开启,不推荐)。 - 示例:
"type": "text", "analyzer": "ik_max_word"
(结合 IK 分词器)。
keyword
- 用于精确匹配的字符串(如 ID、标签、状态码)。
- 不会被分词,按原字符串完整存储。
- 支持聚合、排序、精确查询(如
term
查询)。 - 示例:
"type": "keyword"
(适合存储手机号、枚举值)。
2. 数值类型
long
:64 位整数(范围:-2⁶³ ~ 2⁶³-1),适合存储大整数(如 ID、时间戳)。integer
:32 位整数(范围:-2³¹ ~ 2³¹-1),适合普通整数(如数量、年龄)。short
:16 位整数,byte
:8 位整数(节省空间,适合小范围数值)。double
:64 位浮点数,float
:32 位浮点数(适合价格、分数等小数)。half_float
:16 位浮点数,scaled_float
:缩放浮点数(如scaling_factor: 100
存储两位小数)。
3. 日期类型(date
)
-
用于存储日期或时间,支持多种格式:
- 字符串:
"2023-10-01"
、"2023/10/01 12:30:00"
- 时间戳:
1696108800
(秒级)、1696108800000
(毫秒级)
- 字符串:
-
需指定
format
(默认自动识别),示例:
{ "type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis" }
4. 布尔类型(boolean
)
- 存储
true
或false
,也可接受字符串"true"
/"false"
或数字1
/0
(自动转换)。
二、复杂类型
1. 数组类型(array
)
- ES 没有专门的数组类型,任何字段都可存储数组,只需保证数组内元素类型一致。
- 示例:
"tags": ["技术", "ES"]
(tags
字段类型为keyword
)、"scores": [90, 85, 95]
(scores
为integer
)。
2. 对象类型(object
)
-
用于存储嵌套的 JSON 对象,适合结构化的子数据。
-
示例:
{"user": {"type": "object","properties": {"name": { "type": "text" },"age": { "type": "integer" }}} }
3. 嵌套类型(nested
)
- 是
object
类型的特殊形式,用于存储对象数组,解决普通对象数组查询时的匹配歧义。 - 示例:存储多篇文章的评论(每个评论是独立对象,需精确匹配评论的作者和内容)。
三、特殊类型
1. ip
- 存储 IPv4 或 IPv6 地址(如
"192.168.1.1"
),支持 IP 范围查询(如192.168.0.0/16
)。
2. range
- 存储数值或日期范围,支持
integer_range
、long_range
、date_range
等。 - 示例:
"price_range": { "gte": 100, "lte": 500 }
(价格在 100-500 之间)。
3. geo_point
与 geo_shape
geo_point
:存储经纬度坐标(如{ "lat": 39.9, "lon": 116.3 }
),支持距离查询、地理位置过滤。geo_shape
:存储复杂地理形状(如多边形区域),支持空间关系查询(如包含、相交)。
四、类型选择原则
- 字符串:需全文检索用
text
,需精确匹配 / 聚合用keyword
(可同时定义两者:"fields": { "keyword": { "type": "keyword" } }
)。 - 数值:选择最小合适范围(如
age
用integer
而非long
),小数优先用scaled_float
(避免精度问题)。 - 日期:明确
format
避免解析歧义,优先用时间戳存储(查询效率高)。 - 关联数据:简单嵌套用
object
,对象数组需精确匹配用nested
。
合理的字段类型设计是 ES 性能优化的基础,需结合业务查询场景(全文检索 / 精确匹配 / 聚合分析等)选择。