elk
Table of Contents
Netflow for Mikrotik Routers
Instructions for feeding netflow data into ELK
, assuming the stack is already up and running. Netflow data will be placed into separate daily indexes, using the pattern logstash-netflow.${YYYY.MM.DD}
.
- Define a template mapping for the netflow data, mapping the fields to the correct datatypes:
curl -XPUT http://localhost:9200/_template/logstash-netflow9 -d '{ "template" : "logstash-netflow9-*", "order": 10, "settings": { "index.cache.field.type": "soft", "index.store.compress.stored": true }, "mappings" : { "_default_" : { "_all" : {"enabled" : false}, "properties" : { "@message": { "index": "analyzed", "type": "string" }, "@source": { "index": "not_analyzed", "type": "string" }, "@source_host": { "index": "not_analyzed", "type": "string" }, "@source_path": { "index": "not_analyzed", "type": "string" }, "@tags": { "index": "not_analyzed", "type": "string" }, "@timestamp": { "index": "not_analyzed", "type": "date" }, "@type": { "index": "not_analyzed", "type": "string" }, "netflow": { "dynamic": true, "properties": { "version": { "index": "analyzed", "type": "integer" }, "first_switched": { "index": "not_analyzed", "type": "date" }, "last_switched": { "index": "not_analyzed", "type": "date" }, "direction": { "index": "not_analyzed", "type": "integer" }, "flowset_id": { "index": "not_analyzed", "type": "integer" }, "flow_sampler_id": { "index": "not_analyzed", "type": "integer" }, "flow_seq_num": { "index": "not_analyzed", "type": "long" }, "src_tos": { "index": "not_analyzed", "type": "integer" }, "tcp_flags": { "index": "not_analyzed", "type": "integer" }, "protocol": { "index": "not_analyzed", "type": "integer" }, "ipv4_next_hop": { "index": "analyzed", "type": "ip" }, "in_bytes": { "index": "not_analyzed", "type": "long" }, "in_pkts": { "index": "not_analyzed", "type": "long" }, "out_bytes": { "index": "not_analyzed", "type": "long" }, "out_pkts": { "index": "not_analyzed", "type": "long" }, "input_snmp": { "index": "not_analyzed", "type": "long" }, "output_snmp": { "index": "not_analyzed", "type": "long" }, "ipv4_dst_addr": { "index": "analyzed", "type": "ip" }, "ipv4_src_addr": { "index": "analyzed", "type": "ip" }, "dst_mask": { "index": "analyzed", "type": "integer" }, "src_mask": { "index": "analyzed", "type": "integer" }, "dst_as": { "index": "analyzed", "type": "integer" }, "src_as": { "index": "analyzed", "type": "integer" }, "l4_dst_port": { "index": "not_analyzed", "type": "long" }, "l4_src_port": { "index": "not_analyzed", "type": "long" }, "ipv4_dst_addr_postnat": { "index": "analyzed", "type": "ip" }, "ipv4_src_addr_postnat": { "index": "analyzed", "type": "ip" }, "l4_dst_port_postnat": { "index": "not_analyzed", "type": "long" }, "l4_src_port_postnat": { "index": "not_analyzed", "type": "long" } }, "type": "object" } } } } }'
- Copy the
netflow.yaml
definitions from/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-netflow-2.0.2/lib/logstash/codecs/netflow.rb
to/etc/logstash/mikrotik.netflow9.yaml
and patch the following:# Changed from 2->4 10: - 4 - :input_snmp 14: - 4 - :output_snmp # Changed from uint24->uint32 31: - :uint32 - :ipv6_flow_label # Add these entries: 225: - :ip4_addr - :ipv4_src_addr_postnat 226: - :ip4_addr - :ipv4_dst_addr_postnat 227: - :uint16 - :l4_src_port_postnat 228: - :uint16 - :l4_dst_port_postnat
- Setup a listening UDP port to receive the UDP data, and feed it into the netflow indexes in elasticsearch:
- /etc/logstash/conf.d/50-netflow.conf
input { udp { port => 9996 codec => netflow { versions => [9] definitions => "/etc/logstash/mikrotik.netflow9.yaml" } type => "netflow9" } } output { elasticsearch { hosts => ["localhost:9200"] codec => "json" index => "logstash-%{type}-%{+YYYY.MM.dd}" } }
- In Kibana Settings, add a new index pattern for
logstash-netflow9-*
(if this pattern already exists and the mapping has been changed since, remember to hit therefresh
button to ensure the changed datatypes are picked up. Verify all fields have the right type. - Enable
IP→Traffic Flow
on the desired interfaces, and add the logstash host as a netflow v9 target - Verify data is being indexed by doing a search on
*
against thelogstash-netflow9-*
index pattern
The following warnings will show up briefly when logstash first starts. This is because the templates needed to understand the netflow messages are published in-band on a regular basis, and when logstash first starts up it might not have seen a copy of the templates before flow data is received. Once the template message is received (defaulting to 20 packets on the mikrtoik boards), these messages will cease:
Jan 28 22:40:08 silverhold logstash[9496]: {:timestamp=>"2016-01-28T22:40:08.238000+0000", :message=>"No matching template for flow id 257", :level=>:warn} Jan 28 22:40:09 silverhold logstash[9496]: {:timestamp=>"2016-01-28T22:40:09.266000+0000", :message=>"No matching template for flow id 256", :level=>:warn}
Bugs
wrong number of arguments calling `to_s` (1 for 0)
elk.txt · Last modified: by ben