====== Netflow for Mikrotik Routers ======
Instructions for feeding netflow data into ''ELK'', assuming the stack is already up and running. Netflow data will be placed into separate daily indexes, using the pattern ''logstash-netflow.${YYYY.MM.DD}''.
* Define a template mapping for the netflow data, mapping the fields to the correct datatypes:
curl -XPUT http://localhost:9200/_template/logstash-netflow9 -d '{
"template" : "logstash-netflow9-*",
"order": 10,
"settings": {
"index.cache.field.type": "soft",
"index.store.compress.stored": true
},
"mappings" : {
"_default_" : {
"_all" : {"enabled" : false},
"properties" : {
"@message": { "index": "analyzed", "type": "string" },
"@source": { "index": "not_analyzed", "type": "string" },
"@source_host": { "index": "not_analyzed", "type": "string" },
"@source_path": { "index": "not_analyzed", "type": "string" },
"@tags": { "index": "not_analyzed", "type": "string" },
"@timestamp": { "index": "not_analyzed", "type": "date" },
"@type": { "index": "not_analyzed", "type": "string" },
"netflow": {
"dynamic": true,
"properties": {
"version": { "index": "analyzed", "type": "integer" },
"first_switched": { "index": "not_analyzed", "type": "date" },
"last_switched": { "index": "not_analyzed", "type": "date" },
"direction": { "index": "not_analyzed", "type": "integer" },
"flowset_id": { "index": "not_analyzed", "type": "integer" },
"flow_sampler_id": { "index": "not_analyzed", "type": "integer" },
"flow_seq_num": { "index": "not_analyzed", "type": "long" },
"src_tos": { "index": "not_analyzed", "type": "integer" },
"tcp_flags": { "index": "not_analyzed", "type": "integer" },
"protocol": { "index": "not_analyzed", "type": "integer" },
"ipv4_next_hop": { "index": "analyzed", "type": "ip" },
"in_bytes": { "index": "not_analyzed", "type": "long" },
"in_pkts": { "index": "not_analyzed", "type": "long" },
"out_bytes": { "index": "not_analyzed", "type": "long" },
"out_pkts": { "index": "not_analyzed", "type": "long" },
"input_snmp": { "index": "not_analyzed", "type": "long" },
"output_snmp": { "index": "not_analyzed", "type": "long" },
"ipv4_dst_addr": { "index": "analyzed", "type": "ip" },
"ipv4_src_addr": { "index": "analyzed", "type": "ip" },
"dst_mask": { "index": "analyzed", "type": "integer" },
"src_mask": { "index": "analyzed", "type": "integer" },
"dst_as": { "index": "analyzed", "type": "integer" },
"src_as": { "index": "analyzed", "type": "integer" },
"l4_dst_port": { "index": "not_analyzed", "type": "long" },
"l4_src_port": { "index": "not_analyzed", "type": "long" },
"ipv4_dst_addr_postnat": { "index": "analyzed", "type": "ip" },
"ipv4_src_addr_postnat": { "index": "analyzed", "type": "ip" },
"l4_dst_port_postnat": { "index": "not_analyzed", "type": "long" },
"l4_src_port_postnat": { "index": "not_analyzed", "type": "long" }
},
"type": "object"
}
}
}
}
}'
* Copy the ''netflow.yaml'' definitions from ''/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-netflow-2.0.2/lib/logstash/codecs/netflow.rb'' to ''/etc/logstash/mikrotik.netflow9.yaml'' and patch the following:
# Changed from 2->4
10:
- 4
- :input_snmp
14:
- 4
- :output_snmp
# Changed from uint24->uint32
31:
- :uint32
- :ipv6_flow_label
# Add these entries:
225:
- :ip4_addr
- :ipv4_src_addr_postnat
226:
- :ip4_addr
- :ipv4_dst_addr_postnat
227:
- :uint16
- :l4_src_port_postnat
228:
- :uint16
- :l4_dst_port_postnat
* Setup a listening UDP port to receive the UDP data, and feed it into the netflow indexes in elasticsearch:
input {
udp {
port => 9996
codec => netflow {
versions => [9]
definitions => "/etc/logstash/mikrotik.netflow9.yaml"
}
type => "netflow9"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
codec => "json"
index => "logstash-%{type}-%{+YYYY.MM.dd}"
}
}
* In Kibana Settings, add a new index pattern for ''logstash-netflow9-*'' (if this pattern already exists and the mapping has been changed since, remember to hit the ''refresh'' button to ensure the changed datatypes are picked up. Verify all fields have the right type.
* Enable ''IP->Traffic Flow'' on the desired interfaces, and add the logstash host as a netflow v9 target
* Verify data is being indexed by doing a search on ''*'' against the ''logstash-netflow9-*'' index pattern
The following warnings will show up briefly when logstash first starts. This is because the templates needed to understand the netflow messages are published in-band on a regular basis, and when logstash first starts up it might not have seen a copy of the templates before flow data is received. Once the template message is received (defaulting to 20 packets on the mikrtoik boards), these messages will cease:
Jan 28 22:40:08 silverhold logstash[9496]: {:timestamp=>"2016-01-28T22:40:08.238000+0000", :message=>"No matching template for flow id 257", :level=>:warn}
Jan 28 22:40:09 silverhold logstash[9496]: {:timestamp=>"2016-01-28T22:40:09.266000+0000", :message=>"No matching template for flow id 256", :level=>:warn}
====== Bugs ======
===== wrong number of arguments calling `to_s` (1 for 0) =====
* Bug report: [[https://github.com/logstash-plugins/logstash-codec-netflow/issues/19]]
* Workaround: [[https://github.com/repeatedly/fluent-plugin-netflow/commit/dfcd4f40e6bd57c1d08ddc6eae707fcdc52163f6]]