multiple inputs on logstash jdbc
You can definitely have a single config with multiple jdbc
input and then parametrize the index
and document_type
in your elasticsearch
output depending on which table the event is coming from.
input { jdbc { jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name" jdbc_user => "root" jdbc_password => "password" schedule => "* * * * *" statement => "select * from table1" type => "table1" } jdbc { jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name" jdbc_user => "root" jdbc_password => "password" schedule => "* * * * *" statement => "select * from table2" type => "table2" } # add more jdbc inputs to suit your needs }output { elasticsearch { index => "testdb" document_type => "%{type}" # <- use the type from each input hosts => "localhost:9200" }}
This will not create duplicate data. and compatible logstash 6x.
# YOUR_DATABASE_NAME : test# FIRST_TABLE : place # SECOND_TABLE : things # SET_DATA_INDEX : test_index_1, test_index_2input { jdbc { # The path to our downloaded jdbc driver jdbc_driver_library => "/mysql-connector-java-5.1.44-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" # Postgres jdbc connection string to our database, YOUR_DATABASE_NAME jdbc_connection_string => "jdbc:mysql://localhost:3306/test" # The user we wish to execute our statement as jdbc_user => "root" jdbc_password => "" schedule => "* * * * *" statement => "SELECT @slno:=@slno+1 aut_es_1, es_qry_tbl.* FROM (SELECT * FROM `place`) es_qry_tbl, (SELECT @slno:=0) es_tbl" type => "place" add_field => { "queryFunctionName" => "getAllDataFromFirstTable" } use_column_value => true tracking_column => "aut_es_1" } jdbc { # The path to our downloaded jdbc driver jdbc_driver_library => "/mysql-connector-java-5.1.44-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" # Postgres jdbc connection string to our database, YOUR_DATABASE_NAME jdbc_connection_string => "jdbc:mysql://localhost:3306/test" # The user we wish to execute our statement as jdbc_user => "root" jdbc_password => "" schedule => "* * * * *" statement => "SELECT @slno:=@slno+1 aut_es_2, es_qry_tbl.* FROM (SELECT * FROM `things`) es_qry_tbl, (SELECT @slno:=0) es_tbl" type => "things" add_field => { "queryFunctionName" => "getAllDataFromSecondTable" } use_column_value => true tracking_column => "aut_es_2" } }# install uuid plugin 'bin/logstash-plugin install logstash-filter-uuid'# The uuid filter allows you to generate a UUID and add it as a field to each processed event.filter { mutate { add_field => { "[@metadata][document_id]" => "%{aut_es_1}%{aut_es_2}" } } uuid { target => "uuid" overwrite => true } }output { stdout {codec => rubydebug} if [type] == "place" { elasticsearch { hosts => "localhost:9200" index => "test_index_1_12" #document_id => "%{aut_es_1}" document_id => "%{[@metadata][document_id]}" } } if [type] == "things" { elasticsearch { hosts => "localhost:9200" index => "test_index_2_13" document_id => "%{[@metadata][document_id]}" # document_id => "%{aut_es_2}" # you can set document_id . otherwise ES will genrate unique id. } }}
If you need to run more than one pipeline in the same process, Logstash provides a way to do this through a configuration file called pipelines.yml and using multiple pipelines
Using multiple pipelines is especially useful if your current configuration has event flows that don’t share the same inputs/filters and outputs and are being separated from each other using tags and conditionals.