当前位置: 首页 > news >正文

Logstash:在 Logstash 管道中的定制的 Elasticsearch update by query

我们知道 Elasticsearch output plugin 为我们在 Logstash 的 pipeline 中向 Elasticsearch 的写入提供了可能。我们可以使用如下的格式向 Elasticsearch 写入数据:

  elasticsearch {hosts => ["https://localhost:9200"]index => "data-%{+YYYY.MM.dd}"user => "elastic"password => "NtC7cM-GKQWOxqamHd1R"ssl => trueca_trusted_fingerprint => "d464eed5d00a20908318b6a1de38f88daf3a867177123def4c34aa2272571aaf"}

在向 Elasticsearch 写入数据的时候,目前它有四种操作:

  • index:索引文档(来自 Logstash 的事件)。
  • delete:通过 id 删除文档(此操作需要一个id)
  • create:索引文档,如果索引中已存在该 id 的文档,则失败。
  • update:通过 id 更新文档。 Update 有一个特殊情况,你可以 upsert — 更新文档(如果不存在)。 请参阅 doc_as_upsert 选项。 注意:这在 Elasticsearch 1.x 中不起作用且不受支持。 请升级到 ES 2.x 或更高版本以将此功能与 Logstash 一起使用!

一个 sprintf 样式的字符串,用于根据事件的内容更改操作。 值 %{[foo]} 将使用 foo 字段进行操作。 如果 resolved action 不在 [index, delete, create, update] 中,事件将不会发送到 Elasticsearch。 相反,事件将被发送到管道的死信队列 (DLQ)(如果启用),或者将被记录并删除。

在实际的使用中,假如我们的操作不是 index,delete create 或 update 其中的一种,那么我们该怎么办呢?比如我们想根据一定的条件来更新文档,就像 update by query 那样?我们该怎么办呢?

幸运的是,Logstash 提供了一个叫做 HTTP output plugin。它可以帮我解决这个问题。

准备数据

首先,我们来创建如下的一个索引:

PUT customer/_doc/2
{"id": 2,"timestamp": "2019-08-11T17:55:56Z","paymentType": "Visa","name": "Darby Dacks","gender": "Female","ip_address": "77.72.239.47","purpose": "Shoes","country": "Poland","age": 55,"offer": false 
}

我们在 Kibana 中输入上面的命令来创建一个叫做 customer 的索引。它的 id 为 2。

更新数据

接下来,我们需要按照一定的条件来更新我们的数据。比如,我们想把 paymentType 为 Visa,并且年龄大于或等于 55 岁的人的 offer 设置为 true。在 Kibana 中正常的命令是这样的:

POST customer/_update_by_query
{"query": {"bool": {"must": [{"match": {"paymentType.keyword": "Visa"}},{"range": {"age": {"gte": 50}}}]}},"script": {"source": "ctx._source.offer = params.offer","lang": "painless","params": {"offer": true}}
}

我们可以对 Logsthash 做如下的 pipeline 设计:

logstash.conf

input {generator {message => '{"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}'count => 1}
}filter {json {source => "message"}if [paymentType] == "Mastercard" {drop {}}mutate {remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]}}output {stdout {codec => rubydebug}http {url => "https://localhost:9200/customer/_update_by_query"user => "elastic"password => "Y+6tv9jejPl=W4IGrTD="http_method => "post"format => "message"content_type => "application/json"message => '{"query":{"bool":{"must":[{"match":{"paymentType.keyword":"%{paymentType}"}},{"range":{"age":{"gte":"%{age}"}}}]}},"script":{"source":"ctx._source.offer = params.offer","lang":"painless","params":{"offer":true}}}' cacert => "/Users/liuxg/elastic/elasticsearch-8.6.1/config/certs/http_ca.crt"}
}

在上面,我们在 message 中通过一个查询,匹配到 paymentType.keyword 为 Visa,并且 age 为大于等于 55 的文档,我们设置该用户为促销对象。把他的 offer 值设置为 true。这个在实际的使用中,依据自己的条件来进行配置。在上面,cacert 为我们的 Elasticsearch 的证书文件位置。具体使用,请参考文档。

我们接下来运行 Logstash 的 pipeline:

./bin/logstash -f logstash.conf

 

在上面我们可以看出来信息的输出。我们在 Kibana 中使用如下的命令来检查更新后的文档:

GET customer/_search
{"took": 0,"timed_out": false,"_shards": {"total": 1,"successful": 1,"skipped": 0,"failed": 0},"hits": {"total": {"value": 1,"relation": "eq"},"max_score": 1,"hits": [{"_index": "customer","_id": "2","_score": 1,"_source": {"offer": true,"country": "Poland","gender": "Female","purpose": "Shoes","name": "Darby Dacks","id": 2,"ip_address": "77.72.239.47","age": 55,"timestamp": "2019-08-11T17:55:56Z","paymentType": "Visa"}}]}
}

很显然,我们上面的 offer 值现在变为 true,而不是之前的 false。

http://www.lryc.cn/news/16002.html

相关文章:

  • Spring Cloud Kubernetes环境下使用Jasypt
  • Kotlin-面向对象
  • 循环、函数、对象——js基础练习
  • 精确控制 AI 图像生成的破冰方案,ControlNet 和 T2I-Adapter
  • 让师生“不跑腿”,教育数据治理究竟有何魔力
  • 力扣-寻找用户推荐人
  • mmdetection测试阶段
  • 【无标题】10.货币系统
  • 【c++】类和对象6—运算符重载
  • 【SPSS】基础图形的绘制(条形图、折线图、饼图、箱图)详细操作过程
  • 6、Fatfs系统移植
  • 【数据结构与算法】数据结构的基本概念,时间复杂度
  • 【Python】变量类型,赋值+多个变量赋值
  • Qt基础之二十九:图形视图框架(Graphics View Framework)及其应用
  • 电商平台销量查询:2023年1月牛奶乳品热门排行榜
  • 应用层协议
  • Golang调用FFmpeg转换视频流
  • 外卖点餐小程序开发
  • 华为OD机试真题Python实现【猴子爬山】真题+解题思路+代码(20222023)
  • wordpress 网站备份
  • 如何尽早解决需求变更隐患,降低项目延期风险?
  • [机缘参悟-96] :软件中到处充满了人类社会的气息!
  • 知识点滴 - 自行车分类
  • 【建议收藏】Jenkins+postman+newman之API全自动化测试
  • MySQL数据库————MVCC
  • 为啥Python多线程爬虫跑的慢?
  • 万字长文解析!复现和使用GPT-3/ChatGPT,你所应该知道的
  • Kaldi语音识别技术(八) ----- 整合HCLG
  • day17_异常
  • vue中把node-sass换成dart-sass方式(解决办法)