I/O Guide

CSV, Parquet, SQLite, and Kafka I/O

Ibex keeps file I/O in plugins, not in core syntax. That means CSV and Parquet read/write functions, plus SQLite reads through ADBC and Kafka streams through the Kafka plugin, are loaded with import and then called like any other function in scripts, the REPL, notebooks, and transpiled binaries.

What ships today

CSV plugin

Read RFC 4180 CSV into a DataFrame, infer types, accept custom null tokens, custom delimiters, headerless files, and optional schema hints when you already know the column types.

Parquet plugin

Read and write Apache Parquet for typed batch storage. This is the right default when you want a compact columnar format, better type preservation, and faster reloads than CSV.

ADBC plugin

Read from SQLite and other ADBC-backed systems as Arrow streams. This is the bridge from database result sets into Ibex chunked execution without writing a bespoke reader per backend.

Kafka plugin

Consume JSON messages from Kafka-compatible brokers such as Redpanda, turn them into one-row typed tables, and plug them directly into Stream { ... } pipelines or WebSocket dashboards.

Import bundled I/O plugins

The common pattern is to import the bundled plugin modules you need and then call their functions directly.

import "csv";
import "parquet";
import "adbc";
import "kafka";

let trades = read_csv("data/trades.csv");
let prices = read_parquet("data/prices.parquet");
let db_prices = read_adbc("/path/to/libadbc_driver_sqlite.so",
                               "data/prices.sqlite",
                               "select * from prices");
let tick = kafka_recv("localhost:19092", "ticks",
                          "ibex-demo",
                          "ts:timestamp,symbol:str,price:f64,size:i64");

Read CSV with as much structure as you know

Default CSV read

With just a path, Ibex reads the header row, infers Int64, Float64, or string/categorical columns, and returns a DataFrame.

import "csv";

let trades = read_csv("data/trades.csv");

Null tokens

Pass a comma-separated null specification. <empty> means empty fields should be treated as nulls too.

import "csv";

let train = read_csv("train.csv", "<empty>,NA");

Custom delimiter

This is useful for semicolon-separated files or other non-comma dialects. Quoted commas are still preserved correctly.

import "csv";

let raw = read_csv("quotes.txt", "", ";");

Headerless files

When has_header is false, Ibex synthesizes col1, col2, and so on. This is what the 1BRC benchmark file uses.

import "csv";

let raw = read_csv("measurements.txt", "", ";", false)
    [select { station = col1, temp = col2 }];

Schema hints

If you already know the CSV schema, pass it explicitly to skip most of the inference work. For large repeated loads, this is the right shape.

import "csv";

// positional schema
let fast = read_csv("measurements.txt", "", ";", false, "cat,f64");

// named schema
let typed = read_csv("trades.csv", "", ",", true,
                          "symbol:cat,price:f64,size:i64");

Run a visible live-stream demo with Kafka input and WebSocket output

What the demo does

The bundled Kafka demo uses Redpanda as a local Kafka-compatible broker. A synthetic tick producer writes JSON ticks into the ticks topic, and two Ibex stream jobs consume that same topic:

  • one job groups by symbol and venue and sends live summaries to ws://127.0.0.1:8765
  • one job resamples ticks into 5-second OHLC bars and sends them to ws://127.0.0.1:8766

The dashboard at demo/kafka/ws_dashboard.html opens both WebSocket feeds and shows them as two tabs.

Start the demo

# shell
scripts/demo-kafka.sh
scripts/run-kafka-dashboard.sh

# optional terminal viewers
python3 demo/kafka/ws_client.py
python3 demo/kafka/ohlc_ws_client.py

The Docker stack starts Redpanda plus the random tick producer. The launcher script starts two Ibex REPL processes in the background and keeps them running until you stop it.

The Ibex side

import "kafka";
import "websocket";

ws_listen(8765);

let summary = Stream {
    source = kafka_recv(
        "localhost:19092",
        "ticks",
        "ibex-demo-summary",
        "ts:timestamp,symbol:str,venue:str,price:f64,size:i64",
        "poll_timeout_ms=100;consumer.auto.offset.reset=latest;consumer.session.timeout.ms=6000"
    ),
    transform = [by { symbol, venue }, select {
        trades = count(),
        last_px = last(price),
        total_size = sum(size)
    }],
    sink = ws_send(8765)
};

