Friday, May 01, 2015

Logstash and Oracle Database

    For anyone that doesn't know what Logstash is head over to http://logstash.net/ and watch the video on the home page.  Robin Moffatt has done a great blog post on the full ELK stack with OBIEE.

    This blog post is a first cut at an input filter for Logstash to gather metrics, logs, or anything that can be expressed in sql.  A huge caution that this is a 0.01 attempt and will get better as I need it to.  This is a basic config of doing a select sessions every 5 seconds and report that in this case to just STDOUT.  There are many choices of where to send this information like to elastic search for visualization into Kibana as Mark showed in the OBIEE example.

    If anyone uses this, changes it, likes it, doesn't like it,  let me know.

input { 
  orasql { sql      => "SELECT count(1) sessions from v$session "
           dbuser   => "klrice"
           dbpasswd => "klrice"
           dburl    => "localhost/orcl"
           interval =>  5
      } 
}
output {
 stdout { codec => rubydebug }
}

The running of this is just needs to add in the plugin path for be able to find the orasql input filter.

     The filter itself is where everything oracle-wise is happening.  Since Logstash is written in JRuby, I'm able to use the normal Oracle JDBC Driver to access the database.  For this case I have a ./lib folder where I have a local copy.  This could just as easily be $ORACLE_HOME/jdbc/lib/ojdbc6.jar to ensure using the same as the database version when on the same server.

     Since the connect and establishing a session is expensive when repeatedly issuing sql, I have kept a connection open.  The connection could be closed for any number of reasons so there's an auto-reconnect in here also by calling the getConnection before any sql exec.   Before someone asks: no, sqldev will not get an auto-reconnect like this.




# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "socket" # for Socket.gethostname
require 'java'
# SETUP THE JDBC DRIVER TO YOUR LOCATION
$CLASSPATH << "lib/ojdbc6.jar"

java_import 'oracle.jdbc.OracleDriver'
java_import 'java.sql.DriverManager'


# Run sql line tools and capture the whole output as an event.
#
# Notes:
#
# * The '@source' of this event will be the sql run.
# * The '@message' of this event will be the entire stdout of the sql
#   as one event.
#
class LogStash::Inputs::OraSQL < LogStash::Inputs::Base

  config_name "orasql"
  milestone 2
   
  $currConnection = nil

  default :codec, "plain"

  # Set this to true to enable debugging on an input.
  config :debug, :validate => :boolean, :default => false, :deprecated => "This setting was never used by this plugin. It will be removed soon."

  # SQL to run. For example, "select * from emp"
  config :sql, :validate => :string, :required => true

  # dbuser to run. For example, "select * from emp"
  config :dbuser, :validate => :string, :required => true ,  :default => "/"

  # dbpass to run. For example, "select * from emp"
  config :dbpasswd, :validate => :string, :required => false , :default => ""

  # dburl to run. For example, "select * from emp"
  config :dburl, :validate => :string, :required => true , :default => "//localhost/orcl"

  # Interval to run the sql. Value is in seconds.
  config :interval, :validate => :number, :required => true , :default => 120

  public
  def register
    @logger.info("Registering SQL Input", :type => @type,
                 :sql => @sql, :interval => @interval)
  end # def register
  
  public 
  def getConnection
      if $currConnection == nil  or  ! $currConnection.isValid(100)  
           oradriver = OracleDriver.new
           DriverManager.registerDriver oradriver

           con_props = java.util.Properties.new
           con_props.setProperty("user", @dbuser)
           con_props.setProperty("password", @dbpasswd )
 
           conn =  Java::oracle.jdbc.OracleDriver.new.connect('jdbc:oracle:thin:@' + @dburl, con_props)

           conn.auto_commit = false

           $currConnection = conn
      end 
      
      return $currConnection  
       
  end # end getConnection

  public
  def run(queue)
    hostname = Socket.gethostname
   
    loop do
      start = Time.now
      @logger.info? && @logger.info("Running SQL", :sql => @sql)

      conn = getConnection

      stmt = conn.prepare_statement @sql
      rset = stmt.execute_query
      while ( rset.next )
         i=1
         event =  event = LogStash::Event.new
         decorate(event)
         cols = rset.getMetaData.getColumnCount
         msg = ""
         r=0
         while ( i <= cols ) 
             val = rset.getString(i)
             if ( val != nil ) 
                if ( r > 0 )
                   msg = msg + ","
                end
                event[ rset.getMetaData.getColumnName(i).downcase ] =  val
                msg = msg +  "\"" +rset.getMetaData.getColumnName(i).downcase +  "\" : \"" + val + "\"" 
                r=r+1
             end
             i = i + 1
         end
         event['message'] = "{" + msg + "}"
        queue << event
      end
      conn.close

      duration = Time.now - start
      @logger.info? && @logger.info("Command completed", :sql => @sql,
                                    :duration => duration)

      # Sleep for the remainder of the interval, or 0 if the duration ran
      # longer than the interval.
      sleeptime = [0, @interval - duration].max
      if sleeptime == 0
        @logger.warn("Execution ran longer than the interval. Skipping sleep.",
                     :sql => @sql, :duration => duration,
                     :interval => @interval)
      else
        sleep(sleeptime)
      end
    end # loop
  end # def run
end # class LogStash::Inputs::OraSQL