Skip to main content

Documentation Index

Fetch the complete documentation index at: https://cubed3-docs-cub-2416-update-semantic-snowflake-semantic-vie.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

ksqlDB is a purpose-built database for stream processing applications, ingesting data from Apache Kafka.
Available on the Enterprise plan. Contact us for details.
See how you can use ksqlDB and Cube Cloud to power real-time analytics in Power BI:
In this video, the SQL API is used to connect to Power BI. Currently, it’s recommended to use the DAX API.

Prerequisites

  • Hostname for the ksqlDB server
  • Username and password (or an API key) to connect to ksqlDB server

Confluent Cloud

If you are using Confluent Cloud, you need to generate an API key and use the API key name as your username and the API key secret as your password. You can generate an API key by installing confluent-cli and running the following commands in the command line:
brew install --cask confluent-cli
confluent login
confluent environment use <YOUR-ENVIRONMENT-ID>
confluent ksql cluster list
confluent api-key create --resource <YOUR-KSQL-CLUSTER-ID>

Setup

Manual

Add the following to a .env file in your Cube project:
CUBEJS_DB_TYPE=ksql
CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DB_USER=username
CUBEJS_DB_PASS=password

Environment Variables

Environment VariableDescriptionPossible ValuesRequired
CUBEJS_DB_URLThe host URL for ksqlDB with portA valid database host URL
CUBEJS_DB_USERThe username used to connect to the ksqlDB. API key for Confluent Cloud.A valid database username
CUBEJS_DB_PASSThe password used to connect to the ksqlDB. API secret for Confluent Cloud.A valid database password
CUBEJS_DB_KAFKA_HOSTKafka broker host(s) for Kafka streams mode. Multiple brokers can be comma-separated.A valid Kafka broker URL
CUBEJS_DB_KAFKA_USERUsername for Kafka broker authentication (SASL PLAIN)A valid Kafka username
CUBEJS_DB_KAFKA_PASSPassword for Kafka broker authentication (SASL PLAIN)A valid Kafka password
CUBEJS_DB_KAFKA_USE_SSLIf true, enables SASL_SSL for the Kafka connectiontrue, false
CUBEJS_CONCURRENCYThe number of concurrent queries to the data sourceA valid number

Pre-Aggregations Support

ksqlDB supports only streaming pre-aggregations.

Kafka streams mode

By default, Cube connects to ksqlDB via its REST API. ksqlDB uses its REST API both for metadata (discovering tables and streams) and for streaming data into Cube Store during pre-aggregation builds. In this default mode, Cube may create tables and streams in ksqlDB as part of the pre-aggregation build process (e.g., CREATE TABLE ... AS SELECT statements for non-read-only pre-aggregations). When Kafka streams mode is enabled, Cube reads data directly from the underlying Kafka topics instead of going through the ksqlDB REST API for data streaming. ksqlDB is still used for metadata operations such as discovering tables, streams, and their schemas, but Cube Store subscribes to the backing Kafka topic directly. In this mode, Cube does not create any tables or streams in ksqlDB. All pre-aggregations use the read-only refresh path: Cube discovers the existing ksqlDB objects and their backing Kafka topics, then streams data directly from Kafka into Cube Store.

When to use Kafka streams mode

Kafka streams mode is useful when:
  • You want to prevent Cube from creating any objects in ksqlDB
  • You need higher throughput for data ingestion by reading Kafka directly
  • Your ksqlDB environment has restricted permissions that don’t allow creating tables or streams
  • You prefer Cube Store to consume from Kafka topics without an intermediary

Enabling Kafka streams mode

Set the CUBEJS_DB_KAFKA_HOST environment variable to the address of your Kafka broker(s). This activates Kafka streams mode automatically:
CUBEJS_DB_TYPE=ksql
CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DB_USER=ksql_username
CUBEJS_DB_PASS=ksql_password
CUBEJS_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092
CUBEJS_DB_KAFKA_USER=kafka_api_key
CUBEJS_DB_KAFKA_PASS=kafka_api_secret
CUBEJS_DB_KAFKA_USE_SSL=true
Multiple Kafka brokers can be specified as a comma-separated list:
CUBEJS_DB_KAFKA_HOST=broker1:9092,broker2:9092,broker3:9092
When using Confluent Cloud, the Kafka credentials are separate from the ksqlDB credentials. Generate an API key for the Kafka cluster (not the ksqlDB cluster) and use it as CUBEJS_DB_KAFKA_USER and CUBEJS_DB_KAFKA_PASS.

