CSV plugin
Read RFC 4180 CSV into a DataFrame, infer types, accept
custom null tokens, custom delimiters, headerless files, and optional
schema hints when you already know the column types.
I/O Guide
Ibex keeps file I/O in plugins, not in core syntax. That means CSV and
Parquet read/write functions, plus SQLite reads through ADBC and Kafka
streams through the Kafka plugin, are loaded with import
and then called like any other function in
scripts, the REPL, notebooks, and transpiled binaries.
Overview
Read RFC 4180 CSV into a DataFrame, infer types, accept
custom null tokens, custom delimiters, headerless files, and optional
schema hints when you already know the column types.
Read and write Apache Parquet for typed batch storage. This is the right default when you want a compact columnar format, better type preservation, and faster reloads than CSV.
Read from SQLite and other ADBC-backed systems as Arrow streams. This is the bridge from database result sets into Ibex chunked execution without writing a bespoke reader per backend.
Consume JSON messages from Kafka-compatible brokers such as Redpanda,
turn them into one-row typed tables, and plug them directly into
Stream { ... } pipelines or WebSocket dashboards.
Import pattern
The common pattern is to import the bundled plugin modules you need and then call their functions directly.
import "csv";
import "parquet";
import "adbc";
import "kafka";
let trades = read_csv("data/trades.csv");
let prices = read_parquet("data/prices.parquet");
let db_prices = read_adbc("/path/to/libadbc_driver_sqlite.so",
"data/prices.sqlite",
"select * from prices");
let tick = kafka_recv("localhost:19092", "ticks",
"ibex-demo",
"ts:timestamp,symbol:str,price:f64,size:i64");
CSV Input
With just a path, Ibex reads the header row, infers
Int64, Float64, or string/categorical
columns, and returns a DataFrame.
import "csv";
let trades = read_csv("data/trades.csv");
Pass a comma-separated null specification. <empty>
means empty fields should be treated as nulls too.
import "csv";
let train = read_csv("train.csv", "<empty>,NA");
This is useful for semicolon-separated files or other non-comma dialects. Quoted commas are still preserved correctly.
import "csv";
let raw = read_csv("quotes.txt", "", ";");
When has_header is false, Ibex synthesizes
col1, col2, and so on. This is what the
1BRC benchmark file uses.
import "csv";
let raw = read_csv("measurements.txt", "", ";", false)
[select { station = col1, temp = col2 }];
If you already know the CSV schema, pass it explicitly to skip most of the inference work. For large repeated loads, this is the right shape.
import "csv";
// positional schema
let fast = read_csv("measurements.txt", "", ";", false, "cat,f64");
// named schema
let typed = read_csv("trades.csv", "", ",", true,
"symbol:cat,price:f64,size:i64");
Kafka via Redpanda
The bundled Kafka demo uses Redpanda as a local Kafka-compatible
broker. A synthetic tick producer writes JSON ticks into the
ticks topic, and two Ibex stream jobs consume that same
topic:
symbol and venue and sends live summaries to ws://127.0.0.1:8765ws://127.0.0.1:8766
The dashboard at demo/kafka/ws_dashboard.html opens both
WebSocket feeds and shows them as two tabs.
# shell
scripts/demo-kafka.sh
scripts/run-kafka-dashboard.sh
# optional terminal viewers
python3 demo/kafka/ws_client.py
python3 demo/kafka/ohlc_ws_client.py
The Docker stack starts Redpanda plus the random tick producer. The launcher script starts two Ibex REPL processes in the background and keeps them running until you stop it.
import "kafka";
import "websocket";
ws_listen(8765);
let summary = Stream {
source = kafka_recv(
"localhost:19092",
"ticks",
"ibex-demo-summary",
"ts:timestamp,symbol:str,venue:str,price:f64,size:i64",
"poll_timeout_ms=100;consumer.auto.offset.reset=latest;consumer.session.timeout.ms=6000"
),
transform = [by { symbol, venue }, select {
trades = count(),
last_px = last(price),
total_size = sum(size)
}],
sink = ws_send(8765)
};
The OHLC job in examples/kafka_ohlc.ibex uses the same
source but switches the transform to
[resample 5s, by { symbol }, select { ... }] and writes
to ws://127.0.0.1:8766.
name:type entries like "ts:timestamp,symbol:str,venue:str,price:f64,size:i64".consumer.auto.offset.reset=latest so the stream starts at current traffic instead of replaying old messages.poll_timeout_ms is only the wait between polls. It is not a lifetime timeout for the stream.str is a better demo key type than cat; it avoids per-message categorical dictionary churn.CSV Output
write_csv writes a header row plus all data rows and
returns the number of data rows written. Nulls become empty fields and
strings are quoted according to RFC 4180.
import "csv";
let summary = trades[
select { avg_px = mean(price), rows = count() },
by symbol,
order symbol
];
let n = write_csv(summary, "out/summary.csv");
Parquet
Parquet is the cleaner choice when you control the data pipeline and want strong type preservation, compact files, and faster reloads than CSV.
import "parquet";
let prices = read_parquet("data/prices.parquet");
write_parquet returns the number of rows written. String
and categorical columns are stored as UTF-8. Dates and timestamps are
preserved as typed Parquet columns.
import "parquet";
let n = write_parquet(prices, "out/prices.parquet");
SQLite via ADBC
read_adbc takes a driver, a database URI or path, and a
SQL query. Passing the SQLite driver shared library path is the most
portable shape when you are using a conda environment.
When using import "adbc" in the REPL, start Ibex with
--plugin-path ./build-release/tools so it can find both
adbc.ibex and adbc.so.
import "adbc";
let trades = read_adbc(
"/path/to/libadbc_driver_sqlite.so",
"/tmp/trades.sqlite",
"select symbol, qty, px from trades order by qty desc"
);
The helper script below loads the first 1,000,000 rows of the 1BRC benchmark file into SQLite. After that, the Ibex query shape is the same aggregation you would run directly over CSV.
# shell
python3 scripts/import_measurements_sqlite.py \
--input examples/measurements.txt \
--limit 1000000 \
--output /tmp/measurements_1m.sqlite
LD_LIBRARY_PATH=~/envs/ibex/lib:$LD_LIBRARY_PATH \
./build-release/tools/ibex --plugin-path ./build-release/tools
# ibex
import "adbc";
let measurements = read_adbc(
"~/envs/ibex/lib/libadbc_driver_sqlite.so",
"/tmp/measurements_1m.sqlite",
"select station, temp from measurements"
);
measurements[select {
min_temp = min(temp),
avg_temp = mean(temp),
max_temp = max(temp)
}, by station, order station];
The 4th argument is an optional key=value string separated
by semicolons or newlines. Prefix keys with db.,
conn., or stmt. to target database,
connection, or statement options.
let remote = read_adbc(
"adbc_driver_postgresql",
"postgresql://user:pass@host/db",
"select * from trades where qty > 0",
"conn.autocommit=true;stmt.query_timeout_ms=5000"
);
Round Trip
import "csv";
import "parquet";
let raw = read_csv("measurements.txt", "", ";", false, "cat,f64")
[select { station = col1, temp = col2 }];
let summary = raw[select {
min_temp = min(temp),
avg_temp = mean(temp),
max_temp = max(temp)
}, by station, order station];
write_parquet(summary, "out/summary.parquet");
Practical Notes
Stream { ... } pipelines from a broker rather than UDP or WebSocket input.