If you need a scalable solution that you can deploy to your own cluster, you may consider to use this deployment type. This deployment type uses multiple open-source software in order to process the event dataset and save it wherever you want.
Recently, we switched from Kafka to Kinesis, it's cheaper, reliable and managed service alternative of AWS that can be used as distributed commit-log similar to Kafka. The concept is exactly the same and you don't have to deal with master election, Zookeeper and multiple nodes for reliability. Kafka is great itself but in order to be able to use it efficiently, you need at least 3 Zookeeper server and 3 other Kafka instance. If you're not dealing with a few billions of events which are not small enough to fit in a few hundreds of bytes, Kafka is cheaper than Kinesis but otherwise Kinesis can be used as an alternative of Kafka.
At first, the events are converted to a compact data structure using Apache Avro by Rakam and then they are sent to the to EventStore implementation that basically serialize that data structure and send the byte array to Kafka partitions based on their event collections. Kafka acts as distributed commit-log, we basically push all data to Kafka and periodically process and store the data in a columnar format in a distributed file-system such as Hadoop using PrestoDB.Since inserting rows one-by-one to columnar databases is not usually efficient, we use micro-batching technique using Kafka.Since it's not actually a queue, we can also replay micro-batches if the nodes fail during data processing phase.The process is basically as follows:
- Serialize event to byte sequence using Apache Avro
- Send byte sequence to Kafka based on the event collection name. (We use collection names as Kafka topics)
- Execute a query periodically that fetches data from Kafka with last offset value, de-serializes it and appends to columnar storage file.
- Execute continuous queries periodically that pulls data from Kafka, process it and update the state machine in-memory by merging the result with existing data using presto-streaming connector in Presto. (Stream processing)
- Save last offset of processed rows in Kafka to metadata server.
There are multiple components in this system: metadata server (for event collection schemas and Kafka offsets), Kafka, Zookeeper, PrestoDB and Hadoop.
The Collection API handles serialization and sending events to Kafka. When events are sent to Kafka, the API send acknowledgment message to the client. Rakam uses Apache Avro for serialization and a Mysql or Postgresql metadata server for event schema registry. When a new field is sent to Rakam, the schema registry will alter the schema and update the Apache Avro schema. Currently, the system uses only one Apache Avro schema for an event collection. New fields will be appended to the schema and the updated schema will be used for the system for deserializing/serializing all the event data. If the serialized data does not have new fields (i.e. serialized using previous schema that doesn't include new field) the value of new fields will be NULL.
The master node in Rakam schedules a periodic task that execute a PrestoDB query that pulls data from Kafka using the last offset, update continious query tables using presto-streaming connector in Presto and save the event dataset in columnar format in a distributed file system such as Hadoop or Amazon EFS. PrestoDB also supports Amazon S3 as backend storage.
We re-write queries that are sent to Analysis API before executing on PrestoDB query engine in order to prevent users to execute queries on projects in which they're not authorized to access. We use PrestoDB parser written in ANTLR, and rewrite the query using AST generated by ANTLR. Similar to the other deployment types, continuous query tables use
continuous prefix and materialized query tables use
When the query is re-written, Rakam send the query directly to PrestoDB and it execute the query on event dataset by fetching data from distributed file-system to the worker nodes.
Fortunately, PrestoDB supports CREATE TABLE AS syntax that materialize query results to an external table. We use that feature internally for materialized query table. The materialized query tables uses a special prefix and stored in same schema with event collections. The query parser re-writes the table names that corresponds to the internal name of the materialized query tables. When the materialized query table is refreshed, we
DELETE the whole table and perform the same
CREATE TABLE AS query that materialize the last snapshot.
We use an in-house PrestoDB connector called presto-streaming internally for continious processing. It uses internal PrestoDB aggregators that PrestoDB uses for aggregation queries however unlike the way PrestoDB works, the presto-streaming persists aggregators in-memory and exposes them as PrestoDB tables. We process the micro-batches periodically by fetching data from Kafka and feed the table with pre-aggregated data. The table can be thought as state machines, every time we feed the table it updates its state so that it does incremental computation. As a result, PrestoDB connector works like a stream processing engine. You can find the technical details in presto-streaming page.
Your can plug in your user database or setup a new Postgresql database for user database. In order to be able to JOIN between event dataset and user data, you also need to add your database to PrestoDB as connector.
This deployment type does not offer any solution for CRM module. We suggest Postgresql for this purpose since you can use PrestoDB as event database and Postgresql for CRM module. You just need to configure the settings in config.properties file.
There's nothing fancy here, it uses continuous query tables under the hood.
When a user subscribe an event stream, we automatically record the last offset in Kafka for the subscribed event collections.
Since Apache Kafka already uses Zookeeper for coordination, we take advantage of Zookeeper since it's possible to elect master node easily using client libraries for Zookeeper. We use Curator framework's master election feature that is based on Zookeeper distributed locks.
The master node takes care of Kafka offsets. It executes queries that pulls data from Kafka with last committed offset and process it on PrestoDB. If the query succeeds, it updates the last offset periodically.
We extensively use Guice for dependency injection and Java ServiceLoader for registering modules. Most of the core features are also implemented as modules but Rakam acts single box that you install the modules you need using the configuration file and deploy to multiple nodes optionally for high-availability and distributing the load across the computing resources. You can plug your own databases, You can also easily extend Rakam by developing modules, all you need to do is to extend the interfaces in Java depending on what you need to do, add your module class to ServiceLoader in Java, package your module and add jar file into the classpath.
We use an in-house RESTFul server based on Netty called netty-rest. It allows us to deploy API services easily and automatically generates the API documentation. Netty-rest also supports various protocols like Websockets and Server-Sent events and they're also used by Rakam heavily. For example, when we need to deploy an RESTFul service, we implement HttpService interface and add the class to Guice MultiBinder for HttpService interface. Rakam automatically registers the service and generate API documentation for the service that we developed. Developing Modules
We use Apache Avro in order to serialize events in a compact format, which is a portable serialization library available in many programming languages. The serialized data doesn't include schema definition and the schema must be stored explicitly. Therefore we developed a schema registry service embedded in Rakam. Currently, the supported types are:
STRING, ARRAY, LONG, DOUBLE, BOOLEAN, DATE, TIME, TIMESTAMP
Rakam generates Avro schemas and backend database schemas using these intermediate data types.
In order to be able to control the queries that are performed via Analysis API, we needed a query parser. Unfortunately, almost all databases don't use the exact same SQL syntax so all deployment types have different implementation of query parsing. Fortunately, PrestoDB exposes the query parser module, since we already use PrestoDB for query processing and it has quite similar syntax to Postgresql we use it for now.
Rakam parses the query and re-writes it for backend storage. For example, Rakam has materialized, continuous and collection schemas and the default schema is collection but the backend databases usually uses schemas for project abstraction: each project has its own schema and both materialized and continuous query tables and event collection tables is stored in same schema of specific project. Therefore, the table references will be re-written and the output query will be sent to the backend database.
Event Mapping a feature that modifies event fields before storing them in databases. The cleaning and enrichment is done in this step. Rakam provides a set of enrichment mappers such us ip-to-geolocation and referrer extraction that attaches extra values to events based on previous values. For example, ip-to-geolocation event mapper looks up ip field and if it exists, it search the value of ip field in a ip-to-geolocation database called MaxMind GeoIP2 and attaches geo-location data to the event. Event Mappers must declare the fields they can attach as module specification, Rakam automatically register those fields at runtime and prevent any data type conflicts among the event mappers and collection fields. A tutorial can be found in Developing module for Rakam section.
Rakam uses Airlift configuration library in order to parse the configurations using the config.properties file and VM options. It allows us to load configurations that are required by modules in a nice way.
We developed a web application that uses Rakam Analysis APIs in order to create custom analytics services. Rakam does not shipped with the web application but it can download the static files of the web application and turn the API server into a web application. The web application includes the web interfaces of the Rakam modules and a report analyzer that basically allows you to execute SQL queries and draw charts from the query result and save reports interactively. The report analyzer module has similar features with Chart.io, Periscope.io and ModeAnalytics but hosted version of them. You can customize the web application, enable the modules you need, create interactive reports and finally you will end up with an analytics service that is customized just for you.
The materialized views allow you to create summary tables from your raw data so that you can efficiently pre-compute your data and power your dashboards.
The feature is also based on SQL and, you can basically process your raw data with SQL and materialize the result as a table.
Basically, we have three different strategy materialized views:
When you set
cache strategy along with table update interval, your table will be updated whenever the cache is expired.
Let's say that you used the following query and saved a materialized view with name
weekly_pageviews and 24 hours table update interval:
SELECT date_trunc('week', _time), count(*) from pageview
Then you can run the query
SELECT * from materialized.weekly_pageviews the first time, we will create a physical table and insert to query result to it with
CREATE TABLE materialized_table_weekly_pageviews AS SELECT SELECT date_trunc('week', _time), count(*) from pageview and re-write your query as
SELECT * from materialized_table_weekly_pageviews.
When you run the same query again, we will directly execute
SELECT * from materialized_table_weekly_pageviews so you won't process the raw data again for the next 24 hours. When you run the same query the next day,
we will truncate the table and INSERT the refreshed data again.
Incremental materialized views are one of our core technology. We incrementally process your raw-data and materialize the result so that you can create OLAP cubes, pre-process your data and power your dashboards.
Let's say that you saved a materialized view with name
weekly_pageviews and 24 hours table update interval:
SELECT date_trunc('week', _time), count(*) from pageview
When you reference the
weekly_pageviews materialized view for the first time, similarly the cache strategy we process the historical data and materialize it with the following query:
CREATE TABLE materialized_table_weekly_pageviews AS SELECT SELECT date_trunc('week', _time), count(*) from pageview. The main difference between
incremental strategy is that
instead of truncating the data when the cache is expired, we insert the recent data to the same table using the following query:
INSERT INTO weekly_pageviews SELECT * from materialized_table_weekly_pageviews SELECT date_trunc('week', _time), count(*) from pageview WHERE shard_time between *materialized_view_last_updated_time* AND *current_timestamp*`
materialized_view_last_updated_time column represents the time when the data is processed by our servers so it's always increasing. We don't use
_time column since the clients can send their
_time along with the event and they may re-send the events which are not sent when they're occurred because of network issues or batching.
Real-time incremental strategy is quite similar to incremental strategy but it's real-time. Under to hood, it uses to method that is used for incremental strategy but also UNION ALLs the recent data in order to make your reports real-time. For the same query that we used before,
when you reference a materialized view it runs the following query:
SELECT * from materialized_table_weekly_pageviews UNION ALL SELECT date_trunc('week', _time), count(*) from pageview WHERE shard_time > *materialized_view_last_updated_time*