Continuous Data Ingestion Using Snowpipe in Snowflake for Amazon S3
Continuous Data Ingestion Using Snowpipe in Snowflake for Amazon S3
Published: None
Continuous Data Ingestion Using Snowpipe in Snowflake for Amazon S3
USE WAREHOUSE LRN;
USE DATABASE LRN_DB;
USE SCHEMA LEARNING;
---Create a Table in snowflake as per the source data
CREATE OR REPLACE TABLE PEOPLE_TB (
"Index" INT,
"User Id" VARCHAR(100),
"First Name" VARCHAR(100),
"Last Name" VARCHAR(100),
"Sex" VARCHAR(100),
"Email" VARCHAR(100),
"Phone" VARCHAR(100),
"Date of birth" DATE,
"Job Title" VARCHAR(100)
--ALREADY created the file format that is CSVTYPE_DL and now just altering the property
--ALTER THE TIME_FORMAT = 'YYYY-MM-DD';
ALTER FILE FORMAT CSVTYPE_DL
SET FIELD_OPTIONALLY_ENCLOSED_BY = '"',
TIME_FORMAT = 'YYYY-MM-DD';
---CREATE THE STAGE WITH THE AWS S3 BUCKET URL
CREATE OR REPLACE STAGE EXT_STAGE_01
FILE_FORMAT = CSVTYPE_DL
URL = 's3://learningsnowflakedemo/sourcefolder/';
LIST @EXT_STAGE_01
--Failure using stage area. Cause: [Access Denied (Status Code: 403; Error Code: AccessDenied)]--Bcz no onnection establish between S3 BUCKET AND SNOWFLAKE
---Copy the data from External storage to Table by hardcoding the aws key and secret
COPY INTO PEOPLE_TB
FROM @EXT_STAGE_01
credentials=(aws_key_id='AYYYYY',aws_secret_key='XXXXX')
---Create a User and provide AWS S3 Access and downlaod the credentials
--Now by harcoding the aws_key_id & aws_secret_key data got moved from S3 to Snowflake
SELECT count(*) FROM PEOPLE_TB
-------------------------------------------STORAGE_INTEGRATION---------------------
------STORAGE_INTEGRATION parameter is required when creating an external stage because it provides the necessary credentials and configuration to access external storage services like Amazon S3, Google Cloud Storage, or Azure Blob Storage.-----
---When you set up a storage integration, you associate it with an external stage so that Snowflake knows how to securely access your S3 bucket (or other cloud storage systems) and use the appropriate credentials for that service.------
--------------------AWS ROLE CREATION------------------------------------
--Create a AWS ROLE WITH S3 FULL ACCESS AND PROVIDE THE EXTERNAL ID =99999 ANY VALUE FOR PLACE HOLDER
--make a note of a ARN of that role
------Update the trust Policy in the Role-----------------
--Go to the Role which one you created and edit the trust policy and update the STORAGE_AWS_EXTERNAL_ID property value with
"sts:ExternalId": and STORAGE_AWS_IAM_USER_ARN propert value with AWS under the principle and finally update the policy
-----CREATE A NEW STAGE TO LOAD THE DATA-----------
CREATE OR REPLACE STAGE BLR_STAGE
FILE_FORMAT = CSVTYPE_DL
STORAGE_INTEGRATION=S3_INGST
URL = 's3://learningsnowflakedemo/sourcefolder/'
LIST @BLR_STAGE
---Remove the files under the stages
REMOVE @BLR_STAGE
--Delete all the rows from the table
TRUNCATE TABLE PEOPLE_TB
--Now do the copy activity without hardcoed the aws key & aws secrets as STORAGE INTEGRATION will take care of the authonitcation
--Now do the copy activity without hardcoed the aws key & aws secrets as STORAGE INTEGRATION will take care of the authonitcation
COPY INTO PEOPLE_TB
FROM @BLR_STAGE
---Check the record loaded or not
SELECT count(*) FROM PEOPLE_TB
------------------------SNOW PIPE---------------------
---Snowpipe is a feature in Snowflake that allows you to automate the process of loading data into your Snowflake tables as soon as it is available in a specified external stage (such as an S3 bucket). The primary benefit of Snowpipe is that it provides continuous data ingestion with minimal manual intervention, allowing you to keep your data up-to-date without having to worry about batch processes.
--------------------------SNOW PIPE CREATION------------------
CREATE OR REPLACE PIPE AWSPIPE
AUTO_INGEST=TRUE
AS
COPY INTO PEOPLE_TB
FROM @BLR_STAGE
The output you've provided is related to Snowpipe, Snowflake's continuous data ingestion service that automatically loads data as soon as files arrive in a stage (such as an S3 bucket).
1. executionState: RUNNING
- Significance: This indicates that Snowpipe is actively running and processing the data. The process of continuously checking the stage (S3 bucket, in your case) for new files is ongoing.
2. pendingFileCount: 0
- Significance: This shows that there are no pending files waiting to be processed by Snowpipe. All files that have been staged (or queued) for loading are being processed, or there are no new files that need to be ingested.
3. notificationChannelName
- Significance: This is the Amazon Simple Queue Service (SQS) notification channel used by Snowpipe to receive notifications when new files are added to the stage. Snowpipe is set to listen to this SQS queue, and it will pull notifications to trigger the loading process when a new file is detected.
- The ARN (arn:aws:sqs:us-east-1:724772052480:sf-snowpipe-AIDA2RP6H3YAKSUYT7OYS-H7ejuFDAkI1QY-syzs-J7w) represents the unique identifier for the SQS queue associated with Snowpipe.
4. numOutstandingMessagesOnChannel: 0
- Significance: This indicates that there are no outstanding messages in the SQS queue at the moment. In other words, Snowpipe has already processed all notifications (or there are no new notifications pending). This is generally a positive sign, indicating that the process is not currently backed up.
5. lastPulledFromChannelTimestamp: 2025-02-23T04:04:45.626Z
- Significance: This timestamp shows the last time Snowpipe pulled a notification from the SQS channel. The time is in UTC (Coordinated Universal Time) format, and it tells you when Snowpipe last checked the SQS queue for any new file notification or new data to ingest.
In Summary:
- Snowpipe is running and actively processing files.
- There are no pending files waiting to be ingested.
- No messages are currently queued in the SQS notification channel, meaning Snowpipe has either already processed any new files or there are no new files awaiting processing.
- The last time Snowpipe checked for new data was the timestamp provided (2025-02-23T04:04:45.626Z).
Create A Event notofication for S3 - Whenever the S3 will update one nottification will trigger to the destination SQS and Enter Lambda function ARN - copy it from Notification channel ID
SHOW PIPES
Copy the Notification Channel
--Upload the same schema format data to the S3 bucket and check the table
SELECT COUNT(*) FROM PEOPLE_TB--COUNT WLL BE 200
SELECT SYSTEM$PIPE_STATUS('AWSPIPE');--CHECK THE STATUS
SHOW PIPES
Reference:
Comments
Post a Comment