Loading... # logstash ### jdbc.conf 启动命令 `./logstash -f /newdata/es/logstash-8.11.1/jdbc.conf ` ```input jdbc { # 数据库连接字符串 jdbc_connection_string => "jdbc:mysql://sh-cynosdbmysql-grp-owmxo6u6.sql.tencentcdb.com:226/douyinec?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8" # 数据库用户名和密码 jdbc_user => "doao" jdbc_password => "zh3@" # 数据库java驱动包目录 jdbc_driver_library => "/newdata/es/logstash-8.11.1/mysql-connector-j-8.0.33.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "10000" # 关联第二个文件 statement_filepath => "msg.sql" # 开启用户字段追踪判断,缓存最后记录的对应字段,这里缓存的是主键id字段 use_column_value => true tracking_column => "time" tracking_column_type => "timestamp" record_last_run => true # 缓存文件目录,会把最后操作的记录的id缓存到这个文件中,可以先建好放在合适的位置 last_run_metadata_path => "msg-id.last" # 设定同步周期,当前设置是每分钟一次,更多设置请自己百度 schedule => "*/5 * * * *" # 索引的分类 type => "msg" } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { # 地址 hosts => "localhost:9200" # 索引的名称,可以随便起,我这边跟数据库起了一样的名字 index => "dyhot" # 索引下面的分类,直接用了上面配置中的值 document_type => "%{type}" document_id => "%{seq}" } stdout { codec => json_lines } } ``` ### msg.sql ``` select seq, product_id, title, sales, price, count, good_ratio, pingjiashu, comment_times, comment_time, xiadanshu, yunfei, jingxuan, yongjin, darenshu, url, shoplogo, shoptype, shop_id, shopidnum, shop_name, score, product_num, sell_num, followerCount, shop_comment, author_name, ZeroCName, FirstCName, SecondCName, ThirdCName, FourthCName, laiyuan, time from dy_product_id_hot where time > :sql_last_value ``` ### msg-id.last ``` --- 2024-03-21 03:16:23.000000000 Z ``` 最后修改:2024 年 03 月 21 日 © 允许规范转载 打赏 赞赏作者 赞 如果觉得我的文章对你有用,请随意赞赏