Thursday, November 25, 2021

Snowflake on AWS - load data from S3


  • On AWS: Create IAM User Group and a User for a dedicated Snowflake user
  • Create a User Role granting it AmazonS3FullAccess policy and a trust entity of “Another AWS account”. Use the root account ID in “Specify accounts that can use this role”. Select “Require external ID” and temporarily set External ID an arbitrary value (any).

create or replace storage integration s3_integr

  type = external_stage

  storage_provider = s3

  enabled = true

  storage_aws_role_arn = '<ARN of the AWS role created above>'

  storage_allowed_locations = ('s3://<Path to the data file folder on S3>');

  • Run the command to describe the integration

DESC INTEGRATION s3_integr;

  • Extract STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID values from the output and include these in the trust relationship JSON of the Snowflake user role (on AWS IAM):

{

  "Version": "2012-10-17",

  "Statement": [

    {

      "Effect": "Allow",

      "Principal": {

        "AWS": "<STORAGE_AWS_IAM_USER_ARN here>"

      },

      "Action": "sts:AssumeRole",

      "Condition": {

        "StringEquals": {

          "sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID here>

        }

      }

    }

  ]

}

  • Crate a new database

CREATE or replace DATABASE CUSTOM_DB;

  • To manually load the data - follow the steps immediately below. For continuous data load with Snowpipe scroll further down.

Loading CSV data

  create or replace file format custom_db.public.csv_format

                    type = csv

                    skip_header = 1

                    null_if = ('NULL', 'null')

                    empty_field_as_null = true;

create or replace stage custom_db.public.ext_csv_stage

  URL = ' s3://<Path to the csv file/folder on S3>'

  STORAGE_INTEGRATION = s3_integr

  file_format = custom_db.public.csv_format;    

  • Once loaded – the stage and file format is visible under Databases->CUSTOM_DB->Stages/File Formats. Can use Grant Privileges to share these. 

CREATE or replace TABLE CUSTOM_DATA_CSV(

     CUSTOM_FIELD_1    NUMBER(38,6) 

   , CUSTOM_FIELD_2    NUMBER(38,6) 

   , CUSTOM_FIELD_3    VARCHAR(256) 

   , CUSTOM_FIELD_4    NUMBER(38,1) 

   , CUSTOM_FIELD_5    VARCHAR(256) 

   , CUSTOM_FIELD_6    VARCHAR(128)

   , CUSTOM_FIELD_7    NUMBER(38,8) 

);

copy into CUSTOM_DATA_CSV

from @custom_db.public.ext_csv_stage

on_error = CONTINUE;



Loading PARQUET data

  • Create a file format for PARQUET data import

create or replace file format custom_db.public.parquet_format

  type = 'parquet';

create or replace stage custom_db.public.ext_parquet_stage

  URL = ' s3://<Path to the parquet file/folder on S3>'

  STORAGE_INTEGRATION = s3_integr

  file_format = custom.public.parquet_format;

  • Crate a new custom table for parquet data

CREATE or replace TABLE CUSTOM_DATA_PARQUET(

     CUSTOM_FIELD_1    VARCHAR(150)

   , CUSTOM_FIELD_2    VARCHAR(150)

   , CUSTOM_FIELD_3    VARCHAR(150)

   , CUSTOM_FIELD_4    VARCHAR(150)

   , CUSTOM_FIELD_5    VARCHAR(150)

   , CUSTOM_FIELD_6    VARCHAR(150)

   , CUSTOM_FIELD_7    VARCHAR(150)

   ,filename           VARCHAR

   ,file_row_number    VARCHAR

   ,load_timestamp timestamp default TO_TIMESTAMP_NTZ(current_timestamp)

);

  • Ingest data from S3

copy into custom_db.public.custom_data_parquet

from (select

$1: parquet_custom_field_1::varchar,

$1: parquet_custom_field_2::varchar,

$1: parquet_custom_field_3::varchar,

$1: parquet_custom_field_4::varchar,

$1: parquet_custom_field_5::varchar,

$1: parquet_custom_field_6::varchar,

$1:"Optionally_exact_spelling_of_fieldname_from_parquet_in_double_quotes"::varchar,

METADATA$FILENAME,

METADATA$FILE_ROW_NUMBER,

TO_TIMESTAMP_NTZ(current_timestamp)

from @custom_db.public.ext_parquet_stage);

Loading JSON data

  • Create a file format for JSON data import

create or replace file format custom_db.public.json_format

  type = 'json';

create or replace stage custom_db.public.ext_json_stage

  URL = ' s3://<Path to the JSON file/folder on S3>'

  STORAGE_INTEGRATION = s3_integr

  file_format = custom.public.json_format;

  • Create a new custom table for parquet data

