This is an old revision of the document!
Instructions for feeding netflow data into ELK
, assuming the stack is already up and running. Netflow data will be placed into separate daily indexes, using the pattern logstash-netflow.${YYYY.MM.DD}
.
curl -XPUT http://localhost:9200/_template/logstash-netflow -d '{ "template" : "logstash-netflow-*", "order": 10, "settings": { "index.cache.field.type": "soft", "index.store.compress.stored": true }, "mappings" : { "_default_" : { "_all" : {"enabled" : false}, "properties" : { "@message": { "index": "analyzed", "type": "string" }, "@source": { "index": "not_analyzed", "type": "string" }, "@source_host": { "index": "not_analyzed", "type": "string" }, "@source_path": { "index": "not_analyzed", "type": "string" }, "@tags": { "index": "not_analyzed", "type": "string" }, "@timestamp": { "index": "not_analyzed", "type": "date" }, "@type": { "index": "not_analyzed", "type": "string" }, "netflow": { "dynamic": true, "properties": { "version": { "index": "analyzed", "type": "integer" }, "first_switched": { "index": "not_analyzed", "type": "date" }, "last_switched": { "index": "not_analyzed", "type": "date" }, "direction": { "index": "not_analyzed", "type": "integer" }, "flowset_id": { "index": "not_analyzed", "type": "integer" }, "flow_sampler_id": { "index": "not_analyzed", "type": "integer" }, "flow_seq_num": { "index": "not_analyzed", "type": "long" }, "src_tos": { "index": "not_analyzed", "type": "integer" }, "tcp_flags": { "index": "not_analyzed", "type": "integer" }, "protocol": { "index": "not_analyzed", "type": "integer" }, "ipv4_next_hop": { "index": "analyzed", "type": "ip" }, "in_bytes": { "index": "not_analyzed", "type": "long" }, "in_pkts": { "index": "not_analyzed", "type": "long" }, "out_bytes": { "index": "not_analyzed", "type": "long" }, "out_pkts": { "index": "not_analyzed", "type": "long" }, "input_snmp": { "index": "not_analyzed", "type": "long" }, "output_snmp": { "index": "not_analyzed", "type": "long" }, "ipv4_dst_addr": { "index": "analyzed", "type": "ip" }, "ipv4_src_addr": { "index": "analyzed", "type": "ip" }, "dst_mask": { "index": "analyzed", "type": "integer" }, "src_mask": { "index": "analyzed", "type": "integer" }, "dst_as": { "index": "analyzed", "type": "integer" }, "src_as": { "index": "analyzed", "type": "integer" }, "l4_dst_port": { "index": "not_analyzed", "type": "long" }, "l4_src_port": { "index": "not_analyzed", "type": "long" } }, "type": "object" } } } } }'
input { udp { port => 9995 codec => netflow { # Logstash doesn't support importing netflow v9 templates from the netflow device # and lacks built-in templates for id=256,257 leading to errors and no data versions => [5] } type => "netflow" } } output { elasticsearch { hosts => ["localhost:9200"] codec => "json" index => "logstash-%{type}-%{+YYYY.MM.dd}" } }
logstash-netflow-*
(if this pattern already exists and the mapping has been changed since, remember to hit the refresh
button to ensure the changed datatypes are picked up. Verify all fields have the right type.IP→Traffic Flow
on the desired interfaces, and add the logstash host as a netflow v5 target*
against the logstash-netflow-*
index pattern