User Tools

Site Tools


elk

This is an old revision of the document!


Netflow for Mikrotik Routers

Instructions for feeding netflow data into ELK, assuming the stack is already up and running. Netflow data will be placed into separate daily indexes, using the pattern logstash-netflow.${YYYY.MM.DD}.

  • Define a template mapping for the netflow data, mapping the fields to the correct datatypes:
    curl -XPUT http://localhost:9200/_template/logstash-netflow -d '{
        "template" : "logstash-netflow-*",
        "order": 10,
        "settings": {
          "index.cache.field.type": "soft",
          "index.store.compress.stored": true
        },
        "mappings" : {
            "_default_" : {
               "_all" : {"enabled" : false},
               "properties" : {
                  "@message":     { "index": "analyzed", "type": "string"  },
                  "@source":      { "index": "not_analyzed", "type": "string"  },
                  "@source_host": { "index": "not_analyzed", "type": "string" },
                  "@source_path": { "index": "not_analyzed", "type": "string" },
                  "@tags":        { "index": "not_analyzed", "type": "string" },
                  "@timestamp":   { "index": "not_analyzed", "type": "date" },
                  "@type":        { "index": "not_analyzed", "type": "string" },
                  "netflow": {
                       "dynamic": true,
                       "properties": {
                           "version": { "index": "analyzed", "type": "integer" },
                           "first_switched": { "index": "not_analyzed", "type": "date" },
                           "last_switched": { "index": "not_analyzed", "type": "date" },
                           "direction": { "index": "not_analyzed", "type": "integer" },
                           "flowset_id": { "index": "not_analyzed", "type": "integer" },
                           "flow_sampler_id": { "index": "not_analyzed", "type": "integer" },
                           "flow_seq_num": { "index": "not_analyzed", "type": "long" },
                           "src_tos": { "index": "not_analyzed", "type": "integer" },
                           "tcp_flags": { "index": "not_analyzed", "type": "integer" },
                           "protocol": { "index": "not_analyzed", "type": "integer" },
                           "ipv4_next_hop": { "index": "analyzed", "type": "ip" },
                           "in_bytes": { "index": "not_analyzed", "type": "long" },
                           "in_pkts": { "index": "not_analyzed", "type": "long" },
                           "out_bytes": { "index": "not_analyzed", "type": "long" },
                           "out_pkts": { "index": "not_analyzed", "type": "long" },
                           "input_snmp": { "index": "not_analyzed", "type": "long" },
                           "output_snmp": { "index": "not_analyzed", "type": "long" },
                           "ipv4_dst_addr": { "index": "analyzed", "type": "ip" },
                           "ipv4_src_addr": { "index": "analyzed", "type": "ip" },
                           "dst_mask": { "index": "analyzed", "type": "integer" },
                           "src_mask": { "index": "analyzed", "type": "integer" },
                           "dst_as": { "index": "analyzed", "type": "integer" },
                           "src_as": { "index": "analyzed", "type": "integer" },
                           "l4_dst_port": { "index": "not_analyzed", "type": "long" },
                           "l4_src_port": { "index": "not_analyzed", "type": "long" }
                       },
                       "type": "object"
                   }
                }
            }
       }
    }'
  • Setup a listening UDP port to receive the UDP data, and feed it into the netflow indexes in elasticsearch:
    /etc/logstash/conf.d/50-netflow.conf
    input {
        udp {
            port  => 9995
            codec => netflow {
                versions => [5]
            }
            type  => "netflow"
        }
    }
    output {
        elasticsearch {
            hosts => ["localhost:9200"]
            codec => "json"
            index => "logstash-%{type}-%{+YYYY.MM.dd}"
        }
    }
elk.1453926174.txt.gz · Last modified: 2016/01/27 20:22 by ben