Snowflake 데이터 보존
Braze는 Snowflake에 저장된 2년 이상 된 대부분의 이벤트 데이터에서 개인 식별 정보(PII)를 익명화(제거)합니다. 이 페이지 뒷부분에 설명된 대로 특정 이벤트는 사용자가 삭제될 때까지 유지됩니다. Snowflake 데이터 공유를 사용하는 경우, 보존 정책이 적용되기 전에 Snowflake 계정에 사본을 저장하여 환경에서 전체 이벤트 데이터를 보존할 수 있습니다.
이 페이지에서는 익명화되지 않은 데이터를 보존하는 두 가지 방법을 소개합니다:
- 다른 Snowflake 데이터베이스로 데이터 복사
- 스테이지로 데이터 언로드
Braze는 데이터 보호 기술 지원에 설명된 대로 Braze에서 삭제된 사용자의 이벤트 데이터를 자동으로 익명화합니다. 공유 데이터베이스 외부로 복사된 데이터는 Braze가 더 이상 관리하지 않으므로 이 프로세스에 포함되지 않습니다.
2년 보존 정책에서 면제되는 이벤트
Braze는 사용자가 삭제될 때까지 사용자 라이프사이클, 구독 상태 및 인바운드 메시징과 관련된 이벤트를 보존합니다. 다음 이벤트는 표준 2년 보존 정책에서 제외됩니다:
users.UserOrphanusers.UserDeleteRequestusers.behaviors.subscription.GlobalStateChangeusers.behaviors.subscriptiongroup.StateChangeusers.messages.sms.InboundReceiveusers.messages.whatsapp.InboundReceive
다른 Snowflake 데이터베이스로 모든 데이터 복사
공유된 BRAZE_RAW_EVENTS 스키마에서 Snowflake의 다른 데이터베이스 및 스키마로 데이터를 복사하여 익명화되지 않은 데이터를 보존할 수 있습니다. 다음 단계를 따르세요:
- Snowflake 계정에서
COPY_BRAZE_SHARE프로시저를 생성합니다. 이 프로시저는 Braze가 공유한 모든 데이터를 Snowflake 내의 다른 데이터베이스 및 스키마로 복사하는 데 사용됩니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
CREATE PROCEDURE COPY_BRAZE_SHARE(
SOURCE_DATABASE STRING, -- Database name of the braze data share
SOURCE_SCHEMA STRING, -- Schema name of the braze data share
DESTINATION_DATABASE STRING, -- Name of the database to which you want to copy shared the data
DESTINATION_SCHEMA STRING, -- Name of the schema to which you want to copy shared the data
MAX_DATE DATE default DATEADD(year, -2, CURRENT_DATE()), -- Copy data on or before the maximum date default DATEADD(year, -2, CURRENT_DATE())
TABLE_NAME_FILTER STRING default 'USERS_%' -- Filter to select table that will be unloaded, default to 'USER_%'
)
RETURNS TABLE (TABLE_NAME STRING, SUCCESS BOOLEAN, INFO STRING)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'run'
AS
$$
import snowflake.snowpark as snowpark
from snowflake.snowpark.exceptions import SnowparkSQLException
def run(session: snowpark.Session, SOURCE_DATABASE: str, SOURCE_SCHEMA: str, DESTINATION_DATABASE: str, DESTINATION_SCHEMA: str, MAX_DATE: str, TABLE_NAME_FILTER: str):
result = []
-- Get the list of filtered table names
table_query = f"""
SELECT table_name
FROM {SOURCE_DATABASE}.INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '{SOURCE_SCHEMA}' AND table_name LIKE '{TABLE_NAME_FILTER}'
"""
tables = session.sql(table_query).collect()
-- Iterate through each table and copy data
for row in tables:
table_name = row['TABLE_NAME']
-- Skip archive tables
if table_name.endswith('_ARCHIVED'):
continue
-- Check if the destination table exists
check_table_query = f"""
SELECT COUNT(*) as count
FROM {DESTINATION_DATABASE}.INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '{DESTINATION_SCHEMA}' AND TABLE_NAME = '{table_name}'
"""
table_exists = session.sql(check_table_query).collect()[0]['COUNT'] > 0
if table_exists:
-- Find the current, most recent `SF_CREATED_AT` in the existing table
cur_max_date = None
date_query = f"""
SELECT MAX(SF_CREATED_AT) as CUR_MAX_DATE
FROM {DESTINATION_DATABASE}.{DESTINATION_SCHEMA}.{table_name}
"""
date_result = session.sql(date_query).collect()
if date_result:
cur_max_date = date_result[0]['CUR_MAX_DATE']
if cur_max_date:
-- If the destination table is not empty, only add data that is newer than `cur_max_date` and older than`MAX_DATE`
copy_query = f"""
INSERT INTO {DESTINATION_DATABASE}.{DESTINATION_SCHEMA}.{table_name}
SELECT * FROM {SOURCE_DATABASE}.{SOURCE_SCHEMA}.{table_name}
WHERE SF_CREATED_AT <= '{MAX_DATE}'
AND SF_CREATED_AT > '{cur_max_date}'
"""
else:
-- If the destination table is empty, copy all data before `MAX_DATE`
copy_query = f"""
INSERT INTO {DESTINATION_DATABASE}.{DESTINATION_SCHEMA}.{table_name}
SELECT * FROM {SOURCE_DATABASE}.{SOURCE_SCHEMA}.{table_name}
WHERE SF_CREATED_AT <= '{MAX_DATE}'
"""
else:
-- If the table doesn't exist, create it and copy data
copy_query = f"""
CREATE TABLE {DESTINATION_DATABASE}.{DESTINATION_SCHEMA}.{table_name} AS
SELECT * FROM {SOURCE_DATABASE}.{SOURCE_SCHEMA}.{table_name}
WHERE SF_CREATED_AT <= '{MAX_DATE}'
"""
try:
session.sql(copy_query).collect()
result.append([table_name, True, ""])
except SnowparkSQLException as e:
result.append([table_name, False, str(e)])
-- Return the results
return session.create_dataframe(result, schema=['TABLE_NAME', 'SUCCESS', 'INFO'])
$$;
- Snowflake 계정에서 아래 명령 중 하나를 실행하여 프로시저를 실행합니다.
기본적으로 이 프로시저는 모든 USERS_* 이벤트 유형에 대해 2년 이상 된 데이터를 백업합니다.
1
2
3
4
-- Copy all the rows that are two years or older in all the 'USERS_*' tables
-- from 'SOURCE_DB'.'SOURCE_SCHEMA' to 'DEST_DB'.'DEST_SCHEMA'
CALL COPY_BRAZE_SHARE('SOURCE_DB', 'SOURCE_SCHEMA', 'DEST_DB', 'DEST_SCHEMA')
필터를 지정하여 백업할 데이터의 기간을 선택하고, 테이블 이름 필터를 지정하여 선택한 이벤트 테이블만 백업할 수 있습니다.
1
2
3
4
-- Copy all the rows that are one year or older in all the 'USERS_BEHAVIORS_*' tables
-- from 'SOURCE_DB'.'SOURCE_SCHEMA' to 'DEST_DB'.'DEST_SCHEMA'
CALL COPY_BRAZE_SHARE('SOURCE_DB', 'SOURCE_SCHEMA', 'DEST_DB', 'DEST_SCHEMA', DATEADD(year, -1, CURRENT_DATE()), 'USERS_BEHAVIORS_%')
프로시저를 반복 실행하면 테이블에 이미 있는 최대 SF_CREATED_AT 값보다 큰 행만 백업하므로, 이미 백업한 행을 다시 복사하지 않습니다.
스테이지로 데이터 언로드
공유된 BRAZE_RAW_EVENTS 스키마에서 스테이지로 데이터를 언로드하여 익명화되지 않은 데이터를 보존할 수 있습니다. 다음 단계를 따르세요:
UNLOAD_BRAZE_SHARE프로시저를 생성합니다. 이 프로시저는 Braze가 공유한 모든 데이터를 지정된 스테이지로 복사하는 데 사용됩니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
CREATE PROCEDURE UNLOAD_BRAZE_SHARE(
SOURCE_DATABASE STRING, -- Database name of the braze data share
SOURCE_SCHEMA STRING, -- Schema name of the braze data share
STAGE_NAME STRING, -- Snowflake stage where the data will be unloaded
MIN_DATE DATE, -- Copy data from this date (inclusive)
MAX_DATE DATE, -- Copy data till this date (exclusive)
TABLE_NAME_FILTER STRING default 'USERS_%' -- Filter to select table that will be unloaded, default to 'USER_%'
)
RETURNS TABLE (TABLE_NAME STRING, SUCCESS BOOLEAN, INFO STRING)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'run'
AS
$$
import snowflake.snowpark as snowpark
from snowflake.snowpark.exceptions import SnowparkSQLException
def run(session: snowpark.Session, DATABASE_NAME: str, SCHEMA_NAME: str, STAGE_NAME: str, MIN_DATE: str, MAX_DATE: str, TABLE_NAME_FILTER: str):
result = []
if MIN_DATE >= MAX_DATE:
result.append(["MIN_DATE cannot be more recent than MAX_DATE", False, ""])
return session.create_dataframe(result, schema=['TABLE_NAME', 'SUCCESS', 'INFO'])
-- Get list of tables
table_query = f"""
SELECT TABLE_NAME
FROM {DATABASE_NAME}.INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '{SCHEMA_NAME}' AND TABLE_NAME LIKE '{TABLE_NAME_FILTER}'
"""
tables = session.sql(table_query).collect()
for table in tables:
table_name = table['TABLE_NAME']
-- Skip archive tables
if table_name.endswith('_ARCHIVED'):
continue
-- Create CSV file name
csv_file_name = f"{table_name}_{MIN_DATE}_{MAX_DATE}.csv"
-- Construct `COPY INTO` command with date filter
copy_cmd = f"""
COPY INTO @{STAGE_NAME}/{csv_file_name}
FROM (
SELECT *
FROM {DATABASE_NAME}.{SCHEMA_NAME}.{table_name}
WHERE SF_CREATED_AT >= TO_DATE('{MIN_DATE}') and SF_CREATED_AT < TO_DATE('{MAX_DATE}')
)
FILE_FORMAT = (TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY = '"')
HEADER = TRUE
OVERWRITE = FALSE
"""
-- Execute COPY INTO command
try:
session.sql(copy_cmd).collect()
result.append([table_name, True, csv_file_name])
except SnowparkSQLException as e:
result.append([table_name, False, str(e)])
return session.create_dataframe(result, schema=['TABLE_NAME', 'SUCCESS', 'INFO'])
$$;
- 아래 명령 중 하나를 실행하여 프로시저를 실행합니다.
기본적으로 이 프로시저는 USERS_ 접두사가 있는 모든 테이블을 복사합니다.
1
2
3
4
5
6
7
8
9
10
-- Create a Snowflake stage to store the file
create stage MY_EXPORT_STAGE;
-- Call the procedure
-- to unload date between '2020-01-01' and '2021-01-01'
-- from tables with 'USERS_' prefix in 'DATABASE_NAME'.'SCHEMA'
CALL UNLOAD_BRAZE_SHARE('DATABASE_NAME', 'SCHEMA', 'MY_EXPORT_STAGE', '2020-01-01', 2021-01-01');
-- List the files that are unloaded
LIST @MY_EXPORT_STAGE;
프로시저에 필터를 지정하여 특정 테이블만 언로드할 수 있습니다.
1
2
3
4
5
6
7
8
9
-- Create a Snowflake stage to store the file
create stage MY_EXPORT_STAGE;
-- Unload date between '2020-01-01' and '2021-01-01'
-- from tables with 'USERS_BEHAVIORS_' prefix in 'DATABASE_NAME'.'SCHEMA'
CALL EXPORT_BRAZE_SHARE_TO_STAGE('DATABASE_NAME', 'SCHEMA', 'MY_EXPORT_STAGE', '2020-01-01', 2021-01-01', 'USERS_BEHAVIORS_%');
-- List the files that are unloaded
LIST @MY_EXPORT_STAGE;
GitHub 에서 이 페이지를 편집합니다.