Snowpipe Integration
Once you start ingesting data to your S3 bucket, you need to setup Snowpipe so that Snowflake can fetch files from your S3 bucket, process and make it available for querying.
For each project that you have in Rakam, you need to set up different PIPE in Snowflake because we have one table for each project. We make use of the VARIANT type for the custom schema since it's the preferred way in Snowflake. VARIANT type makes use of columnar storage engine so it's efficient as using dedicated tables for each event type.
// switch to account user
CREATE SCHEMA ${YOUR_DATABASE}.rakam_events;
USE ${YOUR_DATABASE}.rakam_events;
CREATE STAGE rakam_events_${PROJECT_NAME}_stage
url='s3://${TERRAFORM_S3_OUTPUT}/${PROJECT_NAME}/'
file_format = (type='CSV', COMPRESSION='AUTO', FIELD_OPTIONALLY_ENCLOSED_BY = '\'', ESCAPE = NONE, ESCAPE_UNENCLOSED_FIELD = NONE, field_delimiter = none, record_delimiter = '\\n')
credentials = (AWS_KEY_ID = '${TERRAFORM_SNOWFLAKE_KEY_ID}' AWS_SECRET_KEY = '${TERRAFORM_SNOWFLAKE_SECRET}' );
CREATE TABLE events (_time TIMESTAMP_TZ, project TEXT, event_type TEXT, _user TEXT, server_time TIMESTAMP_TZ, _id TEXT, properties variant) cluster by LINEAR(EVENT_TYPE, CAST(_TIME AS DATE));
CREATE SEQUENCE rakam_event_id start = 1 increment = 1;
CREATE PIPE rakam_events_pipe_events auto_ingest=true as
COPY INTO events(_time, project, event_type, _user, server_time, properties, _id) from
(select concat(to_timestamp(cast(get(parse_json($1), '_time') as bigint), 3)::text, ' +0000')::timestamptz, cast(get(parse_json($1), '$schema') as text), cast(get(parse_json($1), '$table') as text), cast(get(parse_json($1), '_user') as text), CURRENT_TIMESTAMP, get(parse_json($1), 'properties'), rakam_event_id.nextval
from @rakam_events_stage);
If you want to import the historical events as well, just run the following snippet:
alter pipe rakam_events_pipe_${PROJECT_NAME} refresh
In order to automate the ingestion, you need to add a notification to your S3 bucket:
- Run the following query on SNOWFLAKE:
SHOW PIPES LIKE 'RAKAM_EVENTS_PIPE_${PROJECT_NAME}'
Copy the value as in the picture:
Go the S3 Bucket and under the Properties section add a new event notification: (direct link is https://s3.console.aws.amazon.com/s3/buckets/S3_BUCKET/?region=eu-west-2&tab=properties)
You're good to go!
Now check the PIPE status in Snowflake:
select SYSTEM$PIPE_STATUS( 'rakam_events_pipe_${PROJECT_NAME}' )
The relation between server_time
and _time
server_time
and _time
The value of server_time
compared to _time
may vary depending on the late arrival events, the buffer duration of the Kinesis Worker, and the performance of Snowpipe.
Late arrival events
If you're using our mobile SDKs in order to collect the data, the clients may not be able to send the event data to Rakam API as soon as you call rakam.logEvent
because of the network issues. The network is not reliable especially on low-end mobile devices so the SDKs retry sending the data periodically as long as the app is open. In that case, the SDKs may even send the event data a few days later so you may see the value of server_time
greater than _time
by a few days.
Buffer Duration of Kinesis Worker
The default value is 5 minutes in our Kinesis Worker which means that we batch all the events within 5 minutes window and send it to your S3 Bucket for Snowflake to ingest the data. If your data is huge, it might be more efficient to increase the buffer duration so that you end up with larger files and a better compression ratio.
Snowpipe
We make use of Snowpipe in order to ingest the data into Snowflake. In most cases, Snowpipe makes the data available within 5 minutes but you can keep track of the live status from here in case you see any delay collecting the data.
Updated about 4 years ago