昨年夏にお祭りでもらったドジョウ、飼育してたら予想以上の成長を見せ、現在水槽サイズが足りなく困っています。でおなじみの片山です。
本題に入ると、タイトル通りRedshitへのデータ取り込みなんですが、まとまったサンプルコードがなかったので作成してみました。言語はJavaです。
処理内容としては、まず指定のCSVファイルを読み込んで、gzip圧縮します。次に圧縮したgzipファイルをS3に入れます。S3に置くときは、日付をプレフィックスに付けて被らないようになっています。最後にRedshiftにJDBCで接続して、copyコマンドでデータをロードします。gzipで圧縮してあるので、copyコマンドのオプションとしてgzipをつけています。
実行に必要なライブラリはAWS SDK for JavaとPostgreSQLのJDBCドライバです。余談ですが、AWS SDK for Java 1.7からjoda-timeがライブラリに加わっていました。どこで使ってるんでしょうね?JDKは7を使っています。
一日一回バッチで回せば、毎日差分データをRedshiftに取り込めます。ちなみにRedshiftは最近SSDベースの安価なインスタンスタイプ(dw2.large)が出ました。ディスク容量は160Gですが、7ECUでメモリ15GBですので、ちょっとしたデータをテストするには十分だと思います。1時間あたり$0.33ですので、皆様お持ちのログデータや売上げデータなどを使って、是非トライしてみて頂きたいです。
水槽も、インスタンスタイプ変更が出来たらいいんですけどね。。。
/** * ローカルからS3経由でRedshiftにデータを入れるサンプル * @author c9katayama */ public class CopyFromLocalToRedshift { //環境に合わせて設定 private static final String CSV_FILE = "data.csv"; private static final String BUCKET_NAME = "BUCKER"; private static final String REDSHIFT_JDBC_URL = "jdbc:postgresql://XXXX.XXXX.ap-northeast-1.redshift.amazonaws.com:5439/demodb?tcpKeepAlive=true"; private static final String REDSHIFT_USER = "USER"; private static final String REDSHIFT_PASSWORD = "PASSWORD"; private static final String TABLE_NAME = "TABLE"; private AWSCredentials credentials; public static void main(String[] args) throws Exception { new CopyFromLocalToRedshift().execute(); } private void execute() throws Exception { initAWSCredentials(); File csvFile = new File(CopyFromLocalToRedshift.class.getResource( CSV_FILE).toURI()); byte[] zippedCSV = gzipFile(csvFile); String s3ObjectKey = putCSVtoS3(zippedCSV); copyCSVfromS3toRedshift(s3ObjectKey); } private void initAWSCredentials() throws IOException { credentials = new PropertiesCredentials( CopyFromLocalToRedshift.class .getResourceAsStream("/AwsCredentials.properties")); } private byte[] gzipFile(File file) throws IOException { byte[] bytes = Files.readAllBytes(Paths.get(file.getPath())); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); GZIPOutputStream out = new GZIPOutputStream(baos)) { out.write(bytes, 0, bytes.length); out.finish(); return baos.toByteArray(); } } private String putCSVtoS3(byte[] zippedBytes) { AmazonS3 s3 = new AmazonS3Client(credentials); ObjectMetadata metadata = new ObjectMetadata(); metadata.setContentEncoding("gzip"); metadata.setContentType("text/plane"); metadata.setContentLength(zippedBytes.length); final String date = new SimpleDateFormat("yyyyMMddHHmmss") .format(new Date()); final String key = date + "-csv.zip"; PutObjectRequest req = new PutObjectRequest(BUCKET_NAME, key, new ByteArrayInputStream(zippedBytes), metadata); s3.putObject(req); return key; } private void copyCSVfromS3toRedshift(String s3ObjectKey) throws SQLException { final String csvLocation = "s3://" + BUCKET_NAME + "/" + s3ObjectKey; final String accessKeyId = credentials.getAWSAccessKeyId(); final String secretKey = credentials.getAWSSecretKey(); final String copyCommand = "copy " + TABLE_NAME + " from '" + csvLocation + "' " + "credentials 'aws_access_key_id=" + accessKeyId + ";aws_secret_access_key=" + secretKey + "' delimiter ',' gzip;"; try (Connection con = DriverManager.getConnection(REDSHIFT_JDBC_URL, REDSHIFT_USER, REDSHIFT_PASSWORD); Statement stmt = con.createStatement()) { stmt.execute(copyCommand); } } }