Skip to content

データをRedshiftに転送する

Amazon Redshift は、Amazon S3 と並んでAmazon Web Services 上で動作する、評判の良いデータウェアハウスです。Currents の Braze データは、Redshift への直接転送用に構造化されています。

以下では、ETL(Extract, Transform, Load)プロセスを通じて Amazon S3 から Redshift へデータを転送する方法を説明します。完全なソースコードについては、Currents のサンプル GitHub リポジトリを参照してください。

S3 から Redshift へのローダーの概要

s3loader.py スクリプトは、同じ Redshift データベース内の別のマニフェストテーブルを使用して、既にコピーされたファイルのトラッキングを行います。全体的な構造は以下の通りです:

  1. S3 内の全ファイルをリストアップし、前回 s3loader.py を実行した時点から追加された新規ファイルを、リストとマニフェストテーブルの内容を比較して識別します。
  2. 新しいファイルを含むマニフェストファイルを作成します。
  3. マニフェストファイルを使用して、S3 から Redshift へ新しいファイルをコピーする COPY クエリを実行します。
  4. コピーされたファイルの名前を Redshift の別のマニフェストテーブルに挿入します。
  5. コミットします。

依存関係

ローダーを実行するには、AWS Python SDK と Psycopg をインストールする必要があります。

1
2
pip install boto3
pip install psycopg2

権限

S3 への読み取りアクセス権を持つ Redshift ロール

まだ行っていない場合は、AWS のドキュメントに従って、S3 上のファイルに対して COPY コマンドを実行できるロールを作成してください。

Redshift VPC の受信ルール

Redshift クラスターが VPC 内にある場合、S3 ローダーを実行しているサーバーからの接続を許可するように VPC を設定する必要があります。Redshift クラスターに移動し、ローダーが接続する VPC セキュリティグループのエントリを選択します。次に、新しい受信ルールを追加します:Type = Redshift、Protocol = TCP、Port = クラスターのポート、Source = ローダーを実行しているサーバーの IP(テスト用には「Anywhere」)。

S3 フルアクセス権を持つ Identity and Access Management(IAM)ユーザー

S3 ローダーには、Currents データを含むファイルへの読み取りアクセスと、Redshift の COPY コマンド用に生成するマニフェストファイルの保存場所へのフルアクセスが必要です。IAM コンソールから AmazonS3FullAccess 権限を持つ新しい Identity and Access Management(IAM)ユーザーを作成してください。認証情報はローダーに渡す必要があるため、保存しておいてください。

アクセス認証情報は、環境変数、共有認証情報ファイル(~/.aws/credentials)、または AWS 設定ファイルを通じてローダーに渡すことができます。あるいは、S3LoadJob オブジェクト内の aws_access_key_id および aws_secret_access_key フィールドに直接割り当てることでローダーに含めることもできますが、ソースコード内に認証情報をハードコーディングすることは推奨しません。

使用方法

使用例

以下のサンプルプログラムは、users.messages.contentcard.Impression イベントのデータを S3 から Redshift の content_card_impression テーブルに読み込みます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
if __name__ == '__main__':
    host = '{YOUR_CLUSTER}.redshift.amazonaws.com'
    port = 5439
    database = '{YOUR_DATABASE}'
    user = '{YOUR_USER}'
    password = '{YOUR_PASSWORD}'
    role = '{YOUR_REDSHIFT_ROLE_ARN}'

    # Do not hard code these credentials.
    aws_access_key_id = None
    aws_secret_access_key = None

    # Content Card Impression Avro fields:
    #   id            - string
    #   user_id       - string
    #   external_user_id - string (nullable)
    #   app_id        - string
    #   content_card_id  - string
    #   campaign_id   - string (nullable)
    #   send_id       - string (nullable)
    #   time          - int
    #   platform      - string (nullable)
    #   device_model  - string (nullable)

    print('Loading Content Card Impression...')
    cc_impression_s3_bucket = '{YOUR_CURRENTS_BUCKET}'
    cc_impression_s3_prefix = '{YOUR_CURRENTS_PREFIX}'
    cc_impression_redshift_table = 'content_card_impression'
    cc_impression_redshift_column_def = [
        ('id', 'text'),
        ('user_id', 'text'),
        ('external_user_id', 'text'),
        ('app_id', 'text'),
        ('content_card_id', 'text'),
        ('campaign_id', 'text'),
        ('send_id', 'text'),
        ('time', 'integer'),
        ('platform', 'text'),
        ('device_model', 'text')
    ]

    cc_impression_redshift = RedshiftEndpoint(host, port, database, user, password,
        cc_impression_redshift_table, cc_impression_redshift_column_def)
    cc_impression_s3 = S3Endpoint(cc_impression_s3_bucket, cc_impression_s3_prefix)

    cc_impression_job = S3LoadJob(cc_impression_redshift, cc_impression_s3, role,
        aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
    cc_impression_job.perform()

認証情報

ローダーを実行するには、まず Redshift クラスターの hostportdatabase、および COPY クエリを実行できる Redshift ユーザーの userpassword を指定する必要があります。さらに、前のセクションで作成した S3 読み取りアクセス権を持つ Redshift ロールの ARN を指定する必要があります。

1
2
3
4
5
6
host = '{YOUR_CLUSTER}.redshift.amazonaws.com'
port = 5439
database = '{YOUR_DATABASE}'
user = '{YOUR_USER}'
password = '{YOUR_PASSWORD}'
role = '{YOUR_REDSHIFT_ROLE_ARN}'

ジョブの設定

イベントファイルの S3 バケットとプレフィックス、および COPY 先の Redshift テーブル名を指定する必要があります。

また、ローダーが必要とする「auto」オプションで Avro ファイルを COPY するには、Redshift テーブルのカラム定義が、サンプルプログラムに示されているように Avro スキーマのフィールド名と一致し、適切な型マッピング(例:stringtextintinteger)が行われている必要があります。

すべてのファイルを一度にコピーするのに時間がかかりすぎる場合は、ローダーに batch_size オプションを渡すこともできます。batch_size を渡すと、ローダーはすべてを同時にコピーする必要なく、一度に1バッチずつインクリメンタルにコピーしてコミットできます。1バッチの読み込みにかかる時間は、batch_size、ファイルのサイズ、および Redshift クラスターのサイズによって異なります。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Content Card Impression Avro fields:
#   id            - string
#   user_id       - string
#   external_user_id - string (nullable)
#   app_id        - string
#   content_card_id  - string
#   campaign_id   - string (nullable)
#   send_id       - string (nullable)
#   time          - int
#   platform      - string (nullable)
#   device_model  - string (nullable)
cc_impression_s3_bucket = '{YOUR_CURRENTS_BUCKET}'
cc_impression_s3_prefix = '{YOUR_CURRENTS_PREFIX}'
cc_impression_redshift_table = 'content_card_impression'
cc_impression_redshift_column_def = [
    ('id', 'text'),
    ('user_id', 'text'),
    ('external_user_id', 'text'),
    ('app_id', 'text'),
    ('content_card_id', 'text'),
    ('campaign_id', 'text'),
    ('send_id', 'text'),
    ('time', 'integer'),
    ('platform', 'text'),
    ('device_model', 'text')
]
cc_impression_batch_size = 1000
New Stuff!