How it works

With Kafka streams mode enabled:
  1. Cube uses the ksqlDB REST API to discover available tables and streams and to retrieve their schemas via DESCRIBE.
  2. For each table or stream, Cube resolves the backing Kafka topic name from the ksqlDB metadata.
  3. Instead of streaming data through ksqlDB, Cube Store connects directly to the Kafka broker(s) and consumes from the resolved topic.
  4. Pre-aggregation builds use the read-only refresh strategy. Cube does not issue any CREATE TABLE or CREATE STREAM statements to ksqlDB.

Data modeling

ksqlDB is typically used as an additional data source alongside a primary data warehouse. To use Kafka streams mode, configure ksqlDB as a named data source using decorated environment variables and point your cubes to it with the data_source property. First, declare the data sources and configure the ksqlDB connection with Kafka credentials:
CUBEJS_DATASOURCES=default,ksql
CUBEJS_DB_TYPE=postgres
CUBEJS_DB_HOST=my.postgres.host
CUBEJS_DB_NAME=my_database
CUBEJS_DB_USER=postgres_user
CUBEJS_DB_PASS=postgres_password
CUBEJS_DS_KSQL_DB_TYPE=ksql
CUBEJS_DS_KSQL_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DS_KSQL_DB_USER=ksql_api_key
CUBEJS_DS_KSQL_DB_PASS=ksql_api_secret
CUBEJS_DS_KSQL_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092
CUBEJS_DS_KSQL_DB_KAFKA_USER=kafka_api_key
CUBEJS_DS_KSQL_DB_KAFKA_PASS=kafka_api_secret
CUBEJS_DS_KSQL_DB_KAFKA_USE_SSL=true
Then, create cubes that reference your data. A common pattern is to combine a batch cube (reading historical data from your warehouse) with a streaming cube (reading real-time data from ksqlDB via Kafka) using a lambda pre-aggregation. The batch cube queries the warehouse and builds daily partitions incrementally. The streaming cube points at an existing ksqlDB stream with data_source: ksql and uses a read-only streaming pre-aggregation that consumes from the backing Kafka topic directly. The lambda pre-aggregation in the batch cube merges both, serving historical data from the warehouse rollup and real-time data from the streaming rollup:
cubes:
  - name: order_events
    data_source: default
    sql: >
      SELECT
        order_id,
        user_id,
        status,
        amount,
        created_at
      FROM ecommerce.order_events
      WHERE {FILTER_PARAMS.order_events.created_at.filter(
        (from, to) =>
        `created_at >= ${from} AND created_at < ${to}`
      )}

    measures:
      - name: count
        type: count

      - name: total_amount
        sql: amount
        type: sum

      - name: failed_count
        sql: "CASE WHEN status = 'failed' THEN 1 ELSE 0 END"
        type: sum

    dimensions:
      - name: order_id
        sql: order_id
        type: string
        primary_key: true

      - name: user_id
        sql: user_id
        type: string

      - name: status
        sql: status
        type: string

      - name: created_at
        sql: created_at
        type: time

    pre_aggregations:
      - name: lambda
        type: rollup_lambda
        rollups:
          - order_events.batch
          - order_events_stream.stream

      - name: batch
        type: rollup
        measures:
          - CUBE.count
          - CUBE.total_amount
          - CUBE.failed_count
        dimensions:
          - CUBE.order_id
          - CUBE.user_id
          - CUBE.status
        time_dimension: CUBE.created_at
        granularity: second
        partition_granularity: day
        build_range_start:
          sql: SELECT NOW() - INTERVAL '90 days'
        build_range_end:
          sql: SELECT NOW()
        refresh_key:
          every: 8 hour
          update_window: 1 day
          incremental: true
        indexes:
          - name: user_status
            columns:
              - CUBE.user_id
              - CUBE.status

  - name: order_events_stream
    data_source: ksql
    sql: "SELECT * FROM ORDER_EVENTS_STREAM"

    measures:
      - name: count
        type: count

      - name: total_amount
        sql: AMOUNT
        type: sum

      - name: failed_count
        sql: "CASE WHEN STATUS = 'failed' THEN 1 ELSE 0 END"
        type: sum

    dimensions:
      - name: order_id
        sql: ORDER_ID
        type: string
        primary_key: true

      - name: user_id
        sql: USER_ID
        type: string

      - name: status
        sql: STATUS
        type: string

      - name: created_at
        sql: CREATED_AT
        type: time

    pre_aggregations:
      - name: stream
        type: rollup
        read_only: true
        measures:
          - CUBE.count
          - CUBE.total_amount
          - CUBE.failed_count
        dimensions:
          - CUBE.order_id
          - CUBE.user_id
          - CUBE.status
        unique_key_columns:
          - order_id
        time_dimension: CUBE.created_at
        granularity: second
        partition_granularity: day
        build_range_start:
          sql: "SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))"
        build_range_end:
          sql: "SELECT DATE_ADD(NOW(), INTERVAL '15 minute')"
        refresh_key:
          every: 1 minute
          update_window: 1 hour
          incremental: true
        indexes:
          - name: user_status
            columns:
              - CUBE.user_id
              - CUBE.status
        stream_offset: latest
