User Tools

Site Tools


elk

Netflow for Mikrotik Routers

Instructions for feeding netflow data into ELK, assuming the stack is already up and running. Netflow data will be placed into separate daily indexes, using the pattern logstash-netflow.${YYYY.MM.DD}.

  • Define a template mapping for the netflow data, mapping the fields to the correct datatypes:
    curl -XPUT http://localhost:9200/_template/logstash-netflow9 -d '{
        "template" : "logstash-netflow9-*",
        "order": 10,
        "settings": {
          "index.cache.field.type": "soft",
          "index.store.compress.stored": true
        },
        "mappings" : {
            "_default_" : {
               "_all" : {"enabled" : false},
               "properties" : {
                  "@message":     { "index": "analyzed", "type": "string"  },
                  "@source":      { "index": "not_analyzed", "type": "string"  },
                  "@source_host": { "index": "not_analyzed", "type": "string" },
                  "@source_path": { "index": "not_analyzed", "type": "string" },
                  "@tags":        { "index": "not_analyzed", "type": "string" },
                  "@timestamp":   { "index": "not_analyzed", "type": "date" },
                  "@type":        { "index": "not_analyzed", "type": "string" },
                  "netflow": {
                       "dynamic": true,
                       "properties": {
                           "version": { "index": "analyzed", "type": "integer" },
                           "first_switched": { "index": "not_analyzed", "type": "date" },
                           "last_switched": { "index": "not_analyzed", "type": "date" },
                           "direction": { "index": "not_analyzed", "type": "integer" },
                           "flowset_id": { "index": "not_analyzed", "type": "integer" },
                           "flow_sampler_id": { "index": "not_analyzed", "type": "integer" },
                           "flow_seq_num": { "index": "not_analyzed", "type": "long" },
                           "src_tos": { "index": "not_analyzed", "type": "integer" },
                           "tcp_flags": { "index": "not_analyzed", "type": "integer" },
                           "protocol": { "index": "not_analyzed", "type": "integer" },
                           "ipv4_next_hop": { "index": "analyzed", "type": "ip" },
                           "in_bytes": { "index": "not_analyzed", "type": "long" },
                           "in_pkts": { "index": "not_analyzed", "type": "long" },
                           "out_bytes": { "index": "not_analyzed", "type": "long" },
                           "out_pkts": { "index": "not_analyzed", "type": "long" },
                           "input_snmp": { "index": "not_analyzed", "type": "long" },
                           "output_snmp": { "index": "not_analyzed", "type": "long" },
                           "ipv4_dst_addr": { "index": "analyzed", "type": "ip" },
                           "ipv4_src_addr": { "index": "analyzed", "type": "ip" },
                           "dst_mask": { "index": "analyzed", "type": "integer" },
                           "src_mask": { "index": "analyzed", "type": "integer" },
                           "dst_as": { "index": "analyzed", "type": "integer" },
                           "src_as": { "index": "analyzed", "type": "integer" },
                           "l4_dst_port": { "index": "not_analyzed", "type": "long" },
                           "l4_src_port": { "index": "not_analyzed", "type": "long" },
                           "ipv4_dst_addr_postnat": { "index": "analyzed", "type": "ip" },
                           "ipv4_src_addr_postnat": { "index": "analyzed", "type": "ip" },
                           "l4_dst_port_postnat": { "index": "not_analyzed", "type": "long" },
                           "l4_src_port_postnat": { "index": "not_analyzed", "type": "long" }
                       },
                       "type": "object"
                   }
                }
            }
       }
    }'
  • Copy the netflow.yaml definitions from /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-netflow-2.0.2/lib/logstash/codecs/netflow.rb to /etc/logstash/mikrotik.netflow9.yaml and patch the following:
    # Changed from 2->4
    10:
    - 4
    - :input_snmp
    14:
    - 4
    - :output_snmp
    
    # Changed from uint24->uint32
    31:
    - :uint32
    - :ipv6_flow_label
    
    # Add these entries:
    225:
    - :ip4_addr
    - :ipv4_src_addr_postnat
    226:
    - :ip4_addr
    - :ipv4_dst_addr_postnat
    227:
    - :uint16
    - :l4_src_port_postnat
    228:
    - :uint16
    - :l4_dst_port_postnat
  • Setup a listening UDP port to receive the UDP data, and feed it into the netflow indexes in elasticsearch:
    /etc/logstash/conf.d/50-netflow.conf
    input {
        udp {
            port  => 9996
            codec => netflow {
                versions => [9]
                definitions => "/etc/logstash/mikrotik.netflow9.yaml"
            }
            type  => "netflow9"
        }
    }
    output {
        elasticsearch {
            hosts => ["localhost:9200"]
            codec => "json"
            index => "logstash-%{type}-%{+YYYY.MM.dd}"
        }
    }
  • In Kibana Settings, add a new index pattern for logstash-netflow9-* (if this pattern already exists and the mapping has been changed since, remember to hit the refresh button to ensure the changed datatypes are picked up. Verify all fields have the right type.
  • Enable IP→Traffic Flow on the desired interfaces, and add the logstash host as a netflow v9 target
  • Verify data is being indexed by doing a search on * against the logstash-netflow9-* index pattern

The following warnings will show up briefly when logstash first starts. This is because the templates needed to understand the netflow messages are published in-band on a regular basis, and when logstash first starts up it might not have seen a copy of the templates before flow data is received. Once the template message is received (defaulting to 20 packets on the mikrtoik boards), these messages will cease:

Jan 28 22:40:08 silverhold logstash[9496]: {:timestamp=>"2016-01-28T22:40:08.238000+0000", :message=>"No matching template for flow id 257", :level=>:warn}
Jan 28 22:40:09 silverhold logstash[9496]: {:timestamp=>"2016-01-28T22:40:09.266000+0000", :message=>"No matching template for flow id 256", :level=>:warn}

Bugs

wrong number of arguments calling `to_s` (1 for 0)

elk.txt · Last modified: 2016/01/28 22:56 by ben