Lambdaを使って、CloudTrailログをCloudSearchに入れて検索する

来週JAWS-UG名古屋でAWS LambdaのLTをするためにいくつかデモ案を作っていますが、実用的なものも必要だろうということで、表題の通りLambdaを使って、CloudTrailログをCloudSearchに入れて検索するサンプルを作成しました。

CloudTrailのログはS3に上がってきますが、ログが出来た通知をLambdaで受けて、そのログを整形してCloudSearchに入れます。従来ではEC2などから定期的にログ確認をしないといけなかったので、非常に楽ですね。

事前準備(入力側)

はじめにLambdaコンソールから、新しいFunctionを登録します。適当な名前をつけます。

コードの中身は後で変えるのでサンプルのままにしておき、つぎにIAMロールを設定します。これはLambdaが実行されるときのロールなので、アクセスするサービス(今回はS3とCloudSearch)への権限をつけたロールを作り、設定します。新規に作ると、arnが自動的に設定されます。


次にS3バケットを設定します。バケットにはIAMロールの設定をいけないですが、簡単な方法としては、先にバケットを作っておき、Lambda側からアクセス権をつける方法です。
実行するLambda関数と同じリージョンに、S3バケットを作成します(別リージョンはNG)。


次に、先ほど作成したLambda関数をコンソールから選択します。画面の下の方に「Configure Event Source」というのがありますのでこれをクリックします。


そうすると、バケット選択画面が出てきますので、先ほど作ったバケットを設定します。またここでもIAMロール設定が出てきます。このIAMロールは、S3バケットからLambdaを呼び出すためのロール設定なので、Lambda呼び出しだけの権限を設定します。

最後に、CloudTrailの画面から、ログ出力先として先ほどのS3バケットを設定します(画面省略)

事前準備(出力側)

次に出力側の設定のため、CloudSearchのドメインを作ります。今回CloudSearchはLambdaやS3バケットと同じリージョンに作りましたが、これは別でも大丈夫です。

ドメインに適当な名前をつけて、マニュアルでIndexをつけます。

出力される主要な部分の項目をIndex対象にします。

ドメインが出来たら、データを挿入するためのDocument Endpointを控えておきます(コードに埋め込みます)

Lambda Functionのコード

今回作成したのは、以下のようなコードになります。

var aws = require('aws-sdk');
var zlib = require('zlib');
exports.aws = aws;// for local test purpose
var CLOUDSEARCH_ENDPOINT = "CloudSearchのDocument Endpoint";

exports.handler = function(event, context) {
  console.log('Received event:');
  var bucket = event.Records[0].s3.bucket.name;
  var key = event.Records[0].s3.object.key;
  var region = event.Records[0].awsRegion;
  var s3 = new aws.S3({
    apiVersion : '2006-03-01',
    region : region
  });

  s3.getObject({
    Bucket : bucket,
    Key : key
  }, function(err, data) {
    if (err) {
      context.done('error', 'error getting file' + err);
    } else {
      var contentType = data.ContentType;
      var contentEncoding = data.ContentEncoding;
      if (contentType === "application/json"
          && contentEncoding === "gzip") {
        var logFileName = key.substr(key.lastIndexOf("/") + 1);
        var buf = data.Body;
        zlib.gunzip(buf, function(_, dezipped) {
          var json = JSON.parse(dezipped.toString('utf-8'));
          sendToCloudSearch(context,region,logFileName, json);
        });
      }
    }
  });
};

function sendToCloudSearch(context,region,logFileName, json) {
  var records = json.Records;
  var searchRecords = [];
  for (var i = 0; i < records.length; i++) {
    var record = records[i];
    var searchRecord = {
      "type" : "add",
      "id" : record.eventTime + "-" + record.requestID,
      "fields" : {
        "usertype" : record.userIdentity.type,
        "arn" : record.userIdentity.arn,
        "accesskeyid" : record.userIdentity.accessKeyId,
        "username" : record.userIdentity.userName,
        "eventtime" : record.eventTime,
        "eventsource" : record.eventSource,
        "eventname" : record.eventName,
        "awsregion" : record.awsRegion,
        "sourceipaddress" : record.sourceIPAddress,
        "useragent" : record.userAgent,
        "requestid" : record.requestID,
        "eventid" : record.eventID,
        "logfilename" : logFileName
      }
    };
    searchRecords[i] = JSON.stringify(searchRecord);
  }
  var cloudsearchDomain = new aws.CloudSearchDomain({
    endpoint : CLOUDSEARCH_ENDPOINT,
    region:region
  });
  var params = {
    contentType : 'application/json',
    documents : "[" + searchRecords.join() + "]"
  };
  cloudsearchDomain.uploadDocuments(params, function(err, data) {
    if (err) {
      context.done("error", err);
    } else {
      context.done(null, 'success');
    }
  });
}

ポイントとしては、CloudTrailのログがgzip圧縮されているため解凍している点と、CloudSearchへデータを入れるため、データを整形している点です。CloudTrailのJSONはそのままCloudSearchに入れられないため、Lmabda内で整形しています。またこの際、ログファイル名やリージョンもCloudSearchの項目に入れていますので、後で元のログを取りやすくなっています。

検索してみる

CloudSearchのTest Searchを使って検索してみます。

今回はじめてCloudSearchを使いましたが、ソースIPやイベントで検索できるので、結構便利ですね。

開発について

今回開発はEclipse(+nodeclipse)で行いました。テストやモック、npmのインストールなど簡単に行えるので便利でした。