Transferir datos a Redshift
Amazon Redshift es un popular almacén de datos que se ejecuta en Amazon Web Services junto con Amazon S3. Los datos de Braze de Currents están estructurados para su transferencia directa a Redshift.
A continuación se describe cómo transferir datos de Amazon S3 a Redshift mediante un proceso de extracción, transformación y carga (ETL). Para obtener el código fuente completo, consulta el repositorio GitHub de ejemplos de Currents.
Ésta es sólo una de las muchas opciones que puedes elegir a la hora de transferir tus datos a los lugares que te resulten más ventajosos.
Resumen del cargador de S3 a Redshift
Els3loader.pyscript utiliza una tabla de manifiesto independiente en la misma base de datos Redshift para realizar el seguimiento de los archivos que ya se han copiado. La estructura general es la siguiente:
- Enumera todos los archivos en S3 y, a continuación, identifica los archivos nuevos desde la última vez que ejecutaste
s3loader.pycomparando la lista con el contenido de la tabla del manifiesto. - Crea un archivo manifiesto que contenga los nuevos archivos.
- Ejecuta una
COPYconsulta para copiar los nuevos archivos de S3 a Redshift utilizando el archivo de manifiesto. - Inserta los nombres de los archivos que se copian en la tabla de manifiesto independiente en Redshift.
- Comprométete.
Dependencias
Debes instalar AWS Python SDK y Psycopg para poder ejecutar el cargador:
1
2
pip install boto3
pip install psycopg2
Permisos
Función Redshift con acceso de lectura a S3
Si aún no lo has hecho, sigue la documentación de AWS para crear una función que pueda ejecutarCOPY comandos en tus archivos en AWS S3.
Reglas de entrada de Redshift VPC
Si tu clúster Redshift se encuentra en una VPC, debes configurar la VPC para permitir las conexiones desde el servidor en el que se ejecuta S3 Loader. Entra en tu clúster de Redshift y selecciona la entrada de grupos de seguridad de VPC a la que deseas que se conecte el cargador. A continuación, añade una nueva regla de entrada: Tipo = Redshift, Protocolo = TCP, Puerto = el puerto de tu clúster, Origen = la IP del servidor que ejecuta el cargador (o «Cualquiera» para realizar pruebas).
Usuario de gestión de identidades y accesos (IAM) con acceso completo a S3
El cargador S3 requiere acceso de lectura a los archivos que contienen tus datos de Currents y acceso completo a la ubicación de los archivos de manifiesto que genera para los comandos COPYRedshift. Crea un nuevo usuario de Gestión de identidades y accesos (IAM) con elAmazonS3FullAccesspermiso de la consola de IAM. Guarda las credenciales, ya que tendrás que pasárselas al cargador.
Puedes pasar las credenciales de acceso al cargador a través de variables de entorno, el archivo de credenciales compartidas (~/.aws/credentials) o el archivo de configuración de AWS. Como alternativa, puedes incluirlas directamente en el cargador asignándolas a losaws_secret_access_keycamposaws_access_key_id y dentro de unS3LoadJobobjeto , pero no recomendamos codificar las credenciales en tu código fuente.
Uso
Ejemplo de uso
El siguiente programa de ejemplo carga datos para elusers.messages.contentcard.Impressionevento desde S3 a lacontent_card_impressiontabla en Redshift.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
if __name__ == '__main__':
host = '{YOUR_CLUSTER}.redshift.amazonaws.com'
port = 5439
database = '{YOUR_DATABASE}'
user = '{YOUR_USER}'
password = '{YOUR_PASSWORD}'
role = '{YOUR_REDSHIFT_ROLE_ARN}'
# Do not hard code these credentials.
aws_access_key_id = None
aws_secret_access_key = None
# Content Card Impression Avro fields:
# id - string
# user_id - string
# external_user_id - string (nullable)
# app_id - string
# content_card_id - string
# campaign_id - string (nullable)
# send_id - string (nullable)
# time - int
# platform - string (nullable)
# device_model - string (nullable)
print('Loading Content Card Impression...')
cc_impression_s3_bucket = '{YOUR_CURRENTS_BUCKET}'
cc_impression_s3_prefix = '{YOUR_CURRENTS_PREFIX}'
cc_impression_redshift_table = 'content_card_impression'
cc_impression_redshift_column_def = [
('id', 'text'),
('user_id', 'text'),
('external_user_id', 'text'),
('app_id', 'text'),
('content_card_id', 'text'),
('campaign_id', 'text'),
('send_id', 'text'),
('time', 'integer'),
('platform', 'text'),
('device_model', 'text')
]
cc_impression_redshift = RedshiftEndpoint(host, port, database, user, password,
cc_impression_redshift_table, cc_impression_redshift_column_def)
cc_impression_s3 = S3Endpoint(cc_impression_s3_bucket, cc_impression_s3_prefix)
cc_impression_job = S3LoadJob(cc_impression_redshift, cc_impression_s3, role,
aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
cc_impression_job.perform()
Credenciales
Para ejecutar el cargador, primero debes proporcionar el host, port, ydatabase de tu clúster Redshift, así como eluser ypassword de un usuario Redshift que pueda ejecutarCOPYconsultas . Además, debes proporcionar el ARN de la función de Redshift con acceso de lectura a S3 que creaste en una sección anterior.
1
2
3
4
5
6
host = '{YOUR_CLUSTER}.redshift.amazonaws.com'
port = 5439
database = '{YOUR_DATABASE}'
user = '{YOUR_USER}'
password = '{YOUR_PASSWORD}'
role = '{YOUR_REDSHIFT_ROLE_ARN}'
Configuración del trabajo
Debes proporcionar el contenedor de S3 y el prefijo de tus archivos de eventos, así como el nombre de la tabla Redshift en la que deseasCOPY los datos.
Además, para los archivosCOPY Avro con la opción «auto» requerida por el cargador, la definición de columna en tu tabla Redshift debe coincidir con los nombres de campo del esquema Avro, tal y como se muestra en el programa de ejemplo, con los tipos mapeados (por ejemplo,string a text,int a integer).
También puedes pasar unabatch_sizeopción al cargador si consideras que tarda demasiado en copiar todos los archivos a la vez. Al pasar unbatch_size , el cargador puede copiar y confirmar de forma incremental un lote cada vez sin tener que copiar todo al mismo tiempo. El tiempo que tarda en cargarse un lote depende delbatch_size , así como del tamaño de los archivos y del tamaño del clúster de Redshift.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Content Card Impression Avro fields:
# id - string
# user_id - string
# external_user_id - string (nullable)
# app_id - string
# content_card_id - string
# campaign_id - string (nullable)
# send_id - string (nullable)
# time - int
# platform - string (nullable)
# device_model - string (nullable)
cc_impression_s3_bucket = '{YOUR_CURRENTS_BUCKET}'
cc_impression_s3_prefix = '{YOUR_CURRENTS_PREFIX}'
cc_impression_redshift_table = 'content_card_impression'
cc_impression_redshift_column_def = [
('id', 'text'),
('user_id', 'text'),
('external_user_id', 'text'),
('app_id', 'text'),
('content_card_id', 'text'),
('campaign_id', 'text'),
('send_id', 'text'),
('time', 'integer'),
('platform', 'text'),
('device_model', 'text')
]
cc_impression_batch_size = 1000
Editar esta página en GitHub