Browse Source

Migrate to Netty 4.1 (#4397)

* Migrate to Netty 4.1

* Revert changes to vendor-module-ids.json and yarn.lock

Ceterum censeo JavaScript esse delendam.

* Add support for native SSL (OpenSSL) bindings

* Wait for transport being started completely in UdpTransportTest

* Refactor getBaseChannelHandlers/getFinalChannelHandlers

* Upgrade to Netty 4.1.18.Final

* Fix throughput and connection counters for Netty-based transports

* Use read bytes for CodecAggregator.Result in GelfChunkAggregator

* Refactor NettyTransport#getLocalAddress()

* Upgrade to Netty 4.1.19.Final

* Allow configuring the number of worker threads per Netty transport

* Remove support for Netty OIO (old I/O) transports

* Remove outdated TODOs

* Use locale metric registry for transport-specific metrics
tags/3.0.0-alpha.0
Jochen Schalanda 1 year ago
parent
commit
6762727721
54 changed files with 2954 additions and 1887 deletions
  1. 1
    0
      config/forbidden-apis/netty3.txt
  2. 21
    6
      graylog-project-parent/pom.xml
  3. 37
    1
      graylog2-server/pom.xml
  4. 2
    2
      graylog2-server/src/main/java/org/graylog2/bootstrap/CmdLineTool.java
  5. 0
    2
      graylog2-server/src/main/java/org/graylog2/bootstrap/ServerBootstrap.java
  6. 6
    1
      graylog2-server/src/main/java/org/graylog2/commands/Server.java
  7. 8
    8
      graylog2-server/src/main/java/org/graylog2/inputs/codecs/GelfChunkAggregator.java
  8. 12
    23
      graylog2-server/src/main/java/org/graylog2/inputs/codecs/gelf/GELFMessageChunk.java
  9. 22
    27
      graylog2-server/src/main/java/org/graylog2/inputs/syslog/tcp/SyslogOctetCountFrameDecoder.java
  10. 27
    27
      graylog2-server/src/main/java/org/graylog2/inputs/syslog/tcp/SyslogTCPFramingRouterHandler.java
  11. 29
    140
      graylog2-server/src/main/java/org/graylog2/inputs/transports/HttpTransport.java
  12. 108
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/NettyTransportConfiguration.java
  13. 12
    36
      graylog2-server/src/main/java/org/graylog2/inputs/transports/SyslogTcpTransport.java
  14. 21
    57
      graylog2-server/src/main/java/org/graylog2/inputs/transports/TcpTransport.java
  15. 5
    11
      graylog2-server/src/main/java/org/graylog2/inputs/transports/TransportsModule.java
  16. 125
    27
      graylog2-server/src/main/java/org/graylog2/inputs/transports/UdpTransport.java
  17. 41
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/ChannelRegistrationHandler.java
  18. 45
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/DatagramChannelFactory.java
  19. 37
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/DatagramPacketHandler.java
  20. 85
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/EventLoopGroupFactory.java
  21. 46
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/EventLoopGroupProvider.java
  22. 51
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/ExceptionLoggingChannelHandler.java
  23. 94
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/HttpHandler.java
  24. 126
    117
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/LenientDelimiterBasedFrameDecoder.java
  25. 86
    62
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/LenientLineBasedFrameDecoder.java
  26. 60
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/MessageAggregationHandler.java
  27. 21
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/NettyTransportType.java
  28. 54
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/PromiseFailureHandler.java
  29. 53
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/RawMessageHandler.java
  30. 45
    0
      graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/ServerSocketChannelFactory.java
  31. 3
    3
      graylog2-server/src/main/java/org/graylog2/plugin/inputs/MessageInput.java
  32. 5
    5
      graylog2-server/src/main/java/org/graylog2/plugin/inputs/codecs/CodecAggregator.java
  33. 215
    86
      graylog2-server/src/main/java/org/graylog2/plugin/inputs/transports/AbstractTcpTransport.java
  34. 88
    246
      graylog2-server/src/main/java/org/graylog2/plugin/inputs/transports/NettyTransport.java
  35. 39
    28
      graylog2-server/src/main/java/org/graylog2/plugin/inputs/transports/util/KeyUtil.java
  36. 15
    41
      graylog2-server/src/main/java/org/graylog2/plugin/inputs/util/ConnectionCounter.java
  37. 9
    15
      graylog2-server/src/main/java/org/graylog2/plugin/inputs/util/PacketInformationDumper.java
  38. 23
    22
      graylog2-server/src/main/java/org/graylog2/plugin/inputs/util/ThroughputCounter.java
  39. 2
    2
      graylog2-server/src/main/java/org/graylog2/rest/resources/cluster/ClusterSystemShutdownResource.java
  40. 0
    2
      graylog2-server/src/main/java/org/graylog2/shared/bindings/GenericBindings.java
  41. 23
    23
      graylog2-server/src/test/java/org/graylog2/inputs/codecs/GelfChunkAggregatorTest.java
  42. 62
    52
      graylog2-server/src/test/java/org/graylog2/inputs/syslog/tcp/SyslogOctetCountFrameDecoderTest.java
  43. 74
    69
      graylog2-server/src/test/java/org/graylog2/inputs/syslog/tcp/SyslogTCPFramingRouterHandlerTest.java
  44. 0
    260
      graylog2-server/src/test/java/org/graylog2/inputs/transports/GELFHttpHandlerTest.java
  45. 0
    123
      graylog2-server/src/test/java/org/graylog2/inputs/transports/HttpTransportHandlerTest.java
  46. 104
    88
      graylog2-server/src/test/java/org/graylog2/inputs/transports/UdpTransportTest.java
  47. 203
    0
      graylog2-server/src/test/java/org/graylog2/inputs/transports/netty/HttpHandlerTest.java
  48. 192
    86
      graylog2-server/src/test/java/org/graylog2/inputs/transports/netty/LenientDelimiterBasedFrameDecoderTest.java
  49. 286
    120
      graylog2-server/src/test/java/org/graylog2/inputs/transports/netty/LenientLineBasedFrameDecoderTest.java
  50. 117
    66
      graylog2-server/src/test/java/org/graylog2/plugin/inputs/transports/AbstractTcpTransportTest.java
  51. 13
    2
      graylog2-server/src/test/java/org/graylog2/plugin/inputs/transports/util/KeyUtilTest.java
  52. 100
    0
      graylog2-server/src/test/java/org/graylog2/plugin/inputs/util/ConnectionCounterTest.java
  53. 99
    0
      graylog2-server/src/test/java/org/graylog2/plugin/inputs/util/ThroughputCounterTest.java
  54. 2
    1
      pom.xml

+ 1
- 0
config/forbidden-apis/netty3.txt View File

@@ -0,0 +1 @@
org.jboss.netty.** @ Migrate to Netty 4.x

+ 21
- 6
graylog-project-parent/pom.xml View File

@@ -67,6 +67,25 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>${netty.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty-tcnative-boringssl-static.version}</version>
<classifier>osx-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty-tcnative-boringssl-static.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
@@ -174,11 +193,6 @@
<artifactId>reflections</artifactId>
<version>${reflections.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
@@ -626,7 +640,7 @@
<!-- Use ALL the cores! -->
<forkCount>1C</forkCount>
<reuseForks>false</reuseForks>
<argLine>-Djava.library.path=${project.basedir}/../lib/sigar-${sigar.version}</argLine>
<argLine>-Djava.library.path=${project.basedir}/../lib/sigar-${sigar.version} -Dio.netty.leakDetectionLevel=paranoid</argLine>
<excludes>
<exclude>**/*IntegrationTest.java</exclude>
<exclude>**/*IT.java</exclude>
@@ -659,6 +673,7 @@
<!--bundledSignature>commons-io-unsafe-${commons-io.version}</bundledSignature-->
</bundledSignatures>
<signaturesFiles>
<signaturesFile>${project.basedir}/../config/forbidden-apis/netty3.txt</signaturesFile>
<signaturesFile>${project.basedir}/../config/forbidden-apis/signatures.txt</signaturesFile>
</signaturesFiles>
</configuration>

+ 37
- 1
graylog2-server/pom.xml View File

@@ -404,7 +404,43 @@

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<artifactId>netty-common</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>

<dependency>

+ 2
- 2
graylog2-server/src/main/java/org/graylog2/bootstrap/CmdLineTool.java View File

@@ -42,6 +42,8 @@ import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import com.google.inject.spi.Message;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
@@ -61,8 +63,6 @@ import org.graylog2.shared.bindings.PluginBindings;
import org.graylog2.shared.plugins.ChainingClassLoader;
import org.graylog2.shared.plugins.PluginLoader;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


+ 0
- 2
graylog2-server/src/main/java/org/graylog2/bootstrap/ServerBootstrap.java View File

@@ -31,7 +31,6 @@ import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.shared.bindings.GenericBindings;
import org.graylog2.shared.bindings.GenericInitializerBindings;
import org.graylog2.shared.bindings.MessageInputBindings;
import org.graylog2.shared.bindings.SchedulerBindings;
import org.graylog2.shared.bindings.ServerStatusBindings;
import org.graylog2.shared.bindings.SharedPeriodicalBindings;
@@ -199,7 +198,6 @@ public abstract class ServerBootstrap extends CmdLineTool {
result.add(new SharedPeriodicalBindings());
result.add(new SchedulerBindings());
result.add(new GenericInitializerBindings());
result.add(new MessageInputBindings());
result.add(new SystemStatsModule(configuration.isDisableSigar()));

return result;

+ 6
- 1
graylog2-server/src/main/java/org/graylog2/commands/Server.java View File

@@ -54,6 +54,7 @@ import org.graylog2.decorators.DecoratorBindings;
import org.graylog2.indexer.IndexerBindings;
import org.graylog2.indexer.retention.RetentionStrategyBindings;
import org.graylog2.indexer.rotation.RotationStrategyBindings;
import org.graylog2.inputs.transports.NettyTransportConfiguration;
import org.graylog2.messageprocessors.MessageProcessorModule;
import org.graylog2.migrations.MigrationsModule;
import org.graylog2.notifications.Notification;
@@ -63,6 +64,7 @@ import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.shared.UI;
import org.graylog2.shared.bindings.MessageInputBindings;
import org.graylog2.shared.bindings.ObjectMapperModule;
import org.graylog2.shared.bindings.RestApiBindings;
import org.graylog2.shared.system.activities.Activity;
@@ -93,6 +95,7 @@ public class Server extends ServerBootstrap {
private final MongoDbConfiguration mongoDbConfiguration = new MongoDbConfiguration();
private final VersionCheckConfiguration versionCheckConfiguration = new VersionCheckConfiguration();
private final KafkaJournalConfiguration kafkaJournalConfiguration = new KafkaJournalConfiguration();
private final NettyTransportConfiguration nettyTransportConfiguration = new NettyTransportConfiguration();

public Server() {
super("server", configuration);
@@ -117,6 +120,7 @@ public class Server extends ServerBootstrap {
new MessageProcessorModule(),
new AlarmCallbackBindings(),
new InitializerBindings(),
new MessageInputBindings(),
new MessageOutputBindings(configuration, chainingClassLoader),
new RotationStrategyBindings(),
new RetentionStrategyBindings(),
@@ -145,7 +149,8 @@ public class Server extends ServerBootstrap {
emailConfiguration,
mongoDbConfiguration,
versionCheckConfiguration,
kafkaJournalConfiguration);
kafkaJournalConfiguration,
nettyTransportConfiguration);
}

@Override

+ 8
- 8
graylog2-server/src/main/java/org/graylog2/inputs/codecs/GelfChunkAggregator.java View File

@@ -21,12 +21,12 @@ import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.graylog2.inputs.codecs.gelf.GELFMessage;
import org.graylog2.inputs.codecs.gelf.GELFMessageChunk;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -85,13 +85,13 @@ public class GelfChunkAggregator implements CodecAggregator {

@Nonnull
@Override
public Result addChunk(ChannelBuffer buffer) {
public Result addChunk(ByteBuf buffer) {
final byte[] readable = new byte[buffer.readableBytes()];
buffer.toByteBuffer().get(readable, buffer.readerIndex(), buffer.readableBytes());
buffer.readBytes(readable, buffer.readerIndex(), buffer.readableBytes());

final GELFMessage msg = new GELFMessage(readable);

final ChannelBuffer aggregatedBuffer;
final ByteBuf aggregatedBuffer;
switch (msg.getGELFType()) {
case CHUNKED:
try {
@@ -108,7 +108,7 @@ public class GelfChunkAggregator implements CodecAggregator {
case ZLIB:
case GZIP:
case UNCOMPRESSED:
aggregatedBuffer = buffer;
aggregatedBuffer = Unpooled.wrappedBuffer(readable);
break;
case UNSUPPORTED:
return INVALID_RESULT;
@@ -127,7 +127,7 @@ public class GelfChunkAggregator implements CodecAggregator {
* @return null or a {@link org.graylog2.plugin.journal.RawMessage raw message} object
*/
@Nullable
private ChannelBuffer checkForCompletion(GELFMessage gelfMessage) {
private ByteBuf checkForCompletion(GELFMessage gelfMessage) {
if (!chunks.isEmpty() && log.isDebugEnabled()) {
log.debug("Dumping GELF chunk map [chunks for {} messages]:\n{}", chunks.size(), humanReadableChunkMap());
}
@@ -178,7 +178,7 @@ public class GelfChunkAggregator implements CodecAggregator {
}
}
completeMessages.inc();
return ChannelBuffers.wrappedBuffer(allChunks);
return Unpooled.wrappedBuffer(allChunks);
}

// message isn't complete yet, check if we should remove the other parts as well

+ 12
- 23
graylog2-server/src/main/java/org/graylog2/inputs/codecs/gelf/GELFMessageChunk.java View File

@@ -16,10 +16,11 @@
*/
package org.graylog2.inputs.codecs.gelf;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.inputs.MessageInput;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

public final class GELFMessageChunk {

@@ -54,14 +55,14 @@ public final class GELFMessageChunk {
private int sequenceCount = -1;
private long arrival = -1L;

private final ChannelBuffer payload;
private final ByteBuf payload;
private final MessageInput sourceInput;

public GELFMessageChunk(final byte[] payload, MessageInput sourceInput) {
if (payload.length < HEADER_TOTAL_LENGTH) {
throw new IllegalArgumentException("This GELF message chunk is too short. Cannot even contain the required header.");
}
this.payload = ChannelBuffers.wrappedBuffer(payload);
this.payload = Unpooled.wrappedBuffer(payload);
this.sourceInput = sourceInput;
read();
}
@@ -108,12 +109,10 @@ public final class GELFMessageChunk {
this.arrival = Tools.nowUTC().getMillis();
}

private String extractId() {
private void extractId() {
if (this.id == null) {
this.id = ChannelBuffers.hexDump(payload, HEADER_PART_HASH_START, HEADER_PART_HASH_LENGTH);
this.id = ByteBufUtil.hexDump(payload, HEADER_PART_HASH_START, HEADER_PART_HASH_LENGTH);
}

return this.id;
}

// lol duplication
@@ -152,20 +151,10 @@ public final class GELFMessageChunk {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();

sb.append("ID: ");
sb.append(this.id);
sb.append("\tSequence: ");
sb.append(this.sequenceNumber + 1); // +1 for readability: 1/2 not 0/2
sb.append("/");
sb.append(this.sequenceCount);
sb.append("\tArrival: ");
sb.append(this.arrival);
sb.append("\tData size: ");
sb.append(this.payload.readableBytes());

return sb.toString();
return "ID: " + this.id +
"\tSequence: " + (this.sequenceNumber + 1) + // +1 for readability: 1/2 not 0/2
"/" + this.sequenceCount +
"\tArrival: " + this.arrival +
"\tData size: " + this.payload.readableBytes();
}

}

+ 22
- 27
graylog2-server/src/main/java/org/graylog2/inputs/syslog/tcp/SyslogOctetCountFrameDecoder.java View File

@@ -16,28 +16,30 @@
*/
package org.graylog2.inputs.syslog.tcp;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.ByteProcessor;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* Implements a Netty {@link FrameDecoder} for the Syslog octet counting framing. (RFC6587)
* Implements a Netty {@link ByteToMessageDecoder} for the Syslog octet counting framing. (RFC6587)
*
* @see <a href="http://tools.ietf.org/html/rfc6587#section-3.4.1">RFC6587 Octet Counting</a>
*/
public class SyslogOctetCountFrameDecoder extends FrameDecoder {
public class SyslogOctetCountFrameDecoder extends ByteToMessageDecoder {
private static final ByteProcessor BYTE_PROCESSOR = value -> value != ' ';

@Override
protected Object decode(final ChannelHandlerContext ctx,
final Channel channel,
final ChannelBuffer buffer) throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
final int frameSizeValueLength = findFrameSizeValueLength(buffer);

// We have not found the frame length value byte size yet.
if (frameSizeValueLength <= 0) {
return null;
return;
}

// Convert the frame length value bytes into an integer without mutating the buffer reader index.
@@ -49,18 +51,14 @@ public class SyslogOctetCountFrameDecoder extends FrameDecoder {
// the buffer has enough data to read the complete message.
if (buffer.readableBytes() - skipLength < length) {
// We cannot read the complete frame yet.
return null;
return;
} else {
// Skip the frame length value bytes and the whitespace that follows it.
buffer.skipBytes(skipLength);
}

final ChannelBuffer frame = extractFrame(buffer, buffer.readerIndex(), length);

// Advance the reader index because extractFrame() does not do that.
buffer.skipBytes(length);

return frame;
final ByteBuf frame = buffer.readRetainedSlice(length);
out.add(frame);
}

/**
@@ -69,17 +67,14 @@ public class SyslogOctetCountFrameDecoder extends FrameDecoder {
* @param buffer The channel buffer
* @return The length of the frame length value
*/
private int findFrameSizeValueLength(final ChannelBuffer buffer) {
final int n = buffer.writerIndex();
private int findFrameSizeValueLength(final ByteBuf buffer) {
final int readerIndex = buffer.readerIndex();
int index = buffer.forEachByte(BYTE_PROCESSOR);

for (int i = buffer.readerIndex(); i < n; i ++) {
final byte b = buffer.getByte(i);

if (b == ' ') {
return i - buffer.readerIndex();
}
if (index >= 0) {
return index - readerIndex;
} else {
return -1;
}

return -1; // Not found.
}
}

+ 27
- 27
graylog2-server/src/main/java/org/graylog2/inputs/syslog/tcp/SyslogTCPFramingRouterHandler.java View File

@@ -16,46 +16,46 @@
*/
package org.graylog2.inputs.syslog.tcp;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;

public class SyslogTCPFramingRouterHandler extends SimpleChannelUpstreamHandler {
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.util.ReferenceCountUtil;

public class SyslogTCPFramingRouterHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final int maxFrameLength;
private final ChannelBuffer[] delimiter;
private boolean routed = false;
private final ByteBuf[] delimiter;
private ChannelInboundHandler handler = null;

public SyslogTCPFramingRouterHandler(int maxFrameLength, ChannelBuffer[] delimiter) {
public SyslogTCPFramingRouterHandler(int maxFrameLength, ByteBuf[] delimiter) {
this.maxFrameLength = maxFrameLength;
this.delimiter = delimiter;
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
final ChannelBuffer message = (ChannelBuffer) e.getMessage();

if (! message.readable()) {
return;
}

if (! routed) {
if (usesOctetCountFraming(message)) {
ctx.getPipeline().addAfter(ctx.getName(), "framer-octet", new SyslogOctetCountFrameDecoder());
} else {
ctx.getPipeline().addAfter(ctx.getName(), "framer-delimiter", new DelimiterBasedFrameDecoder(maxFrameLength, delimiter));
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
if (msg.isReadable()) {
// "Dynamically manipulating a pipeline is relatively an expensive operation."
// https://stackoverflow.com/a/28846565
if (handler == null) {
if (usesOctetCountFraming(msg)) {
handler = new SyslogOctetCountFrameDecoder();
} else {
handler = new DelimiterBasedFrameDecoder(maxFrameLength, delimiter);
}
}

routed = true;
handler.channelRead(ctx, ReferenceCountUtil.retain(msg));
} else {
ctx.fireChannelRead(msg);
}

ctx.sendUpstream(e);
}

private boolean usesOctetCountFraming(ChannelBuffer message) {
private boolean usesOctetCountFraming(ByteBuf message) {
// Octet counting framing needs to start with a non-zero digit.
// See: http://tools.ietf.org/html/rfc6587#section-3.4.1
return '0' < message.getByte(0) && message.getByte(0) <= '9';
final byte firstByte = message.getByte(0);
return '0' < firstByte && firstByte <= '9';
}
}

+ 29
- 140
graylog2-server/src/main/java/org/graylog2/inputs/transports/HttpTransport.java View File

@@ -16,13 +16,18 @@
*/
package org.graylog2.inputs.transports;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.github.joschi.jadconfig.util.Size;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.graylog2.inputs.transports.netty.EventLoopGroupFactory;
import org.graylog2.inputs.transports.netty.HttpHandler;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
@@ -34,120 +39,65 @@ import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.transports.AbstractTcpTransport;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.inputs.util.ConnectionCounter;
import org.graylog2.plugin.inputs.util.ThroughputCounter;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpContentDecompressor;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
import org.jboss.netty.util.HashedWheelTimer;

import javax.inject.Named;

import java.util.LinkedHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static com.codahale.metrics.MetricRegistry.name;
import static org.jboss.netty.channel.Channels.fireMessageReceived;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values;
import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;

public class HttpTransport extends AbstractTcpTransport {
static final int DEFAULT_MAX_INITIAL_LINE_LENGTH = 4096;
static final int DEFAULT_MAX_HEADER_SIZE = 8192;
static final int DEFAULT_MAX_CHUNK_SIZE = (int) Size.kilobytes(64L).toBytes();
static final int DEFAULT_IDLE_WRITER_TIMEOUT = 60;
private static final int DEFAULT_MAX_INITIAL_LINE_LENGTH = 4096;
private static final int DEFAULT_MAX_HEADER_SIZE = 8192;
private static final int DEFAULT_MAX_CHUNK_SIZE = (int) Size.kilobytes(64L).toBytes();
private static final int DEFAULT_IDLE_WRITER_TIMEOUT = 60;

static final String CK_ENABLE_CORS = "enable_cors";
static final String CK_MAX_CHUNK_SIZE = "max_chunk_size";
static final String CK_IDLE_WRITER_TIMEOUT = "idle_writer_timeout";

private final boolean enableCors;
private final HashedWheelTimer timer;
private final int maxChunkSize;
private final int idleWriterTimeout;

@AssistedInject
public HttpTransport(@Assisted Configuration configuration,
@Named("bossPool") Executor bossPool,
EventLoopGroup eventLoopGroup,
EventLoopGroupFactory eventLoopGroupFactory,
NettyTransportConfiguration nettyTransportConfiguration,
ThroughputCounter throughputCounter,
ConnectionCounter connectionCounter,
HashedWheelTimer timer,
LocalMetricRegistry localRegistry) {
super(configuration,
throughputCounter,
localRegistry,
bossPool,
executorService("worker", "http-transport-worker-%d", localRegistry),
connectionCounter);
eventLoopGroup,
eventLoopGroupFactory,
nettyTransportConfiguration);

enableCors = configuration.getBoolean(CK_ENABLE_CORS);

this.timer = timer;
int maxChunkSize = configuration.intIsSet(CK_MAX_CHUNK_SIZE) ? configuration.getInt(CK_MAX_CHUNK_SIZE) : DEFAULT_MAX_CHUNK_SIZE;
this.maxChunkSize = maxChunkSize <= 0 ? DEFAULT_MAX_CHUNK_SIZE : maxChunkSize;
this.idleWriterTimeout = configuration.intIsSet(CK_IDLE_WRITER_TIMEOUT) ? configuration.getInt(CK_IDLE_WRITER_TIMEOUT, DEFAULT_IDLE_WRITER_TIMEOUT) : DEFAULT_IDLE_WRITER_TIMEOUT;
}

private static Executor executorService(final String executorName, final String threadNameFormat, final MetricRegistry metricRegistry) {
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build();
return new InstrumentedExecutorService(
Executors.newCachedThreadPool(threadFactory),
metricRegistry,
name(HttpTransport.class, executorName, "executor-service"));
}

@Override
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getBaseChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> baseChannelHandlers =
super.getBaseChannelHandlers(input);
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getCustomChildChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> handlers = new LinkedHashMap<>();

if (idleWriterTimeout > 0) {
// Install read timeout handler to close idle connections after a timeout.
// This avoids dangling HTTP connections when the HTTP client does not close the connection properly.
// For details see: https://github.com/Graylog2/graylog2-server/issues/3223#issuecomment-270350500

baseChannelHandlers.put("read-timeout-handler", () -> new ReadTimeoutHandler(timer, idleWriterTimeout, TimeUnit.SECONDS));
handlers.put("read-timeout-handler", () -> new ReadTimeoutHandler(idleWriterTimeout, TimeUnit.SECONDS));
}

baseChannelHandlers.put("decoder", () -> new HttpRequestDecoder(DEFAULT_MAX_INITIAL_LINE_LENGTH, DEFAULT_MAX_HEADER_SIZE, maxChunkSize));
baseChannelHandlers.put("aggregator", () -> new HttpChunkAggregator(maxChunkSize));
baseChannelHandlers.put("encoder", HttpResponseEncoder::new);
baseChannelHandlers.put("decompressor", HttpContentDecompressor::new);

return baseChannelHandlers;
}

@Override
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getFinalChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> handlers = Maps.newLinkedHashMap();

handlers.put("http-handler", () -> new Handler(enableCors));
handlers.put("decoder", () -> new HttpRequestDecoder(DEFAULT_MAX_INITIAL_LINE_LENGTH, DEFAULT_MAX_HEADER_SIZE, maxChunkSize));
handlers.put("aggregator", () -> new HttpObjectAggregator(maxChunkSize));
handlers.put("encoder", HttpResponseEncoder::new);
handlers.put("decompressor", HttpContentDecompressor::new);
handlers.put("http-handler", () -> new HttpHandler(enableCors));
handlers.putAll(super.getCustomChildChannelHandlers(input));

handlers.putAll(super.getFinalChannelHandlers(input));
return handlers;
}

@@ -183,66 +133,5 @@ public class HttpTransport extends AbstractTcpTransport {
return r;
}
}
public static class Handler extends SimpleChannelHandler {

private final boolean enableCors;

public Handler(boolean enableCors) {
this.enableCors = enableCors;
}

@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception {
final Channel channel = e.getChannel();
final HttpRequest request = (HttpRequest) e.getMessage();
final boolean keepAlive = isKeepAlive(request);
final HttpVersion httpRequestVersion = request.getProtocolVersion();
final String origin = request.headers().get(Names.ORIGIN);

// to allow for future changes, let's be at least a little strict in what we accept here.
if (HttpMethod.OPTIONS.equals(request.getMethod())) {
writeResponse(channel, keepAlive, httpRequestVersion, OK, origin);
return;
} else if (!HttpMethod.POST.equals(request.getMethod())) {
writeResponse(channel, keepAlive, httpRequestVersion, METHOD_NOT_ALLOWED, origin);
return;
}

final ChannelBuffer buffer = request.getContent();

final boolean correctPath = "/gelf".equals(request.getUri());

if (!correctPath) {
writeResponse(channel, keepAlive, httpRequestVersion, NOT_FOUND, origin);
} else {
// send on to raw message handler
writeResponse(channel, keepAlive, httpRequestVersion, ACCEPTED, origin);
fireMessageReceived(ctx, buffer);
}
}

private void writeResponse(Channel channel,
boolean keepAlive,
HttpVersion httpRequestVersion,
HttpResponseStatus status,
String origin) {
final HttpResponse response =
new DefaultHttpResponse(httpRequestVersion, status);

response.headers().set(Names.CONTENT_LENGTH, 0);
response.headers().set(Names.CONNECTION,
keepAlive ? Values.KEEP_ALIVE : Values.CLOSE);

if (enableCors && origin != null && !origin.isEmpty()) {
response.headers().set(Names.ACCESS_CONTROL_ALLOW_ORIGIN, origin);
response.headers().set(Names.ACCESS_CONTROL_ALLOW_CREDENTIALS, true);
response.headers().set(Names.ACCESS_CONTROL_ALLOW_HEADERS, "Authorization, Content-Type");
}

final ChannelFuture channelFuture = channel.write(response);
if (!keepAlive) {
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
}
}
}

+ 108
- 0
graylog2-server/src/main/java/org/graylog2/inputs/transports/NettyTransportConfiguration.java View File

@@ -0,0 +1,108 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.transports;

import com.github.joschi.jadconfig.Parameter;
import com.github.joschi.jadconfig.validators.PositiveIntegerValidator;
import com.github.joschi.jadconfig.validators.StringNotBlankValidator;
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslProvider;
import org.graylog2.inputs.transports.netty.NettyTransportType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Locale;

public class NettyTransportConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(NettyTransportConfiguration.class);
private static final String PREFIX = "transport_netty_";

@Parameter(value = PREFIX + "type", required = true, validators = StringNotBlankValidator.class)
private String type = "auto";

@Parameter(value = PREFIX + "tls_provider", required = true, validators = StringNotBlankValidator.class)
private String tlsProvider = "auto";

@Parameter(value = PREFIX + "num_threads", required = true, validators = PositiveIntegerValidator.class)
private int numThreads = Runtime.getRuntime().availableProcessors() * 2;

public NettyTransportConfiguration() {
}

@VisibleForTesting
public NettyTransportConfiguration(String type, String tlsProvider, int numThreads) {
this.type = type;
this.tlsProvider = tlsProvider;
this.numThreads = numThreads;
}

public NettyTransportType getType() {
switch (type.toLowerCase(Locale.ROOT)) {
case "epoll":
return NettyTransportType.EPOLL;
case "kqueue":
return NettyTransportType.KQUEUE;
case "nio":
return NettyTransportType.NIO;
case "auto":
default:
return detectPlatform();
}
}

private NettyTransportType detectPlatform() {
if (Epoll.isAvailable()) {
LOG.debug("Using epoll for Netty transport.");
return NettyTransportType.EPOLL;
} else if (KQueue.isAvailable()) {
LOG.debug("Using kqueue for Netty transport.");
return NettyTransportType.KQUEUE;
} else {
LOG.debug("Using NIO for Netty transport.");
return NettyTransportType.NIO;
}
}

public SslProvider getTlsProvider() {
switch (tlsProvider.toLowerCase(Locale.ROOT)) {
case "openssl":
return SslProvider.OPENSSL;
case "jdk":
return SslProvider.JDK;
case "auto":
default:
return detectTlsProvider();
}
}

private SslProvider detectTlsProvider() {
if (OpenSsl.isAvailable()) {
LOG.debug("Using OpenSSL for Netty transports.");
return SslProvider.OPENSSL;
} else {
LOG.debug("Using default Java TLS provider for Netty transports.");
return SslProvider.JDK;
}
}

public int getNumThreads() {
return numThreads;
}
}

+ 12
- 36
graylog2-server/src/main/java/org/graylog2/inputs/transports/SyslogTcpTransport.java View File

@@ -16,69 +16,45 @@
*/
package org.graylog2.inputs.transports;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import org.graylog2.inputs.syslog.tcp.SyslogTCPFramingRouterHandler;
import org.graylog2.inputs.transports.netty.EventLoopGroupFactory;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.inputs.util.ConnectionCounter;
import org.graylog2.plugin.inputs.util.ThroughputCounter;
import org.jboss.netty.channel.ChannelHandler;

import javax.inject.Named;
import java.util.LinkedHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.codahale.metrics.MetricRegistry.name;

public class SyslogTcpTransport extends TcpTransport {
@AssistedInject
public SyslogTcpTransport(@Assisted Configuration configuration,
@Named("bossPool") Executor bossPool,
EventLoopGroup eventLoopGroup,
EventLoopGroupFactory eventLoopGroupFactory,
NettyTransportConfiguration nettyTransportConfiguration,
ThroughputCounter throughputCounter,
ConnectionCounter connectionCounter,
LocalMetricRegistry localRegistry) {
super(configuration,
bossPool,
executorService("worker", "syslog-tcp-transport-worker-%d", localRegistry),
eventLoopGroup,
eventLoopGroupFactory,
nettyTransportConfiguration,
throughputCounter,
connectionCounter,
localRegistry);
}

private static Executor executorService(final String executorName, final String threadNameFormat, final MetricRegistry metricRegistry) {
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build();
return new InstrumentedExecutorService(
Executors.newCachedThreadPool(threadFactory),
metricRegistry,
name(SyslogTcpTransport.class, executorName, "executor-service"));
}


@Override
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getFinalChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> finalChannelHandlers = Maps.newLinkedHashMap();

finalChannelHandlers.putAll(super.getFinalChannelHandlers(input));
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getCustomChildChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> finalChannelHandlers = new LinkedHashMap<>(super.getCustomChildChannelHandlers(input));

// Replace the "framer" channel handler inserted by the parent.
finalChannelHandlers.put("framer", new Callable<ChannelHandler>() {
@Override
public ChannelHandler call() throws Exception {
return new SyslogTCPFramingRouterHandler(maxFrameLength, delimiter);
}
});
finalChannelHandlers.replace("framer", () -> new SyslogTCPFramingRouterHandler(maxFrameLength, delimiter));

return finalChannelHandlers;
}

+ 21
- 57
graylog2-server/src/main/java/org/graylog2/inputs/transports/TcpTransport.java View File

@@ -16,12 +16,12 @@
*/
package org.graylog2.inputs.transports;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import org.graylog2.inputs.transports.netty.EventLoopGroupFactory;
import org.graylog2.inputs.transports.netty.LenientDelimiterBasedFrameDecoder;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.configuration.Configuration;
@@ -34,78 +34,44 @@ import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.transports.AbstractTcpTransport;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.inputs.util.ConnectionCounter;
import org.graylog2.plugin.inputs.util.ThroughputCounter;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;

import javax.inject.Named;
import java.util.LinkedHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.codahale.metrics.MetricRegistry.name;
import static org.jboss.netty.handler.codec.frame.Delimiters.lineDelimiter;
import static org.jboss.netty.handler.codec.frame.Delimiters.nulDelimiter;
import static io.netty.handler.codec.Delimiters.lineDelimiter;
import static io.netty.handler.codec.Delimiters.nulDelimiter;

public class TcpTransport extends AbstractTcpTransport {
public static final String CK_USE_NULL_DELIMITER = "use_null_delimiter";
private static final String CK_MAX_MESSAGE_SIZE = "max_message_size";
private static final int DEFAULT_MAX_FRAME_LENGTH = 2 * 1024 * 1024;

protected final ChannelBuffer[] delimiter;
protected final ByteBuf[] delimiter;
protected final int maxFrameLength;

@AssistedInject
public TcpTransport(@Assisted Configuration configuration,
@Named("bossPool") Executor bossPool,
EventLoopGroup eventLoopGroup,
EventLoopGroupFactory eventLoopGroupFactory,
NettyTransportConfiguration nettyTransportConfiguration,
ThroughputCounter throughputCounter,
ConnectionCounter connectionCounter,
LocalMetricRegistry localRegistry) {
this(configuration,
bossPool,
executorService("worker", "tcp-transport-worker-%d", localRegistry),
throughputCounter,
connectionCounter,
localRegistry);
}

protected TcpTransport(final Configuration configuration,
final Executor bossPool,
final Executor workerPool,
final ThroughputCounter throughputCounter,
final ConnectionCounter connectionCounter,
final LocalMetricRegistry localRegistry) {
super(configuration, throughputCounter, localRegistry, bossPool, workerPool, connectionCounter);
super(configuration, throughputCounter, localRegistry, eventLoopGroup, eventLoopGroupFactory, nettyTransportConfiguration);

final boolean nulDelimiter = configuration.getBoolean(CK_USE_NULL_DELIMITER);
this.delimiter = nulDelimiter ? nulDelimiter() : lineDelimiter();
this.maxFrameLength = configuration.getInt(CK_MAX_MESSAGE_SIZE, Config.DEFAULT_MAX_FRAME_LENGTH);
}

private static Executor executorService(final String executorName, final String threadNameFormat, final MetricRegistry metricRegistry) {
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build();
return new InstrumentedExecutorService(
Executors.newCachedThreadPool(threadFactory),
metricRegistry,
name(TcpTransport.class, executorName, "executor-service"));
this.maxFrameLength = configuration.getInt(CK_MAX_MESSAGE_SIZE, DEFAULT_MAX_FRAME_LENGTH);
}

@Override
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getFinalChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> finalChannelHandlers = Maps.newLinkedHashMap();

finalChannelHandlers.put("framer", new Callable<ChannelHandler>() {
@Override
public ChannelHandler call() throws Exception {
return new LenientDelimiterBasedFrameDecoder(maxFrameLength, delimiter);
}
});
finalChannelHandlers.putAll(super.getFinalChannelHandlers(input));

return finalChannelHandlers;
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getCustomChildChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> childChannelHandlers = new LinkedHashMap<>();

childChannelHandlers.put("framer", () -> new LenientDelimiterBasedFrameDecoder(maxFrameLength, delimiter));
childChannelHandlers.putAll(super.getCustomChildChannelHandlers(input));

return childChannelHandlers;
}


@@ -120,8 +86,6 @@ public class TcpTransport extends AbstractTcpTransport {

@ConfigClass
public static class Config extends AbstractTcpTransport.Config {
public static final int DEFAULT_MAX_FRAME_LENGTH = 2 * 1024 * 1024;

@Override
public ConfigurationRequest getRequestedConfiguration() {
final ConfigurationRequest x = super.getRequestedConfiguration();
@@ -138,7 +102,7 @@ public class TcpTransport extends AbstractTcpTransport {
new NumberField(
CK_MAX_MESSAGE_SIZE,
"Maximum message size",
2 * 1024 * 1024,
DEFAULT_MAX_FRAME_LENGTH,
"The maximum length of a message.",
ConfigurationField.Optional.OPTIONAL,
NumberField.Attribute.ONLY_POSITIVE

+ 5
- 11
graylog2-server/src/main/java/org/graylog2/inputs/transports/TransportsModule.java View File

@@ -16,15 +16,13 @@
*/
package org.graylog2.inputs.transports;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Names;
import io.netty.channel.EventLoopGroup;
import org.graylog2.inputs.transports.netty.EventLoopGroupFactory;
import org.graylog2.inputs.transports.netty.EventLoopGroupProvider;
import org.graylog2.plugin.inject.Graylog2Module;
import org.graylog2.plugin.inputs.transports.Transport;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class TransportsModule extends Graylog2Module {
@Override
protected void configure() {
@@ -39,11 +37,7 @@ public class TransportsModule extends Graylog2Module {
installTransport(mapBinder, "httppoll", HttpPollTransport.class);
installTransport(mapBinder, "syslog-tcp", SyslogTcpTransport.class);

// TODO Add instrumentation to ExecutorService and ThreadFactory
bind(Executor.class)
.annotatedWith(Names.named("bossPool"))
.toInstance(Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat("transport-boss-%d")
.build()));
bind(EventLoopGroupFactory.class).asEagerSingleton();
bind(EventLoopGroup.class).toProvider(EventLoopGroupProvider.class).asEagerSingleton();
}
}

+ 125
- 27
graylog2-server/src/main/java/org/graylog2/inputs/transports/UdpTransport.java View File

@@ -16,66 +16,134 @@
*/
package org.graylog2.inputs.transports;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.github.joschi.jadconfig.util.Size;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.unix.UnixChannelOption;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.graylog2.inputs.transports.netty.DatagramChannelFactory;
import org.graylog2.inputs.transports.netty.DatagramPacketHandler;
import org.graylog2.inputs.transports.netty.EventLoopGroupFactory;
import org.graylog2.inputs.transports.netty.NettyTransportType;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.transports.NettyTransport;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.inputs.util.ThroughputCounter;
import org.jboss.netty.bootstrap.Bootstrap;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.codahale.metrics.MetricRegistry.name;
import javax.annotation.Nullable;
import java.net.SocketAddress;
import java.util.LinkedHashMap;
import java.util.concurrent.Callable;

public class UdpTransport extends NettyTransport {
private static final Logger LOG = LoggerFactory.getLogger(UdpTransport.class);

private final Executor workerExecutor;
private final NettyTransportConfiguration nettyTransportConfiguration;
private final ChannelGroup channels;
private EventLoopGroup eventLoopGroup;
private Bootstrap bootstrap;

@AssistedInject
public UdpTransport(@Assisted Configuration configuration,
EventLoopGroupFactory eventLoopGroupFactory,
NettyTransportConfiguration nettyTransportConfiguration,
ThroughputCounter throughputCounter,
LocalMetricRegistry localRegistry) {
super(configuration, throughputCounter, localRegistry);
this.workerExecutor = executorService("worker", "udp-transport-worker-%d", localRegistry);
super(configuration, eventLoopGroupFactory, throughputCounter, localRegistry);
this.nettyTransportConfiguration = nettyTransportConfiguration;
this.channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

private static Executor executorService(final String executorName, final String threadNameFormat, final LocalMetricRegistry localRegistry) {
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build();
return new InstrumentedExecutorService(
Executors.newCachedThreadPool(threadFactory),
localRegistry,
name(UdpTransport.class, executorName, "executor-service"));
@VisibleForTesting
Bootstrap getBootstrap(MessageInput input) {
LOG.debug("Setting UDP receive buffer size to {} bytes", getRecvBufferSize());
final NettyTransportType transportType = nettyTransportConfiguration.getType();

eventLoopGroup = eventLoopGroupFactory.create(workerThreads, localRegistry, "workers");

return new Bootstrap()
.group(eventLoopGroup)
.channelFactory(new DatagramChannelFactory(transportType))
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(getRecvBufferSize()))
.option(ChannelOption.SO_RCVBUF, getRecvBufferSize())
.option(UnixChannelOption.SO_REUSEPORT, true)
.handler(getChannelInitializer(getChannelHandlers(input)))
.validate();
}

@Override
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> handlers = new LinkedHashMap<>(super.getChannelHandlers(input));
handlers.put("traffic-counter", () -> throughputCounter);
handlers.put("udp-datagram", () -> DatagramPacketHandler.INSTANCE);
handlers.putAll(getChildChannelHandlers(input));

return handlers;
}

@Override
public Bootstrap getBootstrap() {
final ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(new NioDatagramChannelFactory(workerExecutor));
public void launch(final MessageInput input) throws MisfireException {
try {
bootstrap = getBootstrap(input);

final int recvBufferSize = Ints.saturatedCast(getRecvBufferSize());
LOG.debug("Setting receive buffer size to {} bytes", recvBufferSize);
bootstrap.setOption("receiveBufferSizePredictorFactory", new FixedReceiveBufferSizePredictorFactory(recvBufferSize));
bootstrap.setOption("receiveBufferSize", recvBufferSize);
final NettyTransportType transportType = nettyTransportConfiguration.getType();
int numChannels = (transportType == NettyTransportType.EPOLL || transportType == NettyTransportType.KQUEUE) ? workerThreads : 1;
for (int i = 0; i < numChannels; i++) {
LOG.debug("Starting channel on {}", socketAddress);
bootstrap.bind(socketAddress)
.addListener(new InputLaunchListener(channels, input, getRecvBufferSize()))
.syncUninterruptibly();
}
} catch (Exception e) {
throw new MisfireException(e);
}
}

return bootstrap;

@Override
public void stop() {
if (channels != null) {
channels.close().syncUninterruptibly();
}
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
bootstrap = null;
}

@Nullable
@Override
public SocketAddress getLocalAddress() {
if (channels != null) {
return channels.stream().findFirst().map(Channel::localAddress).orElse(null);
}

return null;
}


@FactoryClass
public interface Factory extends Transport.Factory<UdpTransport> {
@Override
@@ -97,4 +165,34 @@ public class UdpTransport extends NettyTransport {
return r;
}
}

private static class InputLaunchListener implements ChannelFutureListener {
private final ChannelGroup channels;
private final MessageInput input;
private final int expectedRecvBufferSize;

public InputLaunchListener(ChannelGroup channels, MessageInput input, int expectedRecvBufferSize) {
this.channels = channels;
this.input = input;
this.expectedRecvBufferSize = expectedRecvBufferSize;
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
final Channel channel = future.channel();
channels.add(channel);
LOG.debug("Started channel {}", channel);

final DatagramChannelConfig channelConfig = (DatagramChannelConfig) channel.config();
final int receiveBufferSize = channelConfig.getReceiveBufferSize();
if (receiveBufferSize != expectedRecvBufferSize) {
LOG.warn("receiveBufferSize (SO_RCVBUF) for input {} (channel {}) should be {} but is {}.",
input, channel, expectedRecvBufferSize, receiveBufferSize);
}
} else {
LOG.warn("Failed to start channel for input {}", input, future.cause());
}
}
}
}

+ 41
- 0
graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/ChannelRegistrationHandler.java View File

@@ -0,0 +1,41 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.transports.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;

public class ChannelRegistrationHandler extends ChannelInboundHandlerAdapter {
private final ChannelGroup channels;

public ChannelRegistrationHandler(ChannelGroup channels) {
this.channels = channels;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channels.remove(ctx.channel());
super.channelInactive(ctx);
}
}

+ 45
- 0
graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/DatagramChannelFactory.java View File

@@ -0,0 +1,45 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.transports.netty;

import io.netty.channel.ChannelFactory;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.kqueue.KQueueDatagramChannel;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;

public class DatagramChannelFactory implements ChannelFactory<DatagramChannel> {
private final NettyTransportType transportType;

public DatagramChannelFactory(NettyTransportType transportType) {
this.transportType = transportType;
}

@Override
public DatagramChannel newChannel() {
switch (transportType) {
case EPOLL:
return new EpollDatagramChannel();
case KQUEUE:
return new KQueueDatagramChannel();
case NIO:
return new NioDatagramChannel();
default:
throw new IllegalArgumentException("Invalid or unknown Netty transport type " + transportType);
}
}
}

+ 37
- 0
graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/DatagramPacketHandler.java View File

@@ -0,0 +1,37 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.transports.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.ReferenceCountUtil;

import java.util.List;

@ChannelHandler.Sharable
public class DatagramPacketHandler extends MessageToMessageDecoder<DatagramPacket> {
public static final DatagramPacketHandler INSTANCE = new DatagramPacketHandler();

@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out) throws Exception {
final ByteBuf content = msg.content();
out.add(ReferenceCountUtil.retain(content));
}
}

+ 85
- 0
graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/EventLoopGroupFactory.java View File

@@ -0,0 +1,85 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.transports.netty;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.InstrumentedThreadFactory;
import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.graylog2.inputs.transports.NettyTransportConfiguration;
import org.graylog2.plugin.LocalMetricRegistry;

import javax.inject.Inject;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class EventLoopGroupFactory {
private final NettyTransportConfiguration configuration;

@Inject
public EventLoopGroupFactory(NettyTransportConfiguration configuration) {
this.configuration = configuration;
}

public EventLoopGroup create(int numThreads, MetricRegistry metricRegistry, String metricPrefix) {
final ThreadFactory threadFactory = threadFactory(metricPrefix, metricRegistry);
final Executor executor = executor(metricPrefix, numThreads, threadFactory, metricRegistry);

switch (configuration.getType()) {
case EPOLL:
return epollEventLoopGroup(numThreads, executor);
case KQUEUE:
return kqueueEventLoopGroup(numThreads, executor);
case NIO:
return nioEventLoopGroup(numThreads, executor);
default:
throw new RuntimeException("Invalid or unknown netty transport type " + configuration.getType());
}
}

private ThreadFactory threadFactory(String name, MetricRegistry metricRegistry) {
final String threadFactoryMetricName = MetricRegistry.name(name, "thread-factory");
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("netty-transport-%d").build();
return new InstrumentedThreadFactory(threadFactory, metricRegistry, threadFactoryMetricName);

}

private Executor executor(final String name, int numThreads, final ThreadFactory threadFactory, final MetricRegistry metricRegistry) {
final String executorMetricName = LocalMetricRegistry.name(name, "executor-service");
final ExecutorService cachedThreadPool = Executors.newFixedThreadPool(numThreads, threadFactory);
return new InstrumentedExecutorService(cachedThreadPool, metricRegistry, executorMetricName);
}

private EventLoopGroup nioEventLoopGroup(int numThreads, Executor executor) {
return new NioEventLoopGroup(numThreads, executor);
}


private EventLoopGroup epollEventLoopGroup(int numThreads, Executor executor) {
return new EpollEventLoopGroup(numThreads, executor);
}

private EventLoopGroup kqueueEventLoopGroup(int numThreads, Executor executor) {
return new KQueueEventLoopGroup(numThreads, executor);
}
}

+ 46
- 0
graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/EventLoopGroupProvider.java View File

@@ -0,0 +1,46 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.transports.netty;

import com.codahale.metrics.MetricRegistry;
import io.netty.channel.EventLoopGroup;
import org.graylog2.inputs.transports.NettyTransportConfiguration;

import javax.inject.Inject;
import javax.inject.Provider;

public class EventLoopGroupProvider implements Provider<EventLoopGroup> {
private final EventLoopGroupFactory eventLoopGroupFactory;
private final NettyTransportConfiguration configuration;
private final MetricRegistry metricRegistry;

@Inject
public EventLoopGroupProvider(EventLoopGroupFactory eventLoopGroupFactory,
NettyTransportConfiguration configuration,
MetricRegistry metricRegistry) {
this.eventLoopGroupFactory = eventLoopGroupFactory;
this.configuration = configuration;
this.metricRegistry = metricRegistry;
}

@Override
public EventLoopGroup get() {
final String name = "netty-transport";
final int numThreads = configuration.getNumThreads();
return eventLoopGroupFactory.create(numThreads, metricRegistry, name);
}
}

+ 51
- 0
graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/ExceptionLoggingChannelHandler.java View File

@@ -0,0 +1,51 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.transports.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.graylog2.plugin.inputs.MessageInput;
import org.slf4j.Logger;

public class ExceptionLoggingChannelHandler extends ChannelInboundHandlerAdapter {
private final MessageInput input;
private final Logger logger;

public ExceptionLoggingChannelHandler(MessageInput input, Logger logger) {
this.input = input;
this.logger = logger;
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (logger.isTraceEnabled() && "Connection reset by peer".equals(cause.getMessage())) {
logger.trace("{} in Input [{}/{}] (channel {})",
cause.getMessage(),
input.getName(),
input.getId(),
ctx.channel());
} else {
logger.error("Error in Input [{}/{}] (channel {})",
input.getName(),
input.getId(),
ctx.channel(),
cause);
}

super.exceptionCaught(ctx, cause);
}
}

+ 94
- 0
graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/HttpHandler.java View File

@@ -0,0 +1,94 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.transports.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;

public class HttpHandler extends SimpleChannelInboundHandler<HttpRequest> {
private final boolean enableCors;

public HttpHandler(boolean enableCors) {
this.enableCors = enableCors;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception {
final Channel channel = ctx.channel();
final boolean keepAlive = HttpUtil.isKeepAlive(request);
final HttpVersion httpRequestVersion = request.protocolVersion();
final String origin = request.headers().get(HttpHeaderNames.ORIGIN);

// to allow for future changes, let's be at least a little strict in what we accept here.
if (HttpMethod.OPTIONS.equals(request.method())) {
writeResponse(channel, keepAlive, httpRequestVersion, HttpResponseStatus.OK, origin);
return;
} else if (!HttpMethod.POST.equals(request.method())) {
writeResponse(channel, keepAlive, httpRequestVersion, HttpResponseStatus.METHOD_NOT_ALLOWED, origin);
return;
}

final boolean correctPath = "/gelf".equals(request.uri());
if (correctPath && request instanceof FullHttpRequest) {
final FullHttpRequest fullHttpRequest = (FullHttpRequest) request;
final ByteBuf buffer = fullHttpRequest.content();

// send on to raw message handler
writeResponse(channel, keepAlive, httpRequestVersion, HttpResponseStatus.ACCEPTED, origin);
ctx.fireChannelRead(buffer);
} else {
writeResponse(channel, keepAlive, httpRequestVersion, HttpResponseStatus.NOT_FOUND, origin);
}
}

private void writeResponse(Channel channel,
boolean keepAlive,
HttpVersion httpRequestVersion,
HttpResponseStatus status,
String origin) {
final HttpResponse response = new DefaultHttpResponse(httpRequestVersion, status);

response.headers().set(HttpHeaderNames.CONTENT_LENGTH, 0);
response.headers().set(HttpHeaderNames.CONNECTION, keepAlive ? HttpHeaderValues.KEEP_ALIVE : HttpHeaderValues.CLOSE);

if (enableCors && origin != null && !origin.isEmpty()) {
response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, origin);
response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, true);
response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Authorization, Content-Type");
}

final ChannelFuture channelFuture = channel.writeAndFlush(response);

if (!keepAlive) {
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
}
}

+ 126
- 117
graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/LenientDelimiterBasedFrameDecoder.java View File

@@ -31,17 +31,16 @@
*/
package org.graylog2.inputs.transports.netty;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.jboss.netty.handler.codec.frame.LineBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.TooLongFrameException;

import java.util.List;

/**
* A decoder that splits the received {@link ChannelBuffer}s by one or more
* A decoder that splits the received {@link ByteBuf}s by one or more
* delimiters. It is particularly useful for decoding the frames which ends
* with a delimiter such as {@link Delimiters#nulDelimiter() NUL} or
* {@linkplain Delimiters#lineDelimiter() newline characters}.
@@ -61,7 +60,7 @@ import org.jboss.netty.handler.codec.frame.TooLongFrameException;
* | ABC\nDEF\r\n |
* +--------------+
* </pre>
* a {@link LenientDelimiterBasedFrameDecoder}{@code (}{@link Delimiters#lineDelimiter() Delimiters.lineDelimiter()}{@code )}
* a {@link LenientDelimiterBasedFrameDecoder}({@link Delimiters#lineDelimiter() Delimiters.lineDelimiter()})
* will choose {@code '\n'} as the first delimiter and produce two frames:
* <pre>
* +-----+-----+
@@ -74,120 +73,119 @@ import org.jboss.netty.handler.codec.frame.TooLongFrameException;
* | ABC\nDEF |
* +----------+
* </pre>
*
*
* @apiviz.uses org.jboss.netty.handler.codec.frame.Delimiters - - useful
*/
public class LenientDelimiterBasedFrameDecoder extends FrameDecoder {
public class LenientDelimiterBasedFrameDecoder extends ByteToMessageDecoder {

private final ChannelBuffer[] delimiters;
private final ByteBuf[] delimiters;
private final int maxFrameLength;
private final boolean stripDelimiter;
private final boolean failFast;
private final boolean emitLastLineWithoutDelimiter;
private boolean discardingTooLongFrame;
private int tooLongFrameLength;
/** Set only when decoding with "\n" and "\r\n" as the delimiter. */
/**
* Set only when decoding with "\n" and "\r\n" as the delimiter.
*/
private final LenientLineBasedFrameDecoder lineBasedDecoder;

/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param delimiter the delimiter
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param delimiter the delimiter
*/
public LenientDelimiterBasedFrameDecoder(int maxFrameLength, ChannelBuffer delimiter) {
public LenientDelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) {
this(maxFrameLength, true, delimiter);
}

/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param delimiter the delimiter
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param delimiter the delimiter
*/
public LenientDelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, ChannelBuffer delimiter) {
this(maxFrameLength, stripDelimiter, false, true, delimiter);
int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) {
this(maxFrameLength, stripDelimiter, true, delimiter);
}

/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
* thrown as soon as the decoder notices the length of the
* frame will exceed <tt>maxFrameLength</tt> regardless of
* whether the entire frame has been read.
* If <tt>false</tt>, a {@link TooLongFrameException} is
* thrown after the entire frame that exceeds
* <tt>maxFrameLength</tt> has been read.
* @param delimiter the delimiter
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
* thrown as soon as the decoder notices the length of the
* frame will exceed <tt>maxFrameLength</tt> regardless of
* whether the entire frame has been read.
* If <tt>false</tt>, a {@link TooLongFrameException} is
* thrown after the entire frame that exceeds
* <tt>maxFrameLength</tt> has been read.
* @param delimiter the delimiter
*/
public LenientDelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, boolean failFast,
ChannelBuffer delimiter) {
ByteBuf delimiter) {
this(maxFrameLength, stripDelimiter, failFast, true, delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes()));
}

/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param delimiters the delimiters
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param delimiters the delimiters
*/
public LenientDelimiterBasedFrameDecoder(int maxFrameLength, ChannelBuffer... delimiters) {
public LenientDelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters) {
this(maxFrameLength, true, delimiters);
}

/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param delimiters the delimiters
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param delimiters the delimiters
*/
public LenientDelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, ChannelBuffer... delimiters) {
this(maxFrameLength, stripDelimiter, false, true, delimiters);
int maxFrameLength, boolean stripDelimiter, ByteBuf... delimiters) {
this(maxFrameLength, stripDelimiter, true, true, delimiters);
}

/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
* thrown as soon as the decoder notices the length of the
* frame will exceed <tt>maxFrameLength</tt> regardless of
* whether the entire frame has been read.
* If <tt>false</tt>, a {@link TooLongFrameException} is
* thrown after the entire frame that exceeds
* <tt>maxFrameLength</tt> has been read.
* @param maxFrameLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
* thrown as soon as the decoder notices the length of the
* frame will exceed <tt>maxFrameLength</tt> regardless of
* whether the entire frame has been read.
* If <tt>false</tt>, a {@link TooLongFrameException} is
* thrown after the entire frame that exceeds
* <tt>maxFrameLength</tt> has been read.
* @param emitLastLineWithoutDelimiter emit the last line even if it doesn't
* end with the delimiter
* @param delimiters the delimiters
* @param delimiters the delimiters
*/
public LenientDelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, boolean failFast, boolean emitLastLineWithoutDelimiter, ChannelBuffer... delimiters) {
int maxFrameLength, boolean stripDelimiter, boolean failFast, boolean emitLastLineWithoutDelimiter, ByteBuf... delimiters) {
validateMaxFrameLength(maxFrameLength);
if (delimiters == null) {
throw new NullPointerException("delimiters");
@@ -200,9 +198,9 @@ public class LenientDelimiterBasedFrameDecoder extends FrameDecoder {
lineBasedDecoder = new LenientLineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast, emitLastLineWithoutDelimiter);
this.delimiters = null;
} else {
this.delimiters = new ChannelBuffer[delimiters.length];
for (int i = 0; i < delimiters.length; i ++) {
ChannelBuffer d = delimiters[i];
this.delimiters = new ByteBuf[delimiters.length];
for (int i = 0; i < delimiters.length; i++) {
ByteBuf d = delimiters[i];
validateDelimiter(d);
this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
}
@@ -214,20 +212,22 @@ public class LenientDelimiterBasedFrameDecoder extends FrameDecoder {
this.emitLastLineWithoutDelimiter = emitLastLineWithoutDelimiter;
}

/** Returns true if the delimiters are "\n" and "\r\n". */
private static boolean isLineBased(final ChannelBuffer[] delimiters) {
/**
* Returns true if the delimiters are "\n" and "\r\n".
*/
private static boolean isLineBased(final ByteBuf[] delimiters) {
if (delimiters.length != 2) {
return false;
}
ChannelBuffer a = delimiters[0];
ChannelBuffer b = delimiters[1];
ByteBuf a = delimiters[0];
ByteBuf b = delimiters[1];
if (a.capacity() < b.capacity()) {
a = delimiters[1];
b = delimiters[0];
}
return a.capacity() == 2 && b.capacity() == 1
&& a.getByte(0) == '\r' && a.getByte(1) == '\n'
&& b.getByte(0) == '\n';
&& a.getByte(0) == '\r' && a.getByte(1) == '\n'
&& b.getByte(0) == '\n';
}

/**
@@ -238,15 +238,29 @@ public class LenientDelimiterBasedFrameDecoder extends FrameDecoder {
}

@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}

/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param buffer the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created.
*/
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, channel, buffer);
return lineBasedDecoder.decode(ctx, buffer);
}
// Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE;
ChannelBuffer minDelim = null;
for (ChannelBuffer delim: delimiters) {
ByteBuf minDelim = null;
for (ByteBuf delim : delimiters) {
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
@@ -256,7 +270,7 @@ public class LenientDelimiterBasedFrameDecoder extends FrameDecoder {

if (minDelim != null) {
int minDelimLength = minDelim.capacity();
ChannelBuffer frame;
ByteBuf frame;

if (discardingTooLongFrame) {
// We've just finished discarding a very large frame.
@@ -267,7 +281,7 @@ public class LenientDelimiterBasedFrameDecoder extends FrameDecoder {
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(ctx, tooLongFrameLength);
fail(tooLongFrameLength);
}
return null;
}
@@ -275,21 +289,21 @@ public class LenientDelimiterBasedFrameDecoder extends FrameDecoder {
if (minFrameLength > maxFrameLength) {