CSVファイルをgzip圧縮してS3に送ってRedshiftにコピー

昨年夏にお祭りでもらったドジョウ、飼育してたら予想以上の成長を見せ、現在水槽サイズが足りなく困っています。でおなじみの片山です。


本題に入ると、タイトル通りRedshitへのデータ取り込みなんですが、まとまったサンプルコードがなかったので作成してみました。言語はJavaです。


処理内容としては、まず指定のCSVファイルを読み込んで、gzip圧縮します。次に圧縮したgzipファイルをS3に入れます。S3に置くときは、日付をプレフィックスに付けて被らないようになっています。最後にRedshiftにJDBCで接続して、copyコマンドでデータをロードします。gzipで圧縮してあるので、copyコマンドのオプションとしてgzipをつけています。

実行に必要なライブラリはAWS SDK for JavaPostgreSQLJDBCドライバです。余談ですが、AWS SDK for Java 1.7からjoda-timeがライブラリに加わっていました。どこで使ってるんでしょうね?JDKは7を使っています。


一日一回バッチで回せば、毎日差分データをRedshiftに取り込めます。ちなみにRedshiftは最近SSDベースの安価なインスタンスタイプ(dw2.large)が出ました。ディスク容量は160Gですが、7ECUでメモリ15GBですので、ちょっとしたデータをテストするには十分だと思います。1時間あたり$0.33ですので、皆様お持ちのログデータや売上げデータなどを使って、是非トライしてみて頂きたいです。


水槽も、インスタンスタイプ変更が出来たらいいんですけどね。。。

/**
 * ローカルからS3経由でRedshiftにデータを入れるサンプル
 * @author c9katayama 
 */
public class CopyFromLocalToRedshift {
  //環境に合わせて設定
  private static final String CSV_FILE = "data.csv";
  private static final String BUCKET_NAME = "BUCKER";
  private static final String REDSHIFT_JDBC_URL = "jdbc:postgresql://XXXX.XXXX.ap-northeast-1.redshift.amazonaws.com:5439/demodb?tcpKeepAlive=true";
  private static final String REDSHIFT_USER = "USER";
  private static final String REDSHIFT_PASSWORD = "PASSWORD";
  private static final String TABLE_NAME = "TABLE";

  private AWSCredentials credentials;

  public static void main(String[] args) throws Exception {
    new CopyFromLocalToRedshift().execute();
  }

  private void execute() throws Exception {

    initAWSCredentials();

    File csvFile = new File(CopyFromLocalToRedshift.class.getResource(
        CSV_FILE).toURI());
    
    byte[] zippedCSV = gzipFile(csvFile);
    
    String s3ObjectKey = putCSVtoS3(zippedCSV);
    
    copyCSVfromS3toRedshift(s3ObjectKey);
  }

  private void initAWSCredentials() throws IOException {
    credentials = new PropertiesCredentials(
        CopyFromLocalToRedshift.class
            .getResourceAsStream("/AwsCredentials.properties"));
  }

  private byte[] gzipFile(File file) throws IOException {
    byte[] bytes = Files.readAllBytes(Paths.get(file.getPath()));
    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
        GZIPOutputStream out = new GZIPOutputStream(baos)) {
      out.write(bytes, 0, bytes.length);
      out.finish();
      return baos.toByteArray();
    }
  }

  private String putCSVtoS3(byte[] zippedBytes) {
    AmazonS3 s3 = new AmazonS3Client(credentials);
    ObjectMetadata metadata = new ObjectMetadata();
    metadata.setContentEncoding("gzip");
    metadata.setContentType("text/plane");
    metadata.setContentLength(zippedBytes.length);
    final String date = new SimpleDateFormat("yyyyMMddHHmmss")
        .format(new Date());
    final String key = date + "-csv.zip";
    PutObjectRequest req = new PutObjectRequest(BUCKET_NAME, key,
        new ByteArrayInputStream(zippedBytes), metadata);
    s3.putObject(req);
    return key;
  }

  private void copyCSVfromS3toRedshift(String s3ObjectKey)
      throws SQLException {
    final String csvLocation = "s3://" + BUCKET_NAME + "/" + s3ObjectKey;
    final String accessKeyId = credentials.getAWSAccessKeyId();
    final String secretKey = credentials.getAWSSecretKey();
    final String copyCommand = "copy " + TABLE_NAME + " from '"
        + csvLocation + "' " + "credentials 'aws_access_key_id="
        + accessKeyId + ";aws_secret_access_key=" + secretKey
        + "' delimiter ',' gzip;";
    try (Connection con = DriverManager.getConnection(REDSHIFT_JDBC_URL,
        REDSHIFT_USER, REDSHIFT_PASSWORD);
        Statement stmt = con.createStatement()) {
      stmt.execute(copyCommand);
    }
  }
}