Streamling
███████╗████████╗██████╗ ███████╗ █████╗ ███╗ ███╗██╗ ██╗███╗ ██╗ ██████╗
██╔════╝╚══██╔══╝██╔══██╗██╔════╝██╔══██╗████╗ ████║██║ ██║████╗ ██║██╔════╝
███████╗ ██║ ██████╔╝█████╗ ███████║██╔████╔██║██║ ██║██╔██╗ ██║██║ ███╗
╚════██║ ██║ ██╔══██╗██╔══╝ ██╔══██║██║╚██╔╝██║██║ ██║██║╚██╗██║██║ ██║
███████║ ██║ ██║ ██║███████╗██║ ██║██║ ╚═╝ ██║███████╗██║██║ ╚████║╚██████╔╝
╚══════╝ ╚═╝ ╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝╚═╝╚═╝ ╚═══╝ ╚═════╝
Streamling is a columnar streaming runtime easily extendable with your own operators. Everything in a pipeline is the same thing underneath: a DataFusion operator passing Apache Arrow RecordBatches. A Kafka source, a SQL filter, sandboxed TypeScript, an HTTP enrichment, and a plugin you wrote yourself all run on one data plane, so they compose freely and inherit the same guarantees.
You can write plugins for what's specific to your domain: polling a partner API, enriching from Postgres, pushing to your warehouse. Reuse them across pipelines by id. A plugin runs at native speed, and the runtime gives it the same guarantees the built-in connectors get: backpressure, checkpoint-coordinated at-least-once delivery, schema validation, and upsert (_gs_op) propagation.
Built-in connectors for Kafka, Postgres, ClickHouse, and webhooks cover the common movement patterns, so you only write code for the parts that are yours.
Built with Rust, Apache Arrow, and Apache DataFusion. Install with one command (see Quick start) or read more at streamling.dev.
Plugins vs. the runtime
The division of labor: you own the logic, the runtime owns correctness.
| You implement in plugins | The runtime enforces |
|---|---|
| Custom sources, transforms, sinks | Checkpoint protocol across the full topology |
| Arbitrary connection and processing logic | At-least-once delivery (sources commit only after sink flush) |
| Domain-specific schemas and options | Schema and primary_key validation before startup |
| External API calls, decoding, enrichment | Backpressure, graceful shutdown, retriable error classification |
| State via the plugin state backend | Upsert semantics (_gs_op) through to sinks |
When to use Streamling
Streamling is a good fit when you need to:
- Write your own operators and reuse them: implement a source, transform, or sink once in Rust (or a transform in WASM/TypeScript), then drop it into any pipeline by id. The runtime enforces checkpointing, schema validation, and delivery guarantees around your code
- Run ongoing data processes over continuous ordered inputs: event streams, database changelogs, polled APIs, or any plugin source that emits data over time
- Build multi-stage flows on one columnar data plane: plugins, SQL, WASM, HTTP enrichment, and dynamic tables chained in a single topology, all exchanging Arrow
RecordBatches - Get at-least-once delivery with checkpoint-coordinated commit ordering: sources don't advance until sinks have durably flushed
- Use built-in connectors as conveniences: Kafka, Postgres, ClickHouse, and webhooks for common data movement, no plugin required
- Run bounded batch jobs and handle upserts (INSERT/UPDATE/DELETE via
_gs_op) into Postgres or ClickHouse
Use Streamling when you need a streaming engine to process continuously arriving data in order through a defined pipeline, not a distributed shuffle or windowed aggregation engine.
Streamling is probably not the right fit when you need:
- Distributed stateful processing: cross-partition joins, windowed aggregations, and coordinated checkpointing across nodes aren't supported today
- A library to embed: it's a standalone runtime you deploy and configure, not a crate you wire into your codebase
Streamling runs as a single-node engine. It can scale horizontally via Kafka consumer groups and multiple independent instances; each instance checkpoints and progresses on its own.
How it works
You define a pipeline in YAML with three sections: sources (where data comes from), transforms (optional processing), and sinks (where results go). The runtime loads the pipeline, wires operators together, and runs until stopped, or until a bounded source finishes.
Data moves between operators as Arrow RecordBatches. Checkpoints coordinate flush and commit across the whole topology so sources only advance after sinks have durably written their data. See Checkpointing for the full protocol.
Streaming pipelines use unbounded sources like Kafka that run indefinitely. Batch pipelines use bounded sources that read a finite dataset and terminate, for example a ClickHouse source or a hybrid source with STREAMLING__JOB_MODE=true to stop after the bounded phase completes.
Building data flows
Streamling is more than point-to-point data movement. The runtime orchestrates complex, ongoing data processes:
- Continuous inputs: event streams, database changelogs, polled APIs, or any plugin source that emits ordered data over time
- Processing stages: decode, enrich, filter, and transform via plugins, SQL, WASM, and HTTP handlers chained in a single topology
- Live lookup state: dynamic tables back SQL transforms with externally updatable lookup data, without restarting the pipeline
- Runtime contract: checkpoint markers propagate through every operator (including plugins); sources don't commit until sinks ack; schemas and
primary_keyare validated at startup
Built-in Postgres is the convenience in the example below; the processing logic lives in the plugin source, SQL filter, HTTP handler, and WASM stages:
sources:
api_orders:
type: acme_api.orders_source # plugin: polls partner API
options:
poll_interval_ms: "5000"
transforms:
recent_orders:
type: sql
primary_key: id
sql: SELECT * FROM api_orders WHERE created_at > now() - interval '1 hour'
enriched:
type: handler
from: recent_orders
url: http://localhost:8087/enrich
primary_key: id
scored:
type: script
from: enriched
language: typescript
script: |
function process(input) {
input.risk_score = input.amount > 1000 ? 'high' : 'low';
return input;
}
primary_key: id
sinks:
pg_orders:
type: postgres
from: scored
schema: app
table: orders
primary_key: id
Your plugins own the I/O and business logic; the runtime owns execution, backpressure, checkpointing, and recovery.
Quick start
This is the simplest path to a running pipeline. Most production flows add plugin stages and multiple transforms. See Building data flows.
# 1. Install the runtime (macOS/Linux)
curl -fsSL https://www.streamling.dev/install.sh | bash
# 2. Write a pipeline
cat > pipeline.yaml <<'EOF'
sources:
raw_transactions:
type: kafka
topic: raw.event.transaction
primary_key: id
transforms:
large_transactions:
type: sql
primary_key: id
sql: |
SELECT *
FROM raw_transactions
WHERE amount > 1000
sinks:
print_large:
type: print
from: large_transactions
EOF
# 3. Run it
export STREAMLING__KAFKA_SOURCE__BROKERS=localhost:9092
export STREAMLING__KAFKA_SOURCE__SCHEMA_REGISTRY_URL=http://localhost:8081
export STREAMLING__PIPELINE_DEFINITION_LOCATION=pipeline.yaml
export RUST_LOG=info
streamling
Records flow from source through transforms to sinks. Checkpoints fire periodically; when a sink acks, the source commits its position. Swap the print sink for Postgres or webhook in production.
To build from source or run against local Kafka/Postgres/ClickHouse, see Development setup.
Common patterns
Built-in connectors are shortcuts for common data movement. Custom data flows mix plugins, SQL, WASM, and HTTP handlers; the runtime orchestrates them with the same delivery guarantees.
Data flows
| Flow | Stages | What the runtime provides |
|---|---|---|
| Plugin → SQL → handler → WASM → sink | plugin source + sql + HTTP handler + WASM + sink | Ordered processing and at-least-once delivery through all stages. See hero example. |
| Plugin → plugin → sink | custom source + custom transform + custom sink | Same guarantees on fully custom I/O (e.g. poll an API, apply domain logic, push to a partner system) |
| Kafka → plugin transform → sink | built-in source + plugin transform + built-in sink | Built-in source convenience + custom compute (decoding, external lookups, multi-record logic) |
| Multi-source via hybrid | bounded backfill + live stream | Phase-ordered processing with checkpoint continuity. See Hybrid Source. |
Connector shortcuts
| Shortcut | Shape | When to use |
|---|---|---|
| Kafka → SQL → Postgres | source + sql + postgres sink | Simple movement and filter with no custom logic. See Kafka Source, SQL Transform, Postgres Sink. |
| ClickHouse → Postgres (batch) | clickhouse source + postgres sink | One-time or job-mode backfill. See ClickHouse Source. |
| Kafka → WASM → webhook | source + script + webhook sink | Lightweight scripted transform, push to an API. See WebAssembly Script Transform, Webhook (HTTP) Sink. |
| Backfill then stream | hybrid source + transforms + sink | Historical backfill then live streaming. See Hybrid Source. |
Plugin examples
Plugin ids appear as the type in pipeline YAML (namespace.operator_name). The examples below are building blocks of custom data flows. Each plugin stage is where custom logic lives, while the runtime wires them into the topology and applies the same checkpointing and delivery rules as built-in operators. They use the bundled basic_plugin; swap in your own plugin ids for production pipelines.
Custom source: generate or poll data the runtime doesn't have a built-in connector for:
sources:
orders:
type: basic_plugin.random_source # e.g. acme_api.orders_source in production
options:
max_rows: "10000"
record_batch_size: "1000"
transforms:
filtered:
type: sql
primary_key: alphanumeric_field
sql: SELECT * FROM orders WHERE num_field > 100
sinks:
pg_orders:
type: postgres
from: filtered
schema: public
table: orders
primary_key: alphanumeric_field
Custom transform: enrich or reshape records between built-in operators:
sources:
raw_events:
type: kafka
topic: raw.events
primary_key: id
transforms:
enriched:
type: basic_plugin.filter_transform # e.g. enrichment.normalize_events in production
from: raw_events
options:
_gs_op: i # keep inserts only
sinks:
pg_events:
type: postgres
from: enriched
schema: public
table: events
primary_key: id
Custom sink: write to a destination without a built-in connector:
sources:
raw_events:
type: kafka
topic: app.events
primary_key: id
transforms: {}
sinks:
partner_out:
type: print_sink # e.g. partner_api.webhook_sink in production
from: raw_events
options:
mode: batch
A single pipeline can mix all three (plugin source, built-in SQL transform, plugin sink) as long as schemas and primary_key line up between stages. Write plugins for the parts that are unique to your domain; let the runtime enforce the parts that must be correct in production (checkpointing, offset commit ordering, upsert propagation, backpressure). See Plugin Pipeline Configuration for loading plugins and the full options reference.
Community plugins are available in the streamling-community-plugins repository.
Overview
A pipeline has three sections: sources, transforms (optional), and sinks. Every node exchanges Arrow RecordBatches. Built-in connectors cover Kafka, Postgres, ClickHouse, webhooks, and WASM scripts; anything else can be a plugin.
sources:
raw.transactions:
type: kafka
topic: raw.event.transaction
transforms:
large_transactions:
type: sql
primary_key: id
sql: |
SELECT *
FROM raw.transactions
WHERE amount > 1000
sinks:
pg.large_transactions:
from: large_transactions
type: postgres
schema: public
table: large_transactions
primary_key: id
Development setup
This section is for contributing to or building Streamling. To run a pipeline, see Quick start.
- Get Rust and Cargo
- Install Kubernetes and OpenSSL
- Recommended IDE: RustRover
This project uses just as a command runner. To see all available commands:
just
Common commands:
just build # Build the project
just test # Run unit tests
just lint # Format and lint the project
just fix # Auto-fix cargo issues
Using k3s for Local Development
For a production-like environment, you can use k3d (k3s in Docker):
-
Run the setup script:
./scripts/k3s-setup.shThis creates a k3d cluster with PostgreSQL, ClickHouse, Redpanda (Kafka), and Prometheus.
-
Source the environment variables:
source infra/local-k8s/env.sh -
Verify services are running:
kubectl get pods -n streamling-e2e
To tear down the cluster:
k3d cluster delete streamling-e2e
Default configuration is compiled into the binary (from
crates/streamling-config/default_config.yaml), so streamling runs as a standalone binary with no external files.
You can override the defaults by placing a config.yaml in the working directory, or using environment variables.
The repo root ships a config.yaml with local-development overrides (docker-compose hosts, throwaway credentials),
so running from the repo root behaves like local development always has.
The format of environment variables is the following: STREAMLING__<property_name>. If a property is nested,
use an additional double underscore (__) to separate the levels, for example
STREAMLING__EXTERNAL_HTTP_HANDLER__TRIGGER_MAX_COUNT. Precedence is: embedded defaults < config.yaml < environment
variables.
To run an e2e test (requires k3s cluster running):
cargo test -p streamling-e2e --test print_sink
Recommended environment variables when running locally:
RUST_LOG=debug. Seeing debug logs is essential.STREAMLING__PIPELINE_DEFINITION_LOCATION=pipeline.yaml. You can switch between existing sample pipelines (or create your own).
Reference
The sections below are the full connector and runtime reference. New here? Start with Quick start.
Topology
As you'll see below, all sources, transforms and sinks can be abstracted as nodes exchanging Arrow's RecordBatches
using SendableRecordBatchStream. Most sources and transforms have a concept of an internal buffer, which is used to
accumulate RecordBatches before passing them to the next operator. This is done to improve performance. The buffer
size is configurable via the internal_buffer_size parameter.
Sources
Sources return a SendableRecordBatchStream as a result.
Kafka Source
The Kafka source allows consuming data from Kafka topics using Avro serialization with Schema Registry integration. It's
implemented as a custom DataFusion Table Provider (TableProvider) with the following key features:
- Schema Management: Automatically fetches and converts Avro schemas from Schema Registry to Arrow schemas.
- Message Processing:
- Converts Avro-encoded messages to Arrow
RecordBatches.- The size of the batches is controlled by the
record_batch_sizeandrecord_batch_interval_msparameters.
- The size of the batches is controlled by the
- Adds operation type column (
_gs_op) to track INSERT/UPDATE/DELETE operations (seeUpsert Semanticssection below). The operation type is determined by thedbz.opheader value.
- Converts Avro-encoded messages to Arrow
Kafka Source uses high-level StreamConsumer which handles everything related to partition assignment, rebalancing, etc. As a result, it doesn't support any parallelism (meaning one partition from the DataFusion perspective). So, in order to scale, increase the number of pods in the Kubernetes deployment (which will coordinate using the same consumer group name).
Reliability:
- Integrates with Streamling's checkpointing system (see
Checkpointingsection below). - Maintains offset tracking per checkpoint epoch.
- Commits offsets after successful processing of all records in a checkpoint epoch.
- Skips committing offsets if the values haven't changed since the last epoch.
Sample configuration:
sources:
raw_events:
type: kafka
topic: app.events
Connection settings — override the embedded defaults (local Kafka and Schema Registry) with these environment variables:
| Environment variable | Default | Description |
|---|---|---|
STREAMLING__KAFKA_SOURCE__BROKERS |
localhost:9092 |
Comma-separated bootstrap broker list |
STREAMLING__KAFKA_SOURCE__SECURITY_PROTOCOL |
plaintext |
Kafka security protocol (e.g. plaintext, ssl, sasl_plaintext, sasl_ssl) |
STREAMLING__KAFKA_SOURCE__SASL_MECHANISM |
(unset) | SASL mechanism (applied when the protocol contains sasl) |
STREAMLING__KAFKA_SOURCE__SASL_USERNAME |
(unset) | SASL username |
STREAMLING__KAFKA_SOURCE__SASL_PASSWORD |
(unset) | SASL password |
STREAMLING__KAFKA_SOURCE__SCHEMA_REGISTRY_URL |
http://localhost:18081 |
Schema Registry URL (required for Avro) |
STREAMLING__KAFKA_SOURCE__SCHEMA_REGISTRY_USERNAME |
(unset) | Schema Registry basic-auth username |
STREAMLING__KAFKA_SOURCE__SCHEMA_REGISTRY_PASSWORD |
(unset) | Schema Registry basic-auth password |
STREAMLING__KAFKA_SOURCE__CONSUMER_GROUP_ID |
(unset) | Consumer group id (coordinates scaling across instances) |
STREAMLING__KAFKA_SOURCE__CLIENT_ID |
(unset) | Kafka client id |
Hybrid Source
A hybrid source runs one or more bounded phases followed by an unbounded phase. Bounded phases read a finite dataset (e.g. a ClickHouse table for backfill) and finish; the unbounded phase (e.g. Kafka) then takes over for live streaming.
With STREAMLING__JOB_MODE=true (or job_mode: true in config.yaml), the pipeline terminates after all bounded phases complete instead of transitioning to the unbounded phase. All sources in the pipeline must be hybrid sources when job mode is enabled.
Sample configuration:
sources:
orders:
type: hybrid
bounded_sources:
- source_type: clickhouse
table_name: orders_archive
unbounded_source:
source_type: kafka
topic: orders_live
start_at: earliest
primary_key: id
Connection settings for each phase come from the same environment variables as the standalone ClickHouse Source (bounded phases) and Kafka Source (unbounded phase).
ClickHouse Source
A standalone ClickHouse source reads a table in paginated chunks and terminates when the table is fully consumed — a bounded source suitable for batch pipelines.
sources:
ch_orders:
type: clickhouse
table_name: orders
primary_key: id
Connection settings — override the embedded defaults with these environment variables:
| Environment variable | Default | Description |
|---|---|---|
STREAMLING__CLICKHOUSE_SOURCE__URL |
http://localhost:8123 |
ClickHouse HTTP endpoint |
STREAMLING__CLICKHOUSE_SOURCE__USER |
default |
Username |
STREAMLING__CLICKHOUSE_SOURCE__PASSWORD |
(empty) | Password |
STREAMLING__CLICKHOUSE_SOURCE__DATABASE |
default |
Database name |
STREAMLING__CLICKHOUSE_SOURCE__PAGE_SIZE |
10000000 |
Rows fetched per pagination chunk |
Transforms
All transforms except the SQL one are implemented as custom DataFusion operators (UserDefinedLogicalNode +
ExtensionPlanner + ExecutionPlan). They accept ExecutionPlan (which is converted to a SendableRecordBatchStream)
as input and construct a new SendableRecordBatchStream as an output.
SQL Transform
The SQL transform allows executing SQL queries on input streams.
-
DataFusion Integration:
- Uses DataFusion's built-in SQL parser and analyzer.
- Leverages DataFusion's query optimization capabilities.
- SQL queries are converted to logical plans.
-
Query Support:
- Supports standard SQL SELECT statements.
- Input streams can be referenced by their reference names.
- Allows filtering, projections, and column transformations. NOTE: stateful operations (e.g. joins and aggregations) have been explicitly disabled. They can be enabled when a proper distributed shuffle is supported.
The transformation preserves the streaming nature of the data while allowing SQL-based modifications to the record contents. Each input batch is processed through the SQL query and produces a corresponding output batch.
Sample configuration:
transforms:
filtered_events:
type: sql
sql: SELECT id, event_type, amount, _gs_op FROM raw_events WHERE amount > 0
primary_key: id
HTTP Handler Transform
The HTTP Handler transform allows sending records to an external HTTP endpoint for processing. The transformed data is
then returned to the pipeline. It's implemented as a custom DataFusion operator (ExternalHandlerExec) with the
following key features:
-
Request Processing:
- Can send records individually or in batches.
- Supports configurable batching with
external_http_handler.trigger_max_countparameter. - Converts Arrow
RecordBatches to JSON before sending. - Handles retries for transient failures using exponential backoff.
- Automatic timeout handling for requests.
- Supports HTTP headers customization.
-
Schema Management:
- Allows dynamic schema modification through overrides.
- Can add new fields, modify existing field types, or remove fields.
- Preserves nullability constraints from the original schema.
-
Response Handling:
- Converts JSON responses back to Arrow
RecordBatches. - Supports two payload envelope versions (0 and 1) for different formatting needs.
- Can operate in output-optional mode where responses are ignored (it's used for the Webhook sink).
- Converts JSON responses back to Arrow
-
Batching and Performance:
- Uses non-blocking I/O.
- Supports concurrent request execution with configurable buffer size (via
external_http_handler.buffer_sizeparameter). - Integrates with Streamling's checkpointing system.
Sample configuration:
transforms:
enriched_accounts:
type: handler
from: raw_accounts
url: http://localhost:8087/enrich
one_row_per_request: false
primary_key: id
WebAssembly Script Transform
The WebAssembly script transformation allows executing custom JavaScript code using WebAssembly for each record in the
input stream. It's implemented as a custom DataFusion operator (WasmRunnerExec) with the following key features:
- Runtime Environment: Uses Extism's js-pdk to execute JavaScript code in a sandboxed WebAssembly environment.
- Record Processing:
- Converts Arrow
RecordBatches to JSON strings. - Executes the provided JavaScript code for each record.
- Uses JavaScript's
evalfunction within the WebAssembly sandbox. - Converts the results back to Arrow
RecordBatches.
- Converts Arrow
- Batching:
- Processes multiple records in a batch for better performance.
- Uses newline character as a separator between records.
- Maintains checkpointing for reliable processing.
- Language Support:
- Supports any valid browser JavaScript and TypeScript (no Node APIs).
- TypeScript is transpiled to JavaScript in runtime using swc.
The transformation preserves the schema of the input stream while allowing modifications to the record contents. Each record is passed to the JavaScript code as a parsed JSON object, and the code must return a new object that will be converted back to the Arrow format.
Sample configuration:
transforms:
updated_account:
type: script
from: raw_accounts
language: javascript
script: >
function process(input) {
input.tier = input.amount > 1000 ? 'premium' : 'standard';
input.status = input.status + '_processed';
return input;
}
primary_key: id
Note: script field accepts any valid browser JavaScript or TypeScript snippet inside a process function with a
single input argument. The input record is passed as input argument, and the transformed record must be returned
from the function.
Sinks
All sinks are implemented as custom DataFusion Table Providers (TableProvider) returning a DataSinkExec. Sinks
accept SendableRecordBatchStream as input. Streamling supports several sink types for outputting processed data.
All sinks support checkpointing to ensure reliable data processing.
Print Sink
Prints records to stdout in JSON format. Each record is prefixed with its row kind (INSERT/UPDATE/DELETE). Useful for debugging and development.
Sample configuration:
sinks:
sink:
type: print
from: filtered_events
The sampling rate is configurable via STREAMLING__PRINT_SINK__SAMPLE_EVERY (default: 1, prints every record; set to N to print only every Nth record).
Blackhole Sink
Discards all records without performing any operation. Useful for testing or when you want to measure throughput without actual output.
Sample configuration:
sinks:
blackhole_events:
type: blackhole
from: raw_events
Webhook (HTTP) Sink
Sends records to an HTTP endpoint as JSON payloads. Supports batching and customization options.
See the HTTP Handler Transform section for more info.
Sample configuration:
sinks:
webhook_accounts:
type: webhook
from: updated_account
url: http://localhost:8087/notify
Postgres Sink
The PostgreSQL sink allows writing data to PostgreSQL tables. It create sink table automically.
Sample configuration:
sinks:
postgres_blocks:
type: postgres
primary_key: id
schema: eth
table: blocks
Connection settings — override the embedded defaults with these environment variables:
| Environment variable | Default | Description |
|---|---|---|
STREAMLING__POSTGRES_SINK__HOST |
127.0.0.1 |
Database host |
STREAMLING__POSTGRES_SINK__PORT |
5432 |
Database port |
STREAMLING__POSTGRES_SINK__USER |
graph-node |
Username for authentication |
STREAMLING__POSTGRES_SINK__PASS |
(empty) | Password for authentication |
STREAMLING__POSTGRES_SINK__DB |
postgres |
Database name |
STREAMLING__POSTGRES_SINK__SSLMODE |
disable |
SSL mode (disable, allow, prefer, require, verify-ca, verify-full) |
Behavior for 256-bit integers (U256/I256):
- Columns annotated as U256/I256 in the Arrow schema (FixedSizeBinary(32) with Streamling metadata) are created in Postgres as
NUMERIC(78,0). - During writes, these columns are stringified in-flight (
u256_to_string/i256_to_string) so the sink sends textual decimal values that Postgres parses intoNUMERIC(78,0). - If an existing destination table has incompatible types (e.g., insufficient precision or non-numeric), the sink errors instead of coercing or dropping data.
Postgres Aggregation Sink
The PostgreSQL aggregation sink enables real-time aggregations directly in PostgreSQL using database triggers. Data flows into a landing table, and a trigger function automatically maintains aggregated values in a separate aggregation table.
- Landing Table: Stores raw records in append-only mode with a composite primary key (
primary_key+_gs_op). - Aggregation Table: Stores aggregated results, updated incrementally by the trigger.
- Operation Handling: For
sum,count, andavg, delete operations negate values.countalso correctly handles updates (contributes 0 to the count).minandmaxdo not support deletes or updates (use only for insert-only streams).
Sample configuration:
sinks:
balances:
type: postgres_aggregate
from: transfers
schema: finance
landing_table: transfer_log
agg_table: account_balances
primary_key: transfer_id
batch_flush_interval: 1s
batch_size: 100
group_by:
account:
type: text
aggregate:
balance:
from: amount
fn: sum
Column configuration:
group_by: Defines grouping columns (composite key for the aggregation table).aggregate: Defines aggregation columns. At least one is required. Supported functions:sum,count,avg,min,max.from: Source column name (defaults to the key name if omitted).type: Optional PostgreSQL type override (e.g.,numeric(30,5)).
When no grouping columns are defined (global aggregation), a sentinel key column is added automatically.
For avg, the sink stores two columns (<name>_sum and <name>_count) to support incremental updates.
The sink uses the same PostgreSQL connection parameters as the regular Postgres sink.
Deduplication and Data Management:
The landing table uses append-only mode with a composite primary key (primary_key + _gs_op). This enables:
- Deduplication within checkpoint window: Duplicate records (same primary key and operation type) arriving within the same checkpoint epoch are deduplicated via upsert (
ON CONFLICT DO UPDATE). Records with the same key but different operations (e.g., insert and delete) are stored separately. - Checkpoint-based truncation: The landing table includes a
_gs_checkpoint_epochcolumn. When checkpoint epoch N is finalized, all data from epoch N-1 is deleted, keeping the landing table small. Deduplication only works within this window.
Limitations:
- SUM/AVG do not support updates: These functions cannot correctly handle updates because they would need the old value to compute the delta. Updates are treated as new values, causing incorrect results. Use
sumandavgonly with insert/delete streams. - COUNT supports all operations: Inserts add +1, updates add 0 (no change), deletes add -1.
- MIN/MAX do not support deletes or updates: These functions cannot retract values without rescanning the entire landing table. Use only with insert-only streams.
Examples
Multiple aggregations with type override:
sinks:
market_stats:
type: postgres_aggregate
from: trades
schema: trading
landing_table: trade_log
agg_table: market_stats
primary_key: trade_id
group_by:
market:
type: varchar(100)
aggregate:
trade_count:
fn: count
total_volume:
from: volume
fn: sum
type: numeric(30,5)
avg_price:
from: price
fn: avg
Global aggregation (no GROUP BY):
sinks:
totals:
type: postgres_aggregate
from: events
schema: analytics
landing_table: event_log
agg_table: global_totals
primary_key: event_id
aggregate:
total_count:
fn: count
Generated Trigger Example:
The sink automatically creates the trigger function and trigger. Here's an example of the generated SQL:
CREATE OR REPLACE FUNCTION "finance"."transfer_log_to_account_balances_fn"()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO "finance"."account_balances" ("account", "balance")
SELECT new_table."account" AS "account",
SUM(CASE WHEN new_table."_gs_op" = 'd' THEN -new_table."amount"
ELSE new_table."amount" END) AS "balance"
FROM new_table
GROUP BY new_table."account"
ON CONFLICT ("account")
DO UPDATE SET "balance" = "finance"."account_balances"."balance" + EXCLUDED."balance";
RETURN NULL;
END;
$$;
DROP TRIGGER IF EXISTS "transfer_log_to_account_balances_trigger" ON "finance"."transfer_log";
CREATE TRIGGER "transfer_log_to_account_balances_trigger"
AFTER INSERT ON "finance"."transfer_log"
REFERENCING NEW TABLE AS new_table
FOR EACH STATEMENT
EXECUTE FUNCTION "finance"."transfer_log_to_account_balances_fn"();
Kafka Sink
The Kafka sink allows emitting data to Kafka topics using Avro serialization with Schema Registry integration. It's
implemented as a custom DataFusion Table Provider (TableProvider) with the following key features:
- Schema Management: Automatically converts Arrow schemas into Avro schemas, registers them with the Schema Registry.
- Message Processing:
- Converts Arrow
RecordBatches to Avro-encoded messages. - Adds operation type column (
_gs_op) to track INSERT/UPDATE/DELETE operations (seeUpsert Semanticssection below) as a message header with thedbz.opkey.
- Converts Arrow
Kafka Sink uses high-level FutureProducer to create and write records. The producer is flushed at the end of every batch.
Sample configuration:
sinks:
custom_kafka_sink:
type: kafka
topic: custom_topic
topic_partitions: 10
from: updated_account
data_format: avro
Connection settings — override the embedded defaults with these environment variables:
| Environment variable | Default | Description |
|---|---|---|
STREAMLING__KAFKA_SINK__BROKERS |
localhost:9092 |
Comma-separated bootstrap broker list |
STREAMLING__KAFKA_SINK__SECURITY_PROTOCOL |
plaintext |
Kafka security protocol (e.g. plaintext, ssl, sasl_plaintext, sasl_ssl) |
STREAMLING__KAFKA_SINK__SASL_MECHANISM |
(unset) | SASL mechanism (applied when the protocol contains sasl) |
STREAMLING__KAFKA_SINK__SASL_USERNAME |
(unset) | SASL username |
STREAMLING__KAFKA_SINK__SASL_PASSWORD |
(unset) | SASL password |
STREAMLING__KAFKA_SINK__SCHEMA_REGISTRY_URL |
http://localhost:18081 |
Schema Registry URL (required for Avro) |
STREAMLING__KAFKA_SINK__SCHEMA_REGISTRY_USERNAME |
(unset) | Schema Registry basic-auth username |
STREAMLING__KAFKA_SINK__SCHEMA_REGISTRY_PASSWORD |
(unset) | Schema Registry basic-auth password |
STREAMLING__KAFKA_SINK__CLIENT_ID |
(unset) | Kafka client id |
ClickHouse Sink
The ClickHouse sink writes records to a ClickHouse table over the HTTP interface. The destination table must already exist; the sink reads its schema and sorting keys at startup rather than creating the table.
- Upsert Semantics: INSERT/UPDATE/DELETE operations are derived from the
_gs_opcolumn (seeUpsert Semanticssection below). Withappend_only_mode: true(the default), the sink targets aReplacingMergeTree(insert_time, is_deleted)table and derives theis_deleted/insert_timecolumns automatically. Withappend_only_mode: false, it usesINSERTfor upserts andALTER TABLE ... DELETEfor deletes. - Compression: INSERT request bodies can be gzip-compressed. The global default comes from
clickhouse_sink.compression/clickhouse_sink.compression_leveland can be overridden per sink with thecompressionandcompression_levelfields.
Sample configuration:
sinks:
ch_output:
type: clickhouse
from: filtered_events
table: test_output
primary_key: id
compression: gzip # optional, overrides clickhouse_sink.compression
Connection settings — override the embedded defaults with these environment variables:
| Environment variable | Default | Description |
|---|---|---|
STREAMLING__CLICKHOUSE_SINK__URL |
http://localhost:8123 |
ClickHouse HTTP endpoint |
STREAMLING__CLICKHOUSE_SINK__USER |
default |
Username |
STREAMLING__CLICKHOUSE_SINK__PASSWORD |
(empty) | Password |
STREAMLING__CLICKHOUSE_SINK__DATABASE |
default |
Database name |
STREAMLING__CLICKHOUSE_SINK__COMPRESSION |
none |
Wire compression for INSERTs (none or gzip); the per-sink compression field overrides it |
STREAMLING__CLICKHOUSE_SINK__COMPRESSION_LEVEL |
6 |
gzip compression level (0–9); ignored unless compression resolves to gzip |
Dynamic Tables
Dynamic tables provide a mechanism for maintaining stateful data structures that can be queried during stream processing. They act as persistent, queryable data stores that can be populated from streaming data and used for lookups, deduplication, and other operations within SQL transforms. Most importantly, these tables can be updated externally by simply changing the underlying datastore (e.g. Postgres) without needing to restart the pipeline.
Optionally, dynamic tables can also be populated by Streamling in runtime by providing a SQL query. At the moment, this SQL query only supports projections and filters. Also, it doesn't have a requirement to return _gs_op column. SQL queries defined for dynamic tables are executed as side outputs (see the Side Outputs section below) and are guaranteed to be in sync with the main data flow.
Overview
- Persistent Storage: Data persists across pipeline restarts and checkpoints
- Fast Lookups: Optimized for checking if values exist in the table
- Multiple Backends: Support for in-memory and PostgreSQL.
- SQL Integration: Seamless integration with SQL transforms through UDF functions
Configuration
Dynamic tables are configured in the transforms section of the pipeline topology, even though they function as both a data store and a sink. The backend storage is configured at the application level.
Example configuration:
transforms:
# Dynamic table without SQL - acts as a simple key-value store
user_seen_table:
type: dynamic_table
backend_type: postgres # Options: in-memory, postgres
backend_entity_name: user_tracking_table
# Dynamic table with SQL - automatically populated from a source
vip_customers_table:
type: dynamic_table
backend_type: Postgres
backend_entity_name: vip_customers
sql: |
SELECT customer_id
FROM crm.accounts
WHERE tier = 'vip'
Backend Types
In-Memory Backend
- Use Case: Development, testing, and low-latency scenarios
- Persistence: Data is lost on restart
- Performance: Fastest option with lowest latency
- Configuration: No additional setup required
transforms:
temp_table:
type: dynamic_table
backend_type: InMemory
backend_entity_name: temp_data
PostgreSQL Backend
- Use Case: Production deployments requiring persistence
- Persistence: Data survives restarts and failures
- Performance: Good performance with network overhead
- Configuration: Requires PostgreSQL connection settings
transforms:
persistent_table:
type: dynamic_table
backend_type: Postgres
backend_entity_name: persistent_data
Usage with SQL Transforms
Dynamic tables are accessed within SQL transforms using the dynamic_table_check UDF (User Defined Function). This function checks if a value exists in the specified dynamic table.
Example: Deduplication
transforms:
# First, define a dynamic table
seen_users:
type: dynamic_table
backend_type: Postgres
backend_entity_name: user_dedup
sql: |
SELECT user_id
FROM user.events
# Then use it in a SQL transform
filtered_events:
type: sql
sql: |
SELECT *
FROM user.events
WHERE NOT dynamic_table_check('seen_users', user_id)
primary_key: event_id
Example: Lookup filter
transforms:
vip_customers:
type: dynamic_table
backend_type: Postgres
backend_entity_name: vip_customers
sql: |
SELECT customer_id
FROM crm.accounts
WHERE tier = 'vip'
vip_orders:
type: sql
sql: |
SELECT id, customer_id, amount, created_at
FROM orders.events
WHERE dynamic_table_check('vip_customers', customer_id)
primary_key: id
UDF Function Signature
The dynamic_table_check function has the following signature:
- Parameters:
table_name(string): Name of the dynamic table to queryvalue(string): Value to check for existence
- Returns:
boolean-trueif the value exists in the table,falseotherwise
Source Validation
Dynamic tables include built-in validation to ensure data consistency. When a dynamic table with SQL is used in a SQL transform, both must reference the same input. This prevents issues where the dynamic table and the transform are out of sync. This input can be a source or another transform.
Performance Considerations
- Batch Operations: The UDF processes batches of values
- Backend Choice: Choose backends based on your latency and persistence requirements
- Indexing: PostgreSQL backend automatically creates an index for optimal lookup performance
Side Outputs
Side outputs allow emitting data to a "side" processor without modifying the main data flow. They're not part of the topology definition
(not visible in the LogicalPlan). The implementation can slightly differ between operators, but the general idea is that side outputs are executed
right before the data is emitted (by receiving a RecordBatch).
Currently, the only use case for side outputs is populating dynamic tables (when a SQL query is defined).
Checkpointing
Streamling supports lightweight, local checkpointing. Local checkpointing means that each instance of Streamling performs checkpointing independently, so no coordination is needed. Checkpoint markers with monotonically increasing epochs are emitted by the checkpoint coordinator, then they're passed between the operators (sources to transforms to sinks) and finally delivered to the checkpoint coordinator. Once the coordinator receives the expected number of marker acknowledgements, it advances the epoch and sends the finalization message.
Here's the end-to-end flow with expectations from each operator:
Marker phase:
1. Coordinator broadcasts Marker → Sources receive them
2. Sources record offsets (DON'T commit yet), propagate Marker downstream in batches
3. Sinks receive batches with Marker, FLUSH/COMMIT their data, then send Ack
4. Coordinator receives all Acks → knows all sinks have flushed
Finalizer phase:
5. Coordinator sends Finalizer → Sources receive them
6. Sources COMMIT their state (e.g., Kafka offsets)
7. Sources propagate Finalizer downstream
8. Sinks ignore Finalizer
This guarantees at-least-once because sources only commit after all sinks have confirmed they flushed.
For example, the Kafka source uses this message to commit previously persisted offsets. Here are the logs
for the Kafka -> SQL Transform -> Print topology:
2025-12-03T18:10:20.399911Z INFO tokio-runtime-worker ThreadId(03) streamling_core::checkpoints::checkpoint_management: Sending checkpoint with epoch: 1
2025-12-03T18:10:20.400262Z DEBUG tokio-runtime-worker ThreadId(07) streamling_connectors::table_providers::kafka: Received epoch marker: CheckpointEpoch(1)
2025-12-03T18:10:20.400465Z DEBUG tokio-runtime-worker ThreadId(07) streamling_connectors::table_providers::kafka: Current position: TPL {app.events.account/0: offset=Offset(87) metadata="", error=Ok(()); app.events.account/1: offset=Offset(19) metadata="", error=Ok(()); app.events.account/2: offset=Offset(241) metadata="", error=Ok(()); app.events.account/3: offset=Offset(2) metadata="", error=Ok(())}
2025-12-03T18:10:20.508865Z DEBUG tokio-runtime-worker ThreadId(11) streamling_core::checkpoints::checkpoint_management: [CheckpointCoordinator] Received checkpoint ACK for epoch: 1 from sink
2025-12-03T18:10:20.508998Z INFO tokio-runtime-worker ThreadId(11) streamling_core::checkpoints::checkpoint_management: Epoch finalized: 1
2025-12-03T18:10:20.613465Z DEBUG tokio-runtime-worker ThreadId(11) streamling_connectors::table_providers::kafka: Received epoch finalizer: CheckpointEpoch(1)
2025-12-03T18:10:20.613503Z DEBUG tokio-runtime-worker ThreadId(11) streamling_connectors::table_providers::kafka: Committing position: TPL {app.events.account/0: offset=Offset(87) metadata="", error=Ok(()); app.events.account/1: offset=Offset(19) metadata="", error=Ok(()); app.events.account/2: offset=Offset(241) metadata="", error=Ok(()); app.events.account/3: offset=Offset(2) metadata="", error=Ok(())}
2025-12-03T18:10:20.620278Z DEBUG tokio-runtime-worker ThreadId(11) streamling_connectors::table_providers::kafka: Saving position to state backend: TopicPartition { topic: "app.events.account", partition: 0, offset: TopicPartitionOffset { offset: 87, updated_at: 1764785420620 } }
2025-12-03T18:10:20.620308Z DEBUG tokio-runtime-worker ThreadId(11) streamling_connectors::table_providers::kafka: Saving position to state backend: TopicPartition { topic: "app.events.account", partition: 1, offset: TopicPartitionOffset { offset: 19, updated_at: 1764785420620 } }
2025-12-03T18:10:20.620317Z DEBUG tokio-runtime-worker ThreadId(11) streamling_connectors::table_providers::kafka: Saving position to state backend: TopicPartition { topic: "app.events.account", partition: 2, offset: TopicPartitionOffset { offset: 241, updated_at: 1764785420620 } }
2025-12-03T18:10:20.620326Z DEBUG tokio-runtime-worker ThreadId(11) streamling_connectors::table_providers::kafka: Saving position to state backend: TopicPartition { topic: "app.events.account", partition: 3, offset: TopicPartitionOffset { offset: 2, updated_at: 1764785420620 } }
2025-12-03T18:10:21.466247Z INFO tokio-runtime-worker ThreadId(03) streamling_core::checkpoints::checkpoint_management: Sending checkpoint with epoch: 2
2025-12-03T18:10:21.573524Z DEBUG tokio-runtime-worker ThreadId(05) streamling_connectors::table_providers::kafka: Received epoch marker: CheckpointEpoch(2)
2025-12-03T18:10:21.573603Z DEBUG tokio-runtime-worker ThreadId(05) streamling_connectors::table_providers::kafka: Current position: TPL {app.events.account/0: offset=Offset(87) metadata="", error=Ok(()); app.events.account/1: offset=Offset(19) metadata="", error=Ok(()); app.events.account/2: offset=Offset(241) metadata="", error=Ok(()); app.events.account/3: offset=Offset(2) metadata="", error=Ok(())}
2025-12-03T18:10:21.681840Z DEBUG tokio-runtime-worker ThreadId(03) streamling_core::checkpoints::checkpoint_management: [CheckpointCoordinator] Received checkpoint ACK for epoch: 2 from sink
2025-12-03T18:10:21.681873Z INFO tokio-runtime-worker ThreadId(03) streamling_core::checkpoints::checkpoint_management: Epoch finalized: 2
2025-12-03T18:10:21.787167Z DEBUG tokio-runtime-worker ThreadId(03) streamling_connectors::table_providers::kafka: Received epoch finalizer: CheckpointEpoch(2)
2025-12-03T18:10:21.787226Z DEBUG tokio-runtime-worker ThreadId(03) streamling_connectors::table_providers::kafka: Current position already committed, so skipping commit
You can see that once the epoch 1 is created, the Kafka source immediately receives the epoch marker and saves the current offsets. Then it propagates the marker to the SQL transform, which propagates it to the Print sink. Finally, the print sink sends the checkpoint ack back to the coordinator. Once the coordinator receives the expected number of acks (just 1 in this case, since there is only one sink), it finalizes the epoch and sends the finalizer message to the Kafka source. The source then commits the offsets.
State Backends
Streamling supports multiple state backend implementations for persisting operator state.
NOTE: application_id (STREAMLING__APPLICATION_ID) is used as a namespace for the state backend. It must be unique
across all applications using the same state backend. By default, it is set to the app_name from infra config.
In-Memory State Backend
- Simple HashMap-based implementation
- Suited for testing and local development
- No persistence between restarts
- Lowest latency but no durability guarantees
SQLite State Backend
- Uses SQLite for state storage
- Good for single-node deployments or local development
- Persists data to a local file
- Schema:
CREATE TABLE state ( namespace TEXT, key TEXT, data TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(namespace, key) ); - Sqlite database file is created automatically on startup if it doesn't exist.
- The
created_atfield is automatically set to the current timestamp when new state records are created and preserved on updates.
PostgreSQL State Backend
- Uses PostgreSQL for state storage
- Recommended for production deployments
- Supports high availability
- Schema:
CREATE TABLE streamling.state ( namespace TEXT, key TEXT, data JSONB NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY(namespace, key) ); - Postgres schema and table are created automatically on startup if they don't exist.
- The
created_atfield is automatically set to the current timestamp when new state records are created and preserved on updates.
You can choose a state backend by setting the STREAMLING__STATE_BACKEND__BACKEND_TYPE environment variable to one of the following values:
InMemory(default, a no-op)PostgresSqlite
Usage with Kafka Source Checkpoints
The state backend plays a crucial role in Kafka source reliability:
-
Offset Management:
- Stores Kafka partition offsets using the format
{reference_name}:{topic}:{partition}as keys - Each offset entry contains the offset position and last update timestamp
- Used to resume processing from the last committed position after restarts
- NOTE: offsets in the state always have preference over the Kafka consumer group ones
- Stores Kafka partition offsets using the format
-
Checkpoint Process:
- When a checkpoint epoch is finalized, Kafka source commits offsets to both Kafka and state backend
- State backend persists the offsets in a durable way
- On startup, Kafka source:
- Queries state backend for stored offsets
- Seeks to stored positions if found
- Starts from configured position (earliest/latest) for new partitions
Error Handling
Streamling uses StreamlingError, a custom error type that wraps anyhow::Error and integrates with DataFusion while providing rich error context and semantic flags.
Why Context Matters
Errors are messages meant for humans (debugging) or machines (automated recovery). Without context, errors become useless for both. Consider a common antipattern where we catch errors and throw them up the stack as fast as possible:
// Bad: error forwarding strips away meaning
fn process_batch(&self) -> Result<RecordBatch> {
let data = self.fetch_data()?; // "connection refused"
let parsed = parse_avro(data)?; // just forwards the error
Ok(transform(parsed)?)
}
When this fails in production, you get "connection refused". But connection to what? During which operation? In which pipeline? The error shows where it originated but not the logical path through the application—the path you actually need to debug the issue.
Backtraces don't solve this either. In async Rust, they're dominated by noise (dozens of GenFuture::poll() frames) and still only show origin, not intent.
Context captures intent. Each .context() call answers: "What was I trying to do when this failed?"
// Good: context captures the logical path
fn process_batch(&self) -> Result<RecordBatch> {
let data = self.fetch_data()
.streamling_context("failed to fetch data from schema registry")?;
let parsed = parse_avro(data)
.streamling_context("failed to parse Avro payload")?;
transform(parsed)
.streamling_context("failed to transform batch")
}
Now the error reads: "failed to transform batch: failed to parse Avro payload: invalid schema version". You know exactly what the code was trying to do at each level.
For more on this philosophy, see Stop Forwarding Errors, Start Designing Them.
StreamlingError
StreamlingError is the standard error type with two semantic flags:
internal: Internal errors are not likely to be caused by user input and may contain implementation details. User-facing errors (e.g., invalid configuration) should have this flag unset.retriable: Indicates the operation can be retried (e.g., transient network errors like connection refused or timeout).
These flags enable automated error handling (retry logic) and safe error display to end users.
Creating Errors
Macros (preferred for simple cases)
use streamling_core::{streamling_err, streamling_bail, streamling_user_err, streamling_user_bail, streamling_retriable_err};
// Internal, non-retriable error
let err = streamling_err!("failed to process item {}", item_id);
// Early return with internal, non-retriable error
if value < 0 {
streamling_bail!("value must be non-negative, got {}", value);
}
// User-facing, non-retriable error (safe to show to users)
let err = streamling_user_err!("invalid configuration: {}", reason);
// Early return with user-facing error
if input.is_empty() {
streamling_user_bail!("input cannot be empty");
}
// Internal, retriable error (for transient failures)
let err = streamling_retriable_err!("connection to {} timed out", host);
Constructors (for more control)
use streamling_core::error::StreamlingError;
// Basic constructors
let err = StreamlingError::new("internal error"); // internal, non-retriable
let err = StreamlingError::user("user-facing error"); // user-facing, non-retriable
let err = StreamlingError::retriable("transient failure"); // internal, retriable
// With cause (wraps another error)
let err = StreamlingError::with_cause("write failed", io_error); // internal, non-retriable
let err = StreamlingError::user_with_cause("config error", parse_error); // user-facing, non-retriable
let err = StreamlingError::retriable_with_cause("connection failed", e); // internal, retriable
Chainable Modifiers
Errors can be modified after creation:
let err = StreamlingError::new("error")
.mark_retriable() // mark as retriable
.mark_user_facing() // mark as user-facing (not internal)
.context("additional context"); // add context, preserves flags
Adding Context
Use the ResultExt trait to add context to any Result:
use streamling_core::error::ResultExt;
fn process() -> Result<()> {
// Eager context (always evaluated)
some_operation()
.streamling_context("failed to perform operation")?;
// Lazy context (only evaluated on error)
expensive_operation()
.streamling_with_context(|| format!("failed for item {}", compute_id()))?;
Ok(())
}
When applied to Result<T, StreamlingError>, the internal and retriable flags are preserved. For other error types, creates an internal, non-retriable error.
Automatic Conversions
StreamlingError provides From implementations for common error types:
| Source Type | internal |
retriable |
|---|---|---|
anyhow::Error |
true |
false |
arrow_schema::ArrowError |
true |
false |
DataFusionError |
true |
false |
std::io::Error |
true |
Depends on error kind* |
*I/O errors are marked retriable for: ConnectionRefused, ConnectionReset, ConnectionAborted, NotConnected, TimedOut, Interrupted.
StreamlingError also converts to DataFusionError automatically, enabling seamless use with the ? operator at trait boundaries.
Checking Error Properties
if err.is_retriable() {
// Implement retry logic
}
if !err.is_internal() {
// Safe to display to user
show_to_user(&err.to_string());
}
DataFusion Integration
DataFusion traits require datafusion::error::Result<T>. Use StreamlingError or the IntoStreamlingResult trait:
use streamling_core::error::{Result, ResultExt, IntoStreamlingResult};
impl TableProvider for MyProvider {
async fn scan(&self, ...) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
// StreamlingError converts to DataFusionError automatically
let schema = fetch_schema(&self.url)
.streamling_context("failed to fetch schema")?;
// Or convert anyhow::Result explicitly
let data = some_anyhow_function()
.into_streamling()?;
Ok(Arc::new(MyExec::new(schema)))
}
}
Error Output Format
When errors are displayed, the full chain is preserved:
Error: kafka source 'raw_transactions': failed to create Kafka source
Caused by:
Execution error: Error: failed to fetch Avro schema from Schema Registry for topic raw.event.transaction
Caused by:
Error: Could not get id from response had no other cause, it's retriable: false, it's cached: false
This replaces the previous nested "Execution error: Execution error:" pattern with clear, hierarchical error messages.
Upsert Semantics
By default, DataFusion passes data in a columnar format using RecordBatch without any additional metadata.
So, in order to support upsert semantics, an additional _gs_op column is added to the schema. Unfortunately, this
comes with some challenges mentioned in https://github.com/goldsky-io/streamling/pull/2.
Plugin System
Overview
Streamling's plugin system provides a flexible way to extend the platform with custom functionality. The plugin system is built using FFI (Foreign Function Interface) with the abi_stable crate, ensuring that plugins built against one version of Streamling continue to work with future versions. While theoretically any language that can interface with Rust can be used, the primary focus is on Rust-based plugins.
The plugin architecture consists of these key components:
- Host Environment: The Streamling core that loads and manages plugins
- Plugin Interface: A stable ABI for communication between host and plugins
- Message Channels: Bidirectional communication channels for data exchange (input, output, and metrics)
Plugins can implement any of the following types:
- Source plugins: Generate data to be processed by the pipeline
- Transform plugins: Modify data flowing through the pipeline
- Sink plugins: Consume data from the pipeline and write it to external systems
- Preprocessor plugins: Transform topology configuration before parsing
- UDF plugins: Custom scalar functions usable in SQL transforms
- Side output plugins: Observe data from sources without modifying the pipeline
Plugin Pipeline Configuration
Plugins location is configured via the application config (STREAMLING__PLUGIN__PATH env var). It can either point to
an individual plugin file or a directory containing multiple plugin files. These plugins will be loaded and initialized
on application startup.
Plugins are configured in pipeline definitions through JSON or YAML. NOTE: plugin ids (explained below) are used as a type. This makes plugins first-class citizens in the topology definition.
Here's an example showing a complete plugin-based pipeline with source, transform and sink:
sources:
plugin_data:
type: basic_plugin.random_source
options:
max_rows: "50"
transforms:
updated_plugin_data:
from: plugin_data
type: basic_plugin.filter_transform
options:
_gs_op: i
sinks:
print.updated_plugin_data:
from: updated_plugin_data
type: print_sink
options:
mode: batch
Each plugin configuration requires:
type: Unique plugin id, consisting of the plugin namespace (optional) and an operator name, separated by a dot (e.g.,basic_plugin.random_source)options: Key-value pairs for plugin-specific configuration
Message-based Interface
Communication between Streamling and plugins happens through a message-based interface built on crossbeam channels. Each plugin receives three channels:
- Input Channel: Receives messages from the host
- Output Channel: Sends messages back to the host
- Metrics Channel: Sends metrics from the plugin to the host
The message protocol is defined by the PluginMsg enum:
pub enum PluginMsg {
Init,
NextBatch { data: SafeArrowArray },
CheckpointMarker { epoch: PluginCheckpointEpoch },
CheckpointAck { epoch: PluginCheckpointEpoch },
CheckpointFinalizer { epoch: PluginCheckpointEpoch },
Terminate,
Topology { config: RString },
Error { message: RString },
}
Key messages include:
Init: Sent to initialize the pluginNextBatch: Contains an Arrow RecordBatch for processingCheckpointMarker/CheckpointAck/CheckpointFinalizer: Support Streamling's checkpointing systemTerminate: Graceful shutdown signalTopology: Sends topology configuration to preprocessor pluginsError: Error response from a plugin
This design allows plugins to process data incrementally and supports the full range of Streamling's reliability features.
FFI
The core plugin interface is defined by the PluginModule struct:
pub struct PluginModule {
/// Function to initialize the plugin, called only once per plugin module.
pub init: extern "C" fn(
logging: PluginLogging,
) -> RResult<PluginRuntimeConfiguration, PluginInitializationError>,
/// Function to create a plugin instance, e.g. a source, transform, or sink.
pub create: extern "C" fn(
plugin_id: RString,
input_schema: ROption<SafeArrowSchema>,
options: PluginOptions,
runtime: PluginAsyncRuntimeObj,
state_backend_config: PluginStateBackendConfig,
message_channels: PluginChannels,
) -> RResult<PluginResult, PluginInitializationError>,
/// Returns UDF descriptors provided by this plugin. Can return an empty vector.
pub udf_descriptors:
extern "C" fn() -> RResult<RVec<PluginUdfDescriptor>, PluginInitializationError>,
/// Returns side output descriptors provided by this plugin. Can return an empty vector.
pub side_output_descriptors:
extern "C" fn() -> RResult<RVec<PluginSideOutputDescriptor>, PluginInitializationError>,
}
Plugins must implement these functions:
init: Called once when the plugin is loaded. It returns aPluginRuntimeConfiguration, which contains available plugin ids and optional per-plugin channel capacity hints.create: Called to create a new plugin instance with a specific plugin id.udf_descriptors: Returns descriptors for any UDFs the plugin provides.side_output_descriptors: Returns descriptors for any side outputs the plugin provides.
Async Runtime Support
Tokio's runtime can't be passed over FFI, so it can't be shared with the plugins directly. Plugins may choose to either:
- Use the provided
PluginAsyncRuntime(described below) trait, as well as the correspondingPluginAsyncRuntimeObjtrait object, to perform asynchronous operations. This runtime is created on the host side and shared by all plugins. It allows to use resources efficiently, but it also limits async operations to the provided interface. - Initialize and use their own Tokio async runtime.
The PluginAsyncRuntime trait uses async-ffi looks like this:
pub trait PluginAsyncRuntime: Send + Sync + Clone {
fn spawn(&self, fut: FfiFuture<()>);
fn sleep(&self, dur: RDuration) -> FfiFuture<()>;
fn timeout(&self, dur: RDuration, fut: FfiFuture<()>) -> FfiFuture<RResult<(), ()>>;
fn block_on(&self, fut: FfiFuture<()>);
fn yield_now(&self) -> FfiFuture<()>;
}
It's also possible to use plugin-specific Tokio runtime via the PluginAsyncRuntime trait. The high-level API described
below provides init_plugin_with_async_runtime!() macro just for that.
Data Handling with Arrow
Data flows between Streamling and plugins as Arrow RecordBatches, providing efficient columnar data processing.
The SafeArrowArray type enables safe passage of Arrow data across the FFI boundary:
pub struct SafeArrowArray {
pub array: FFI_ArrowArray,
pub schema: SafeArrowSchema,
}
Conversion functions handle the transformation between Arrow RecordBatches and the FFI-safe representation.
State Backend Support
PluginStateBackendConfig containing the application namespace and the serialized state backend configuration is passed
to the plugins. From there, PluginStateBackendFactory can be used to create a state backend factory for plugins. After
that, the factory can be used just like a regular state backend factory (refer to the State Backends section above).
NOTE: state backends typically require a Tokio runtime to be initialized, so plugins should enable async support before using them.
Plugin Metrics
Plugins receive a PluginMetricsRecorder that allows emitting metrics back to the host via a dedicated metrics channel. The host processes these metrics and integrates them with the Streamling telemetry system.
Available methods:
record_count(name, value)/record_count_w_tags(name, value, tags): Record counter metrics.record_latency(name, duration)/record_latency_w_tags(name, duration, tags): Record latency/histogram metrics.record_gauge(name, value)/record_gauge_w_tags(name, value, tags): Record gauge metrics.
Tags are provided as Vec<(&str, &str)> key-value pairs.
High-Level API
While plugins can work directly with the message-based interface, Streamling provides convenient higher-level traits that simplify plugin development.
Core Operator Traits
#[async_trait]
pub trait SourcePlugin: SupportsGracefulShutdown + Send + Sync {
async fn initialize(&self) -> Result<(), PluginError>;
fn output_schema(&self) -> Result<SchemaRef, PluginError>;
/// Return an empty batch to indicate missing data if needed.
async fn generate_batch(&self) -> Result<RecordBatch, PluginError>;
/// Returning a successful result indicates that the checkpoint marker was processed
/// successfully, and the mark should be propagated downstream.
async fn process_checkpoint_marker(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
async fn process_checkpoint_finalizer(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
}
#[async_trait]
pub trait TransformPlugin: SupportsGracefulShutdown + Send + Sync {
async fn initialize(&self) -> Result<(), PluginError>;
fn output_schema(&self) -> Result<SchemaRef, PluginError>;
/// Return an empty batch to indicate missing data if needed.
async fn process_batch(&self, data: RecordBatch) -> Result<RecordBatch, PluginError>;
/// Returning a successful result indicates that the checkpoint marker was processed
/// successfully, and the mark should be propagated downstream.
async fn process_checkpoint_marker(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
async fn process_checkpoint_finalizer(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
}
#[async_trait]
pub trait SinkPlugin: SupportsGracefulShutdown + Send + Sync {
async fn initialize(&self) -> Result<(), PluginError>;
async fn process_batch(&self, data: RecordBatch) -> Result<(), PluginError>;
/// Returning a successful result indicates that the checkpoint marker was processed
/// successfully, and an acknowledgment should be sent back to the source.
async fn process_checkpoint_marker(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
async fn process_checkpoint_finalizer(&self, epoch: CheckpointEpoch) -> Result<(), PluginError>;
}
SupportsGracefulShutdown is a small trait that allows plugins to handle graceful shutdown by implementing these methods:
#[async_trait]
pub trait SupportsGracefulShutdown {
/// Returns true if the plugin is still running.
fn is_running(&self) -> bool;
/// Attempts to gracefully shut down the plugin.
async fn terminate(&self) -> Result<(), PluginError>;
}
Typical implementation would rely on an AtomicBool to track the running state.
Preprocessor Trait
Preprocessors transform the raw topology configuration string before it is parsed. This enables dynamic configuration generation.
#[async_trait]
pub trait PreprocessorPlugin: Send + Sync {
async fn preprocess_topology(&self, config: String) -> Result<String, PluginError>;
}
Preprocessors are configured via the application config:
plugin:
preprocessor_ids:
- my_preprocessor
preprocessor_options:
my_preprocessor:
key: value
Side Output Trait
Side outputs observe data from sources without modifying the pipeline. Unlike other plugin types, side outputs use direct FFI invocation (no channels). One instance is created per source.
pub trait SideOutputPlugin: Send + Sync {
fn process_batch(&self, batch: &RecordBatch) -> Result<(), String>;
fn shutdown(&self);
}
Side outputs are configured via the application config:
plugin:
side_output_ids:
- my_side_output
side_output_options:
my_side_output:
key: value
UDF Plugins
UDF (User Defined Function) plugins provide custom scalar functions that can be used in SQL transforms. They implement DataFusion's ScalarUDFImpl trait and are registered globally when the plugin is loaded.
Dispatchers
Dispatcher types automatically handle the message loop and channel communication, allowing plugin developers to focus on business logic:
SourcePluginDispatcher: Manages message flow for source pluginsTransformPluginDispatcher: Manages message flow for transform pluginsSinkPluginDispatcher: Manages message flow for sink pluginsPreprocessorPluginDispatcher: Manages message flow for preprocessor plugins
Registration Macros
Helper macros make it easy to implement the plugin interface:
register_plugin_source!("<plugin_id>", <SourcePlugin>);to register a source plugin.register_plugin_transform!("<plugin_id>", <TransformPlugin>);to register a transform plugin.register_plugin_sink!("<plugin_id>", <SinkPlugin>);to register a sink plugin.register_plugin_preprocessor!("<plugin_id>", <PreprocessorPlugin>);to register a preprocessor plugin.register_plugin_udf!(<ScalarUDFImpl>);to register a UDF from a type implementingScalarUDFImpl+Default.register_plugin_udf_fn!(<factory_fn>);to register a UDF from a factory function returningScalarUDF.register_plugin_side_output!("<id>", <SideOutputPlugin>);to register a side output plugin.init_plugin!();orinit_plugin_with_async_runtime!();to finalize the plugin definition. The latter creates a plugin-specific Tokio runtime.
Plugin id consists of a namespace (optional) and a name, separated by a dot (e.g., basic_plugin.random_source). The namespace allows grouping related plugins together, which can help avoid naming conflicts and organize plugins logically.
Any number of register_plugin_* macros can be called, e.g. to register multiple transforms.
When using macros, SourcePlugin constructor (new) will receive the async runtime (see Async Runtime Support section),
state backend factory (see State Backend Support section), metrics recorder (see Plugin Metrics section), and the
options map as parameters. TransformPlugin and SinkPlugin constructors will also receive the input schema as the
first parameter.
// Source constructor signature
fn new(
rt: PluginAsyncRuntimeObj,
state_backend_factory: PluginStateBackendFactory,
metrics_recorder: PluginMetricsRecorder,
options: HashMap<String, String>,
) -> Self
// Transform and Sink constructor signature (adds input_schema as first param)
fn new(
schema: SchemaRef,
rt: PluginAsyncRuntimeObj,
state_backend_factory: PluginStateBackendFactory,
metrics_recorder: PluginMetricsRecorder,
options: HashMap<String, String>,
) -> Self
Examples
The project includes example plugins that demonstrate different implementation approaches:
Basic Example
A complete plugin implementation using the high-level API:
// From plugin_examples/basic/src/lib.rs
mod sink;
mod source;
mod transform;
use crate::sink::PrintSink;
use crate::source::RandomSource;
use crate::transform::FilterTransform;
use streamling_plugin::{
init_plugin_with_async_runtime, register_plugin_sink, register_plugin_source,
register_plugin_transform,
};
// using namespace and name; this would be "basic_plugin.random_source" in the topology
register_plugin_source!("basic_plugin", "random_source", RandomSource);
register_plugin_transform!("basic_plugin", "filter_transform", FilterTransform);
// using name only
register_plugin_sink!("print_sink", PrintSink);
init_plugin_with_async_runtime!();
Source Plugin Example
The RandomSource example shows how to implement a data generator:
// From plugin_examples/basic/src/source.rs
pub struct RandomSource {
rt: PluginAsyncRuntimeObj,
schema: SchemaRef,
options: HashMap<String, String>,
max_rows: usize,
row_count: AtomicUsize,
}
#[async_trait]
impl SourcePlugin for RandomSource {
async fn initialize(&self) -> Result<(), PluginError> {
Ok(())
}
async fn generate_batch(&self) -> Result<RecordBatch, PluginError> {
self.rt.sleep(RDuration::from_secs(1)).await;
// Generate random data as a RecordBatch
// ...
Ok(batch)
}
// Checkpoint handling methods
// ...
}
Transform Plugin Example
The FilterTransform example demonstrates data transformation:
// From plugin_examples/basic/src/transform.rs
pub struct FilterTransform {
schema: SchemaRef,
options: HashMap<String, String>,
}
#[async_trait]
impl TransformPlugin for FilterTransform {
async fn process_batch(&self, batch: RecordBatch) -> Result<RecordBatch, PluginError> {
if batch.num_rows() == 0 {
return Ok(batch);
}
// Apply filtering logic based on options
// ...
Ok(filtered_batch)
}
// Other required methods
// ...
}
Low-Level Example
For developers who need more control, direct interaction with the message channels is possible:
// From plugin_examples/low_level/src/sink.rs
pub struct PrintSink {
schema: SchemaRef,
options: HashMap<String, String>,
channels: PluginChannels,
}
impl PrintSink {
pub async fn start(&self, runtime: PluginAsyncRuntimeObj) -> Result<(), DataFusionError> {
loop {
match self.channels.input.receiver.recv().map(|m| m.into_enum()) {
Ok(Ok(PluginMsg::Init)) => {
// Handle initialization
}
Ok(Ok(PluginMsg::NextBatch { data })) => {
let batch: RecordBatch = data.into();
// Process the batch
}
// Handle other message types
// ...
}
}
}
}
Building Plugins
To build a plugin (using the high-level API):
- Create a new Rust crate with
crate-type = ["cdylib"]in Cargo.toml - Implement one or more plugin traits:
SourcePlugin,TransformPlugin,SinkPlugin,PreprocessorPlugin,SideOutputPlugin, orScalarUDFImpl(for UDFs). - Use registration macros to register plugin components.
- Call
init_plugin!()orinit_plugin_with_async_runtime!()to finalize. - Build with
cargo build
The resulting shared library (.so, .dylib, .dll) can be loaded by Streamling via the STREAMLING__PLUGIN__PATH configuration.
Checkpointing Support
Plugins fully integrate with Streamling's checkpointing system, allowing exactly-once processing semantics. The checkpointing messages flow through the same channels as data:
CheckpointMarker: Indicates a checkpoint should be createdCheckpointAck: Acknowledges receipt of a checkpoint marker (only from sinks)CheckpointFinalizer: Indicates a checkpoint has been completed across the pipeline
By properly handling these messages, plugins can participate in Streamling's reliability guarantees.
Channel Capacity Tuning
Plugins can declare default channel capacities using macros:
set_plugin_input_buffer!("<plugin_id>", <capacity>);: Set input channel capacity.set_plugin_output_buffer!("<plugin_id>", <capacity>);: Set output channel capacity.
The global default can also be set via the STREAMLING__PLUGIN__CHANNEL_CAPACITY environment variable.
Performance Considerations
- Batch Size: Adjust the batch size for optimal performance. Larger batches reduce overhead but increase latency.
- Message Handling: Process messages efficiently to avoid blocking the pipeline.
- Data Conversion: Minimize conversions between Arrow and native formats.
- Resource Usage: Be mindful of memory and CPU usage in plugin implementations.
Profiling
Profiling locally
You can profile application runs, unit and integration tests with flamegraph locally.
Install it first, then run many scenarios, e.g. profiling an integration test:
cargo flamegraph --test pipeline
Profiling in Kubernetes
The project can be configured to be profiled with perf:
- If running in Kubernetes, you may need to update
securityContext:capabilities.add: SYS_ADMINto allow usingperfinside the containerprivileged: trueto allow access to kernel symbols (required for call graphs)
- Uncomment the following section in the Cargo.toml file:
[profile.release] debug = trueand wait until the build is ready. Ideally, this should be automated (e.g. we should have a separate Github Action for this).
- (Recommended) deploy without memory or ephemeral storage limits.
- Connect to a pod.
- Run
echo -1 | tee /proc/sys/kernel/perf_event_paranoidandecho 0 | tee /proc/sys/kernel/kptr_restrict. - Run
perf:perf record -p 1 --call-graph dwarf -F 99 -- sleep 15(where1is the PID of the process). Time and frequency can be adjusted. - Run
perf script --no-inline > out.perf. - Copy the
out.perffile locally withkubectl cp. - Checkout https://github.com/brendangregg/FlameGraph repo.
- Run
$PATH_TO_REPO/FlameGraph/stackcollapse-perf.pl out.perf > out.folded. - Run
$PATH_TO_REPO/FlameGraph/flamegraph.pl out.folded > out.svg. - Open
out.svgin a browser. - Enjoy the flamegraphs!
perf binary
Note: the perf binary may need to be compiled from scratch for the version of the linux kernel used in the Kubernetes cluster.
Instructions for compiling can be found here.
Telemetry and Metrics
Streamling uses with Datafusion metrics system to record the metrics and has OpenTelemetry integration to export it to metrics-backend.
Configuration
Telemetry is configured through the application configuration:
open_telemetry_metrics:
ingestion_endpoint: "http://localhost:4318/v1/metrics" # OTLP endpoint
endpoint_protocol: "http/protobuf" # Protocol: http/json, http/protobuf, or grpc
batch_interval_secs: 30 # How often to export metrics
global_tags: "environment:production,image_tag:v1.0.0" # Global tags for all metrics
By default, service name is set to streamling and service.instance.id is set to application_id.
Environment variables can also be used:
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: Override the ingestion endpointOTEL_EXPORTER_OTLP_PROTOCOL: Override the protocolSTREAMLING__OPEN_TELEMETRY_METRICS__GLOBAL_TAGS: Set global tags
Supported Components and Metrics
The following table shows all supported components and the metrics they emit:
| Component | Tags/Labels | Metric Name | Metric Type | Description |
|---|---|---|---|---|
| Kafka Source | topic |
output_rows |
Counter | Total number of rows consumed from Kafka topics |
output_rows_rate |
Gauge | Current rate of rows being consumed per time window | ||
output_rows_latency |
Histogram | Time taken to process Kafka message batches (ms) | ||
| HTTP Handler Transform | output_rows |
Counter | Total number of rows processed by HTTP handlers | |
output_rows_rate |
Gauge | Current rate of rows being processed per time window | ||
output_rows_latency |
Histogram | Time taken for HTTP request/response cycles (ms) | ||
| Blackhole Sink | output_rows |
Counter | Total number of rows discarded | |
output_rows_rate |
Gauge | Current rate of rows being discarded per time window | ||
output_rows_latency |
Histogram | Time taken to process and discard batches (ms) |
Note
All metrics include the following standard tags for filtering and aggregation:
id: The key of the structs inside ofsources|transforms|sinksin the pipeline-configtopology_node_type: The topology type ("source", "transform", or "sink")operator_type: The specific implementation (e.g., "kafka", "sql", "webhook")- Component-specific tags (e.g.,
topicfor Kafka sources)
Per-node labels (telemetry.labels)
Every source, transform, and sink accepts an optional telemetry.labels map. Each key/value attaches as an additional Prometheus label dimension on every metric the node emits:
sources:
raw_events:
type: kafka
topic: app.events
telemetry:
labels:
tier: critical
dataset: app.events
team: platform
Query: streamling_output_rows_total{dataset="app.events",tier="critical"}.
Constraints (validated at pipeline load):
- Up to 20 labels per node.
- Keys match Prometheus label-name grammar (
^[a-zA-Z_][a-zA-Z0-9_]*$) and cannot start with__. - Keys cannot shadow built-in tags (
id,topology_node_type,operator_type,service_instance_id) or the per-type identity tag for the node's kind (topicon Kafka,tableon ClickHouse/Postgres,urlon webhooks,typeon plugins,languageon script transforms; hybrid sources reserve bothtableandtopic). - Values up to 256 bytes; control characters (newline, null, etc.) are rejected to protect Prometheus text-format output. Tab is allowed.
Precedence with plugin-declared labels: plugins can return their own identity labels via PluginResult::labels. When a plugin-declared key collides with a YAML-declared key on the same node, the plugin value wins and a WARN log names the colliding key and both values (plugins are authoritative about their own identity, but the WARN surfaces misconfigurations rather than hiding them).
Hybrid sources: telemetry.labels on a hybrid source automatically propagate to both the bounded and unbounded phase-child metric series.
Metric Temporality
Streamling exports two variants of the output_rows metric with different temporality semantics:
-
output_rows_total(Cumulative): Reports the cumulative total of rows since process start. Use this for rate calculations in Prometheus/Grafana with functions likerate()andincrease(). -
output_rows_delta(Delta): Reports incremental rows since the last export interval. Use this for billing and aggregation use cases in time-series databases like ClickHouse, where you can simplySUM()the values without worrying about resets or lag compensation.
Adding Telemetry to Custom Operators
For Execution Plans (Sources and Transforms)
Wrap your execution plan with TelemetryExec. See kafka.rs, external_handlers.rs for inspiration.
For Sinks
Wrap your sink with TelemetryDataSink. See blackhole.rs for inspiration.
Troubleshooting
If metrics are not appearing:
- Check Configuration: Ensure the OTLP endpoint is reachable and configured correctly
- Verify Protocol: Make sure the endpoint supports the configured protocol
- Review Logs: Look for telemetry initialization errors in the application logs
- Test Connectivity: Verify network connectivity to the metrics endpoint
The telemetry system will log warnings if initialization fails, but the application will continue running without metrics export.