Skip to content

Using POE to hook syslog-ng

Being able to do analysis, sorting, or database storage of syslog messages is incredibly useful. There are tons of solutions on the market to do just that. If you’re working on a system developed in house that you’d like to incorporate syslog messages into, then it may be easier to hook directly into the syslog stream than to introduce another piece of software into the environment which needs to be glued.

Syslog-ng facilitates easy integration with Perl binaries as the Perl program is spawned once during the daemon start up and a handle to that program’s STDIN is maintained for dispatching of messages. Using POE, we can turn this into an event driven model, making additional complexity simple.

In this example, we’ll create a POE Master session that receives all of the syslog-ng input from STDIN. Using off the shelf components, we’ll run a TCP Server on port 9514 that will allow clients to connect and subscribe to feeds based on the “program” name of the message being dispatched.

Anytime I’m using Regular Expressions over and over, I like to “precook” them. This compiles the regular expression, and lets the engine skip that step each time they’re used. Doing so is simply a matter of declaring the regex with the qr// operator:

my %cooked = (
	program => qr/\s+\d+:\d+:\d+\s+\S+\s+([^:\s]+)(:|\s)/,
);

Initialization

Next we’ll create the administrative session in charge of dispatching the messages to the proper channels:

# Dispatcher Master Session
POE::Session->create(
	inline_states => {
		_start					=> \&dispatcher_start,
		_stop					=> sub { print "SESSION ", $_[SESSION]->ID, " stopped.\n"; },
		register_client			=> \&register_client,
		subscribe_client		=> \&subscribe_client,
		hangup_client			=> \&hangup_client,

		dispatch_message		=> \&dispatch_message,
	},
);

We’ll define those subroutines shortly, but we need to setup the rest of our sessions. Next, we’ll need a TCP Server to handle the client connections, we can get that using POE::Component::Server::TCP:

# TCP Session Master
POE::Component::Server::TCP->new(
		Alias		=> 'server',
		Address		=> '127.0.0.1',
		Port		=> 9514,

		ClientConnected		=> \&client_connect,
		ClientInput			=> \&client_input,

		ClientDisconnected	=> \&client_term,
		ClientError			=> \&client_term,

		InlineStates		=> {
			client_print		=> \&client_print,
		},
);

The final session will handle the Input on STDIN from syslog-ng:

# Syslog-ng Stream Master
POE::Session->create(
		inline_states => {
			_start		=> \&stream_start,
			_stop		=> sub { print "SESSION ", $_[SESSION]->ID, " stopped.\n"; },
			stream_line		=> \&stream_line,
		},
);

Now we have to define the subroutines that we’ll be dispatching events to. The heavy lifting is done by POE, and we’re left to handle simple things.

Session Routines: dispatcher

This session is going to managing which clients receive which messages. The actual input is handled by the stream session, and the sending of the messages to the client by the server session. As we have a raw POE::Session, our first subroutine dispatcher_start is just going to do some basic preparation:

sub dispatcher_start {
	my ($kernel, $heap) = @_[KERNEL, HEAP];

	$kernel->alias_set( 'dispatcher' );  # allow named dispatch to this session.

	$heap->{subscribers} = {};
        $heap->{clients} = {};

}

Next event to be handled is the register_client event which is fired anytime a connection is established to the server session. All the dispatcher does is register it’s session_id into an internal heap. Nothing happens with it, but if we needed to send a message to all clients, we could loop over this hash and broadcast message.

sub register_client {
    # ARG0 => TCP Client Session ID
    my ($kernel,$heap,$sid) = @_[KERNEL,HEAP,ARG0];

    $heap->{clients}{$sid} = 1;
}

Clients can subscribe to a program name, which they do by entering “sub dhcpd, dnsmasq” into the TCP Server. It’s not fancy, but man is it convenient for debugging and development purposes. The server session determines that the subscription is occurring and passes it’s argument string to the dispatcher session via the subscribe_client event. This subroutine is called:

