ElasticSearch-Logstash6.0.0安装以及mysql数据导入ES
发布日期:2021-05-07 14:49:33 浏览次数:19 分类:技术文章

本文共 4583 字,大约阅读时间需要 15 分钟。

ElasticSearch-Logstash6.0.0安装以及mysql数据导入ES

Logstash安装(6.0.0)

  1. 下载logstash到linux
    我直接给你们百度云
    链接:https://pan.baidu.com/s/13M3OdoG3wqcpdebUUSwoHQ
    提取码:5aaq
  2. 解压即可(unzip)
unzip logstash-6.0.0.zip
  1. 下载后续要用的mysql jar包(也可以去官网下载)
    链接:https://pan.baidu.com/s/1TpFEWE9M81tNNn_uXkW-3Q
    提取码:cg2d

mysql数据导入ES

1.添加自定义配置文件(放到logstash/config下面)

vi logstash-mysql.conf

添加以下内容:

# Sample Logstash configuration for creating a simple# Beats -> Logstash -> Elasticsearch pipeline.input {
jdbc{
jdbc_connection_string => "jdbc:mysql://192.168.xxx.xxx:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&tinyInt1isBit=false" jdbc_user => "xxx" jdbc_password => "xxx" jdbc_driver_library => "/opt/local_software/logstash-6.0.0/config/mysql-connector-java-5.1.39.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" jdbc_default_timezone =>"Asia/Shanghai" statement_filepath => "/opt/local_software/logstash-6.0.0/config/sql/face.sql" schedule => "* * * * *" # type => "mycat" # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中 record_last_run => true # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值. use_column_value => true # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键 tracking_column => "id" tracking_column_type => "numeric" last_run_metadata_path => "./face_last_id" lowercase_column_names => false }}filter {
if [sex] == 1 {
mutate {
add_field => {
"tags" => "男"} } } if [sex] == 2 {
mutate {
add_field => {
"tags" => "女"} } } if [sex] == 0 {
mutate {
add_field => {
"tags" => "未知"} } }}output {
elasticsearch {
hosts => ["http://192.168.135.237:9201"] #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" index => "face_card" document_id => "%{id}" #user => "elastic" #password => "changeme" } stdout {
codec => json_lines }

说明:

你主要需要改动的地方有以下几点:

  1. jdbc:mysql://192.168.xxx.xxx:3306/test?
    更改ip,端口以及端口后面数据库的名称(我这里是test)
  2. jdbc_user => “数据库用户名”
  3. jdbc_password => “数据库密码”
  4. jdbc_driver_library :上面下载的mysql jar包放的位置
  5. statement_filepath => “/opt/local_software/logstash-6.0.0/config/sql/face.sql”
    这个是需要执行的sql脚本,logstash根据sql去mysql中导入数据的(sql还是要写的,可以跟我保持一致)
  6. output中:修改 hosts => [“http://192.168.xxx.xxx:9201”] 修改为自己的ES ip+端口

2.在config下创建目录

mkdir sql

3.创建sql文件(在sql目录下)

vi face.sql

添加如下内容:

select * from face_card where id > :sql_last_value order by id limit 1000

4.去对应数据库中创建一张表(对应sql文件中的表名)

CREATE TABLE `face_card` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `name` varchar(255) DEFAULT NULL,  `sex` int(11) DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

添加如下数据(简单点即可)

在这里插入图片描述
5.运行脚本:
在logstash-6.0.0目录下执行命令:

./bin/logstash -f ./config/face.conf

6.等待较长时间后,发现sql导入成功,如图:

在这里插入图片描述

验证mysql导入的数据

可以用kibana(如果配置了kibana情况下),也可以用postman

用kibana验证:
在这里插入图片描述
输入命令:

GET /face_card/_search

得到结果:

{  "took": 3,  "timed_out": false,  "_shards": {    "total": 5,    "successful": 5,    "skipped": 0,    "failed": 0  },  "hits": {    "total": 4,    "max_score": 1,    "hits": [      {        "_index": "face_card",        "_type": "doc",        "_id": "2",        "_score": 1,        "_source": {          "name": "yy",          "@version": "1",          "id": 2,          "@timestamp": "2020-07-29T08:39:01.606Z",          "sex": 1,          "tags": [            "男"          ]        }      },      {        "_index": "face_card",        "_type": "doc",        "_id": "4",        "_score": 1,        "_source": {          "name": "ee",          "@version": "1",          "id": 4,          "@timestamp": "2020-07-29T08:39:01.614Z",          "sex": 2,          "tags": [            "女"          ]        }      },      {        "_index": "face_card",        "_type": "doc",        "_id": "1",        "_score": 1,        "_source": {          "name": "ljj",          "@version": "1",          "id": 1,          "@timestamp": "2020-07-29T08:39:01.604Z",          "sex": 0,          "tags": [            "未知"          ]        }      },      {        "_index": "face_card",        "_type": "doc",        "_id": "3",        "_score": 1,        "_source": {          "name": "dd",          "@version": "1",          "id": 3,          "@timestamp": "2020-07-29T08:39:01.614Z",          "sex": 0,          "tags": [            "未知"          ]        }      }    ]  }}

可以看出mysql中的4条数据都写进去了,也就是利用Logstash导入数据成功

上一篇:ElasticSearch-Routing路由简介-脑裂问题以及解决方案
下一篇:ElasticSearch配置Head

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2025年03月31日 21时59分02秒