Snowflake Snowpipe API in Python to ingest data from AWS S3 to Snowflake stage tables
Snowflake Snowpipe
Snowflake is cloud data warehouse like Redshift but it features in the seperation of computing resources and storage. Snowflake offers a solution called Snowpipe to help copy files from AWS S3 to Snowflake. There're many ways to do it e.g. setting up AWS SQS but I would like to discuss a more stable way which is using its snowflake-ingest python package.
Python code setup
I have created a class called SnowpipeAPI
shown below
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.error import IngestResponseError
class SnowpipeAPI(object):
# ...
def uploadFileList(self, file_list):
self.logger.info("We are going to load: \n" + str(file_list))
staged_file_list = [StagedFile(file_name, None)
for file_name in file_list]
self.logger.info("loading pipe: "+self.table['pipe'])
resp = self.ingest_manager.ingest_files(staged_file_list)
try:
resp = self.ingest_manager.ingest_files(staged_file_list)
except IngestResponseError as e:
self.logger.info(e)
self.slackbot.warn(e)
exit(1)
uploaded_files = None
if resp['responseCode'] == 'SUCCESS':
self.logger.info(
"Snowflake has received files and will start loading")
self.logger.info("Request id is : " + resp['requestId'])
try:
uploaded_files = self.checkPipeStatus(
datetime.datetime.now(), file_list)
except RecursionError as e:
self.logger.info(
"Fail to continue fetching upload status as {} ".format(str(e)))
self.slackbot.warn(
"Fail to continue fetching upload status as {} ".format(str(e)))
self.getHistoryReport()
else:
self.logger.info("Request from snowpipe api has failed")
self.slackbot.warn("Request from snowpipe api has failed")
return uploaded_files
def checkPipeStatus(self, startTime, file_list, uploaded_file_list=[]):
if (datetime.datetime.now() - startTime).total_seconds() > self.config['timeout']:
return uploaded_file_list
history_resp = self.ingest_manager.get_history()
uploaded_file_count = len(uploaded_file_list)
if len(history_resp['files']) > 0:
uploaded_file_list_per_request = [
f for f in history_resp['files'] if f['status'] == 'LOADED'
and f['complete'] == True]
uploaded_file_count += len(uploaded_file_list_per_request)
self.logger.info('Totally processed {} files'.format(
str(uploaded_file_count)))
uploaded_file_list += uploaded_file_list_per_request
filepathWithHour = False
if re.search(r"\d\d\d\d/\d\d/\d\d/\d\d", file_list[0]):
filepathWithHour = True
if ((uploaded_file_count + 1) == len(file_list) and filepathWithHour == False)
or ((uploaded_file_count + 24) == len(file_list) and filepathWithHour == True)
or (uploaded_file_count*2 == len(file_list))
or (uploaded_file_count == len(file_list)):
self.logger.info('Upload finished')
self.logger.info('Ingest Report:\n')
self.logger.info(history_resp)
return uploaded_file_list
else:
# wait for 20 seconds
self.logger.info("waiting for 5 sec")
time.sleep(5)
return self.checkPipeStatus(startTime, file_list, uploaded_file_list)
def getHistoryReport(self):
hour = datetime.timedelta(hours=1)
date = datetime.datetime.utcnow() - hour
history_range_resp = self.ingest_manager.get_history_range(
date.isoformat() + 'Z')
self.logger.info('\nHistory scan report: \n')
self.logger.info(history_range_resp)
# ...
There're three important methods in the class uploadFileList
, checkPipeStatus
and getHistoryReport
. I collect the files in S3 and pass them into uploadFileList
method. And then I use a tail recursion checkPipeStatus
to keep polling the status of the upload process. Once everything is finished , getHistoryReport
will print out a report of processed files with their status.
How to use this class
Below is a funciton to pass files to upload into the API method uploadFileList
using a higher order function get_file_list
to collect a list of the s3 files for a snowflake stage table. For example, you can put a list of files with same table columns layout in one s3 bucket. This function will then load all the files in the bucket into the stage table with a 'pipe' which I will talk about in next section.
# ...
def process_snowpipe_api(get_file_list, table):
logger.info("Creating snowpipe API client")
apiCLient = SnowpipeAPI(config.data, table)
file_list = get_file_list(table)
logger.info("Loading file list " + str(file_list))
if len(file_list) > 0:
uploaded_files = apiCLient.uploadFileList(file_list)
else:
logger.info("No new file to load")
uploaded_files = None
return uploaded_files
# ...
Snowflake
For exmaple, I have created a database role in snowflake called snowpipe1. And follow the below steps:
- Grant the required privileges on the database objects
grant usage on database data_lake to role snowpipe1;
grant usage on schema data_lake.xxx to role snowpipe1;
grant insert, select on DATA_LAKE.xxx.table_name to role snowpipe1;
- Create file formats. And grant usage to role SNOWPIPE1
- Create stage in sql. And then grant usage to role SNOWPIPE1
create stage SNOWPIPE_STAGE_FOR_X
file_format = SNOWPIPE_CSV_FOR_X
url = 's3://datalake-bucket/folder/'
credentials = (aws_key_id='xxx' aws_secret_key=xxx' );
- Create pipe and grant ownership. Please note once you grant ownership, your account can’t see the PIPE anymore. You need to switch to snowpipe account.
create pipe SNOWPIPE_PIPE_FOR_X as copy into DATA_LAKE.RPT.X from @SNOWPIPE_STAGE_FOR_X;
// pause the pipe first in order to switch ownership
ALTER PIPE SNOWPIPE_PIPE_FOR_X SET PIPE_EXECUTION_PAUSED=true;
grant ownership on pipe data_lake.rpt.SNOWPIPE_PIPE_FOR_X to role snowpipe1;
// change back the pipe to running status
select SYSTEM$PIPE_STATUS( 'SNOWPIPE_PIPE_FOR_X' );
ALTER PIPE SNOWPIPE_PIPE_FOR_X SET PIPE_EXECUTION_PAUSED=False;
select SYSTEM$PIPE_STATUS( 'SNOWPIPE_PIPE_FOR_X' );