Snowpipe Integration

Once you start ingesting data to your S3 bucket, you need to setup Snowpipe so that Snowflake can fetch files from your S3 bucket, process and make it available for querying.

For each project that you have in Rakam, you need to set up different PIPE in Snowflake because we have one table for each project. We make use of the VARIANT type for the custom schema since it's the preferred way in Snowflake. VARIANT type makes use of columnar storage engine so it's efficient as using dedicated tables for each event type.

// switch to account user

CREATE SCHEMA ${YOUR_DATABASE}.rakam_events;

USE ${YOUR_DATABASE}.rakam_events;

CREATE STAGE rakam_events_${PROJECT_NAME}_stage
    url='s3://${TERRAFORM_S3_OUTPUT}/${PROJECT_NAME}/'
    file_format = (type='CSV', COMPRESSION='AUTO', FIELD_OPTIONALLY_ENCLOSED_BY = '\'', ESCAPE = NONE, ESCAPE_UNENCLOSED_FIELD = NONE, field_delimiter = none, record_delimiter = '\\n')
    credentials = (AWS_KEY_ID = '${TERRAFORM_SNOWFLAKE_KEY_ID}' AWS_SECRET_KEY = '${TERRAFORM_SNOWFLAKE_SECRET}' );
    
CREATE TABLE events (_time TIMESTAMP_TZ, project TEXT, event_type TEXT, _user TEXT, server_time TIMESTAMP_TZ, _id TEXT, properties variant) cluster by LINEAR(EVENT_TYPE, CAST(_TIME AS DATE));

CREATE SEQUENCE rakam_event_id start = 1 increment = 1;

CREATE PIPE rakam_events_pipe_events auto_ingest=true as
COPY INTO events(_time, project, event_type, _user, server_time, properties, _id) from 
     (select concat(to_timestamp(cast(get(parse_json($1), '_time') as bigint), 3)::text, ' +0000')::timestamptz, cast(get(parse_json($1), '$schema') as text), cast(get(parse_json($1), '$table') as text), cast(get(parse_json($1), '_user') as text), CURRENT_TIMESTAMP, get(parse_json($1), 'properties'), rakam_event_id.nextval 
     from @rakam_events_stage);

If you want to import the historical events as well, just run the following snippet:

alter pipe rakam_events_pipe_${PROJECT_NAME} refresh

In order to automate the ingestion, you need to add a notification to your S3 bucket:

  1. Run the following query on SNOWFLAKE:
SHOW PIPES LIKE 'RAKAM_EVENTS_PIPE_${PROJECT_NAME}'

Copy the value as in the picture:

1454

Go the S3 Bucket and under the Properties section add a new event notification: (direct link is https://s3.console.aws.amazon.com/s3/buckets/S3_BUCKET/?region=eu-west-2&tab=properties)

1006

You're good to go!

Now check the PIPE status in Snowflake:

select SYSTEM$PIPE_STATUS( 'rakam_events_pipe_${PROJECT_NAME}' )

The relation between server_time and _time

The value of server_time compared to _time may vary depending on the late arrival events, the buffer duration of the Kinesis Worker, and the performance of Snowpipe.

Late arrival events

If you're using our mobile SDKs in order to collect the data, the clients may not be able to send the event data to Rakam API as soon as you call rakam.logEvent because of the network issues. The network is not reliable especially on low-end mobile devices so the SDKs retry sending the data periodically as long as the app is open. In that case, the SDKs may even send the event data a few days later so you may see the value of server_time greater than _time by a few days.

Buffer Duration of Kinesis Worker

The default value is 5 minutes in our Kinesis Worker which means that we batch all the events within 5 minutes window and send it to your S3 Bucket for Snowflake to ingest the data. If your data is huge, it might be more efficient to increase the buffer duration so that you end up with larger files and a better compression ratio.

Snowpipe

We make use of Snowpipe in order to ingest the data into Snowflake. In most cases, Snowpipe makes the data available within 5 minutes but you can keep track of the live status from here in case you see any delay collecting the data.