ksqlDB is a purpose-built database for stream processing applications, ingesting data from Apache Kafka.Documentation Index
Fetch the complete documentation index at: https://cubed3-docs-cub-2416-update-semantic-snowflake-semantic-vie.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Available on the Enterprise plan.
Contact us for details.
Prerequisites
- Hostname for the ksqlDB server
- Username and password (or an API key) to connect to ksqlDB server
Confluent Cloud
If you are using Confluent Cloud, you need to generate an API key and use the API key name as your username and the API key secret as your password. You can generate an API key by installingconfluent-cli and running the
following commands in the command line:
Setup
Manual
Add the following to a.env file in your Cube project:
Environment Variables
| Environment Variable | Description | Possible Values | Required |
|---|---|---|---|
CUBEJS_DB_URL | The host URL for ksqlDB with port | A valid database host URL | ✅ |
CUBEJS_DB_USER | The username used to connect to the ksqlDB. API key for Confluent Cloud. | A valid database username | ✅ |
CUBEJS_DB_PASS | The password used to connect to the ksqlDB. API secret for Confluent Cloud. | A valid database password | ✅ |
CUBEJS_DB_KAFKA_HOST | Kafka broker host(s) for Kafka streams mode. Multiple brokers can be comma-separated. | A valid Kafka broker URL | ❌ |
CUBEJS_DB_KAFKA_USER | Username for Kafka broker authentication (SASL PLAIN) | A valid Kafka username | ❌ |
CUBEJS_DB_KAFKA_PASS | Password for Kafka broker authentication (SASL PLAIN) | A valid Kafka password | ❌ |
CUBEJS_DB_KAFKA_USE_SSL | If true, enables SASL_SSL for the Kafka connection | true, false | ❌ |
CUBEJS_CONCURRENCY | The number of concurrent queries to the data source | A valid number | ❌ |
Pre-Aggregations Support
ksqlDB supports only streaming pre-aggregations.Kafka streams mode
By default, Cube connects to ksqlDB via its REST API. ksqlDB uses its REST API both for metadata (discovering tables and streams) and for streaming data into Cube Store during pre-aggregation builds. In this default mode, Cube may create tables and streams in ksqlDB as part of the pre-aggregation build process (e.g.,CREATE TABLE ... AS SELECT
statements for non-read-only pre-aggregations).
When Kafka streams mode is enabled, Cube reads data directly from the
underlying Kafka topics instead of going through the ksqlDB REST API for
data streaming. ksqlDB is still used for metadata operations such as
discovering tables, streams, and their schemas, but Cube Store subscribes
to the backing Kafka topic directly.
In this mode, Cube does not create any tables or streams in ksqlDB. All
pre-aggregations use the read-only refresh path: Cube discovers the
existing ksqlDB objects and their backing Kafka topics, then streams data
directly from Kafka into Cube Store.
When to use Kafka streams mode
Kafka streams mode is useful when:- You want to prevent Cube from creating any objects in ksqlDB
- You need higher throughput for data ingestion by reading Kafka directly
- Your ksqlDB environment has restricted permissions that don’t allow creating tables or streams
- You prefer Cube Store to consume from Kafka topics without an intermediary
Enabling Kafka streams mode
Set theCUBEJS_DB_KAFKA_HOST environment variable to the address of your
Kafka broker(s). This activates Kafka streams mode automatically:
When using Confluent Cloud,
the Kafka credentials are separate from the ksqlDB credentials. Generate
an API key for the Kafka cluster (not the ksqlDB cluster) and use it as
CUBEJS_DB_KAFKA_USER and CUBEJS_DB_KAFKA_PASS.How it works
With Kafka streams mode enabled:- Cube uses the ksqlDB REST API to discover available tables and streams
and to retrieve their schemas via
DESCRIBE. - For each table or stream, Cube resolves the backing Kafka topic name from the ksqlDB metadata.
- Instead of streaming data through ksqlDB, Cube Store connects directly to the Kafka broker(s) and consumes from the resolved topic.
- Pre-aggregation builds use the read-only refresh strategy. Cube does
not issue any
CREATE TABLEorCREATE STREAMstatements to ksqlDB.
Data modeling
ksqlDB is typically used as an additional data source alongside a primary data warehouse. To use Kafka streams mode, configure ksqlDB as a named data source using decorated environment variables and point your cubes to it with thedata_source property.
First, declare the data sources and configure the ksqlDB connection with
Kafka credentials:
data_source: ksql and uses a read-only streaming pre-aggregation
that consumes from the backing Kafka topic directly. The lambda
pre-aggregation in the batch cube merges both, serving historical data
from the warehouse rollup and real-time data from the streaming rollup:
read_only: true— Cube will not create any objects in ksqlDB. The data is consumed directly from the backing Kafka topic.stream_offset— controls where Cube Store starts consuming from in the Kafka topic. Set to"latest"to only consume new messages arriving after the pre-aggregation is created. Set to"earliest"to replay the topic from the beginning. Defaults to"latest"if not specified. On subsequent refreshes, Cube Store automatically resumes from the last processed offset regardless of this setting.unique_key_columns— columns that uniquely identify a record, used for deduplication (see below).
Primary key and ungrouped queries
For the streaming pre-aggregation to work in read-only mode, the generated SQL must not contain aGROUP BY clause — Cube Store’s stream
post-processing engine does not support aggregation.
Cube automatically omits the GROUP BY clause when the dimensions
included in the pre-aggregation contain a primary key. In that case, the
generated query becomes a simple SELECT ... FROM ... without grouping,
and measures are passed through as raw expressions rather than
aggregated. This is what makes the pre-aggregation eligible for the
read-only streaming path.
You must include all primary key columns of the cube in the
streaming pre-aggregation’s dimensions list. If any primary key
dimension is missing, the query may not be recognized as ungrouped
and will fail to use the streaming path.
The sql_table or sql value should reference an existing ksqlDB stream
or table. Cube discovers its schema automatically. With Kafka streams
mode enabled, the streaming pre-aggregation reads the backing Kafka topic
directly — no objects are created in ksqlDB.
Unique key columns and deduplication
Whenunique_key_columns is set, Cube Store appends an internal
sequence column (__seq) to the table, populated from the Kafka
partition offset. The unique key columns together with __seq form the
sort key for all indexes on this table.
Deduplication is not applied at ingestion time — all incoming records are
appended as they arrive. Instead, Cube Store deduplicates during
reads and compaction: rows are sorted by the unique key columns
and then by __seq, and only the last row per unique key (the one
with the highest sequence number) is kept. This means that if the same
key appears multiple times in the stream, the most recent version is
always the one returned by queries.
For Kafka messages, unique key column values can come from either the
message payload (the JSON value) or the message key. If a column
listed in unique_key_columns is missing from the payload, Cube Store
falls back to the Kafka message key: for a single unique key column, the
raw key value is used; for composite keys, the key is expected to be a
JSON object with matching field names.
Stream format
Cube Store expects Kafka messages to have a JSON object as their value payload, with field names matching the column names defined in the cube. For example, given the streaming cube above, each Kafka message value should look like:sql property of each dimension and measure definition. Missing
fields default to null.
The message key is optional. When present and the value starts with {,
it is parsed as a JSON object and used as a fallback source for unique
key column values (see above).
Timestamp handling
For dimensions withtype: time, Cube Store accepts timestamp values in
two formats:
- String — parsed using ISO 8601 / RFC 3339 formats. Supported
patterns include:
2025-01-15T10:30:00.000Z2025-01-15T10:30:00Z2025-01-15 10:30:00.000 UTC2025-01-15T10:30:002025-01-15 10:30:002025-01-15
- Number — interpreted as epoch milliseconds (not seconds, not
microseconds). For example,
1736939400000represents2025-01-15T10:30:00.000Z.
PARSE_TIMESTAMP in the cube’s sql property to
convert them. In that case, define the source column as type: string
in a source_table and use the select_statement to transform it:
granularity property of
the pre-aggregation) is handled automatically. Cube generates the
appropriate PARSE_TIMESTAMP(FORMAT_TIMESTAMP(CONVERT_TZ(...)))
expression chain to truncate timestamps to the configured granularity
(e.g., day, hour, minute). Cube Store evaluates these expressions
natively on each micro-batch during ingestion. Standard SQL functions
like date_trunc are also available in the select_statement.
Filtering on the stream
When the streaming cube defines asql property with a SELECT
statement (rather than sql_table), Cube Store applies the projection
and any WHERE filters from that statement directly on each micro-batch
of incoming Kafka messages. This filtering happens inside Cube Store
using its query engine — it does not require ksqlDB to process the
filter. Only rows that pass the filter are ingested into the
pre-aggregation table.
This allows you to define a streaming cube that only ingests a subset of
the data from the underlying Kafka topic without creating any
server-side filter objects in ksqlDB.
Supported SQL syntax
TheSELECT statement must follow a strict shape. Cube Store only
accepts plans that resolve to Projection > Filter > TableScan (where
the filter is optional). Any other query plan shape is rejected.
Supported:
SELECTwith column references (e.g.,SELECT col1, col2 FROM topic)SELECT *wildcard- Column aliases (
SELECT col1 AS my_alias) WHEREclause with comparison operators (=,!=,<,>,<=,>=)- Boolean logic in
WHERE(AND,OR,NOT) IS NULLandIS NOT NULLINlists (col IN (1, 2, 3))BETWEENexpressionsCASE ... WHEN ... THEN ... ELSE ... ENDexpressionsCAST(expr AS type)type conversionsEXTRACT(field FROM expr)for date/time partsSUBSTRING(expr FROM start FOR length)- Scalar functions (e.g.,
COALESCE,CONCAT, arithmetic) CONVERT_TZfor timezone conversion (internally rewritten for compatibility)PARSE_TIMESTAMPandFORMAT_TIMESTAMPfor timestamp parsing and formatting using ksql-style format strings (e.g.,yyyy-MM-dd'T'HH:mm:ss.SSS)- Nested expressions with parentheses
date_truncfor timestamp truncation
JOINclauses — only a singleFROMtable is allowed- Subqueries in
SELECTorWHERE GROUP BY,HAVING, or aggregate functions (SUM,COUNT,AVG, etc.)ORDER BY(rows are consumed in stream order)LIMITandOFFSETUNION,INTERSECT,EXCEPT- Window functions (
OVER,PARTITION BY) - Multiple
FROMor multipleWHEREclauses - Common Table Expressions (
WITH ... AS)
SELECT list that are not simple column
references must have explicit aliases. Unique key columns may reference
the source column through a scalar function (e.g.,
CAST(id AS VARCHAR) AS id), but not through arbitrary expressions.