分布式搜索和分析引擎Elasticsearch实战指南
ES 介绍与安装
Elasticsearch, 简称 ES,它是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful 风格接口,多数据源,自动搜索负载等。它可以近乎实时的存储、检索数据;本身扩展性很好,可以扩展到上百台服务器,处理 PB 级别的数据。es 也使用 Java 开发并使用 Lucene 作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的 RESTful API 来隐藏 Lucene 的复杂性,从而让全文搜索变得简单。
Elasticsearch 是面向文档(document oriented)的,这意味着它可以存储整个对象或文档(document)。然而它不仅仅是存储,还会索引(index)每个文档的内容使之可以被搜索。在 Elasticsearch 中,你可以对文档(而非成行成列的数据)进行索引、搜索、排序、过滤。
ES 安装
# 添加仓库秘钥
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
# 上边的添加方式会导致一个 apt-key 的警告,如果不想报警告使用下边这个
curl -s https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/icsearch.gpg --import
# 添加镜像源仓库
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elasticsearch.list
# 更新软件包列表
sudo apt update
# 安装 es
sudo apt-get install elasticsearch=7.17.21
# 启动 es
sudo systemctl start elasticsearch
# 安装 ik 分词器插件
sudo /usr/share/elasticsearch/bin/elasticsearch-plugin install https://get.infini.cloud/elasticsearch/analysis-ik/7.17.21
若 apt update 更新源报错:
安装 kibana
使用 apt 命令安装 Kibana。
sudo apt install kibana
sudo apt install kibana配置 Kibana(可选):
根据需要配置 Kibana。配置文件通常位于 /etc/kibana/kibana.yml。可能需要
设置如服务器地址、端口、Elasticsearch URL 等。
sudo vim /etc/kibana/kibana.yml
例如,你可能需要设置 Elasticsearch 服务的 URL: 大概 32 行左右
elasticsearch.host: “http://localhost:9200”
启动 Kibana 服务:
安装完成后,启动 Kibana 服务。
sudo systemctl start kibana
设置开机自启(可选):
如果你希望 Kibana 在系统启动时自动启动,可以使用以下命令来启用自启动。
sudo systemctl enable kibana
验证安装:
使用以下命令检查 Kibana 服务的状态。
sudo systemctl status kibana
访问 Kibana:
在浏览器中访问 Kibana,通常是 http://:5601
ES 客户端的安装
代码:https://github.com/seznam/elasticlient
官网:https://seznam.github.io/elasticlient/index.html
ES C++的客户端选择并不多, 我们这里使用 elasticlient 库, 下面进行安装。
# 克隆代码
git clone https://github.com/seznam/elasticlient
# 切换目录
cd elasticlient
# 更新子模块
git submodule update --init --recursive
# 编译代码
mkdir build
cd build
cmake ..
make
# 安装
make install
cmake 生成 makefile 的过程会遇到一个问题
解决:需要安装 MicroHTTPD 库
sudo apt-get install libmicrohttpd-dev
make 的时候编译出错:这是子模块 googletest 没有编译安装
collect2: error: ld returned 1 exit status
make[2]: *** [external/httpmockserver/test/CMakeFiles/testserver.dir/build.make:105: bin/test-server] Error 1
make[1]: *** [CMakeFiles/Makefile2:675:
external/httpmockserver/test/CMakeFiles/test-server.dir/all] Error
2
make: *** [Makefile:146: all] Error 2
解决:手动安装子模块
cd ../external/googletest/
mkdir cmake && cd cmake/
cmake -DCMAKE_INSTALL_PREFIX=/usr ..
make && sudo make install
安装好重新 cmake 即可。
ES 核心概念
索引(Index)
一个索引就是一个拥有几分相似特征的文档的集合。比如说,你可以有一个客户数据的索引,一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来标识(必须全部是小写字母的),并且当我们要对应于这个索引中的文档进行索引、搜索、更新和删除的时候,都要使用到这个名字。在一个集群中,可以定义任意多的索引。
字段(Field)
字段相当于是数据表的字段,对文档数据根据不同属性进行的分类标识。
名称 | 数值 | 备注 |
---|---|---|
enabled | true(默认) | false | 是否仅作存储,不做搜索和分析 |
index | true(默认) | false | 是否构建倒排索引(决定了是否分词,是否被索引 |
index_option | ||
dynamic | true(缺省)| false | 控制 mapping 的自动更新 |
doc_value | true(默认) | false | 是否开启 doc_value,用户聚合和排序分析,分词字段不能使用 |
fielddata | fielddata": {“format”:“disabled”} | 是否为 text 类型启动 fielddata,实现排序和聚合分析针对分词字段,参与排序或聚合时能提高性能,不分词字段统一建议使用 doc_value |
store | true | false(默认) | 是否单独设置此字段的是否存储而从_source 字段中分离,只能搜索,不能获取值 |
coerce | true(默认) | false | 是否开启自动数据类型转换功能,比如:字符串转数字,浮点转整型 |
analyzer | “analyzer”: “ik” | 指定分词器,默认分词器为 standard analyzer |
boost | “boost”: 1.23 | 字段级别的分数加权,默认值是 1.0 |
fields | “fields”: {“raw”: {“type”:“text”,“index”:“not_analyzed”}} | 对一个字段提供多种索引模式,同一个字段的值,一个分词,一个不分词 |
data_detection | true(默认) | false | 是否自动识别日期类型 |
文档 (document)
一个文档是一个可被索引的基础信息单元。比如,你可以拥有某一个客户的文档,某一个产品的一个文档或者某个订单的一个文档。文档以 JSON(Javascript Object Notation)格式来表示,而 JSON 是一个到处存在的互联网数据交互格式。在一个index/type 里面,你可以存储任意多的文档。一个文档必须被索引或者赋予一个索引的 type。
Elasticsearch与传统关系型数据库相比如下:
DB | Database | Table | Row | Column |
---|---|---|---|---|
ES | Index | Type | Document | Field |
Kibana 访问 es 进行测试
通过网页访问 kibana:
创建索引库
POST /user/_doc
{"settings": {"analysis": {"analyzer": {"ik": {"tokenizer": "ik_max_word"}}}},"mappings": {"dynamic": true,"properties": {"nickname": {"type": "text","analyzer": "ik_max_word"},"user_id": {"type": "keyword","analyzer": "standard"},"phone": {"type": "keyword","analyzer": "standard"},"description": {"type": "text","enabled": false},"avatar_id": {"type": "keyword","enabled": false}}}
}
新增数据:
POST /user/_doc/_bulk
{"index":{"_id":"1"}}
{"user_id" : "USER4b862aaa-2df8654a-7eb4bb65-e3507f66","nickname" : "昵称 1","phone" : "手机号 1","description" : "签名 1","avatar_id" : "头像 1"}
{"index":{"_id":"2"}}
{"user_id" : "USER14eeeaa5-442771b9-0262e455-e4663d1d","nickname" : "昵称 2","phone" : "手机号 2","description" : "签名 2","avatar_id" : "头像 2"}
{"index":{"_id":"3"}}
{"user_id" : "USER484a6734-03a124f0-996c169d-d05c1869","nickname" : "昵称 3","phone" : "手机号 3","description" : "签名 3","avatar_id" : "头像 3"}
{"index":{"_id":"4"}}
{"user_id" : "USER186ade83-4460d4a6-8c08068f-83127b5d","nickname" : "昵称 4","phone" : "手机号 4","description" : "签名 4","avatar_id" : "头像 4"}
{"index":{"_id":"5"}}
{"user_id" : "USER6f19d074-c33891cf-23bf5a83-57189a19","nickname" : "昵称 5","phone" : "手机号 5","description" : "签名 5","avatar_id" : "头像 5"}
{"index":{"_id":"6"}}
{"user_id" : "USER97605c64-9833ebb7-d0455353-35a59195","nickname" : "昵称 6","phone" : "手机号 6","description" : "签名 6","avatar_id" : "头像 6"}
查看并搜索数据
GET /user/_doc/_search?pretty
{"query": {"bool": {"must_not": [{"terms": {"user_id.keyword": ["USER4b862aaa-2df8654a-7eb4bb65-e3507f66","USER14eeeaa5-442771b9-0262e455-e4663d1d","USER484a6734-03a124f0-996c169d-d05c1869"]}}],"should": [{"match": {"user_id": "昵称"}},{"match": {"nickname": "昵称"}},{"match": {"phone": "昵称"}}]}}
}
删除索引:
DELETE /user
检索全部数据:
GET /user/_search
{"query": {"match_all": {}}
}
ES 客户端接口介绍
// 创建客户端对象
explicit Client(const std::vector < std::string >> &hostUrlList,std::int32_t timeout = 6000);// 应用于索引创建,以及新增数据
cpr::Response index(const std::string &indexName,const std::string &docType,const std::string &id,const std::string &body,const std::string &routing = std::string());// 检索数据
cpr::Response search(const std::string &indexName,const std::string &docType,const std::string &body,const std::string &routing = std::string());// 删除数据
cpr::Response remove(const std::string &indexName,const std::string &docType,const std::string &id,const std::string &routing = std::string());
使用案例,数据为上面的数据:
#include <elasticlient/client.h>
#include <cpr/cpr.h>
#include <iostream>int main()
{// 构造ES客户端elasticlient::Client client({"http://127.0.0.1:9200/"});// 发起搜索请求try{auto rsp = client.search("user", "_doc", "{\"query\":{\"match_all\":{} }}");std::cout << rsp.status_code << std::endl;std::cout << rsp.text << std::endl;}catch (std::exception &e){std::cout << "请求失败: " << e.what() << std::endl;return -1;}return 0;
}
测试结果:
二次封装
使用jsoncpp库实现数据的序列化和反序列化
Json::Value:用于进行中间数据存储
将多个字段数据进行序列化,需要先将数据存储到Value对象中
若要对一个json格式字符串进行解析,解析结果也是存放在Value中
常用接口:
#include <json/json.h>Value &operator=(Value &other);
Value &operator[](const char *key); // Value["name"] = "张三";
Value &append(const Value &value); // 数组数据的新增,Value["score"].append(100);
std::string asString() const; // Value["name"].asString();
ArrayIndex size() const; // 获取数组元素的个数
Value &operator[](ArrayIndex index); // 通过下标获取数组元素,Value["socre"][0].asFloat();// Write类
class Json_API StreamWriter
{virtual write(Value const &root, JSONCPP_OSTREAM *sout) = 0; // 序列化接口
};
class Json_API StreamWriterBuilder
{StreamWriter *newStreamWriter(); // StreamWriter 对象生产接口
};// Reader类
class JSON_API CharReader
{virtual bool parse(char const *beginDoc, char const *endDoc, Value *root, JSONCPP_STRING *errs) = 0;
};class JSON_API CharReaderBuilder
{CharReader *newCharReader(); // 创建CharReader对象接口
};
Jsoncpp使用案例:
#include <json/json.h>
#include <iostream>
#include <string>
#include <sstream>
#include <memory>bool serialize(const Json::Value &root, std::string &str)
{Json::StreamWriterBuilder swb;std::unique_ptr<Json::StreamWriter> sw(swb.newStreamWriter());std::stringstream ss;int ret = sw->write(root, &ss);if (ret < 0){std::cout << "json serialize failed" << std::endl;return false;}str = ss.str();return true;
}bool deserialize(const std::string &body, Json::Value &val)
{Json::CharReaderBuilder crb;std::unique_ptr<Json::CharReader> cr(crb.newCharReader());std::string err;bool ret = cr->parse(body.c_str(), body.c_str() + body.size(), &val, &err);if (ret == false){std::cout << "json deserialize failed " << err << std::endl;return false;}return true;
}int main()
{std::string name = "小明";int age = 18;float score[3] = {91, 99, 100};Json::Value stu;stu["name"] = name;stu["age"] = age;stu["score"].append(score[0]);stu["score"].append(score[1]);stu["score"].append(score[2]);std::string json_str;bool ret = serialize(stu, json_str);if (ret == false){std::cout << "json serialize failed" << std::endl;return -1;}std::cout << json_str << std::endl;Json::Value root;ret = deserialize(json_str, root);if (ret == false){std::cout << "json deserialize failed" << std::endl;return -1;}std::cout << "姓名:" << root["name"].asString() << std::endl;std::cout << "年龄:" << root["age"].asInt() << std::endl;std::cout << "成绩分别是: ";int sz = root["score"].size();for (int i = 0; i < sz; i++){std::cout << root["score"][i].asFloat() << " ";}std::cout << std::endl;return 0;
}
ES客户端API二次封装
封装四个操作:索引创建,数据新增,数据查询,数据删除
封装最主要完成的是请求正文的构造过程,Json::Value对象数据新增过程
索引创建:
1.能够动态设定索引名称,索引类型
2.能够动态的添加字段,并设置字段类型,设置分词器类型,是否构造索引
构造思想:根据固定的Json格式构造Value对象即可
数据新增:
1.提供用户一个新增字段及数据的接口即可
2.提供一个发起请求的接口
封装代码实现:
#pragma once
#include <json/json.h>
#include <elasticlient/client.h>
#include <cpr/cpr.h>
#include <iostream>
#include <string>
#include <sstream>
#include <memory>
#include "logger.hpp"namespace hdp
{bool serialize(const Json::Value &root, std::string &str){Json::StreamWriterBuilder swb;std::unique_ptr<Json::StreamWriter> sw(swb.newStreamWriter());std::stringstream ss;int ret = sw->write(root, &ss);if (ret < 0){LOG_ERROR("json serialize failed");return false;}str = ss.str();return true;}bool deserialize(const std::string &body, Json::Value &val){Json::CharReaderBuilder crb;std::unique_ptr<Json::CharReader> cr(crb.newCharReader());std::string err;bool ret = cr->parse(body.c_str(), body.c_str() + body.size(), &val, &err);if (ret == false){LOG_ERROR("json deserialize failed: {} ", err);return false;}return true;}class ESIndex{public:ESIndex(const std::shared_ptr<elasticlient::Client> &client,const std::string &name,const std::string &type = "_doc"): _name(name), _type(type), _client(client){Json::Value analysis;Json::Value analyzer;Json::Value ik;Json::Value tokenizer;tokenizer["tokenizer"] = "ik_max_word";ik["ik"] = tokenizer;analyzer["analyzer"] = ik;analysis["analysis"] = analyzer;_index["settings"] = analysis;}ESIndex &append(const std::string &key, const std::string &type = "text",const std::string &analyzer = "ik_max_word",bool enabled = true){Json::Value field;field["type"] = type;field["analyzer"] = analyzer;if (enabled == false)field["enabled"] = enabled;_properties[key] = field;return *this;}bool create(const std::string &index_id = "default_index_id"){Json::Value mappings;mappings["dynamic"] = true;mappings["properties"] = _properties;_index["mappings"] = mappings;std::string body;bool ret = serialize(_index, body);if (ret == false){LOG_ERROR("索引序列化失败");return false;}try{cpr::Response rsp = _client->index(_name, _type, index_id, body);if (rsp.status_code < 200 || rsp.status_code >= 300){LOG_ERROR("创建ES索引 {} 失败,响应状态码异常: {}", _name, rsp.status_code);return false;}}catch (std::exception &e){LOG_ERROR("创建ES索引 {} 失败: {}", _name, e.what());return false;}return true;}private:std::string _name;std::string _type;Json::Value _index;Json::Value _properties;std::shared_ptr<elasticlient::Client> _client;};class ESInsert{public:ESInsert(const std::shared_ptr<elasticlient::Client> client,const std::string &name,const std::string &type = "_doc"): _name(name),_type(type), _client(client) {}template <class T>ESInsert &append(const std::string &key, const T &val){_item[key] = val;return *this;}bool insert(const std::string &id = ""){std::string body;bool ret = serialize(_item, body);if (ret == false){LOG_ERROR("索引序列化失败");return false;}try{cpr::Response rsp = _client->index(_name, _type, id, body);if (rsp.status_code < 200 || rsp.status_code >= 300){LOG_ERROR("新增数据 {} 失败,响应状态码为: {}", body, rsp.status_code);return false;}}catch (std::exception &e){LOG_ERROR("新增数据 {} 失败: {}", body, e.what());return false;}return true;}private:std::string _name;std::string _type;Json::Value _item;std::shared_ptr<elasticlient::Client> _client;};class ESRemove{public:ESRemove(const std::shared_ptr<elasticlient::Client> &client,const std::string &name, const std::string &type = "_doc"): _name(name), _type(type), _client(client) {}bool remove(const std::string &id){try{cpr::Response rsp = _client->remove(_name, _type, id);if (rsp.status_code < 200 || rsp.status_code >= 300){LOG_ERROR("删除数据{}失败:响应状态码异常: {}", rsp.status_code);return false;}}catch (std::exception &e){LOG_ERROR("删除数据 {} 异常: {}", id, e.what());return false;}return true;}private:std::string _name;std::string _type;std::shared_ptr<elasticlient::Client> _client;};class ESSearch{public:ESSearch(const std::shared_ptr<elasticlient::Client> &client,const std::string &name, const std::string &type = "_doc"): _name(name), _type(type), _client(client) {}ESSearch &append_must_not_terms(const std::string &key, const std::vector<std::string> &vals){Json::Value fields;for (const auto& val : vals){fields[key].append(val);}Json::Value terms;terms["terms"] = fields;_must_not.append(terms);return *this;}ESSearch &append_should_match(const std::string &key, const std::string &val){Json::Value field;field[key] = val;Json::Value match;match["match"] = field;_should.append(match);return *this;}ESSearch &append_must_term(const std::string &key, const std::string &val){Json::Value field;field[key] = val;Json::Value term;term["term"] = field;_must.append(term);return *this;}ESSearch &append_must_match(const std::string &key, const std::string &val){Json::Value field;field[key] = val;Json::Value match;match["match"] = field;_must.append(match);return *this;}Json::Value search(){Json::Value cond;if(_must_not.empty() == false)cond["must_not"] = _must_not;if(_should.empty() == false)cond["should"] = _should;if(_must.empty() == false)cond["must"] = _must;Json::Value query;query["bool"] = cond;Json::Value root;root["query"] = query;std::string body;bool ret = serialize(root, body);if (ret == false){LOG_ERROR("索引序列化失败");return Json::Value();}cpr::Response rsp;try{rsp = _client->search(_name, _type, body);if (rsp.status_code < 200 || rsp.status_code >= 300){LOG_ERROR("检索数据 {} 失败,响应状态码异常: {}", body, rsp.status_code);return Json::Value();}}catch (std::exception &e){LOG_ERROR("检索数据 {} 失败: {}", body, e.what());return Json::Value();}// 需要对响应正文进行反序列化Json::Value json_res;ret = deserialize(rsp.text, json_res);if (ret == false){LOG_ERROR("检索数据 {} 结果反序列化失败", rsp.text);return Json::Value();}serialize(json_res, body);LOG_DEBUG("检索响应正文: [{}]", body);return json_res["hits"]["hits"];}private:std::string _name;std::string _type;Json::Value _must_not;Json::Value _should;Json::Value _must;std::shared_ptr<elasticlient::Client> _client;};
}
二次封装测试代码:
#include "../../../common/icsearch.hpp"
#include <gflags/gflags.h>DEFINE_int32(run_mode, 0, "程序的运行模式,0-调试,1-发布");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志的输出等级");int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);hdp::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);std::vector<std::string> host_list = {"http://127.0.0.1:9200/"};std::shared_ptr<elasticlient::Client> client = std::make_shared<elasticlient::Client>(host_list);// 创建索引bool ret = hdp::ESIndex(client, "test_user").append("nickname").append("phone", "keyword", "standard", true).create();if (ret == false){LOG_ERROR("创建索引失败");return -1;}else{LOG_DEBUG("创建索引成功");}// 新增数据ret = hdp::ESInsert(client, "test_user").append("nickname", "张三").append("phone", "123456").insert("0001");if (ret == false){LOG_ERROR("新增数据失败");return -1;}else{LOG_DEBUG("新增数据成功");}ret = hdp::ESInsert(client, "test_user").append("nickname", "李四").append("phone", "112233").insert("0002");if (ret == false){LOG_ERROR("新增数据失败");return -1;}else{LOG_DEBUG("新增数据成功");}std::this_thread::sleep_for(std::chrono::seconds(1));// 检索数据Json::Value user = hdp::ESSearch(client, "test_user").append_should_match("nickname", "李四").search();if (user.empty() || user.isArray() == false){LOG_ERROR("检索结果为空,或者结果不是数组类型");return -1;}else{LOG_DEBUG("数据检索成功");}int size = user.size();for (int i = 0; i < size; ++i){LOG_INFO("nickname: {}", user[i]["_source"]["nickname"].asString());LOG_INFO("phone: {}", user[i]["_source"]["phone"].asString());}// 更新数据ret = hdp::ESInsert(client, "test_user").append("nickname", "李四").append("phone", "123456789").insert("0002");if (ret == false){LOG_ERROR("更新数据失败");return -1;}else{LOG_DEBUG("更新数据成功");}std::this_thread::sleep_for(std::chrono::seconds(1));user = hdp::ESSearch(client, "test_user").append_should_match("phone.keyword", "123456789").search();if (user.empty() || user.isArray() == false){LOG_ERROR("检索结果为空,或者结果不是数组类型");return -1;}else{LOG_DEBUG("数据检索成功");}size = user.size();for (int i = 0; i < size; ++i){LOG_INFO("nickname: {}", user[i]["_source"]["nickname"].asString());LOG_INFO("phone: {}", user[i]["_source"]["phone"].asString());}// 删除数据ret = hdp::ESRemove(client, "test_user").remove("0002");if (ret == false){LOG_ERROR("删除数据失败");return -1;}else{LOG_DEBUG("删除数据成功");}std::this_thread::sleep_for(std::chrono::seconds(1));user = hdp::ESSearch(client, "test_user").append_should_match("phone.keyword", "123456789").search();if (user.empty() || user.isArray() == false){LOG_ERROR("检索结果为空,或者结果不是数组类型");return -1;}else{LOG_DEBUG("数据检索成功");}size = user.size();for (int i = 0; i < size; ++i){LOG_INFO("nickname: {}", user[i]["_source"]["nickname"].asString());LOG_INFO("phone: {}", user[i]["_source"]["phone"].asString());}return 0;
}
测试结果:
ES客户端API使用注意事项:
1.地址后面不要忘了相对根目录: http://127.0.0.1:9200/
tring());
}
// 删除数据
ret = hdp::ESRemove(client, "test_user").remove("0002");
if (ret == false)
{LOG_ERROR("删除数据失败");return -1;
}
else
{LOG_DEBUG("删除数据成功");
}std::this_thread::sleep_for(std::chrono::seconds(1));
user = hdp::ESSearch(client, "test_user").append_should_match("phone.keyword", "123456789").search();
if (user.empty() || user.isArray() == false)
{LOG_ERROR("检索结果为空,或者结果不是数组类型");return -1;
}
else
{LOG_DEBUG("数据检索成功");
}
size = user.size();
for (int i = 0; i < size; ++i)
{LOG_INFO("nickname: {}", user[i]["_source"]["nickname"].asString());LOG_INFO("phone: {}", user[i]["_source"]["phone"].asString());
}
return 0;
}
测试结果:[外链图片转存中...(img-v3LOnflV-1753877627079)]ES客户端API使用注意事项:1.地址后面不要忘了相对根目录: http://127.0.0.1:9200/2.ES客户端API使用,要进行异常捕捉,否则操作失败会导致程序异常退出