CREATE or replace TABLE CUSTOM_DATA_JSON(

    id VARCHAR(50)

   , CUSTOM_FIELD_1    VARCHAR(150)

   , CUSTOM_FIELD_2    VARCHAR(150)

   , CUSTOM_FIELD_3    INTEGER

   , CUSTOM_FIELD_4    FLOAT

   , CUSTOM_FIELD_5    VARCHAR(150)

   , CUSTOM_FIELD_6    VARCHAR(128)

   , CUSTOM_FIELD_7    VARCHAR(128)

   ,filename    VARCHAR

   ,file_row_number VARCHAR

   ,load_timestamp timestamp default TO_TIMESTAMP_NTZ(current_timestamp)

);

  • Ingest data from S3. Using case-sensitive field names that match the exact JSON construct content – including trailing spaces, etc.

copy into custom_db.public.custom_data_json

from (select

$1:"_id"::varchar,

$1:" Custom Field 1 "::varchar,

$1:" Custom Field 2 "::varchar,

$1:" Custom Field 3 "::integer,

$1:" Custom Field 4 "::float,

$1:" Custom Field 5 "::varchar,

$1:" Custom Field 6 "::varchar,

$1:" Custom Field 7 "::varchar,

METADATA$FILENAME,

METADATA$FILE_ROW_NUMBER,

TO_TIMESTAMP_NTZ(current_timestamp)

from @custom_db.public.ext_json_stage);


Snowpipe

  • https://docs.snowflake.com/en/user-guide/data-load-snowpipe-intro.html#data-duplication
  • Serverless data load mechanism
  • Enables loading data from files as soon as they’re available in a stage. 
  • Leverages event notifications (ex: AWS SQS) for cloud storage to inform Snowpipe of the arrival of new data files to load. A dedicated COPY statement identifies the source location of the data files (i.e., a stage) and a target table. The user is billed for the compute each load consumes
Loading CSV file(s) data
  • Similar to the manual load, steps to create storage integration, TABLE, csv file format and stage are required
  • Same commands as above can be re-used making sure that URL parameter in stage definition points to the S3 folder where csv files will be dropped. Re-running integration command can re-generate STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID. Make sure that User Role Trusl relationship is updated accordingly
  • Define the actual pipe. Note that this contains the exact same copy command as used for manual loads

create or replace pipe custom_db.public.myawesomepipe auto_ingest=true as

copy into custom_data_csv

from @custom_db.public.ext_csv_stage

on_error = CONTINUE;

  • Run the show command to display the list of created pipes. Locate the newly created pipe and extract the value of "notification_channel" column. This is the ARN of the SQS topic that the pipe will use. 

show pipes;

  • Copy this value into S3->Bucket that the data files will be uploaded into->Properties-Event notifications. Create new event notification with destination of SQS Queue and supply the ARN copied from the Snowflake show pipe output.
Loading PARQUET file(s) data
  • The steps are very much similar to the above CSV pipeline and manual PARQUET load examples
  • Ensure that the stage defined during the manual PARQUET load above contains the URL of the folder and not the individual file
  • Same commands as above can be re-used. Making sure that URL parameter in stage definition points to the location (folder?) withing the bucket where parquet files will be uploaded into.
  • Create a snowpipe definition - or update an existing one using create or replace pipe followed by the same copy command used to manually load the parquet files

create or replace pipe custom_db.public.myawesomepipe auto_ingest=true as

copy into custom_db.public.custom_data_parquet

from (select

$1: parquet_custom_field_1::varchar,

$1: parquet_custom_field_2::varchar,

$1: parquet_custom_field_3::varchar,

$1: parquet_custom_field_4::varchar,

$1: parquet_custom_field_5::varchar,

$1: parquet_custom_field_6::varchar,

$1:"Optionally_exact_spelling_of_fieldname_from_parquet_in_double_quotes"::varchar,

METADATA$FILENAME,

METADATA$FILE_ROW_NUMBER,

TO_TIMESTAMP_NTZ(current_timestamp)

from @custom_db.public.ext_parquet_stage);

  • The SQS queue ARN will remain unchanged from what was generated in the above CVS pipe run. If this is the first time a Snowpipe is defined, see steps above on copying updating the Event notifications on the S3 bucket.
Loading JSON file(s) data
  • Similar to parquet and CSV steps, JSON can be loaded using the combination create or replace pipe and the copy into from the manual CSV load.
  • Need to make sure the table, file format and stage are defined correctly


No comments:

Post a Comment