The OHLC job in examples/kafka_ohlc.ibex uses the same source but switches the transform to [resample 5s, by { symbol }, select { ... }] and writes to ws://127.0.0.1:8766.

Kafka-specific notes

  • The schema string is explicit and required: use name:type entries like "ts:timestamp,symbol:str,venue:str,price:f64,size:i64".
  • For live dashboards, prefer consumer.auto.offset.reset=latest so the stream starts at current traffic instead of replaying old messages.
  • poll_timeout_ms is only the wait between polls. It is not a lifetime timeout for the stream.
  • For per-message live streams, str is a better demo key type than cat; it avoids per-message categorical dictionary churn.

Write CSV back out

write_csv writes a header row plus all data rows and returns the number of data rows written. Nulls become empty fields and strings are quoted according to RFC 4180.

import "csv";

let summary = trades[
    select { avg_px = mean(price), rows = count() },
    by symbol,
    order symbol
];

let n = write_csv(summary, "out/summary.csv");

Use Parquet for typed, columnar storage

Read Parquet

Parquet is the cleaner choice when you control the data pipeline and want strong type preservation, compact files, and faster reloads than CSV.

import "parquet";

let prices = read_parquet("data/prices.parquet");

Write Parquet

write_parquet returns the number of rows written. String and categorical columns are stored as UTF-8. Dates and timestamps are preserved as typed Parquet columns.

import "parquet";

let n = write_parquet(prices, "out/prices.parquet");

Read database tables and query results through the chunked path

Minimal SQLite query

read_adbc takes a driver, a database URI or path, and a SQL query. Passing the SQLite driver shared library path is the most portable shape when you are using a conda environment.

When using import "adbc" in the REPL, start Ibex with --plugin-path ./build-release/tools so it can find both adbc.ibex and adbc.so.

import "adbc";

let trades = read_adbc(
    "/path/to/libadbc_driver_sqlite.so",
    "/tmp/trades.sqlite",
    "select symbol, qty, px from trades order by qty desc"
);

1M-row measurements showcase

The helper script below loads the first 1,000,000 rows of the 1BRC benchmark file into SQLite. After that, the Ibex query shape is the same aggregation you would run directly over CSV.

# shell
python3 scripts/import_measurements_sqlite.py \
    --input examples/measurements.txt \
    --limit 1000000 \
    --output /tmp/measurements_1m.sqlite

LD_LIBRARY_PATH=~/envs/ibex/lib:$LD_LIBRARY_PATH \
./build-release/tools/ibex --plugin-path ./build-release/tools

# ibex
import "adbc";

let measurements = read_adbc(
    "~/envs/ibex/lib/libadbc_driver_sqlite.so",
    "/tmp/measurements_1m.sqlite",
    "select station, temp from measurements"
);

measurements[select {
    min_temp = min(temp),
    avg_temp = mean(temp),
    max_temp = max(temp)
}, by station, order station];

Optional ADBC options string

The 4th argument is an optional key=value string separated by semicolons or newlines. Prefix keys with db., conn., or stmt. to target database, connection, or statement options.

let remote = read_adbc(
    "adbc_driver_postgresql",
    "postgresql://user:pass@host/db",
    "select * from trades where qty > 0",
    "conn.autocommit=true;stmt.query_timeout_ms=5000"
);

Typical batch pipeline

import "csv";
import "parquet";

let raw = read_csv("measurements.txt", "", ";", false, "cat,f64")
    [select { station = col1, temp = col2 }];

let summary = raw[select {
    min_temp = min(temp),
    avg_temp = mean(temp),
    max_temp = max(temp)
}, by station, order station];

write_parquet(summary, "out/summary.parquet");

When to choose which format

Choose CSV when

  • You are ingesting raw files from outside systems.
  • You need human-readable interchange.
  • You need delimiter or headerless-file flexibility.

Choose Parquet when

  • You control both ends of the pipeline.
  • You want better typed persistence and faster reloads.
  • You are caching a cleaned dataset for repeated analysis.

Choose SQLite via ADBC when

  • You want to stage a database table or query result into Ibex.
  • You want SQL pushdown for filtering, joins, or pre-aggregation.
  • You want to bridge into other ADBC-backed systems later.

Choose Kafka when

  • You want live event ingestion instead of batch file loads.
  • You want to feed Stream { ... } pipelines from a broker rather than UDP or WebSocket input.
  • You want a local Redpanda-backed demo that ends in a browser dashboard.