Continuous Data Ingestion Using Snowpipe in Snowflake for Amazon S3

Continuous Data Ingestion Using Snowpipe in Snowflake for Amazon S3

Published: None

Source: https://www.linkedin.com/pulse/continuous-data-ingestion-using-snowpipe-snowflake-amazon-mohapatra-omeuc?trackingId=ux5WBiGsSi2CFJi4kUVf3A%3D%3D


Continuous Data Ingestion Using Snowpipe in Snowflake for Amazon S3

Running Kafka streams after dark, diving into genetic code by daylight, and wrestling with Databricks and Tableflow in every spare moment—sleep is optional



Article content

USE WAREHOUSE LRN;

USE DATABASE LRN_DB;

USE SCHEMA LEARNING;

---Create a Table in snowflake as per the source data

CREATE OR REPLACE TABLE PEOPLE_TB (

"Index" INT,

"User Id" VARCHAR(100),

"First Name" VARCHAR(100),

"Last Name" VARCHAR(100),

"Sex" VARCHAR(100),

"Email" VARCHAR(100),

"Phone" VARCHAR(100),

"Date of birth" DATE,

"Job Title" VARCHAR(100)

Article content

--ALREADY created the file format that is CSVTYPE_DL and now just altering the property

--ALTER THE TIME_FORMAT = 'YYYY-MM-DD';

ALTER FILE FORMAT CSVTYPE_DL

SET FIELD_OPTIONALLY_ENCLOSED_BY = '"',

TIME_FORMAT = 'YYYY-MM-DD';



---CREATE THE STAGE WITH THE AWS S3 BUCKET URL

CREATE OR REPLACE STAGE EXT_STAGE_01

FILE_FORMAT = CSVTYPE_DL

URL = 's3://learningsnowflakedemo/sourcefolder/';



Article content


LIST @EXT_STAGE_01

--Failure using stage area. Cause: [Access Denied (Status Code: 403; Error Code: AccessDenied)]--Bcz no onnection establish between S3 BUCKET AND SNOWFLAKE


Article content

---Copy the data from External storage to Table by hardcoding the aws key and secret

COPY INTO PEOPLE_TB

FROM @EXT_STAGE_01

credentials=(aws_key_id='AYYYYY',aws_secret_key='XXXXX')


Article content

---Create a User and provide AWS S3 Access and downlaod the credentials

--Now by harcoding the aws_key_id & aws_secret_key data got moved from S3 to Snowflake

SELECT count(*) FROM PEOPLE_TB



Article content

-------------------------------------------STORAGE_INTEGRATION---------------------

------STORAGE_INTEGRATION parameter is required when creating an external stage because it provides the necessary credentials and configuration to access external storage services like Amazon S3, Google Cloud Storage, or Azure Blob Storage.-----

---When you set up a storage integration, you associate it with an external stage so that Snowflake knows how to securely access your S3 bucket (or other cloud storage systems) and use the appropriate credentials for that service.------

--------------------AWS ROLE CREATION------------------------------------

--Create a AWS ROLE WITH S3 FULL ACCESS AND PROVIDE THE EXTERNAL ID =99999 ANY VALUE FOR PLACE HOLDER

--make a note of a ARN of that role




Article content


------Update the trust Policy in the Role-----------------

--Go to the Role which one you created and edit the trust policy and update the STORAGE_AWS_EXTERNAL_ID property value with

"sts:ExternalId": and STORAGE_AWS_IAM_USER_ARN propert value with AWS under the principle and finally update the policy


Article content

-----CREATE A NEW STAGE TO LOAD THE DATA-----------

CREATE OR REPLACE STAGE BLR_STAGE

FILE_FORMAT = CSVTYPE_DL

STORAGE_INTEGRATION=S3_INGST

URL = 's3://learningsnowflakedemo/sourcefolder/'


LIST @BLR_STAGE

---Remove the files under the stages

REMOVE @BLR_STAGE


--Delete all the rows from the table

TRUNCATE TABLE PEOPLE_TB

--Now do the copy activity without hardcoed the aws key & aws secrets as STORAGE INTEGRATION will take care of the authonitcation


--Now do the copy activity without hardcoed the aws key & aws secrets as STORAGE INTEGRATION will take care of the authonitcation

COPY INTO PEOPLE_TB

FROM @BLR_STAGE

Article content

---Check the record loaded or not

SELECT count(*) FROM PEOPLE_TB

------------------------SNOW PIPE---------------------

---Snowpipe is a feature in Snowflake that allows you to automate the process of loading data into your Snowflake tables as soon as it is available in a specified external stage (such as an S3 bucket). The primary benefit of Snowpipe is that it provides continuous data ingestion with minimal manual intervention, allowing you to keep your data up-to-date without having to worry about batch processes.

--------------------------SNOW PIPE CREATION------------------

CREATE OR REPLACE PIPE AWSPIPE

AUTO_INGEST=TRUE

AS

COPY INTO PEOPLE_TB

FROM @BLR_STAGE


Article content

The output you've provided is related to Snowpipe, Snowflake's continuous data ingestion service that automatically loads data as soon as files arrive in a stage (such as an S3 bucket).


1. executionState: RUNNING

  • Significance: This indicates that Snowpipe is actively running and processing the data. The process of continuously checking the stage (S3 bucket, in your case) for new files is ongoing.

2. pendingFileCount: 0

  • Significance: This shows that there are no pending files waiting to be processed by Snowpipe. All files that have been staged (or queued) for loading are being processed, or there are no new files that need to be ingested.

3. notificationChannelName

  • Significance: This is the Amazon Simple Queue Service (SQS) notification channel used by Snowpipe to receive notifications when new files are added to the stage. Snowpipe is set to listen to this SQS queue, and it will pull notifications to trigger the loading process when a new file is detected.
  • The ARN (arn:aws:sqs:us-east-1:724772052480:sf-snowpipe-AIDA2RP6H3YAKSUYT7OYS-H7ejuFDAkI1QY-syzs-J7w) represents the unique identifier for the SQS queue associated with Snowpipe.

4. numOutstandingMessagesOnChannel: 0

  • Significance: This indicates that there are no outstanding messages in the SQS queue at the moment. In other words, Snowpipe has already processed all notifications (or there are no new notifications pending). This is generally a positive sign, indicating that the process is not currently backed up.

5. lastPulledFromChannelTimestamp: 2025-02-23T04:04:45.626Z

  • Significance: This timestamp shows the last time Snowpipe pulled a notification from the SQS channel. The time is in UTC (Coordinated Universal Time) format, and it tells you when Snowpipe last checked the SQS queue for any new file notification or new data to ingest.

In Summary:

  • Snowpipe is running and actively processing files.
  • There are no pending files waiting to be ingested.
  • No messages are currently queued in the SQS notification channel, meaning Snowpipe has either already processed any new files or there are no new files awaiting processing.
  • The last time Snowpipe checked for new data was the timestamp provided (2025-02-23T04:04:45.626Z).


Create A Event notofication for S3 - Whenever the S3 will update one nottification will trigger to the destination SQS and Enter Lambda function ARN - copy it from Notification channel ID

SHOW PIPES

Copy the Notification Channel

--Upload the same schema format data to the S3 bucket and check the table

SELECT COUNT(*) FROM PEOPLE_TB--COUNT WLL BE 200

SELECT SYSTEM$PIPE_STATUS('AWSPIPE');--CHECK THE STATUS

SHOW PIPES


Reference:

https://www.youtube.com/watch?v=TPmtS-MDcsc&t=363s

https://docs.snowflake.com/en/user-guide/data-load-s3

Comments