Key properties for the streaming pre-aggregation:
  • read_only: true — Cube will not create any objects in ksqlDB. The data is consumed directly from the backing Kafka topic.
  • stream_offset — controls where Cube Store starts consuming from in the Kafka topic. Set to "latest" to only consume new messages arriving after the pre-aggregation is created. Set to "earliest" to replay the topic from the beginning. Defaults to "latest" if not specified. On subsequent refreshes, Cube Store automatically resumes from the last processed offset regardless of this setting.
  • unique_key_columns — columns that uniquely identify a record, used for deduplication (see below).

Primary key and ungrouped queries

For the streaming pre-aggregation to work in read-only mode, the generated SQL must not contain a GROUP BY clause — Cube Store’s stream post-processing engine does not support aggregation. Cube automatically omits the GROUP BY clause when the dimensions included in the pre-aggregation contain a primary key. In that case, the generated query becomes a simple SELECT ... FROM ... without grouping, and measures are passed through as raw expressions rather than aggregated. This is what makes the pre-aggregation eligible for the read-only streaming path. You must include all primary key columns of the cube in the streaming pre-aggregation’s dimensions list. If any primary key dimension is missing, the query may not be recognized as ungrouped and will fail to use the streaming path. The sql_table or sql value should reference an existing ksqlDB stream or table. Cube discovers its schema automatically. With Kafka streams mode enabled, the streaming pre-aggregation reads the backing Kafka topic directly — no objects are created in ksqlDB.

Unique key columns and deduplication

When unique_key_columns is set, Cube Store appends an internal sequence column (__seq) to the table, populated from the Kafka partition offset. The unique key columns together with __seq form the sort key for all indexes on this table. Deduplication is not applied at ingestion time — all incoming records are appended as they arrive. Instead, Cube Store deduplicates during reads and compaction: rows are sorted by the unique key columns and then by __seq, and only the last row per unique key (the one with the highest sequence number) is kept. This means that if the same key appears multiple times in the stream, the most recent version is always the one returned by queries. For Kafka messages, unique key column values can come from either the message payload (the JSON value) or the message key. If a column listed in unique_key_columns is missing from the payload, Cube Store falls back to the Kafka message key: for a single unique key column, the raw key value is used; for composite keys, the key is expected to be a JSON object with matching field names.

Stream format