sub subscribe_client {
    # ARG0 => SID of Client
    # ARG1 => Argument String of the subscribe
	my ($kernel,$heap,$sid,$argstr) = @_[KERNEL,HEAP,ARG0,ARG1];

    # Split the input at commas or spaces into words:
	my @progs = map { lc } split /[\s,]+/, $argstr;
    # Add the SID to the list of Subscribed Clients for that program
	foreach my $prog (@progs) {
		$heap->{subscribers}{$prog}{$sid} = 1;
	}

    # Inform the client they've subscribed via client_print
	$kernel->post( $sid => 'client_print' => 'Subscribed to : ' . join(', ', @progs ) );
}

If a client disconnects, we remove it from the message dispatching hash:

sub hangup_client {
    # ARG0 => SID of Client Disconnecting
	my ($kernel,$heap,$sid) = @_[KERNEL,HEAP,ARG0];

	delete $heap->{clients}{$sid};

	foreach my $p ( keys %{ $heap->{subscribers} } ) {
		delete $heap->{subscribers}{$p}{$sid}
			if exists $heap->{subscribers}{$p}{$sid};
	}
}

Now comes the most important event the dispatcher handles, dispatch_message. In this event, we have a message from syslog-ng that needs to go to interested parties. This event determines the “program” and it’s subscribers and sends that message along appropriately:

