Wednesday 18 December 2013

Logstash, ElasticSearch and Kibana Integration for Clickstream Weblog Ingestion

In this blog I am going to show case how we can develop a quick and easy demo application for clickstream weblog ingestion, search and visualization. We will achieve this using Logstash for log ingestion, store it in ElasticSearch and make a pretty dashboard using Kibana. For clickstream weblog I am using logs data from ECML/PKDD 2005 Discovery Challenge . You can download complete weblogs after registering there. These weblog are delimited by semi-colon (;) and have below mentioned fields in order:
  • shop_id
  • unixtime
  • client ip
  • session
  • visted page
  • referrer
Here are some sample log lines:
For creating this demo we need to create a logstash configuration file (lets name this file clickstream.conf) which consists of specifying inputs, filters and outputs. The clickstream.conf file looks like:
input { 
  file {
# path for clickstream log
    path => "/home/rishav.rohit/Desktop/clickstream/_2004_02_01_19_click_stream.log"
# define a type for all events handeled by this input
    type => "weblog"
    start_position => "beginning"
# the clickstream log is in character set ISO-8859-1
    codec => plain {charset => "ISO-8859-1"}

filter {
  csv {
# define columns present in weblog
    columns => [shop_id, unixtime, client_ip, session, page, referrer]
    separator => ";"
  grok {
# get visited page and page parameters
    match => ["page", "%{URIPATH:page_visited}(?:%{URIPARAM:page_params})?"]
     remove_field => ["page"]
  date {
# as we are getting unixtime field in epoch seconds we will convert it to normal timestamp
    match => [ "unixtime", "UNIX" ]
  geoip {
# this will convert ip to longitude-latitude using GeoLiteCity database from Maxmind
    source => "client_ip"
    fields => ["latitude","longitude"]
    target => "geoip"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
  mutate {
# this will convert geoip.coordinates to float values
    convert => [ "[geoip][coordinates]", "float" ]

output { 
# store output in local elasticsearch cluster
  elasticsearch {
    host => ""
To start logstash agent we run below command:
java -jar logstash-1.2.2-flatjar.jar agent -f clickstream.conf
A sample record in ElasticSearch looks like this:

    _index: logstash-2004.02.01
    _type: logs
    _id: I1N0MboUR0O1O3RZ-qXqnw
    _version: 1
    _score: 1
    _source: {
        message: [
        @timestamp: 2004-02-01T18:00:07.000Z
        @version: 1
        type: weblog
        path: /home/rishav.rohit/Desktop/clickstream/_2004_02_01_19_click_stream.log
        shop_id: 14
        unixtime: 1075658407
        session: f07f39ec63abf67f965684f3fa5729c4
        page_visited: /findp/
        page_params: ?&id=63&view=1,2,3,14,20,15&p_14=nerez
        geoip: {
            latitude: 50.08330000000001
            longitude: 14.466700000000003
            coordinates: [

So we have parsed complex log message into simpler components and converted fields like unixtime to datetime, IP to latitude-longitude and got page visited by the client. Now using Kibana we can quickly make dashboard with these panels

