Browse Source

Merge branch 'master' into cef-issue-21

tags/3.0.0-alpha.0
Jochen Schalanda 1 year ago
parent
commit
121aac0e1f
No account linked to committer's email address
75 changed files with 6168 additions and 17 deletions
  1. 86
    0
      docs/netflow/README.md
  2. BIN
      docs/netflow/images/netflow-dashboard.png
  3. BIN
      docs/netflow/images/netflow-example.png
  4. BIN
      docs/netflow/images/netflow-udp-input-1.png
  5. 6
    0
      graylog-project-parent/pom.xml
  6. 8
    0
      graylog2-server/pom.xml
  7. 40
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/NetFlowPluginModule.java
  8. 216
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/codecs/NetFlowCodec.java
  9. 288
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/codecs/NetflowV9CodecAggregator.java
  10. 36
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/codecs/RemoteAddressCodecAggregator.java
  11. 85
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/codecs/TemplateKey.java
  12. 27
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/flows/CorruptFlowPacketException.java
  13. 23
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/flows/EmptyTemplateException.java
  14. 27
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/flows/FlowException.java
  15. 23
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/flows/InvalidFlowVersionException.java
  16. 194
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/flows/NetFlowFormatter.java
  17. 74
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/inputs/NetFlowUdpInput.java
  18. 80
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/transport/NetFlowUdpTransport.java
  19. 65
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/transport/NetflowMessageAggregationHandler.java
  20. 69
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/utils/ByteBufUtils.java
  21. 116
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/utils/Protocol.java
  22. 76
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v5/NetFlowV5Header.java
  23. 35
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v5/NetFlowV5Packet.java
  24. 144
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v5/NetFlowV5Parser.java
  25. 101
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v5/NetFlowV5Record.java
  26. 23
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9BaseRecord.java
  27. 113
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9FieldDef.java
  28. 71
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9FieldType.java
  29. 202
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9FieldTypeRegistry.java
  30. 66
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9Header.java
  31. 1416
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9Journal.java
  32. 34
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9OptionRecord.java
  33. 37
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9OptionTemplate.java
  34. 50
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9Packet.java
  35. 389
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9Parser.java
  36. 32
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9Record.java
  37. 36
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9ScopeDef.java
  38. 46
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9Template.java
  39. 62
    0
      graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/RawNetFlowV9Packet.java
  40. 2
    0
      graylog2-server/src/main/java/org/graylog2/commands/Server.java
  41. 7
    1
      graylog2-server/src/main/java/org/graylog2/rest/models/system/ldap/requests/LdapSettingsRequest.java
  42. 4
    4
      graylog2-server/src/main/java/org/graylog2/rest/models/system/ldap/responses/LdapSettingsResponse.java
  43. 15
    3
      graylog2-server/src/main/java/org/graylog2/rest/resources/system/ldap/LdapResource.java
  44. 9
    0
      graylog2-server/src/main/java/org/graylog2/security/ldap/LdapSettingsImpl.java
  45. 2
    0
      graylog2-server/src/main/java/org/graylog2/shared/security/ldap/LdapSettings.java
  46. 474
    0
      graylog2-server/src/main/resources/netflow9.yml
  47. 21
    0
      graylog2-server/src/main/resources/netflow_v9.proto
  48. 145
    0
      graylog2-server/src/test/java/org/graylog/plugins/netflow/codecs/NetFlowCodecTest.java
  49. 541
    0
      graylog2-server/src/test/java/org/graylog/plugins/netflow/codecs/NetflowV9CodecAggregatorTest.java
  50. 73
    0
      graylog2-server/src/test/java/org/graylog/plugins/netflow/transport/NetFlowUdpTransportTest.java
  51. 44
    0
      graylog2-server/src/test/java/org/graylog/plugins/netflow/utils/ProtocolTest.java
  52. 190
    0
      graylog2-server/src/test/java/org/graylog/plugins/netflow/v5/NetFlowV5ParserTest.java
  53. 51
    0
      graylog2-server/src/test/java/org/graylog/plugins/netflow/v9/NetFlowV9FieldTypeRegistryTest.java
  54. 29
    0
      graylog2-server/src/test/java/org/graylog/plugins/netflow/v9/NetFlowV9HeaderTest.java
  55. 122
    0
      graylog2-server/src/test/java/org/graylog/plugins/netflow/v9/NetFlowV9ParserTest.java
  56. BIN
      graylog2-server/src/test/resources/netflow-data/netflow-v5-1.dat
  57. BIN
      graylog2-server/src/test/resources/netflow-data/netflow-v5-2.dat
  58. BIN
      graylog2-server/src/test/resources/netflow-data/netflow-v9-1.dat
  59. BIN
      graylog2-server/src/test/resources/netflow-data/netflow-v9-2-1.dat
  60. BIN
      graylog2-server/src/test/resources/netflow-data/netflow-v9-2-2.dat
  61. BIN
      graylog2-server/src/test/resources/netflow-data/netflow-v9-2-3.dat
  62. BIN
      graylog2-server/src/test/resources/netflow-data/netflow-v9-3_incomplete.dat
  63. BIN
      graylog2-server/src/test/resources/netflow-data/netflow5.pcap
  64. BIN
      graylog2-server/src/test/resources/netflow-data/netflow9.pcap
  65. BIN
      graylog2-server/src/test/resources/netflow-data/netgraph-netflow5.pcap
  66. BIN
      graylog2-server/src/test/resources/netflow-data/nprobe-netflow9-2.pcap
  67. BIN
      graylog2-server/src/test/resources/netflow-data/nprobe-netflow9-3.pcap
  68. BIN
      graylog2-server/src/test/resources/netflow-data/nprobe-netflow9-4.pcap
  69. BIN
      graylog2-server/src/test/resources/netflow-data/nprobe-netflow9.pcap
  70. BIN
      graylog2-server/src/test/resources/netflow-data/pmacctd-netflow5.pcap
  71. BIN
      graylog2-server/src/test/resources/netflow-data/pmacctd-netflow9.pcap
  72. 4
    0
      graylog2-web-interface/src/components/ldap/LdapComponent.css
  73. 30
    6
      graylog2-web-interface/src/components/ldap/LdapComponent.jsx
  74. 8
    3
      graylog2-web-interface/webpack.combined.config.js
  75. 1
    0
      pom.xml

+ 86
- 0
docs/netflow/README.md View File

@@ -0,0 +1,86 @@
NetFlow Plugin for Graylog
==========================