sub dispatch_message {
    # ARG0 => The raw message from syslog-ng
	my ($kernel,$heap,$msg) = @_[KERNEL,HEAP,ARG0];

    # Determine the program name
	if( my ($program) = map { lc } ($msg =~ /$cooked{program}/) ) {
		# remove the sub process and PID from the program
		$program =~ s/\(.*//g;
		$program =~ s/\[.*//g;

        # If we have subscribers, send them the message.
		if( exists $heap->{subscribers}{$program} ) {
			foreach my $sid (keys %{ $heap->{subscribers}{$program} }) {
				$kernel->post( $sid => 'client_print' => $msg );
			}
		}
}

You'll notice on line 14 above, the post( $sid => client_print => $msg ) sends the event to the appropriate client and calls the client_print event on itself. This is all the dispatcher needs to do. The rest is handled by other other sessions.

Session Routines: server

This session accepts new tcp clients and handles writing to the sockets. We'll take a look at a few subroutines here. Fist we'll look at the ClientConnect event.

sub client_connect {
    # SESSION is the client's session object
	my ($kernel,$heap,$ses) = @_[KERNEL,HEAP,SESSION];

	my $SID = $ses->ID;

    # Register the Client with the Dispatcher
	$kernel->post( 'dispatcher' => 'register_client' => $SID );

    # Store the current entry for 'client' in the heap so we can communicate later
	$heap->{clients}{ $SID } = $heap->{client};

	# Say hello to the client.
	$heap->{client}->put( "Hello Client: $SID" );
}

We also need a disconnect event:

sub client_term {
	my ($kernel,$heap,$ses) = @_[KERNEL,HEAP,SESSION];
	my $sid = $ses->ID;

    # Delete the Client's Dispatch Table
	delete $heap->{dispatch}{$sid};
    # Tell the dispatcher session we're through
	$kernel->post( 'dispatcher' => 'hangup_client' =>  $sid );
}

Next we’ll handle sending message to the client, which is incredibly easy:

sub client_print {
    # ARG0 => Message to Send to the Client
	my ($kernel,$heap,$ses,$mesg) = @_[KERNEL,HEAP,SESSION,ARG0];

	$heap->{clients}{$ses->ID}->put($mesg);
}

Now we a routine to handle the ClientInput event. This event will take commands from the clients and do something with them. We’ll use an internal dispatch table in the form of a hash to handle translating commands. This will allow us to expand our API if we need to.

sub client_input {
    # SESSION is the Client Session Object with input
    # ARG0 => Input waiting from that client
	my ($kernel,$heap,$ses,$msg) = @_[KERNEL,HEAP,SESSION,ARG0];
	my $sid = $ses->ID;

    # Build a Dispatch Table if one does not exists in the heap for this entry.
	if( !exists $heap->{dispatch}{$sid} ) {
		$heap->{dispatch}{$sid} = {

			subscribe		=> {
				re			=> qr/^sub(?:scribe)? (.*)/,
				callback	=> sub {
					$kernel->post( 'dispatcher' => 'subscribe_client' => $sid, shift );
				},
			},
            # FUTURE API for Clients receiving every message!
			#fullfeed		=> {
			#	re			=> qr/^(fullfeed)/,
			#	callback	=> sub {
			#		$kernel->post( 'dispatcher' => 'fullfeed_client' => $sid );
			#	},
			#},
		};
	}

	#
	# Check for messages:
	my $handled = 0;
    # Get Our Dispatch Table
	my $dispatch = $heap->{dispatch}{$sid};
    # Look up and take action according to our dispatch table
	foreach my $evt ( keys %{ $dispatch } ) {
		if( my($args) = ($msg =~ /$dispatch->{$evt}{re}/)) {
			$handled = 1;
			$dispatch->{$evt}{callback}->($args);
			last;
		}
	}

    # Inform the client that their command was not understood.
	if( !$handled ) {
		$kernel->post( $sid => 'client_print' => 'UNKNOWN COMMAND, Ignored.' );
	}
}

That’s the most complicated routine in the program, but it does allow us to morph the dispatch tables for individual clients. Lines 12-15 build a dispatch table entry with the regular expression to match the command, followed by a callback subroutine reference which handles the command. Lines 34 and 36 are where these rules are applied to the input from the client.

Session Routines: stream

The last session is very simple. This session maintains the connection to STDIN from syslog-ng and dispatches those lines as events to the dispatcher session. There is a startup routine:

sub stream_start {
	my ($kernel, $heap) = @_[KERNEL, HEAP];

	$kernel->alias_set( 'stream' );

	#
	# Initialize the connection to STDIN as a POE::Wheel
	my $stdin = IO::Handle->new_from_fd( \*STDIN, 'r' );
	my $stderr = IO::Handle->new_from_fd( \*STDERR, 'w' );

	$heap->{stream} = POE::Wheel::ReadWrite->new(
		InputHandle		=> $stdin,
		OutputHandle	=> $stderr,
		InputEvent		=> 'stream_line',
	);
}

And the stream_line event which sends the incoming syslog messages to the dispatcher session for processing:

#--------------------------------------------------------------------------#
sub stream_line {
    # ARG0 => Line from STDIN, New line delimited.
	my ($kernel,$msg) = @_[KERNEL,ARG0];

	return unless length $msg;

	$kernel->post( 'dispatcher' => 'dispatch_message' => $msg );

}

Setting it up with syslog-ng

If we store our POE program in /usr/local/bin/poe-syslog-ng.pl, in the syslog-ng.conf we need to specify it as a program:

#
# Subscriber Feeds
destination d_subscribers {
	program("/usr/local/bin/poe-syslog-ng.pl");
};

Then you can feed it based on filters, just like the rest of the destination macros in syslog-ng:

#
# SUBSCRIPTION SERVICE:
log { source(s_ext); source(s_udp); filter(f_database); destination(d_subscribers); };

The whole #!

For those interested, I’ve written a program that expands this example with enhanced functionality. The full source is available here:

#!/usr/bin/perl
#
# This is the POE Master Server.
#  1) Take all the syslog input
#  2) Listen for parsers
#  3) Filter streams to parsers
#  TODO: 4) Maintain Parser State, restarting on crash

use strict;
use warnings;

use Socket;
use Regexp::Common qw(net);

sub POE::Kernel::ASSERT_DEFAULT (){ 1 }
#sub POE::Kernel::TRACE_DEFAULT (){ 1 }
use POE qw(
	Wheel::ReadWrite
	Component::Server::TCP
);

my %cooked = (
	program => qr/\s+\d+:\d+:\d+\s+\S+\s+([^:\s]+)(:|\s)/,
);

#--------------------------------------------------------------------------#
# POE Session Initialization

# Dispatcher Master Session
POE::Session->create(
	inline_states => {
		_start					=> \&dispatcher_start,
		_stop					=> sub { print "SESSION ", $_[SESSION]->ID, " stopped.\n"; },
		register_client			=> \&register_client,
		subscribe_client		=> \&subscribe_client,
		unsubscribe_client		=> \&unsubscribe_client,
		fullfeed_client			=> \&fullfeed_client,
		dispatch_message		=> \&dispatch_message,
		broadcast				=> \&broadcast,
		hangup_client			=> \&hangup_client,
		server_shutdown			=> \&server_shutdown,
		debug_client			=> \&debug_client,
		nobug_client			=> \&nobug_client,
		debug_message			=> \&debug_message,
	},
);

# TCP Session Master
POE::Component::Server::TCP->new(
		Alias		=> 'server',
		Address		=> '127.0.0.1',
		Port		=> 9514,

		ClientConnected		=> \&client_connect,
		ClientInput			=> \&client_input,

		ClientDisconnected	=> \&client_term,
		ClientError			=> \&client_term,

		InlineStates		=> {
			client_print		=> \&client_print,
		},
);

# Syslog-ng Stream Master
POE::Session->create(
		inline_states => {
			_start		=> \&stream_start,
			_stop		=> sub { print "SESSION ", $_[SESSION]->ID, " stopped.\n"; },

			stream_line		=> \&stream_line,
			stream_error	=> \&stream_error,
		},
);

#--------------------------------------------------------------------------#

#--------------------------------------------------------------------------#
# POE Main Loop
POE::Kernel->run();
exit 0;
#--------------------------------------------------------------------------#

#--------------------------------------------------------------------------#
# POE Event Functions
#--------------------------------------------------------------------------#

#--------------------------------------------------------------------------#
sub debug {
	my $msg = shift;
	chomp($msg);
	$poe_kernel->post( 'dispatcher' => 'debug_message' => $msg );
	print "[debug] $msg\n";
}
#--------------------------------------------------------------------------#
sub dispatcher_start {
	my ($kernel, $heap) = @_[KERNEL, HEAP];

	$kernel->alias_set( 'dispatcher' );

	$heap->{subscribers} = { };
	$heap->{full} = { };
	$heap->{debug} = { };
}

#--------------------------------------------------------------------------#
sub register_client {
	my ($kernel,$heap,$sid) = @_[KERNEL,HEAP,ARG0];

	$heap->{clients}{$sid} = 1;
}

#--------------------------------------------------------------------------#
sub debug_client {
	my ($kernel,$heap,$sid) = @_[KERNEL,HEAP,ARG0];

	if( exists $heap->{full}{$sid} ) {  return;  }

	$heap->{debug}{$sid} = 1;
	$kernel->post( $sid => 'client_print' => 'Debugging enabled.' );
}

#--------------------------------------------------------------------------#
sub nobug_client {
	my ($kernel,$heap,$sid) = @_[KERNEL,HEAP,ARG0];

	delete $heap->{debug}{$sid}
		if exists $heap->{debug}{$sid};
	$kernel->post( $sid => 'client_print' => 'Debugging disabled.' );
}

#--------------------------------------------------------------------------#
sub fullfeed_client {
	my ($kernel,$heap,$sid) = @_[KERNEL,HEAP,ARG0];

	#
	# Remove from normal subscribers.
	foreach my $prog (keys %{ $heap->{subscribers} }) {
		delete $heap->{subscribers}{$prog}{$sid}
			if exists $heap->{subscribers}{$prog}{$sid};
	}

	#
	# Turn off DEBUG
	if( exists $heap->{debug}{$sid} ) {
		delete $heap->{debug}{$sid};
	}

	#
	# Add to fullfeed:
	$heap->{full}{$sid} = 1;

	$kernel->post( $sid => 'client_print' => 'Full feed enabled, all other functions disabled.');
}

#--------------------------------------------------------------------------#
sub subscribe_client {
	my ($kernel,$heap,$sid,$argstr) = @_[KERNEL,HEAP,ARG0,ARG1];

	if( exists $heap->{full}{$sid} ) {  return;  }

	my @progs = map { lc } split /[\s,]+/, $argstr;
	foreach my $prog (@progs) {
		$heap->{subscribers}{$prog}{$sid} = 1;
	}

	$kernel->post( $sid => 'client_print' => 'Subscribed to : ' . join(', ', @progs ) );
}
#--------------------------------------------------------------------------#
sub unsubscribe_client {
	my ($kernel,$heap,$sid,$argstr) = @_[KERNEL,HEAP,ARG0,ARG1];

	my @progs = map { lc } split /[\s,]+/, $argstr;
	foreach my $prog (@progs) {
		delete $heap->{subscribers}{$prog}{$sid};
	}

	$kernel->post( $sid => 'client_print' => 'Subscription removed for : ' . join(', ', @progs ) );
}

#--------------------------------------------------------------------------#
sub hangup_client {
	my ($kernel,$heap,$sid) = @_[KERNEL,HEAP,ARG0];

	delete $heap->{clients}{$sid};

	foreach my $p ( keys %{ $heap->{subscribers} } ) {
		delete $heap->{subscribers}{$p}{$sid}
			if exists $heap->{subscribers}{$p}{$sid};
	}

	if( exists $heap->{debug}{$sid} ) {
		delete $heap->{debug}{$sid};
	}

	if( exists $heap->{full}{$sid} ) {
		delete $heap->{full}{$sid};
	}

	debug("Client Termination Posted: $sid\n");

}

#--------------------------------------------------------------------------#
sub stream_start {
	my ($kernel, $heap) = @_[KERNEL, HEAP];

	$kernel->alias_set( 'stream' );

	#
	# Initialize the connection to STDIN as a POE::Wheel
	my $stdin = IO::Handle->new_from_fd( \*STDIN, 'r' );
	my $stderr = IO::Handle->new_from_fd( \*STDERR, 'w' );

	$heap->{stream} = POE::Wheel::ReadWrite->new(
		InputHandle		=> $stdin,
		OutputHandle	=> $stderr,
		InputEvent		=> 'stream_line',
		ErrorEvent		=> 'stream_error',
	);
}

#--------------------------------------------------------------------------#
sub stream_line {
	my ($kernel,$msg) = @_[KERNEL,ARG0];

	return unless length $msg;

	$kernel->post( 'dispatcher' => 'dispatch_message' => $msg );

}

#--------------------------------------------------------------------------#
sub stream_error {
	my ($kernel) = $_[KERNEL];

	debug("STREAM ERROR!!!!!!!!!!\n");
	$kernel->call( 'dispatcher' => 'server_shutdown' => 'Stream lost' );
}

#--------------------------------------------------------------------------#
sub server_shutdown {
	my ($kernel,$heap,$msg) = @_[KERNEL,HEAP,ARG0];

	$kernel->call( dispatcher => 'broadcast' => 'SERVER DISCONNECTING: ' . $msg );
	$kernel->call( 'server' => 'shutdown' );
	exit;
}

#--------------------------------------------------------------------------#
sub client_connect {
	my ($kernel,$heap,$ses) = @_[KERNEL,HEAP,SESSION];

	my $KID = $kernel->ID();
	my $CID = $heap->{client}->ID;
	my $SID = $ses->ID;

	$kernel->post( 'dispatcher' => 'register_client' => $SID );

	$heap->{clients}{ $SID } = $heap->{client};
	#
	# Say hello to the client.
	$heap->{client}->put( "EHLO Streamer (KERNEL: $KID:$SID)" );
}

#--------------------------------------------------------------------------#
sub client_print {
	my ($kernel,$heap,$ses,$mesg) = @_[KERNEL,HEAP,SESSION,ARG0];

	$heap->{clients}{$ses->ID}->put($mesg);
}

#--------------------------------------------------------------------------#
sub broadcast {
	my ($kernel,$heap,$msg) = @_[KERNEL,HEAP,ARG0];

	foreach my $sid (keys %{ $heap->{clients} }) {
		$kernel->post( $sid => 'client_print' => $msg );
	}
}
#--------------------------------------------------------------------------#
sub dispatch_message {
	my ($kernel,$heap,$msg) = @_[KERNEL,HEAP,ARG0];

	foreach my $sid ( keys %{ $heap->{full} } ) {
		$kernel->post( $sid => 'client_print' => $msg );
	}

	if( my ($program) = map { lc } ($msg =~ /$cooked{program}/) ) {
		# remove the sub process and PID from the program
		$program =~ s/\(.*//g;
		$program =~ s/\[.*//g;

		debug("DISPATCHING MESSAGE [$program]");

		if( exists $heap->{subscribers}{$program} ) {
			foreach my $sid (keys %{ $heap->{subscribers}{$program} }) {
				$kernel->post( $sid => 'client_print' => $msg );
			}
		}
		else {
			debug("Message discarded, no listeners.");
		}
	}
	else {
			debug("Message discarded, format not understood.");
	}
}

#--------------------------------------------------------------------------#
sub debug_message {
	my ($kernel,$heap,$msg) = @_[KERNEL,HEAP,ARG0];

	foreach my $sid (keys %{ $heap->{debug} }) {
		$kernel->post( $sid => 'client_print' => '[debug] ' . $msg );
	}
}

#--------------------------------------------------------------------------#
sub client_input {
	my ($kernel,$heap,$ses,$msg) = @_[KERNEL,HEAP,SESSION,ARG0];
	my $sid = $ses->ID;

	if( !exists $heap->{dispatch}{$sid} ) {
		$heap->{dispatch}{$sid} = {
			fullfeed		=> {
				re			=> qr/^(fullfeed)/,
				callback	=> sub {
					$kernel->post( 'dispatcher' => 'fullfeed_client' => $sid );
				},
			},
			subscribe		=> {
				re			=> qr/^sub(?:scribe)? (.*)/,
				callback	=> sub {
					$kernel->post( 'dispatcher' => 'subscribe_client' => $sid, shift );
				},
			},
			unsubscribe 	=> {
				re			=> qr/^unsub(?:scribe)? (.*)/,
				callback	=> sub {
					$kernel->post( 'dispatcher' => 'unsubscribe_client' => $sid, shift );
				},
			},
			debug 	=> {
				re			=> qr/^(debug)/i,
				callback	=> sub {
					$kernel->post( 'dispatcher' => 'debug_client' => $sid, shift );
				},
			},
			nobug 	=> {
				re			=> qr/^(no(de)?bug)/i,
				callback	=> sub {
					$kernel->post( 'dispatcher' => 'nobug_client' => $sid, shift );
				},
			},
			#quit			=> {
			#	re			=> qr/(exit)|q(uit)?/,
			#	callback	=> sub {
			#			$kernel->post( $sid => 'client_print' => 'Terminating connection on your request.');
			#			$kernel->post( $sid => 'shutdown' );
			#	},
			#},
			#status			=> {
			#	re			=> qr/^status/,
			#	callback	=> sub {
			#		my $cnt = scalar( keys %{ $heap->{clients} } );
			#		my $subcnt = scalar( keys %{ $heap->{subscribers} });
			#		my $msg = "Currently $cnt connections, $subcnt subscribed.";
			#		$kernel->post( $sid, 'client_print', $msg );
			#	},
			#},
		};
	}

	#
	# Check for messages:
	my $handled = 0;
	my $dispatch = $heap->{dispatch}{$sid};
	foreach my $evt ( keys %{ $dispatch } ) {
		if( my($args) = ($msg =~ /$dispatch->{$evt}{re}/)) {
			$handled = 1;
			$dispatch->{$evt}{callback}->($args);
			last;
		}
	}

	if( !$handled ) {
		$kernel->post( $sid => 'client_print' => 'UNKNOWN COMMAND, Ignored.' );
	}
}

#--------------------------------------------------------------------------#
sub client_term {
	my ($kernel,$heap,$ses) = @_[KERNEL,HEAP,SESSION];
	my $sid = $ses->ID;

	delete $heap->{dispatch}{$sid};
	$kernel->post( 'dispatcher' => 'hangup_client' =>  $sid );

	debug("SERVER, client $sid disconnected.\n");
}

#--------------------------------------------------------------------------#
Share and Enjoy:
  • Reddit
  • Digg
  • del.icio.us
  • Slashdot
  • StumbleUpon
  • Google
  • Ma.gnolia
  • Facebook

{ 1 } Comments

  1. Ty | July 3, 2009 at 11:08 pm | Permalink

    Great post. I use POE to serve as a application request server and it has not failed me yet.