logstash
jdbc.conf
启动命令
./logstash -f /newdata/es/logstash-8.11.1/jdbc.conf
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://10.1.2.5:336/jeecg-boot?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8"
jdbc_user => "root"
jdbc_password => "sh3433333"
jdbc_driver_library => "D:\\devsoft\\logstash-8.13.4\\mysql-connector-j-8.0.33.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "10000"
statement_filepath => "msg.sql"
use_column_value => true
tracking_column => "create_time"
tracking_column_type => "timestamp"
record_last_run => true
last_run_metadata_path => "msg-id.last"
schedule => "*/5 * * * *"
type => "msg"
}
}
filter {
#json {
#source => "message"
#remove\_field => ["message"]
#}
ruby {
code => "
if event.get('create_time')
# 先将 create_time 转换为字符串
create_time_str = event.get('create_time').to_s
# 然后截取字符串的前7位
event.set('index_suffix', create_time_str[0,7])
end
"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "alarm_indicator_recording-%{index_suffix}"
document_type => "%{type}"
}
stdout {
codec => json_lines
}
}
msg.sql
select
id, sn_num, num, indicator_name, indicator_type, indicator_factor, indicator_num, quality, alarm_level, create_by, create_time, update_by, update_time, del_flag, alarm_location, description
from
alarm_indicator_recording
where
create_time > date_add(:sql_last_value,INTERVAL 8 HOUR)
msg-id.last
--- 2024-03-21 03:16:23.000000000 Z
ce ---
# logstash-mysql-es.conf
input {
jdbc {
# 1. JDBC 驱动和连接信息 (请替换为您的实际信息)
jdbc_driver_library => "/www/es_logstash/logstash-8.13.4/mysql-connector-j-8.0.33.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://sh-cynosdbmysql-grp-oasdasd.sql.tencentcdb.com:20326/douyinec?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai&useLegacyDatetimeCode=false&useTimezone=true"
jdbc_user => "root"
jdbc_password => "Zhasddasd@"
jdbc_page_size => "910000"
jdbc_paging_enabled => true
# 2. 定时执行 (例如:每分钟检查一次)
schedule => "*/5 * * * *"
# 3. 核心 SQL 查询
# !!!重要提示!!!
# 首次运行请使用下面这句,以从您指定的时间点开始同步:
#statement => "SELECT * FROM dy_product_id_hotx WHERE time > '2025-07-10 03:10:00' ORDER BY time ASC"
#
# 首次运行成功后,请立即停止 Logstash,并换成下面这句,以实现自动增量同步:
statement => "SELECT * FROM dy_product_id_hotx WHERE time > :sql_last_value ORDER BY time ASC"
# 4. 增量同步配置
use_column_value => true
tracking_column => "time"
tracking_column_type => "timestamp"
last_run_metadata_path => "/www/es_logstash/logstash-8.13.4/lasttime.txt"
jdbc_default_timezone => "Asia/Shanghai"
}
}
filter {
# 移除不需要的 Logstash 元数据字段
mutate {
remove_field => ["@version", "@timestamp"]
}
}
output {
elasticsearch {
# 1. Elasticsearch 连接信息 (请替换为您的实际信息)
hosts => ["http://127.0.0.1:9200"]
# 2. 索引名称 (请替换为您希望使用的索引名)
index => "product_info"
# 3. 关键配置:使用 seq ID,实现新增和更新
document_id => "%{seq}"
}
# (可选) 在控制台输出,方便调试
stdout { codec => rubydebug }
}