Cube Store expects Kafka messages to have a JSON object as their value payload, with field names matching the column names defined in the cube. For example, given the streaming cube above, each Kafka message value should look like:
{
  "ORDER_ID": "ord_12345",
  "USER_ID": "usr_789",
  "STATUS": "completed",
  "AMOUNT": 49.99,
  "CREATED_AT": "2025-01-15T10:30:00.000"
}
Field names are case-sensitive and must match the column names used in the sql property of each dimension and measure definition. Missing fields default to null. The message key is optional. When present and the value starts with {, it is parsed as a JSON object and used as a fallback source for unique key column values (see above).

Timestamp handling

For dimensions with type: time, Cube Store accepts timestamp values in two formats:
  • String — parsed using ISO 8601 / RFC 3339 formats. Supported patterns include:
    • 2025-01-15T10:30:00.000Z
    • 2025-01-15T10:30:00Z
    • 2025-01-15 10:30:00.000 UTC
    • 2025-01-15T10:30:00
    • 2025-01-15 10:30:00
    • 2025-01-15
  • Number — interpreted as epoch milliseconds (not seconds, not microseconds). For example, 1736939400000 represents 2025-01-15T10:30:00.000Z.
If your Kafka topic produces timestamps as strings in a non-standard format, you can use PARSE_TIMESTAMP in the cube’s sql property to convert them. In that case, define the source column as type: string in a source_table and use the select_statement to transform it:
sql: `SELECT PARSE_TIMESTAMP(TIMESTAMP_STR,
  'yyyy-MM-dd''T''HH:mm:ss.SSSX', 'UTC') AS created_at,
  ORDER_ID, USER_ID, STATUS, AMOUNT
  FROM ORDER_EVENTS_STREAM`,
Time dimension truncation (controlled by the granularity property of the pre-aggregation) is handled automatically. Cube generates the appropriate PARSE_TIMESTAMP(FORMAT_TIMESTAMP(CONVERT_TZ(...))) expression chain to truncate timestamps to the configured granularity (e.g., day, hour, minute). Cube Store evaluates these expressions natively on each micro-batch during ingestion. Standard SQL functions like date_trunc are also available in the select_statement.

Filtering on the stream

When the streaming cube defines a sql property with a SELECT statement (rather than sql_table), Cube Store applies the projection and any WHERE filters from that statement directly on each micro-batch of incoming Kafka messages. This filtering happens inside Cube Store using its query engine — it does not require ksqlDB to process the filter. Only rows that pass the filter are ingested into the pre-aggregation table. This allows you to define a streaming cube that only ingests a subset of the data from the underlying Kafka topic without creating any server-side filter objects in ksqlDB.

Supported SQL syntax

The SELECT statement must follow a strict shape. Cube Store only accepts plans that resolve to Projection > Filter > TableScan (where the filter is optional). Any other query plan shape is rejected. Supported:
  • SELECT with column references (e.g., SELECT col1, col2 FROM topic)
  • SELECT * wildcard
  • Column aliases (SELECT col1 AS my_alias)
  • WHERE clause with comparison operators (=, !=, <, >, <=, >=)
  • Boolean logic in WHERE (AND, OR, NOT)
  • IS NULL and IS NOT NULL
  • IN lists (col IN (1, 2, 3))
  • BETWEEN expressions
  • CASE ... WHEN ... THEN ... ELSE ... END expressions
  • CAST(expr AS type) type conversions
  • EXTRACT(field FROM expr) for date/time parts
  • SUBSTRING(expr FROM start FOR length)
  • Scalar functions (e.g., COALESCE, CONCAT, arithmetic)
  • CONVERT_TZ for timezone conversion (internally rewritten for compatibility)
  • PARSE_TIMESTAMP and FORMAT_TIMESTAMP for timestamp parsing and formatting using ksql-style format strings (e.g., yyyy-MM-dd'T'HH:mm:ss.SSS)
  • Nested expressions with parentheses
  • date_trunc for timestamp truncation
Not supported:
  • JOIN clauses — only a single FROM table is allowed
  • Subqueries in SELECT or WHERE
  • GROUP BY, HAVING, or aggregate functions (SUM, COUNT, AVG, etc.)
  • ORDER BY (rows are consumed in stream order)
  • LIMIT and OFFSET
  • UNION, INTERSECT, EXCEPT
  • Window functions (OVER, PARTITION BY)
  • Multiple FROM or multiple WHERE clauses
  • Common Table Expressions (WITH ... AS)
All column expressions in the SELECT list that are not simple column references must have explicit aliases. Unique key columns may reference the source column through a scalar function (e.g., CAST(id AS VARCHAR) AS id), but not through arbitrary expressions.