[![Build Status](https://travis-ci.org/Graylog2/graylog-plugin-netflow.svg?branch=master)](https://travis-ci.org/Graylog2/graylog-plugin-netflow)

This plugin provides a NetFlow UDP input to act as a Flow collector that receives data from Flow exporters. Each received Flow will be converted to a Graylog message.

**Required Graylog version:** 2.3.0 and later

## Supported NetFlow Versions

The version of the plugin now supports NetFlow V9. It can support IPv6 addresses without
conversion and handles all of the fields from the fixed V5 format. In addition this plugin supports
events from a CISCO ASA 5500, including firewall and routing events. Beware, there is significant
duplication of typical syslog reporting in the v9 reporting.

## Installation
> Since Graylog Version 2.4.0 this plugin is already included in the Graylog server installation package as default plugin.

[Download the plugin](https://github.com/Graylog2/graylog-plugin-netflow/releases)
and place the `.jar` file in your Graylog plugin directory. The plugin directory
is the `plugins/` folder relative from your `graylog-server` directory by default
and can be configured in your `graylog.conf` file.

Restart `graylog-server` and you are done.

## Setup

In the Graylog web interface, go to System/Inputs and create a new NetFlow input like this:

![NetFlow input creation dialog](https://github.com/Graylog2/graylog-plugin-netflow/blob/master/images/netflow-udp-input-1.png)

## Example Message

This is an example NetFlow message in Graylog:

![NetFlow example fields screenshot](https://github.com/Graylog2/graylog-plugin-netflow/blob/master/images/netflow-example.png)

## Example Dashboard

This is an example of a dashboard with NetFlow data:

![NetFlow example dashboard screenshot](https://github.com/Graylog2/graylog-plugin-netflow/blob/master/images/netflow-dashboard.png)

## Credits

The NetFlow parsing code is based on the https://github.com/wasted/netflow project and has been ported from Scala to Java.

## Plugin Development

### Testing

To generate some NetFlow data for debugging and testing you can use softflowd.

Example command and output:

```
# softflowd -D -i eth0 -v 5 -t maxlife=1 -n 10.0.2.2:2055

Using eth0 (idx: 0)
softflowd v0.9.9 starting data collection
Exporting flows to [10.0.2.2]:2055
ADD FLOW seq:1 [10.0.2.2]:48164 <> [10.0.2.15]:22 proto:6
ADD FLOW seq:2 [10.0.2.2]:51428 <> [10.0.2.15]:22 proto:6
Starting expiry scan: mode 0
Queuing flow seq:1 (0x7fef0318bc70) for expiry reason 6
Finished scan 1 flow(s) to be evicted
Sending v5 flow packet len = 120
sent 1 netflow packets
EXPIRED: seq:1 [10.0.2.2]:48164 <> [10.0.2.15]:22 proto:6 octets>:322 packets>:7 octets<:596 packets<:7 start:2015-07-21T13:18:01.236 finish:2015-07-21T13:18:27.718 tcp>:10 tcp<:18 flowlabel>:00000000 flo
wlabel<:00000000 (0x7fef0318bc70)
ADD FLOW seq:3 [10.0.2.2]:2055 <> [10.0.2.15]:48363 proto:17
ADD FLOW seq:4 [10.0.2.2]:48164 <> [10.0.2.15]:22 proto:6
```

## Plugin Release

We are using the Maven release plugin:

```
$ mvn release:prepare
[...]
$ mvn release:perform
```

This sets the version numbers, creates a tag and pushes to GitHub. Travis CI will build the release artifacts and upload to GitHub automatically.

BIN
docs/netflow/images/netflow-dashboard.png View File


BIN
docs/netflow/images/netflow-example.png View File


BIN
docs/netflow/images/netflow-udp-input-1.png View File


+ 6
- 0
graylog-project-parent/pom.xml View File

@@ -639,6 +639,12 @@
<version>${okhttp.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.pkts</groupId>
<artifactId>pkts-core</artifactId>
<version>${pkts.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>


+ 8
- 0
graylog2-server/pom.xml View File

@@ -177,6 +177,10 @@
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
@@ -556,6 +560,10 @@
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
<dependency>
<groupId>io.pkts</groupId>
<artifactId>pkts-core</artifactId>
</dependency>
</dependencies>

<build>

+ 40
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/NetFlowPluginModule.java View File

@@ -0,0 +1,40 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow;

import org.graylog.plugins.netflow.codecs.NetFlowCodec;
import org.graylog.plugins.netflow.inputs.NetFlowUdpInput;
import org.graylog.plugins.netflow.transport.NetFlowUdpTransport;
import org.graylog2.plugin.PluginConfigBean;
import org.graylog2.plugin.PluginModule;

import java.util.Collections;
import java.util.Set;

public class NetFlowPluginModule extends PluginModule {
@Override
public Set<? extends PluginConfigBean> getConfigBeans() {
return Collections.emptySet();
}

@Override
protected void configure() {
addMessageInput(NetFlowUdpInput.class);
addCodec("netflow", NetFlowCodec.class);
addTransport("netflow-udp", NetFlowUdpTransport.class);
}
}

+ 216
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/codecs/NetFlowCodec.java View File

@@ -0,0 +1,216 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.codecs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.inject.assistedinject.Assisted;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.graylog.plugins.netflow.flows.FlowException;
import org.graylog.plugins.netflow.flows.NetFlowFormatter;
import org.graylog.plugins.netflow.v5.NetFlowV5Packet;
import org.graylog.plugins.netflow.v5.NetFlowV5Parser;
import org.graylog.plugins.netflow.v9.NetFlowV9FieldTypeRegistry;
import org.graylog.plugins.netflow.v9.NetFlowV9Journal;
import org.graylog.plugins.netflow.v9.NetFlowV9OptionTemplate;
import org.graylog.plugins.netflow.v9.NetFlowV9Packet;
import org.graylog.plugins.netflow.v9.NetFlowV9Parser;
import org.graylog.plugins.netflow.v9.NetFlowV9Record;
import org.graylog.plugins.netflow.v9.NetFlowV9Template;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ResolvableInetSocketAddress;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.annotations.Codec;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.AbstractCodec;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.codecs.MultiMessageCodec;
import org.graylog2.plugin.inputs.transports.NettyTransport;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Codec(name = "netflow", displayName = "NetFlow")
public class NetFlowCodec extends AbstractCodec implements MultiMessageCodec {
/**
* Marker byte which signals that the contained netflow packet should be parsed as is.
*/
public static final byte PASSTHROUGH_MARKER = 0x00;
/**
* Marker byte which signals that the contained netflow v9 packet is non-RFC:
* It contains all necessary template flows before any data flows and can be completely parsed without a template cache.
*/
public static final byte ORDERED_V9_MARKER = 0x01;
@VisibleForTesting
static final String CK_NETFLOW9_DEFINITION_PATH = "netflow9_definitions_Path";
private static final Logger LOG = LoggerFactory.getLogger(NetFlowCodec.class);
private final NetFlowV9FieldTypeRegistry typeRegistry;
private final NetflowV9CodecAggregator netflowV9CodecAggregator;

@Inject
protected NetFlowCodec(@Assisted Configuration configuration, NetflowV9CodecAggregator netflowV9CodecAggregator) throws IOException {
super(configuration);
this.netflowV9CodecAggregator = netflowV9CodecAggregator;

final String netFlow9DefinitionsPath = configuration.getString(CK_NETFLOW9_DEFINITION_PATH);
if (netFlow9DefinitionsPath == null || netFlow9DefinitionsPath.trim().isEmpty()) {
this.typeRegistry = NetFlowV9FieldTypeRegistry.create();
} else {
try (InputStream inputStream = new FileInputStream(netFlow9DefinitionsPath)) {
this.typeRegistry = NetFlowV9FieldTypeRegistry.create(inputStream);
}
}
}

@Nullable
@Override
public CodecAggregator getAggregator() {
return netflowV9CodecAggregator;
}

@Nullable
@Override
public Message decode(@Nonnull RawMessage rawMessage) {
throw new UnsupportedOperationException("MultiMessageCodec " + getClass() + " does not support decode()");
}

@Nullable
@Override
public Collection<Message> decodeMessages(@Nonnull RawMessage rawMessage) {
try {
final ResolvableInetSocketAddress remoteAddress = rawMessage.getRemoteAddress();
final InetSocketAddress sender = remoteAddress != null ? remoteAddress.getInetSocketAddress() : null;

final byte[] payload = rawMessage.getPayload();
if (payload.length < 3) {
LOG.debug("NetFlow message (source: {}) doesn't even fit the NetFlow version (size: {} bytes)",
sender, payload.length);
return null;
}

final ByteBuf buffer = Unpooled.wrappedBuffer(payload);
switch (buffer.readByte()) {
case PASSTHROUGH_MARKER:
final NetFlowV5Packet netFlowV5Packet = NetFlowV5Parser.parsePacket(buffer);

return netFlowV5Packet.records().stream()
.map(record -> NetFlowFormatter.toMessage(netFlowV5Packet.header(), record, sender))
.collect(Collectors.toList());
case ORDERED_V9_MARKER:
// our "custom" netflow v9 that has all the templates in the same packet
return decodeV9(sender, buffer);
default:
final List<RawMessage.SourceNode> sourceNodes = rawMessage.getSourceNodes();
final RawMessage.SourceNode sourceNode = sourceNodes.isEmpty() ? null : sourceNodes.get(sourceNodes.size() - 1);
final String inputId = sourceNode == null ? "<unknown>" : sourceNode.inputId;
LOG.warn("Unsupported NetFlow packet on input {} (source: {})", inputId, sender);
return null;
}
} catch (FlowException e) {
LOG.error("Error parsing NetFlow packet <{}> received from <{}>", rawMessage.getId(), rawMessage.getRemoteAddress(), e);
if (LOG.isDebugEnabled()) {
LOG.debug("NetFlow packet hexdump:\n{}", ByteBufUtil.prettyHexDump(Unpooled.wrappedBuffer(rawMessage.getPayload())));
}
return null;
} catch (InvalidProtocolBufferException e) {
LOG.error("Invalid NetFlowV9 entry found, cannot parse the messages", ExceptionUtils.getRootCause(e));
return null;
}
}

@VisibleForTesting
Collection<Message> decodeV9(InetSocketAddress sender, ByteBuf buffer) throws InvalidProtocolBufferException {
final List<NetFlowV9Packet> netFlowV9Packets = decodeV9Packets(buffer);

return netFlowV9Packets.stream().map(netFlowV9Packet -> netFlowV9Packet.records().stream()
.filter(record -> record instanceof NetFlowV9Record)
.map(record -> NetFlowFormatter.toMessage(netFlowV9Packet.header(), record, sender))
.collect(Collectors.toList())
).flatMap(Collection::stream)
.collect(Collectors.toList());
}

@VisibleForTesting
List<NetFlowV9Packet> decodeV9Packets(ByteBuf buffer) throws InvalidProtocolBufferException {
byte[] v9JournalEntry = new byte[buffer.readableBytes()];
buffer.readBytes(v9JournalEntry);
final NetFlowV9Journal.RawNetflowV9 rawNetflowV9 = NetFlowV9Journal.RawNetflowV9.parseFrom(v9JournalEntry);

// parse all templates used in the packet
final Map<Integer, NetFlowV9Template> templateMap = Maps.newHashMap();
rawNetflowV9.getTemplatesMap().forEach((templateId, byteString) -> {
final NetFlowV9Template netFlowV9Template = NetFlowV9Parser.parseTemplate(
Unpooled.wrappedBuffer(byteString.toByteArray()), typeRegistry);
templateMap.put(templateId, netFlowV9Template);
});
final NetFlowV9OptionTemplate[] optionTemplate = {null};
rawNetflowV9.getOptionTemplateMap().forEach((templateId, byteString) -> {
optionTemplate[0] = NetFlowV9Parser.parseOptionTemplate(Unpooled.wrappedBuffer(byteString.toByteArray()), typeRegistry);
});

return rawNetflowV9.getPacketsList().stream()
.map(bytes -> Unpooled.wrappedBuffer(bytes.toByteArray()))
.map(buf -> NetFlowV9Parser.parsePacket(buf, typeRegistry, templateMap, optionTemplate[0]))
.collect(Collectors.toList());
}

@FactoryClass
public interface Factory extends AbstractCodec.Factory<NetFlowCodec> {
@Override
NetFlowCodec create(Configuration configuration);

@Override
Config getConfig();
}

@ConfigClass
public static class Config extends AbstractCodec.Config {
@Override
public void overrideDefaultValues(@Nonnull ConfigurationRequest cr) {
if (cr.containsField(NettyTransport.CK_PORT)) {
cr.getField(NettyTransport.CK_PORT).setDefaultValue(2055);
}
}

@Override
public ConfigurationRequest getRequestedConfiguration() {
final ConfigurationRequest configuration = super.getRequestedConfiguration();
configuration.addField(new TextField(CK_NETFLOW9_DEFINITION_PATH, "Netflow 9 field definitions", "", "Path to the YAML file containing Netflow 9 field definitions", ConfigurationField.Optional.OPTIONAL));
return configuration;
}
}
}

+ 288
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/codecs/NetflowV9CodecAggregator.java View File

@@ -0,0 +1,288 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.codecs;

import com.github.joschi.jadconfig.util.Size;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.graylog.plugins.netflow.v9.NetFlowV9Journal;
import org.graylog.plugins.netflow.v9.NetFlowV9Parser;
import org.graylog.plugins.netflow.v9.RawNetFlowV9Packet;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.inject.Inject;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* For Netflow v9 packets we want to prepend the corresponding flow template.
* If we don't have that template yet, we consider the flow packet to be incomplete and continue to wait for the template.
* TODO consider sharing seen templates between nodes in the cluster to minimize wait time
*/
public class NetflowV9CodecAggregator implements RemoteAddressCodecAggregator {
private static final Logger LOG = LoggerFactory.getLogger(NetflowV9CodecAggregator.class);

private static final ByteBuf PASSTHROUGH_MARKER = Unpooled.wrappedBuffer(new byte[]{NetFlowCodec.PASSTHROUGH_MARKER});

private final Cache<TemplateKey, TemplateBytes> templateCache;
private final Cache<TemplateKey, Queue<PacketBytes>> packetCache;

@Inject
public NetflowV9CodecAggregator() {
// TODO customize
this.templateCache = CacheBuilder.newBuilder()
.maximumSize(5000)
.removalListener(notification -> LOG.debug("Removed {} from template cache for reason {}", notification.getKey(), notification.getCause()))
.recordStats()
.build();
this.packetCache = CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.maximumWeight(Size.megabytes(1).toBytes())
.removalListener((RemovalListener<TemplateKey, Queue<PacketBytes>>) notification -> LOG.debug("Removed {} from packet cache for reason {}", notification.getKey(), notification.getCause()))
.weigher((key, value) -> value.stream().map(PacketBytes::readableBytes).reduce(0, Integer::sum))
.recordStats()
.build();
}

@Nonnull
@Override
public Result addChunk(ByteBuf buf, SocketAddress remoteAddress) {
if (buf.readableBytes() < 2) {
// the buffer doesn't contain enough bytes to be a netflow packet, discard the packet
return new Result(null, false);
}

// This thing is using *WAY* too many copies. :'(
try {
final int netFlowVersion = buf.getShort(0);

// only netflow v9 needs special treatment, everything else we just pass on
if (netFlowVersion != 9) {
return new Result(Unpooled.copiedBuffer(PASSTHROUGH_MARKER, buf), true);
}

// for NetFlow V9 we check that we have previously received template flows for each data flow.
// if we do not have them yet, buffer the data flows until we receive a matching template
// since we do not want to do that again in the codec, we will violate the RFC when putting together
// the packets again:
// the codec can, contrary to https://tools.ietf.org/html/rfc3954#section-9, assume that for each packet/RawMessage
// the packet contains all necessary templates. This greatly simplifies parsing at the expense of larger RawMessages.

// The rest of the code works as follows:
// We shallowly parse the incoming packet, extracting all flows into ByteBufs.
// We then cache the raw bytes for template flows, keyed by remote ip and source id. These are used to reassemble the packet for the journal later.
// For each netflow v9 packet that we do not have a matching template for yet, we put it into a queue.
// Once the template flow arrives we go back through the queue and remove now matching packets for further processing.
if (LOG.isTraceEnabled()) {
LOG.trace("Received V9 packet:\n{}", ByteBufUtil.prettyHexDump(buf));
}
final RawNetFlowV9Packet rawNetFlowV9Packet = NetFlowV9Parser.parsePacketShallow(buf);
final long sourceId = rawNetFlowV9Packet.header().sourceId();

LOG.trace("Incoming NetFlow V9 packet contains: {}", rawNetFlowV9Packet);

// register templates and check for buffered flows
for (Map.Entry<Integer, byte[]> template : rawNetFlowV9Packet.templates().entrySet()) {
final int templateId = template.getKey();
final byte[] bytes = template.getValue();

final TemplateKey templateKey = new TemplateKey(remoteAddress, sourceId, templateId);
final TemplateBytes templateBytes = new TemplateBytes(bytes, false);
templateCache.put(templateKey, templateBytes);
}

final Map.Entry<Integer, byte[]> optionTemplate = rawNetFlowV9Packet.optionTemplate();
if (optionTemplate != null) {
final int templateId = optionTemplate.getKey();
final byte[] bytes = optionTemplate.getValue();

final TemplateKey templateKey = new TemplateKey(remoteAddress, sourceId, templateId);
final TemplateBytes templateBytes = new TemplateBytes(bytes, true);

templateCache.put(templateKey, templateBytes);
}

// this list of flows to return in the result
// Using ByteBuf here to enable de-duplication with the hash set.
final Set<ByteBuf> packetsToSend = new HashSet<>();

// if we have new templates, figure out which buffered packets template requirements are now satisfied
if (!rawNetFlowV9Packet.templates().isEmpty() || rawNetFlowV9Packet.optionTemplate() != null) {
final Set<Integer> knownTemplateIds = new HashSet<>();
for (TemplateKey templateKey : templateCache.asMap().keySet()) {
if (templateKey.getRemoteAddress() == remoteAddress && templateKey.getSourceId() == sourceId) {
final Integer templateId = templateKey.getTemplateId();
knownTemplateIds.add(templateId);
}
}

final Queue<PacketBytes> bufferedPackets = packetCache.getIfPresent(TemplateKey.idForExporter(remoteAddress, sourceId));
if (bufferedPackets != null) {
final List<PacketBytes> tempQueue = new ArrayList<>(bufferedPackets.size());
PacketBytes previousPacket;
int addedPackets = 0;
while (null != (previousPacket = bufferedPackets.poll())) {
// are all templates the packet references there?
if (knownTemplateIds.containsAll(previousPacket.getUsedTemplates())) {
packetsToSend.add(Unpooled.wrappedBuffer(previousPacket.getBytes()));
addedPackets++;
} else {
tempQueue.add(previousPacket);
}
}
LOG.debug("Processing {} previously buffered packets, {} packets require more templates.", addedPackets, tempQueue.size());
// if we couldn't process some of the buffered packets, add them back to the queue to wait for more templates to come in
if (!tempQueue.isEmpty()) {
bufferedPackets.addAll(tempQueue);
}
}
}

boolean packetBuffered = false;

// the list of template keys to return in the result
final Set<TemplateKey> templates = new HashSet<>();

// find out which templates we need to include for the current packet
for (int templateId : rawNetFlowV9Packet.usedTemplates()) {
final TemplateKey templateKey = new TemplateKey(remoteAddress, sourceId, templateId);
final TemplateBytes template = templateCache.getIfPresent(templateKey);

if (template == null) {
// we don't have the template, this packet needs to be buffered until we receive the templates
try {
final TemplateKey newTemplateKey = TemplateKey.idForExporter(remoteAddress, sourceId);
final Queue<PacketBytes> bufferedPackets = packetCache.get(newTemplateKey, ConcurrentLinkedQueue::new);
final byte[] bytes = ByteBufUtil.getBytes(buf);
bufferedPackets.add(new PacketBytes(bytes, rawNetFlowV9Packet.usedTemplates()));
packetBuffered = true;
} catch (ExecutionException ignored) {
// the loader cannot fail, it only creates a new queue
}
} else {
// include the template in our result
templates.add(templateKey);

// .slice is enough here, because we convert it into a byte array when creating the result below
// no need to copy or retain anything, the buffer only lives as long as this method's scope
final ByteBuf packet = buf.slice();
packetsToSend.add(packet);
}
}

// if we have buffered this packet, don't try to process it now. we still need all the templates for it
if (packetBuffered) {
return new Result(null, true);
}

// if we didn't buffer anything but also didn't have anything queued that can be processed, don't proceed.
if (packetsToSend.isEmpty()) {
return new Result(null, true);
}

// add the used templates and option template to the journal message builder
final NetFlowV9Journal.RawNetflowV9.Builder builder = NetFlowV9Journal.RawNetflowV9.newBuilder();
for (TemplateKey templateKey : templates) {
final TemplateBytes templateBytes = templateCache.getIfPresent(templateKey);
if (templateBytes == null) {
LOG.warn("Template {} expired while processing, discarding netflow packet", templateKey);
} else if (templateBytes.isOptionTemplate()) {
LOG.debug("Writing options template flow {}", templateKey);
final byte[] bytes = templateBytes.getBytes();
builder.putOptionTemplate(1, ByteString.copyFrom(bytes));
} else {
LOG.debug("Writing template {}", templateKey);
final byte[] bytes = templateBytes.getBytes();
builder.putTemplates(templateKey.getTemplateId(), ByteString.copyFrom(bytes));
}
}

// finally write out all the packets we had buffered as well as the current one
for (ByteBuf packetBuffer : packetsToSend) {
final byte[] bytes = ByteBufUtil.getBytes(packetBuffer);
final ByteString value = ByteString.copyFrom(bytes);
builder.addPackets(value);
}

final byte[] bytes = builder.build().toByteArray();
final ByteBuf resultBuffer = Unpooled.buffer(bytes.length + 1)
.writeByte(NetFlowCodec.ORDERED_V9_MARKER)
.writeBytes(bytes);
return new Result(resultBuffer, true);

} catch (Exception e) {
LOG.error("Unexpected failure while aggregating NetFlowV9 packet, discarding packet.", ExceptionUtils.getRootCause(e));
return new Result(null, false);
}
}

private class TemplateBytes {
private final byte[] bytes;
private final boolean optionTemplate;

public TemplateBytes(byte[] bytes, boolean optionTemplate) {
this.bytes = bytes;
this.optionTemplate = optionTemplate;
}

public byte[] getBytes() {
return bytes;
}

public boolean isOptionTemplate() {
return optionTemplate;
}
}

public static class PacketBytes {
private final byte[] bytes;
private final Set<Integer> usedTemplates;

public PacketBytes(byte[] bytes, Set<Integer> usedTemplates) {
this.bytes = bytes;
this.usedTemplates = usedTemplates;
}

public byte[] getBytes() {
return bytes;
}

public Set<Integer> getUsedTemplates() {
return usedTemplates;
}

public int readableBytes() {
return bytes.length;
}
}
}

+ 36
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/codecs/RemoteAddressCodecAggregator.java View File

@@ -0,0 +1,36 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.codecs;

import io.netty.buffer.ByteBuf;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.SocketAddress;

public interface RemoteAddressCodecAggregator extends CodecAggregator {

@Nonnull
@Override
default Result addChunk(ByteBuf buf) {
return addChunk(buf, null);
}

@Nonnull
Result addChunk(ByteBuf buf, @Nullable SocketAddress remoteAddress);
}

+ 85
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/codecs/TemplateKey.java View File

@@ -0,0 +1,85 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.codecs;

import com.google.common.base.MoreObjects;

import java.net.SocketAddress;
import java.util.Objects;

/**
* The unique key for template flow ids, which is exporter source address and its obversation id (source ID)
*/
public class TemplateKey {
private final SocketAddress remoteAddress;
private final long sourceId;
private final int templateId;

/**
* A key usable for identifying netflow exporters, when the template id is irrelevant.
* This is used for grouping buffered packets by their exporter, because template ids are only unique across remote address and source id.
*
* @param remoteAddress the exporters address
* @param sourceId the observation id
* @return object for use as cache key
*/
public static TemplateKey idForExporter(SocketAddress remoteAddress, long sourceId) {
return new TemplateKey(remoteAddress, sourceId, -1);
}

public TemplateKey(SocketAddress remoteAddress, long sourceId, int templateId) {
this.remoteAddress = remoteAddress;
this.sourceId = sourceId;
this.templateId = templateId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TemplateKey that = (TemplateKey) o;
return sourceId == that.sourceId &&
templateId == that.templateId &&
Objects.equals(remoteAddress, that.remoteAddress);
}

public SocketAddress getRemoteAddress() {
return remoteAddress;
}

public long getSourceId() {
return sourceId;
}

public int getTemplateId() {
return templateId;
}

@Override
public int hashCode() {
return Objects.hash(remoteAddress, sourceId, templateId);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("remoteAddress", remoteAddress)
.add("sourceId", sourceId)
.add("templateId", templateId)
.toString();
}
}

+ 27
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/flows/CorruptFlowPacketException.java View File

@@ -0,0 +1,27 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.flows;

public class CorruptFlowPacketException extends FlowException {
public CorruptFlowPacketException() {
super();
}

public CorruptFlowPacketException(String message) {
super(message);
}
}

+ 23
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/flows/EmptyTemplateException.java View File

@@ -0,0 +1,23 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.flows;

public class EmptyTemplateException extends FlowException {
public EmptyTemplateException(String message) {
super(message);
}
}

+ 27
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/flows/FlowException.java View File

@@ -0,0 +1,27 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.flows;

public class FlowException extends RuntimeException {
public FlowException() {
super();
}

public FlowException(String message) {
super(message);
}
}

+ 23
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/flows/InvalidFlowVersionException.java View File

@@ -0,0 +1,23 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.flows;

public class InvalidFlowVersionException extends FlowException {
public InvalidFlowVersionException(int version) {
super("Invalid NetFlow version " + version);
}
}

+ 194
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/flows/NetFlowFormatter.java View File

@@ -0,0 +1,194 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.flows;

import com.google.common.collect.ImmutableMap;
import org.graylog.plugins.netflow.utils.ByteBufUtils;
import org.graylog.plugins.netflow.utils.Protocol;
import org.graylog.plugins.netflow.v5.NetFlowV5Header;
import org.graylog.plugins.netflow.v5.NetFlowV5Record;
import org.graylog.plugins.netflow.v9.NetFlowV9BaseRecord;
import org.graylog.plugins.netflow.v9.NetFlowV9Header;
import org.graylog2.plugin.Message;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.util.Locale;
import java.util.Map;

public class NetFlowFormatter {
private static final String MF_VERSION = "nf_version";
private static final String MF_FLOW_PACKET_ID = "nf_flow_packet_id";
private static final String MF_TOS = "nf_tos";
private static final String MF_SRC = "nf_src";
private static final String MF_SRC_ADDRESS = "nf_src_address";
private static final String MF_SRC_TOS = "nf_src_tos";
private static final String MF_DST = "nf_dst";
private static final String MF_DST_ADDRESS = "nf_dst_address";
private static final String MF_DST_TOS = "nf_dst_tos";
private static final String MF_NEXT_HOP = "nf_next_hop";
private static final String MF_SRC_PORT = "nf_src_port";
private static final String MF_DST_PORT = "nf_dst_port";
private static final String MF_SRC_MASK = "nf_src_mask";
private static final String MF_DST_MASK = "nf_dst_mask";
private static final String MF_SRC_AS = "nf_src_as";
private static final String MF_DST_AS = "nf_dst_as";
private static final String MF_PROTO = "nf_proto";
private static final String MF_PROTO_NAME = "nf_proto_name";
private static final String MF_TCP_FLAGS = "nf_tcp_flags";
private static final String MF_START = "nf_start";
private static final String MF_STOP = "nf_stop";
private static final String MF_BYTES = "nf_bytes";
private static final String MF_PKTS = "nf_pkts";
private static final String MF_SNMP_INPUT = "nf_snmp_input";
private static final String MF_SNMP_OUTPUT = "nf_snmp_output";

private static String toMessageString(NetFlowV5Record record) {
return String.format(Locale.ROOT, "NetFlowV5 [%s]:%d <> [%s]:%d proto:%d pkts:%d bytes:%d",
record.srcAddr().getHostAddress(), record.srcPort(),
record.dstAddr().getHostAddress(), record.dstPort(),
record.protocol(), record.packetCount(), record.octetCount());
}

private static String toMessageString(NetFlowV9BaseRecord record) {
final ImmutableMap<String, Object> fields = record.fields();
final long packetCount = (long) fields.get("in_pkts");
final long octetCount = (long) fields.get("in_bytes");
final String srcAddr = (String) fields.get("ipv4_src_addr");
final String dstAddr = (String) fields.get("ipv4_dst_addr");
final Integer srcPort = (Integer) fields.get("l4_src_port");
final Integer dstPort = (Integer) fields.get("l4_dst_port");
final Short protocol = (Short) fields.get("protocol");

return String.format(Locale.ROOT, "NetFlowV9 [%s]:%d <> [%s]:%d proto:%d pkts:%d bytes:%d",
srcAddr, srcPort,
dstAddr, dstPort,
protocol, packetCount, octetCount);
}

public static Message toMessage(NetFlowV5Header header,
NetFlowV5Record record,
@Nullable InetSocketAddress sender) {
final String source = sender == null ? null : sender.getAddress().getHostAddress();
final long timestamp = header.unixSecs() * 1000L + (header.unixNsecs() / 1000000L);
final Message message = new Message(toMessageString(record), source, new DateTime(timestamp, DateTimeZone.UTC));

message.addField(MF_VERSION, 5);
message.addField(MF_FLOW_PACKET_ID, header.flowSequence());
message.addField(MF_TOS, record.tos());
message.addField(MF_SRC, record.srcAddr().getHostAddress() + ":" + record.srcPort());
message.addField(MF_SRC_ADDRESS, record.srcAddr().getHostAddress());
message.addField(MF_DST, record.dstAddr().getHostAddress() + ":" + record.dstPort());
message.addField(MF_DST_ADDRESS, record.dstAddr().getHostAddress());
if (!ByteBufUtils.DEFAULT_INET_ADDRESS.equals(record.nextHop())) {
message.addField(MF_NEXT_HOP, record.nextHop().getHostAddress());
}
message.addField(MF_SRC_PORT, record.srcPort());
message.addField(MF_DST_PORT, record.dstPort());
message.addField(MF_SRC_MASK, record.srcMask());
message.addField(MF_DST_MASK, record.dstMask());
message.addField(MF_SRC_AS, record.srcAs());
message.addField(MF_DST_AS, record.dstAs());

message.addField(MF_PROTO, record.protocol());
final Protocol protocol = Protocol.getByNumber(record.protocol());
if (protocol != null) {
message.addField(MF_PROTO_NAME, protocol.getAlias());
}
message.addField(MF_TCP_FLAGS, record.tcpFlags());
if (record.first() > 0) {
long start = timestamp - (header.sysUptime() - record.first());
message.addField(MF_START, new DateTime(start, DateTimeZone.UTC));
}
if (record.last() > 0) {
long stop = timestamp - (header.sysUptime() - record.last());
message.addField(MF_STOP, new DateTime(stop, DateTimeZone.UTC));
}
message.addField(MF_BYTES, record.octetCount());
message.addField(MF_PKTS, record.packetCount());
message.addField(MF_SNMP_INPUT, record.inputIface());
message.addField(MF_SNMP_OUTPUT, record.outputIface());

return message;
}

public static Message toMessage(NetFlowV9Header header,
NetFlowV9BaseRecord record,
@Nullable InetSocketAddress sender) {
final String source = sender == null ? null : sender.getAddress().getHostAddress();
final long timestamp = header.unixSecs() * 1000L;
final Message message = new Message(toMessageString(record), source, new DateTime(timestamp, DateTimeZone.UTC));

final Map<String, Object> fields = record.fields();

message.addField(MF_VERSION, 9);
fields.forEach((key, value) -> message.addField("nf_" + key, value));

final String srcAddr = (String) fields.get("ipv4_src_addr");
final String dstAddr = (String) fields.get("ipv4_dst_addr");
final Object srcPort = fields.get("l4_src_port");
final Object dstPort = fields.get("l4_dst_port");
final String ipv4NextHop = (String) fields.get("ipv4_next_hop");
final Long first = (Long) fields.get("first_switched");
final Long last = (Long) fields.get("last_switched");

message.addField(MF_FLOW_PACKET_ID, header.sequence());
message.addField(MF_TOS, fields.get("ip_tos"));
message.addField(MF_SRC_TOS, fields.get("ip_src_tos"));
message.addField(MF_DST_TOS, fields.get("ip_dst_tos"));
message.addField(MF_SRC, srcAddr + ":" + srcPort);
message.addField(MF_SRC_ADDRESS, srcAddr);
message.addField(MF_DST, dstAddr + ":" + dstPort);
message.addField(MF_DST_ADDRESS, dstAddr);
if (!ByteBufUtils.DEFAULT_INET_ADDRESS.getHostAddress().equals(ipv4NextHop)) {
message.addField(MF_NEXT_HOP, ipv4NextHop);
}
message.addField(MF_SRC_PORT, srcPort);
message.addField(MF_DST_PORT, dstPort);
message.addField(MF_SRC_MASK, fields.get("src_mask"));
message.addField(MF_DST_MASK, fields.get("dst_mask"));
message.addField(MF_SRC_AS, fields.get("src_as"));
message.addField(MF_DST_AS, fields.get("dst_as"));
final Object protocol = fields.get("protocol");
if (protocol != null) {
message.addField(MF_PROTO, protocol);
short protocolNumber = ((Number) protocol).shortValue();
final Protocol protocolInfo = Protocol.getByNumber(protocolNumber);
if (protocolInfo != null) {
message.addField(MF_PROTO_NAME, protocolInfo.getAlias());
}
}
message.addField(MF_TCP_FLAGS, fields.get("tcp_flags"));

if (first != null && first > 0) {
long start = timestamp - (header.sysUptime() - first);
message.addField(MF_START, new DateTime(start, DateTimeZone.UTC));
}
if (last != null && last > 0) {
long stop = timestamp - (header.sysUptime() - last);
message.addField(MF_STOP, new DateTime(stop, DateTimeZone.UTC));
}
message.addField(MF_BYTES, fields.get("in_bytes"));
message.addField(MF_PKTS, fields.get("in_pkts"));
message.addField(MF_SNMP_INPUT, fields.get("input_snmp"));
message.addField(MF_SNMP_OUTPUT, fields.get("output_snmp"));

return message;
}
}

+ 74
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/inputs/NetFlowUdpInput.java View File

@@ -0,0 +1,74 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.inputs;

import com.codahale.metrics.MetricRegistry;
import com.google.inject.assistedinject.Assisted;
import org.graylog.plugins.netflow.codecs.NetFlowCodec;
import org.graylog.plugins.netflow.transport.NetFlowUdpTransport;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;

import javax.inject.Inject;

public class NetFlowUdpInput extends MessageInput {
private static final String NAME = "NetFlow UDP";

@Inject
public NetFlowUdpInput(MetricRegistry metricRegistry,
@Assisted Configuration configuration,
NetFlowUdpTransport.Factory transportFactory,
NetFlowCodec.Factory codecFactory,
LocalMetricRegistry localMetricRegistry,
Config config,
Descriptor descriptor,
ServerStatus serverStatus) {
super(metricRegistry, configuration, transportFactory.create(configuration), localMetricRegistry,
codecFactory.create(configuration), config, descriptor, serverStatus);
}

@FactoryClass
public interface Factory extends MessageInput.Factory<NetFlowUdpInput> {
@Override
NetFlowUdpInput create(Configuration configuration);

@Override
Config getConfig();

@Override
Descriptor getDescriptor();
}

public static class Descriptor extends MessageInput.Descriptor {
@Inject
public Descriptor() {
super(NAME, false, "https://github.com/Graylog2/graylog-plugin-netflow");
}
}

@ConfigClass
public static class Config extends MessageInput.Config {
@Inject
public Config(NetFlowUdpTransport.Factory transport, NetFlowCodec.Factory codec) {
super(transport.getConfig(), codec.getConfig());
}
}
}

+ 80
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/transport/NetFlowUdpTransport.java View File

@@ -0,0 +1,80 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.transport;

import com.google.inject.assistedinject.Assisted;
import io.netty.channel.ChannelHandler;
import org.graylog.plugins.netflow.codecs.RemoteAddressCodecAggregator;
import org.graylog2.inputs.transports.NettyTransportConfiguration;
import org.graylog2.inputs.transports.UdpTransport;
import org.graylog2.inputs.transports.netty.EventLoopGroupFactory;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.inputs.util.ThroughputCounter;

import javax.inject.Inject;
import java.util.LinkedHashMap;
import java.util.concurrent.Callable;

/**
* This UDP transport is largely identical to its superclass, but replaces the codec aggregator and its handler with custom
* implementations that are able to pass the remote address.
*
* Without the remote address the NetFlow V9 code cannot distinguish between flows from different exporters and thus might
* handle template flows incorrectly should they differ between exporters.
*
* @see <a href="https://tools.ietf.org/html/rfc3954#section-5.1">RFC 3953 - Source ID</a>
*/
public class NetFlowUdpTransport extends UdpTransport {
@Inject
public NetFlowUdpTransport(@Assisted Configuration configuration,
EventLoopGroupFactory eventLoopGroupFactory,
NettyTransportConfiguration nettyTransportConfiguration,
ThroughputCounter throughputCounter,
LocalMetricRegistry localRegistry) {
super(configuration, eventLoopGroupFactory, nettyTransportConfiguration, throughputCounter, localRegistry);
}

@Override
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> handlers = new LinkedHashMap<>(super.getChannelHandlers(input));

// Replace the default "codec-aggregator" handler with one that passes the remote address
final RemoteAddressCodecAggregator aggregator = (RemoteAddressCodecAggregator) getAggregator();
handlers.replace("codec-aggregator", () -> new NetflowMessageAggregationHandler(aggregator, localRegistry));
handlers.remove("udp-datagram");

return handlers;
}

@FactoryClass
public interface Factory extends Transport.Factory<NetFlowUdpTransport> {
@Override
NetFlowUdpTransport create(Configuration configuration);

@Override
NetFlowUdpTransport.Config getConfig();
}

@ConfigClass
public static class Config extends UdpTransport.Config {
}
}

+ 65
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/transport/NetflowMessageAggregationHandler.java View File

@@ -0,0 +1,65 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.transport;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import org.graylog.plugins.netflow.codecs.RemoteAddressCodecAggregator;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

public class NetflowMessageAggregationHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private static final Logger LOG = LoggerFactory.getLogger(NetflowMessageAggregationHandler.class);

private final RemoteAddressCodecAggregator aggregator;
private final Timer aggregationTimer;
private final Meter invalidChunksMeter;

public NetflowMessageAggregationHandler(RemoteAddressCodecAggregator aggregator, MetricRegistry metricRegistry) {
this.aggregator = aggregator;
aggregationTimer = metricRegistry.timer("aggregationTime");
invalidChunksMeter = metricRegistry.meter("invalidMessages");
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
final SocketAddress remoteAddress = msg.sender();
final CodecAggregator.Result result;
try (Timer.Context ignored = aggregationTimer.time()) {
result = aggregator.addChunk(msg.content(), remoteAddress);
}
final ByteBuf completeMessage = result.getMessage();
if (completeMessage != null) {
LOG.debug("Message aggregation completion, forwarding {}", completeMessage);
ctx.fireChannelRead(completeMessage);
} else if (result.isValid()) {
LOG.debug("More chunks necessary to complete this message");
} else {
invalidChunksMeter.mark();
LOG.debug("Message chunk was not valid and discarded.");
}
}
}

+ 69
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/utils/ByteBufUtils.java View File

@@ -0,0 +1,69 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.utils;

import com.google.common.net.InetAddresses;
import io.netty.buffer.ByteBuf;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class ByteBufUtils {
public static final InetAddress DEFAULT_INET_ADDRESS = InetAddresses.forString("0.0.0.0");

public static long getUnsignedInteger(final ByteBuf buf, final int offset, final int length) {
switch (length) {
case 1:
return buf.getUnsignedByte(offset);
case 2:
return buf.getUnsignedShort(offset);
case 3:
return buf.getUnsignedMedium(offset);
case 4:
return buf.getUnsignedInt(offset);
case 8:
return buf.getLong(offset) & 0x00000000ffffffffL;
default:
return 0L;
}
}

public static InetAddress getInetAddress(final ByteBuf buf, final int offset, final int length) {
final byte[] data = new byte[length];
buf.getBytes(offset, data, 0, length);

return getInetAddress(data);
}

public static InetAddress readInetAddress(final ByteBuf buf) {
final byte[] data = new byte[4];
buf.readBytes(data);

return getInetAddress(data);
}

private static InetAddress getInetAddress(byte[] data) {
InetAddress address;
try {
address = InetAddress.getByAddress(data);
} catch (UnknownHostException e) {
address = DEFAULT_INET_ADDRESS;
}

return address;
}
}

+ 116
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/utils/Protocol.java View File

@@ -0,0 +1,116 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.utils;

import com.google.common.collect.ImmutableMap;

/**
* Can be used to lookup protocol numbers. Generated from a /etc/protocols file on Ubuntu 14.04 LTS.
*/
public enum Protocol {
IP("ip", 0, "IP"),
// HOPOPT("hopopt", 0, "HOPOPT"),
ICMP("icmp", 1, "ICMP"),
IGMP("igmp", 2, "IGMP"),
GGP("ggp", 3, "GGP"),
IPENCAP("ipencap", 4, "IP-ENCAP"),
ST("st", 5, "ST"),
TCP("tcp", 6, "TCP"),
EGP("egp", 8, "EGP"),
IGP("igp", 9, "IGP"),
PUP("pup", 12, "PUP"),
UDP("udp", 17, "UDP"),
HMP("hmp", 20, "HMP"),
XNS_IDP("xns-idp", 22, "XNS-IDP"),
RDP("rdp", 27, "RDP"),
ISO_TP4("iso-tp4", 29, "ISO-TP4"),
DCCP("dccp", 33, "DCCP"),
XTP("xtp", 36, "XTP"),
DDP("ddp", 37, "DDP"),
IDPR_CMTP("idpr-cmtp", 38, "IDPR-CMTP"),
IPV6("ipv6", 41, "IPv6"),
IPV6_ROUTE("ipv6-route", 43, "IPv6-Route"),
IPV6_FRAG("ipv6-frag", 44, "IPv6-Frag"),
IDRP("idrp", 45, "IDRP"),
RSVP("rsvp", 46, "RSVP"),
GRE("gre", 47, "GRE"),
ESP("esp", 50, "IPSEC-ESP"),
AH("ah", 51, "IPSEC-AH"),
SKIP("skip", 57, "SKIP"),
IPV6_ICMP("ipv6-icmp", 58, "IPv6-ICMP"),
IPV6_NONXT("ipv6-nonxt", 59, "IPv6-NoNxt"),
IPV6_OPTS("ipv6-opts", 60, "IPv6-Opts"),
RSPF("rspf", 73, "RSPF"),
VMTP("vmtp", 81, "VMTP"),
EIGRP("eigrp", 88, "EIGRP"),
OSPF("ospf", 89, "OSPFIGP"),
AX_25("ax.25", 93, "AX.25"),
IPIP("ipip", 94, "IPIP"),
ETHERIP("etherip", 97, "ETHERIP"),
ENCAP("encap", 98, "ENCAP"),
PIM("pim", 103, "PIM"),
IPCOMP("ipcomp", 108, "IPCOMP"),
VRRP("vrrp", 112, "VRRP"),
L2TP("l2tp", 115, "L2TP"),
ISIS("isis", 124, "ISIS"),
SCTP("sctp", 132, "SCTP"),
FC("fc", 133, "FC"),
MOBILITY_HEADER("mobility-header", 135, "Mobility-Header"),
UDPLITE("udplite", 136, "UDPLite"),
MPLS_IN_IP("mpls-in-ip", 137, "MPLS-in-IP"),
MANET("manet", 138, "#"),
HIP("hip", 139, "HIP"),
SHIM6("shim6", 140, "Shim6"),
WESP("wesp", 141, "WESP"),
ROHC("rohc", 142, "ROHC");

private final String name;
private final int number;
private final String alias;

private static final ImmutableMap<Integer, Protocol> ID_MAP;

static {
final ImmutableMap.Builder<Integer, Protocol> idMapBuilder = ImmutableMap.builder();
for (final Protocol protocol : values()) {
idMapBuilder.put(protocol.getNumber(), protocol);
}
ID_MAP = idMapBuilder.build();
}

Protocol(final String name, final int number, final String alias) {
this.name = name;
this.number = number;
this.alias = alias;
}

public String getAlias() {
return alias;
}

public String getName() {
return name;
}

public int getNumber() {
return number;
}

public static Protocol getByNumber(final int number) {
return ID_MAP.get(number);
}
}

+ 76
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/v5/NetFlowV5Header.java View File

@@ -0,0 +1,76 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.v5;

import com.google.auto.value.AutoValue;

@AutoValue
public abstract class NetFlowV5Header {
// bytes 0-1
public abstract int version();

// bytes 2-3
public abstract int count();

// bytes 4-7, milliseconds since device boot
public abstract long sysUptime();

// bytes 8-11, seconds since UTC 1970
public abstract long unixSecs();

// bytes 12-15, nanoseconds since UTC 1970
public abstract long unixNsecs();

// bytes 16-19, sequence counter of total flow seen
public abstract long flowSequence();

// bytes 20, type of flow switching engine
public abstract int engineType();

// bytes 21, slot number of the flow-switching engine
public abstract int engineId();

// bytes 22-23, first two bits hold the sampling mode, remaining 14 bits
// hold value of sampling interval
public abstract int samplingMode();

public abstract int samplingInterval();

static NetFlowV5Header create(int version,
int count,
long sysUptime,
long unixSecs,
long unixNsecs,
long flowSequence,
int engineType,
int engineId,
int samplingMode,
int samplingInterval
) {
return new AutoValue_NetFlowV5Header(
version,
count,
sysUptime,
unixSecs,
unixNsecs,
flowSequence,
engineType,
engineId,
samplingMode,
samplingInterval);
}
}

+ 35
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/v5/NetFlowV5Packet.java View File

@@ -0,0 +1,35 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.v5;

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;

import java.util.List;

@AutoValue
public abstract class NetFlowV5Packet {
public abstract NetFlowV5Header header();

public abstract ImmutableList<NetFlowV5Record> records();

public abstract long dataLength();

public static NetFlowV5Packet create(NetFlowV5Header header, List<NetFlowV5Record> records, long dataLength) {
return new AutoValue_NetFlowV5Packet(header, ImmutableList.copyOf(records), dataLength);
}
}

+ 144
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/v5/NetFlowV5Parser.java View File

@@ -0,0 +1,144 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.v5;

import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
import org.graylog.plugins.netflow.flows.CorruptFlowPacketException;
import org.graylog.plugins.netflow.flows.InvalidFlowVersionException;
import org.graylog.plugins.netflow.utils.ByteBufUtils;

import java.net.InetAddress;

public class NetFlowV5Parser {
private static final int HEADER_LENGTH = 24;
private static final int RECORD_LENGTH = 48;

public static NetFlowV5Packet parsePacket(ByteBuf bb) {
final int readableBytes = bb.readableBytes();

final NetFlowV5Header header = parseHeader(bb.slice(bb.readerIndex(), HEADER_LENGTH));
final int packetLength = HEADER_LENGTH + header.count() * RECORD_LENGTH;
if (header.count() <= 0 || readableBytes < packetLength) {
throw new CorruptFlowPacketException("Insufficient data (expected: " + packetLength + " bytes, actual: " + readableBytes + " bytes)");
}

final ImmutableList.Builder<NetFlowV5Record> records = ImmutableList.builder();
int offset = HEADER_LENGTH;
for (int i = 0; i < header.count(); i++) {
records.add(parseRecord(bb.slice(offset + bb.readerIndex(), RECORD_LENGTH)));
offset += RECORD_LENGTH;
}

return NetFlowV5Packet.create(header, records.build(), offset);
}

/**
* <pre>
* | BYTES | CONTENTS | DESCRIPTION |
* |-------|-------------------|------------------------------------------------------------------------------------------|
* | 0-1 | version | NetFlow export format version number |
* | 2-3 | count | Number of flows exported in this packet (1-30) |
* | 4-7 | sys_uptime | Current time in milliseconds since the export device booted |
* | 8-11 | unix_secs | Current count of seconds since 0000 UTC 1970 |
* | 12-15 | unix_nsecs | Residual nanoseconds since 0000 UTC 1970 |
* | 16-19 | flow_sequence | Sequence counter of total flows seen |
* | 20 | engine_type | Type of flow-switching engine |
* | 21 | engine_id | Slot number of the flow-switching engine |
* | 22-23 | sampling_interval | First two bits hold the sampling mode; remaining 14 bits hold value of sampling interval |
* </pre>
*/
private static NetFlowV5Header parseHeader(ByteBuf bb) {
final int version = bb.readUnsignedShort();
if (version != 5) {
throw new InvalidFlowVersionException(version);
}

final int count = bb.readUnsignedShort();
final long sysUptime = bb.readUnsignedInt();
final long unixSecs = bb.readUnsignedInt();
final long unixNsecs = bb.readUnsignedInt();
final long flowSequence = bb.readUnsignedInt();
final short engineType = bb.readUnsignedByte();
final short engineId = bb.readUnsignedByte();
final short sampling = bb.readShort();
final int samplingMode = (sampling >> 14) & 3;
final int samplingInterval = sampling & 0x3fff;

return NetFlowV5Header.create(
version,
count,
sysUptime,
unixSecs,
unixNsecs,
flowSequence,
engineType,
engineId,
samplingMode,
samplingInterval);
}

/**
* <pre>
* | BYTES | CONTENTS | DESCRIPTION |
* |-------|-----------|--------------------------------------------------------------------|
* | 0-3 | srcaddr | Source IP address |
* | 4-7 | dstaddr | Destination IP address |
* | 8-11 | nexthop | IP address of next hop router |
* | 12-13 | input | SNMP index of input interface |
* | 14-15 | output | SNMP index of output interface |
* | 16-19 | dPkts | Packets in the flow |
* | 20-23 | dOctets | Total number of Layer 3 bytes in the packets of the flow |
* | 24-27 | first | SysUptime at start of flow |
* | 28-31 | last | SysUptime at the time the last packet of the flow was received |
* | 32-33 | srcport | TCP/UDP source port number or equivalent |
* | 34-35 | dstport | TCP/UDP destination port number or equivalent |
* | 36 | pad1 | Unused (zero) bytes |
* | 37 | tcp_flags | Cumulative OR of TCP flags |
* | 38 | prot | IP protocol type (for example, TCP = 6; UDP = 17) |
* | 39 | tos | IP type of service (ToS) |
* | 40-41 | src_as | Autonomous system number of the source, either origin or peer |
* | 42-43 | dst_as | Autonomous system number of the destination, either origin or peer |
* | 44 | src_mask | Source address prefix mask bits |
* | 45 | dst_mask | Destination address prefix mask bits |
* | 46-47 | pad2 | Unused (zero) bytes |
* </pre>
*/
private static NetFlowV5Record parseRecord(ByteBuf bb) {
final InetAddress srcAddr = ByteBufUtils.readInetAddress(bb);
final InetAddress dstAddr = ByteBufUtils.readInetAddress(bb);
final InetAddress nextHop = ByteBufUtils.readInetAddress(bb);
final int inputIface = bb.readUnsignedShort();
final int outputIface = bb.readUnsignedShort();
final long packetCount = bb.readUnsignedInt();
final long octetCount = bb.readUnsignedInt();
final long first = bb.readUnsignedInt();
final long last = bb.readUnsignedInt();
final int srcPort = bb.readUnsignedShort();
final int dstPort = bb.readUnsignedShort();
bb.readByte(); // unused pad1
final short tcpFlags = bb.readUnsignedByte();
final short protocol = bb.readUnsignedByte();
final short tos = bb.readUnsignedByte();
final int srcAs = bb.readUnsignedShort();
final int dstAs = bb.readUnsignedShort();
final short srcMask = bb.readUnsignedByte();
final short dstMask = bb.readUnsignedByte();

return NetFlowV5Record.create(srcAddr, dstAddr, nextHop, inputIface, outputIface, packetCount, octetCount, first, last, srcPort, dstPort, tcpFlags, protocol, tos, srcAs, dstAs, srcMask, dstMask);
}
}

+ 101
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/v5/NetFlowV5Record.java View File

@@ -0,0 +1,101 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.v5;

import com.google.auto.value.AutoValue;

import java.net.InetAddress;

@AutoValue
public abstract class NetFlowV5Record {
// bytes 0-3
public abstract InetAddress srcAddr();

// bytes 4-7
public abstract InetAddress dstAddr();

// bytes 8-11
public abstract InetAddress nextHop();

// bytes 12-13, snmp index of input interface
public abstract int inputIface();

// bytes 14-15, snmp index of output interface
public abstract int outputIface();

// bytes 16-19, packets in flow
public abstract long packetCount();

// bytes 20-23, total number of L3 bytes in the packets of the flow
public abstract long octetCount();

// bytes 24-27, sysuptime at start of flow
public abstract long first();

// bytes 28-31, sysuptime at the time the last packet of the flow was
// received
public abstract long last();

// bytes 32-33
public abstract int srcPort();

// bytes 34-35
public abstract int dstPort();

// bytes 37
public abstract short tcpFlags();

// bytes 38, ip protocol type (e.g. tcp = 6, udp = 17)
public abstract short protocol();

// bytes 39, type of service
public abstract short tos();

// bytes 40-41, source AS number
public abstract int srcAs();

// bytes 42-43, destination AS number
public abstract int dstAs();

// bytes 44
public abstract short srcMask();

// bytes 45
public abstract short dstMask();

static NetFlowV5Record create(InetAddress srcAddr,
InetAddress dstAddr,
InetAddress nextHop,
int inputIface,
int outputIface,
long packetCount,
long octetCount,
long first,
long last,
int srcPort,
int dstPort,
short tcpFlags,
short protocol,
short tos,
int srcAs,
int dstAs,
short srcMask,
short dstMask) {
return new AutoValue_NetFlowV5Record(srcAddr, dstAddr, nextHop, inputIface, outputIface, packetCount, octetCount, first, last, srcPort, dstPort, tcpFlags, protocol, tos, srcAs, dstAs, srcMask, dstMask
);
}
}

+ 23
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9BaseRecord.java View File

@@ -0,0 +1,23 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.v9;

import com.google.common.collect.ImmutableMap;

public interface NetFlowV9BaseRecord {
ImmutableMap<String, Object> fields();
}

+ 113
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9FieldDef.java View File

@@ -0,0 +1,113 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.v9;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;
import io.netty.buffer.ByteBuf;

import java.math.BigInteger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Optional;

@JsonAutoDetect
@AutoValue
public abstract class NetFlowV9FieldDef {
@JsonProperty("type")
public abstract NetFlowV9FieldType type();

@JsonProperty("length")
public abstract int length();

@JsonCreator
public static NetFlowV9FieldDef create(@JsonProperty("type") NetFlowV9FieldType type, @JsonProperty("length") int length) {
return new AutoValue_NetFlowV9FieldDef(type, length);
}

public Optional<Object> parse(ByteBuf bb) {
int len = length() != 0 ? length() : type().valueType().getDefaultLength();
switch (type().valueType()) {
case UINT8:
case UINT16:
case UINT24:
case UINT32:
case UINT64:
return parseUnsignedNumber(bb, len);
case INT8:
return Optional.of(bb.readByte());
case INT16:
return Optional.of(bb.readShort());
case INT24:
return Optional.of(bb.readMedium());
case INT32:
return Optional.of(bb.readInt());
case INT64:
return Optional.of(bb.readLong());
case IPV4:
byte[] b = new byte[4];
bb.readBytes(b);
try {
return Optional.of(InetAddress.getByAddress(b).getHostAddress());
} catch (UnknownHostException e) {
return Optional.empty();
}
case IPV6:
byte[] b2 = new byte[16];
bb.readBytes(b2);
try {
return Optional.of(InetAddress.getByAddress(b2).getHostAddress());
} catch (UnknownHostException e) {
return Optional.empty();
}
case MAC:
byte[] b3 = new byte[6];
bb.readBytes(b3);
return Optional.of(String.format(Locale.ROOT, "%02x:%02x:%02x:%02x:%02x:%02x", b3[0], b3[1], b3[2], b3[3], b3[4], b3[5]));
case STRING:
byte[] b4 = new byte[len];
bb.readBytes(b4);
return Optional.of(new String(b4, StandardCharsets.UTF_8));
default:
return Optional.empty();
}
}

private Optional<Object> parseUnsignedNumber(ByteBuf bb, int length) {
switch (length) {
case 1:
return Optional.of(bb.readUnsignedByte());
case 2:
return Optional.of(bb.readUnsignedShort());
case 3:
return Optional.of(bb.readUnsignedMedium());
case 4:
return Optional.of(bb.readUnsignedInt());
case 8:
return Optional.of(bb.readLong());
default:
byte[] uint64Bytes = new byte[length];
bb.readBytes(uint64Bytes);
return Optional.of(new BigInteger(uint64Bytes));
}

}
}

+ 71
- 0
graylog2-server/src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9FieldType.java View File

@@ -0,0 +1,71 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.netflow.v9;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;