- On AWS: Create IAM User Group and a User for a dedicated Snowflake user
- Create a User Role granting it AmazonS3FullAccess policy and a trust entity of “Another AWS account”. Use the root account ID in “Specify accounts that can use this role”. Select “Require external ID” and temporarily set External ID an arbitrary value (any).
- On Snowflake - create an integration object
- https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html
create or replace storage
integration s3_integr
type = external_stage
storage_provider = s3
enabled = true
storage_aws_role_arn = '<ARN of the AWS
role created above>'
storage_allowed_locations = ('s3://<Path to the data file folder on S3>');
- Run the command to describe the integration
DESC INTEGRATION s3_integr;
- Extract STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID values from the output and include these in the trust relationship JSON of the Snowflake user role (on AWS IAM):
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "<STORAGE_AWS_IAM_USER_ARN
here>"
},
"Action":
"sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID here>
}
}
}
]
}
- Crate a new database
CREATE or replace DATABASE
CUSTOM_DB;
- To manually load the data - follow the steps immediately below. For continuous data load with Snowpipe scroll further down.
Loading CSV data
- Create a file format for CSV data import
- https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html
create or replace file format custom_db.public.csv_format
type = csv
skip_header = 1
null_if = ('NULL', 'null')
empty_field_as_null = true;
- Create stage object for loading data. Stage is intermediate space a data file is uploaded into prior to copying its contents into a table.
- https://docs.snowflake.com/en/sql-reference/sql/create-stage.html
create or replace stage
custom_db.public.ext_csv_stage
URL = ' s3://<Path to the csv file/folder on S3>'
STORAGE_INTEGRATION = s3_integr
file_format = custom_db.public.csv_format;
- Once loaded – the stage and file format is visible under Databases->CUSTOM_DB->Stages/File Formats. Can use Grant Privileges to share these.
- Create a new custom table for csv data
- https://docs.snowflake.com/en/sql-reference/sql/create-table.html
CREATE or replace TABLE CUSTOM_DATA_CSV(
CUSTOM_FIELD_1 NUMBER(38,6)
, CUSTOM_FIELD_2 NUMBER(38,6)
, CUSTOM_FIELD_3 VARCHAR(256)
, CUSTOM_FIELD_4 NUMBER(38,1)
, CUSTOM_FIELD_5 VARCHAR(256)
, CUSTOM_FIELD_6 VARCHAR(128)
, CUSTOM_FIELD_7 NUMBER(38,8)
);
- Ingest data from S3
- https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
copy into CUSTOM_DATA_CSV
from
@custom_db.public.ext_csv_stage
on_error = CONTINUE;
Loading PARQUET data
- Create a file format for PARQUET data import
create or replace file
format custom_db.public.parquet_format
type = 'parquet';
- Create stage object for loading data
- https://docs.snowflake.com/en/sql-reference/sql/create-stage.html
create or replace stage custom_db.public.ext_parquet_stage
URL = ' s3://<Path to the parquet file/folder
on S3>'
STORAGE_INTEGRATION = s3_integr
file_format = custom.public.parquet_format;
- Crate a new custom table for parquet data
CREATE or replace TABLE CUSTOM_DATA_PARQUET(
CUSTOM_FIELD_1 VARCHAR(150)
, CUSTOM_FIELD_2 VARCHAR(150)
, CUSTOM_FIELD_3 VARCHAR(150)
, CUSTOM_FIELD_4 VARCHAR(150)
, CUSTOM_FIELD_5 VARCHAR(150)
, CUSTOM_FIELD_6 VARCHAR(150)
, CUSTOM_FIELD_7 VARCHAR(150)
,filename VARCHAR
,file_row_number VARCHAR
,load_timestamp timestamp default
TO_TIMESTAMP_NTZ(current_timestamp)
);
- Ingest data from S3
copy into custom_db.public.custom_data_parquet
from (select
$1: parquet_custom_field_1::varchar,
$1: parquet_custom_field_2::varchar,
$1: parquet_custom_field_3::varchar,
$1: parquet_custom_field_4::varchar,
$1: parquet_custom_field_5::varchar,
$1: parquet_custom_field_6::varchar,
$1:"Optionally_exact_spelling_of_fieldname_from_parquet_in_double_quotes"::varchar,
METADATA$FILENAME,
METADATA$FILE_ROW_NUMBER,
TO_TIMESTAMP_NTZ(current_timestamp)
from @custom_db.public.ext_parquet_stage);
- Note the direct copy from METADATA file for FILENAME and FILE_ROW_NUMBER
- https://docs.snowflake.com/en/user-guide/querying-metadata.html#metadata-columns
- Select is used to extract data from the stage file. This is useful technique when specific data or data from multiple files (use join) needs to be extracted.
- https://docs.snowflake.com/en/user-guide/data-unload-overview.html
Loading JSON data
- Create a file format for JSON data import
create or replace file
format custom_db.public.json_format
type = 'json';
- Create stage object for loading data
- https://docs.snowflake.com/en/sql-reference/sql/create-stage.html
create or replace stage custom_db.public.ext_json_stage
URL = ' s3://<Path to the JSON file/folder
on S3>'
STORAGE_INTEGRATION = s3_integr
file_format = custom.public.json_format;
- Create a new custom table for parquet data
CREATE or replace TABLE CUSTOM_DATA_JSON(
id VARCHAR(50)
, CUSTOM_FIELD_1 VARCHAR(150)
, CUSTOM_FIELD_2 VARCHAR(150)
, CUSTOM_FIELD_3 INTEGER
, CUSTOM_FIELD_4 FLOAT
, CUSTOM_FIELD_5 VARCHAR(150)
, CUSTOM_FIELD_6 VARCHAR(128)
, CUSTOM_FIELD_7 VARCHAR(128)
,filename
VARCHAR
,file_row_number VARCHAR
,load_timestamp timestamp default
TO_TIMESTAMP_NTZ(current_timestamp)
);
- Ingest data from S3. Using case-sensitive field names that match the exact JSON construct content – including trailing spaces, etc.
copy into custom_db.public.custom_data_json
from (select
$1:"_id"::varchar,
$1:" Custom Field 1
"::varchar,
$1:" Custom Field 2 "::varchar,
$1:" Custom Field 3 "::integer,
$1:" Custom Field 4 "::float,
$1:" Custom Field 5 "::varchar,
$1:" Custom Field 6 "::varchar,
$1:" Custom Field 7 "::varchar,
METADATA$FILENAME,
METADATA$FILE_ROW_NUMBER,
TO_TIMESTAMP_NTZ(current_timestamp)
from @custom_db.public.ext_json_stage);
Snowpipe
- https://docs.snowflake.com/en/user-guide/data-load-snowpipe-intro.html#data-duplication
- Serverless data load mechanism
- Enables loading data from files as soon as they’re available in a stage.
- Leverages event notifications (ex: AWS SQS) for cloud storage to inform Snowpipe of the arrival of new data files to load. A dedicated COPY statement identifies the source location of the data files (i.e., a stage) and a target table. The user is billed for the compute each load consumes
- Similar to the manual load, steps to create storage integration, TABLE, csv file format and stage are required
- Same commands as above can be re-used making sure that URL parameter in stage definition points to the S3 folder where csv files will be dropped. Re-running integration command can re-generate STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID. Make sure that User Role Trusl relationship is updated accordingly
- Define the actual pipe. Note that this contains the exact same copy command as used for manual loads
create or replace pipe custom_db.public.myawesomepipe auto_ingest=true as
copy into custom_data_csv
from @custom_db.public.ext_csv_stage
on_error = CONTINUE;
- Run the show command to display the list of created pipes. Locate the newly created pipe and extract the value of "notification_channel" column. This is the ARN of the SQS topic that the pipe will use.
show pipes;
- Copy this value into S3->Bucket that the data files will be uploaded into->Properties-Event notifications. Create new event notification with destination of SQS Queue and supply the ARN copied from the Snowflake show pipe output.
- The steps are very much similar to the above CSV pipeline and manual PARQUET load examples
- Ensure that the stage defined during the manual PARQUET load above contains the URL of the folder and not the individual file
- Same commands as above can be re-used. Making sure that URL parameter in stage definition points to the location (folder?) withing the bucket where parquet files will be uploaded into.
- Create a snowpipe definition - or update an existing one using create or replace pipe followed by the same copy command used to manually load the parquet files
create or replace pipe custom_db.public.myawesomepipe auto_ingest=true as
copy into custom_db.public.custom_data_parquet
from (select
$1: parquet_custom_field_1::varchar,
$1: parquet_custom_field_2::varchar,
$1: parquet_custom_field_3::varchar,
$1: parquet_custom_field_4::varchar,
$1: parquet_custom_field_5::varchar,
$1: parquet_custom_field_6::varchar,
$1:"Optionally_exact_spelling_of_fieldname_from_parquet_in_double_quotes"::varchar,
METADATA$FILENAME,
METADATA$FILE_ROW_NUMBER,
TO_TIMESTAMP_NTZ(current_timestamp)
from @custom_db.public.ext_parquet_stage);
- The SQS queue ARN will remain unchanged from what was generated in the above CVS pipe run. If this is the first time a Snowpipe is defined, see steps above on copying updating the Event notifications on the S3 bucket.
- Similar to parquet and CSV steps, JSON can be loaded using the combination create or replace pipe and the copy into from the manual CSV load.
- Need to make sure the table, file format and stage are defined correctly
No comments:
Post a Comment