User Tools

Site Tools


elk

This is an old revision of the document!


Netflow for Mikrotik Routers

Instructions for feeding netflow data into ELK, assuming the stack is already up and running. Netflow data will be placed into separate daily indexes, using the pattern logstash-netflow.${YYYY.MM.DD}.

  • Define a template mapping for the netflow data, mapping the fields to the correct datatypes:
    curl -XPUT http://localhost:9200/_template/logstash-netflow -d '{
        "template" : "logstash-netflow-*",
        "order": 10,
        "settings": {
          "index.cache.field.type": "soft",
          "index.store.compress.stored": true
        },
        "mappings" : {
            "_default_" : {
               "_all" : {"enabled" : false},
               "properties" : {
                  "@message":     { "index": "analyzed", "type": "string"  },
                  "@source":      { "index": "not_analyzed", "type": "string"  },
                  "@source_host": { "index": "not_analyzed", "type": "string" },
                  "@source_path": { "index": "not_analyzed", "type": "string" },
                  "@tags":        { "index": "not_analyzed", "type": "string" },
                  "@timestamp":   { "index": "not_analyzed", "type": "date" },
                  "@type":        { "index": "not_analyzed", "type": "string" },
                  "netflow": {
                       "dynamic": true,
                       "properties": {
                           "version": { "index": "analyzed", "type": "integer" },
                           "first_switched": { "index": "not_analyzed", "type": "date" },
                           "last_switched": { "index": "not_analyzed", "type": "date" },
                           "direction": { "index": "not_analyzed", "type": "integer" },
                           "flowset_id": { "index": "not_analyzed", "type": "integer" },
                           "flow_sampler_id": { "index": "not_analyzed", "type": "integer" },
                           "flow_seq_num": { "index": "not_analyzed", "type": "long" },
                           "src_tos": { "index": "not_analyzed", "type": "integer" },
                           "tcp_flags": { "index": "not_analyzed", "type": "integer" },
                           "protocol": { "index": "not_analyzed", "type": "integer" },
                           "ipv4_next_hop": { "index": "analyzed", "type": "ip" },
                           "in_bytes": { "index": "not_analyzed", "type": "long" },
                           "in_pkts": { "index": "not_analyzed", "type": "long" },
                           "out_bytes": { "index": "not_analyzed", "type": "long" },
                           "out_pkts": { "index": "not_analyzed", "type": "long" },
                           "input_snmp": { "index": "not_analyzed", "type": "long" },
                           "output_snmp": { "index": "not_analyzed", "type": "long" },
                           "ipv4_dst_addr": { "index": "analyzed", "type": "ip" },
                           "ipv4_src_addr": { "index": "analyzed", "type": "ip" },
                           "dst_mask": { "index": "analyzed", "type": "integer" },
                           "src_mask": { "index": "analyzed", "type": "integer" },
                           "dst_as": { "index": "analyzed", "type": "integer" },
                           "src_as": { "index": "analyzed", "type": "integer" },
                           "l4_dst_port": { "index": "not_analyzed", "type": "long" },
                           "l4_src_port": { "index": "not_analyzed", "type": "long" }
                       },
                       "type": "object"
                   }
                }
            }
       }
    }'
  • Setup a listening UDP port to receive the UDP data, and feed it into the netflow indexes in elasticsearch:
    /etc/logstash/conf.d/50-netflow.conf
    input {
        udp {
            port  => 9995
            codec => netflow {
                # Logstash doesn't support importing netflow v9 templates from the netflow device
                # and lacks built-in templates for id=256,257 leading to errors and no data
                versions => [5]
            }
            type  => "netflow"
        }
    }
    output {
        elasticsearch {
            hosts => ["localhost:9200"]
            codec => "json"
            index => "logstash-%{type}-%{+YYYY.MM.dd}"
        }
    }
  • In Kibana Settings, add a new index pattern for logstash-netflow-* (if this pattern already exists and the mapping has been changed since, remember to hit the refresh button to ensure the changed datatypes are picked up. Verify all fields have the right type.
  • Enable IP→Traffic Flow on the desired interfaces, and add the logstash host as a netflow v5 target
  • Verify data is being indexed by doing a search on * against the logstash-netflow-* index pattern

Bugs

wrong number of arguments calling `to_s` (1 for 0)

elk.1454007918.txt.gz · Last modified: 2016/01/28 19:05 by ben