# logstash ### jdbc.conf 启动命令 `./logstash -f /newdata/es/logstash-8.11.1/jdbc.conf ` ``` input { jdbc { jdbc_connection_string => "jdbc:mysql://10.1.2.5:336/jeecg-boot?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8" jdbc_user => "root" jdbc_password => "sh3433333" jdbc_driver_library => "D:\\devsoft\\logstash-8.13.4\\mysql-connector-j-8.0.33.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "10000" statement_filepath => "msg.sql" use_column_value => true tracking_column => "create_time" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "msg-id.last" schedule => "*/5 * * * *" type => "msg" } } filter { #json { #source => "message" #remove\_field => ["message"] #} ruby { code => " if event.get('create_time') # 先将 create_time 转换为字符串 create_time_str = event.get('create_time').to_s # 然后截取字符串的前7位 event.set('index_suffix', create_time_str[0,7]) end " } } output { elasticsearch { hosts => "localhost:9200" index => "alarm_indicator_recording-%{index_suffix}" document_type => "%{type}" } stdout { codec => json_lines } } ``` ### msg.sql ``` select id, sn_num, num, indicator_name, indicator_type, indicator_factor, indicator_num, quality, alarm_level, create_by, create_time, update_by, update_time, del_flag, alarm_location, description from alarm_indicator_recording where create_time > date_add(:sql_last_value,INTERVAL 8 HOUR) ``` ### msg-id.last ``` --- 2024-03-21 03:16:23.000000000 Z ``` --- ce --- ``` # logstash-mysql-es.conf input { jdbc { # 1. JDBC 驱动和连接信息 (请替换为您的实际信息) jdbc_driver_library => "/www/es_logstash/logstash-8.13.4/mysql-connector-j-8.0.33.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://sh-cynosdbmysql-grp-oasdasd.sql.tencentcdb.com:20326/douyinec?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useTimezone=true" jdbc_user => "root" jdbc_password => "Zhasddasd@" jdbc_page_size => "910000" jdbc_paging_enabled => true # 2. 定时执行 (例如:每分钟检查一次) schedule => "*/5 * * * *" # 3. 核心 SQL 查询 # !!!重要提示!!! # 首次运行请使用下面这句,以从您指定的时间点开始同步: #statement => "SELECT * FROM dy_product_id_hotx WHERE time > '2025-07-10 03:10:00' ORDER BY time ASC" # # 首次运行成功后,请立即停止 Logstash,并换成下面这句,以实现自动增量同步: statement => "SELECT * FROM dy_product_id_hotx WHERE time > :sql_last_value ORDER BY time ASC" # 4. 增量同步配置 use_column_value => true tracking_column => "time" tracking_column_type => "timestamp" last_run_metadata_path => "/www/es_logstash/logstash-8.13.4/lasttime.txt" jdbc_default_timezone => "Asia/Shanghai" } } filter { # 移除不需要的 Logstash 元数据字段 mutate { remove_field => ["@version", "@timestamp"] } } output { elasticsearch { # 1. Elasticsearch 连接信息 (请替换为您的实际信息) hosts => ["http://127.0.0.1:9200"] # 2. 索引名称 (请替换为您希望使用的索引名) index => "product_info" # 3. 关键配置:使用 seq ID,实现新增和更新 document_id => "%{seq}" } # (可选) 在控制台输出,方便调试 stdout { codec => rubydebug } } ``` --- Loading... # logstash ### jdbc.conf 启动命令 `./logstash -f /newdata/es/logstash-8.11.1/jdbc.conf ` ``` input { jdbc { jdbc_connection_string => "jdbc:mysql://10.1.2.5:336/jeecg-boot?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8" jdbc_user => "root" jdbc_password => "sh3433333" jdbc_driver_library => "D:\\devsoft\\logstash-8.13.4\\mysql-connector-j-8.0.33.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "10000" statement_filepath => "msg.sql" use_column_value => true tracking_column => "create_time" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "msg-id.last" schedule => "*/5 * * * *" type => "msg" } } filter { #json { #source => "message" #remove\_field => ["message"] #} ruby { code => " if event.get('create_time') # 先将 create_time 转换为字符串 create_time_str = event.get('create_time').to_s # 然后截取字符串的前7位 event.set('index_suffix', create_time_str[0,7]) end " } } output { elasticsearch { hosts => "localhost:9200" index => "alarm_indicator_recording-%{index_suffix}" document_type => "%{type}" } stdout { codec => json_lines } } ``` ### msg.sql ``` select id, sn_num, num, indicator_name, indicator_type, indicator_factor, indicator_num, quality, alarm_level, create_by, create_time, update_by, update_time, del_flag, alarm_location, description from alarm_indicator_recording where create_time > date_add(:sql_last_value,INTERVAL 8 HOUR) ``` ### msg-id.last ``` --- 2024-03-21 03:16:23.000000000 Z ``` --- ce --- ``` # logstash-mysql-es.conf input { jdbc { # 1. JDBC 驱动和连接信息 (请替换为您的实际信息) jdbc_driver_library => "/www/es_logstash/logstash-8.13.4/mysql-connector-j-8.0.33.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://sh-cynosdbmysql-grp-oasdasd.sql.tencentcdb.com:20326/douyinec?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useTimezone=true" jdbc_user => "root" jdbc_password => "Zhasddasd@" jdbc_page_size => "910000" jdbc_paging_enabled => true # 2. 定时执行 (例如:每分钟检查一次) schedule => "*/5 * * * *" # 3. 核心 SQL 查询 # !!!重要提示!!! # 首次运行请使用下面这句,以从您指定的时间点开始同步: #statement => "SELECT * FROM dy_product_id_hotx WHERE time > '2025-07-10 03:10:00' ORDER BY time ASC" # # 首次运行成功后,请立即停止 Logstash,并换成下面这句,以实现自动增量同步: statement => "SELECT * FROM dy_product_id_hotx WHERE time > :sql_last_value ORDER BY time ASC" # 4. 增量同步配置 use_column_value => true tracking_column => "time" tracking_column_type => "timestamp" last_run_metadata_path => "/www/es_logstash/logstash-8.13.4/lasttime.txt" jdbc_default_timezone => "Asia/Shanghai" } } filter { # 移除不需要的 Logstash 元数据字段 mutate { remove_field => ["@version", "@timestamp"] } } output { elasticsearch { # 1. Elasticsearch 连接信息 (请替换为您的实际信息) hosts => ["http://127.0.0.1:9200"] # 2. 索引名称 (请替换为您希望使用的索引名) index => "product_info" # 3. 关键配置:使用 seq ID,实现新增和更新 document_id => "%{seq}" } # (可选) 在控制台输出,方便调试 stdout { codec => rubydebug } } ``` --- 最后修改:2025 年 08 月 01 日 © 允许规范转载 打赏 赞赏作者 赞 如果觉得我的文章对你有用,请随意赞赏