Welcome to proc
The Problem
JavaScript streams are push-based. Producers push data whether consumers are ready or not. This creates backpressure—complex coordination between producers and consumers to prevent memory exhaustion. And when something goes wrong? You need error handlers on every stream in the chain.
// Traditional streams: backpressure + error handling at every step
stream1.on("error", handleError);
stream2.on("error", handleError);
stream3.on("error", handleError);
// Plus drain events, pause/resume, pipe coordination...
The Solution
proc uses async iterators instead of streams. Consumers pull data when ready. No backpressure. No coordination. And errors flow through pipelines naturally—one try-catch handles everything.
// proc: no backpressure, errors just work
try {
await run("cat", "data.txt")
.run("grep", "error")
.run("wc", "-l")
.lines
.forEach(console.log);
} catch (error) {
// All errors caught here
}
Who This Book Is For
This documentation is for developers who:
- Run child processes and want better error handling than
Deno.Commandprovides - Process streaming data (logs, CSV files, API responses) without loading everything into memory
- Want Array-like methods (
map,filter,reduce) for async data - Are replacing shell scripts with type-safe, testable code
You should be comfortable with TypeScript basics and async/await. No prior experience with Deno streams or child processes required.
What You’ll Learn
Running Processes — Execute commands, chain them like shell pipes, capture output, handle errors gracefully.
Async Iterables — Use map, filter, reduce, and more on any async data
source. Process gigabyte files with constant memory.
Bridge Push and Pull — Convert callbacks, events, and WebSockets into async iterables with WritableIterable. Automatic backpressure, natural error propagation.
Data Transforms — Convert between CSV, TSV, JSON, and Record formats with streaming support. Or use the WASM-powered flatdata CLI for maximum throughput.
A Taste of proc
Count lines in a compressed file—streaming, constant memory:
import { read } from "jsr:@j50n/proc@0.24.6";
const count = await read("logs.txt.gz")
.transform(new DecompressionStream("gzip"))
.lines
.count();
Chain processes like shell pipes:
import { run } from "jsr:@j50n/proc@0.24.6";
const errors = await run("cat", "app.log")
.run("grep", "ERROR")
.run("wc", "-l")
.lines.first;
Transform async data with familiar methods:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const results = await enumerate(urls)
.concurrentMap(fetch, { concurrency: 5 })
.filter((r) => r.ok)
.map((r) => r.json())
.collect();
Bridge event-driven code to async iteration:
import { WritableIterable } from "jsr:@j50n/proc@0.24.6";
const messages = new WritableIterable<string>();
ws.onmessage = async (e) => await messages.write(e.data);
ws.onclose = () => messages.close();
for await (const msg of messages) {
console.log("Received:", msg);
}
Quick Decision Guide
Need to run shell commands? → Running Processes
Processing files line by line? → File I/O
Converting CSV/TSV/JSON? → Data Transforms
Have callback/event-based data? → WritableIterable
Need maximum throughput? → flatdata CLI
Working with any async data? → Understanding Enumerable
Getting Started
- Installation — Add proc to your project
- Quick Start — Your first proc script in 5 minutes
- Key Concepts — Essential patterns to understand
Version: 0.24.6 | License: MIT | Status: Production-ready
Installation
Getting started with proc is simple—it’s just a Deno import away.
Import from JSR
Add proc to your Deno project:
import * as proc from "jsr:@j50n/proc@0.24.6";
Or import just what you need:
import { enumerate, read, run } from "jsr:@j50n/proc@0.24.6";
That’s it! No installation step, no package.json, no node_modules.
Data Transforms (Optional)
The data transforms module is separate from the core library to keep the main import lightweight. Import it when you need CSV, TSV, JSON, or Record format conversions:
// Core library - process management and async iterables
import { enumerate, read, run } from "jsr:@j50n/proc@0.24.6";
// Data transforms - CSV, TSV, JSON, Record conversions
import { fromCsvToRows, toTsv } from "jsr:@j50n/proc@0.24.6/transforms";
Why separate?
- Smaller bundles: Users who only need process management don’t import transform code
- Clear separation: Core library vs specialized data format conversions
- Optional feature: Transforms are powerful but not required for basic usage
See the Data Transforms guide for details.
Permissions
proc needs permissions to run child processes and access files. When you run your script, Deno will prompt you, or you can grant them upfront:
deno run --allow-run --allow-read your-script.ts
Common permissions:
--allow-run- Required to run child processes--allow-read- Needed to read files--allow-write- Needed to write files--allow-env- If your processes need environment variables
You can be more specific:
# Only allow running specific commands
deno run --allow-run=ls,grep,wc your-script.ts
# Only allow reading specific directories
deno run --allow-read=/var/log your-script.ts
Version Pinning
For production, pin to a specific version:
import { run } from "jsr:@j50n/proc@1.0.0";
For development, use the latest:
import { run } from "jsr:@j50n/proc";
Next Steps
Ready to write your first proc script? Head to the Quick Start guide.
Quick Start
Let’s get you running code in 5 minutes.
Your First Process
Create a file called hello.ts:
import { run } from "jsr:@j50n/proc@0.24.6";
// Run a command and capture output
const lines = await run("echo", "Hello, proc!").lines.collect();
console.log(lines); // ["Hello, proc!"]
Run it:
deno run --allow-run hello.ts
What just happened?
run()started theechocommand.linesconverted the output to text lines.collect()gathered all lines into an array
Chaining Processes
Let’s chain commands together, like shell pipes:
import { run } from "jsr:@j50n/proc@0.24.6";
const result = await run("echo", "HELLO WORLD")
.run("tr", "A-Z", "a-z") // Convert to lowercase
.lines.first;
console.log(result); // "hello world"
Each .run() pipes the previous output to the next command’s input.
Working with Files
Process a file line by line:
import { read } from "jsr:@j50n/proc@0.24.6";
const errorCount = await read("app.log")
.lines
.filter((line) => line.includes("ERROR"))
.count();
console.log(`Found ${errorCount} errors`);
Handling Errors
Errors propagate naturally—catch them once at the end:
import { run } from "jsr:@j50n/proc@0.24.6";
try {
await run("false") // This command exits with code 1
.lines
.collect();
} catch (error) {
console.error(`Command failed: ${error.code}`);
}
No need to check errors at each step. They flow through the pipeline and you catch them once. For details, see Error Handling.
Using Array Methods
Work with async data using familiar Array methods:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const data = ["apple", "banana", "cherry"];
const numbered = await enumerate(data)
.enum() // Add indices
.map(([fruit, i]) => `${i + 1}. ${fruit}`)
.collect();
console.log(numbered);
// ["1. apple", "2. banana", "3. cherry"]
A Real Example
Let’s find the 5 most recent commits that mention “fix”:
import { run } from "jsr:@j50n/proc@0.24.6";
const commits = await run("git", "log", "--oneline")
.lines
.filter((line) => line.includes("fix"))
.take(5)
.collect();
commits.forEach((commit) => console.log(commit));
This chains multiple operations, all streaming, using minimal memory. For more complex examples, see Recipes.
Data Transforms (Optional)
Need to work with CSV, TSV, JSON, or Record formats? Import the transforms module:
import { read } from "jsr:@j50n/proc@0.24.6";
import { fromCsvToRows, toTsv } from "jsr:@j50n/proc@0.24.6/transforms";
// Convert CSV to TSV
await read("data.csv")
.transform(fromCsvToRows())
.transform(toTsv())
.writeTo("data.tsv");
The transforms module is separate from the core library to keep your bundle lightweight. See Data Transforms for details.
See Also
- Key Concepts — Properties vs methods, resource management
- Running Processes — All the ways to run commands
- Error Handling — How errors propagate through pipelines
- Recipes — Copy-paste solutions for common tasks
Tutorial: Building a Git Analyzer
In this tutorial, you’ll build a command-line tool that analyzes a git repository’s commit history. Along the way, you’ll learn proc’s core features by using them to solve a real problem.
By the end, you’ll have a working script that:
- Lists recent commits
- Finds bug fixes
- Counts commits by author
- Identifies the most active day of the week
- Handles errors gracefully
The final script is about 40 lines of code.
Prerequisites
- Deno installed
- A git repository to analyze (this tutorial works on any repo)
- Basic TypeScript knowledge
What We’re Building
Here’s what the finished analyzer outputs:
=== Git Repository Analysis ===
Recent commits: 25
Bug fixes found: 7
Top contributors:
alice: 12 commits
bob: 8 commits
carol: 5 commits
Most active day: Tuesday (8 commits)
Let’s build it step by step.
Step 1: Getting Commits
Create a file called git-analyzer.ts. We’ll start by fetching the last 100
commits:
import { run } from "jsr:@j50n/proc@0.24.6";
// Get the last 100 commits (one line each)
const commits = await run("git", "log", "--oneline", "-100")
.lines
.collect();
console.log(`Found ${commits.length} commits`);
console.log("First commit:", commits[0]);
Run it:
deno run --allow-run git-analyzer.ts
What’s happening:
run()executes the git command.linesconverts the byte output to text lines.collect()gathers all lines into an array
Try changing -100 to -10 to see fewer commits.
Step 2: Filtering for Bug Fixes
Now let’s find commits that mention “fix” — likely bug fixes:
import { run } from "jsr:@j50n/proc@0.24.6";
const fixes = await run("git", "log", "--oneline", "-100")
.lines
.filter((line) => line.toLowerCase().includes("fix"))
.collect();
console.log(`Found ${fixes.length} bug fixes:`);
for (const fix of fixes.slice(0, 5)) {
console.log(` ${fix}`);
}
What’s happening:
.filter()keeps only lines containing “fix”- We use
.slice(0, 5)to show just the first 5 matches
The filtering happens as data streams through — we never load all 100 commits into memory just to filter them.
Step 3: Counting Commits by Author
Let’s see who’s contributing the most. We’ll parse the commit log to extract author names:
import { run } from "jsr:@j50n/proc@0.24.6";
// Get author of each commit
const authorCounts = await run("git", "log", "--format=%an", "-100")
.lines
.filter((name) => name.trim() !== "")
.reduce((counts, name) => {
counts[name] = (counts[name] || 0) + 1;
return counts;
}, {} as Record<string, number>);
// Sort by count and take top 5
const topAuthors = Object.entries(authorCounts)
.sort((a, b) => b[1] - a[1])
.slice(0, 5);
console.log("Top contributors:");
for (const [name, count] of topAuthors) {
console.log(` ${name}: ${count} commits`);
}
What’s happening:
--format=%anoutputs just the author name for each commit.reduce()accumulates counts into an object- We sort and slice to get the top 5
Step 4: Finding the Most Active Day
Which day of the week sees the most commits? We’ll parse commit dates and count them:
import { run } from "jsr:@j50n/proc@0.24.6";
const days = [
"Sunday",
"Monday",
"Tuesday",
"Wednesday",
"Thursday",
"Friday",
"Saturday",
];
// Get commit dates
const dayCounts = await run(
"git",
"log",
"--format=%ad",
"--date=format:%u",
"-100",
)
.lines
.filter((line) => line.trim() !== "")
.reduce((counts, dayNum) => {
const day = days[parseInt(dayNum) % 7];
counts[day] = (counts[day] || 0) + 1;
return counts;
}, {} as Record<string, number>);
// Find the busiest day
const busiest = Object.entries(dayCounts)
.sort((a, b) => b[1] - a[1])[0];
console.log(`Most active day: ${busiest[0]} (${busiest[1]} commits)`);
What’s happening:
--format=%ad --date=format:%uoutputs just the day of week (1-7).reduce()accumulates counts into an object- We sort to find the maximum
Step 5: Handling Errors
What happens if someone runs this outside a git repository? Let’s handle that gracefully:
import { ExitCodeError, run } from "jsr:@j50n/proc@0.24.6";
try {
const commits = await run("git", "rev-parse", "--git-dir")
.lines
.collect();
console.log("This is a git repository");
} catch (error) {
if (error instanceof ExitCodeError) {
console.error("Error: Not a git repository");
console.error("Please run this command from within a git repository.");
Deno.exit(1);
}
throw error;
}
What’s happening:
git rev-parse --git-dirfails with exit code 128 if not in a repo- proc throws
ExitCodeErrorfor non-zero exit codes - We catch it and show a friendly message
Step 6: The Complete Script
Now let’s put it all together into a polished tool:
import { ExitCodeError, run } from "jsr:@j50n/proc@0.24.6";
const days = [
"Sunday",
"Monday",
"Tuesday",
"Wednesday",
"Thursday",
"Friday",
"Saturday",
];
async function analyzeRepo(commitCount = 100) {
// Verify we're in a git repo
try {
await run("git", "rev-parse", "--git-dir").lines.collect();
} catch (error) {
if (error instanceof ExitCodeError) {
throw new Error("Not a git repository");
}
throw error;
}
// Get commits
const commits = await run("git", "log", "--oneline", `-${commitCount}`)
.lines
.collect();
// Count bug fixes
const fixes = commits.filter((c) => c.toLowerCase().includes("fix"));
// Get top authors
const authorCounts = await run(
"git",
"log",
"--format=%an",
`-${commitCount}`,
)
.lines
.filter((name) => name.trim() !== "")
.reduce((counts, name) => {
counts[name] = (counts[name] || 0) + 1;
return counts;
}, {} as Record<string, number>);
const authors = Object.entries(authorCounts)
.sort((a, b) => b[1] - a[1])
.slice(0, 5)
.map(([name, count]) => ({ name, count }));
// Count by day of week
const dayCounts = await run(
"git",
"log",
"--format=%ad",
"--date=format:%u",
`-${commitCount}`,
)
.lines
.filter((line) => line.trim() !== "")
.reduce((counts, dayNum) => {
const day = days[parseInt(dayNum) % 7];
counts[day] = (counts[day] || 0) + 1;
return counts;
}, {} as Record<string, number>);
const busiestDay = Object.entries(dayCounts).sort((a, b) => b[1] - a[1])[0];
return { commits, fixes, authors, busiestDay };
}
// Run the analysis
try {
const result = await analyzeRepo(100);
console.log("=== Git Repository Analysis ===\n");
console.log(`Recent commits: ${result.commits.length}`);
console.log(`Bug fixes found: ${result.fixes.length}`);
console.log("\nTop contributors:");
for (const author of result.authors) {
console.log(` ${author.name}: ${author.count} commits`);
}
console.log(
`\nMost active day: ${result.busiestDay[0]} (${
result.busiestDay[1]
} commits)`,
);
} catch (error) {
console.error(`Error: ${error.message}`);
Deno.exit(1);
}
Run the complete analyzer:
deno run --allow-run git-analyzer.ts
What You’ve Learned
In building this tool, you’ve used proc’s core features:
| Feature | How We Used It |
|---|---|
run() | Execute git commands |
.lines | Convert output to text |
.collect() | Gather results into arrays |
.filter() | Find bug fixes, remove empty lines |
.reduce() | Count commits by author and day |
ExitCodeError | Handle “not a repo” error |
All of this with streaming — even if you analyzed 10,000 commits, memory usage stays constant.
Exercises
Try extending the analyzer:
- Add a date range filter — Only analyze commits from the last month
- Find the longest commit message — Use
.reduce()to track the max - Export to JSON — Output results as JSON for other tools
- Add file change stats — Use
git log --statto count files changed
See Also
- Running Processes — More ways to run commands
- Error Handling — Deep dive into error propagation
- Array-Like Methods — All the methods you can use
- Recipes — More practical examples
Key Concepts
A few patterns that will make everything click.
Properties vs Methods
Some APIs are properties (no parentheses), some are methods (with parentheses):
// Properties - no parentheses
.lines
.status
.first
.last
// Methods - with parentheses
.collect()
.map()
.filter()
.count()
Properties return new objects or promises. Methods are functions you call. Your IDE will guide you.
Error Propagation
Errors flow through pipelines like data. One try-catch handles everything:
try {
await run("command1")
.run("command2")
.run("command3")
.lines
.forEach(process);
} catch (error) {
// Process errors, transform errors, your errors—all caught here
}
Important: Errors occur in sync with the data stream. If a process fails on line 100, you’ll successfully process lines 1-99 first. This makes error handling predictable and eliminates race conditions common in traditional streams.
See Error Handling for the full story.
Resource Management
Always consume process output. Unconsumed output keeps the process handle open.
// ✅ Good - output consumed
await run("ls").lines.collect();
await run("ls").lines.forEach(console.log);
// ❌ Bad - resource leak
const p = run("ls"); // Output never consumed
The enumerate() Pattern
enumerate() wraps an iterable to add Array-like methods. Call .enum() to add
indices:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
// Without indices
const doubled = await enumerate([1, 2, 3])
.map((n) => n * 2)
.collect();
// [2, 4, 6]
// With indices
const numbered = await enumerate(["a", "b", "c"])
.enum()
.map(([item, i]) => `${i}: ${item}`)
.collect();
// ["0: a", "1: b", "2: c"]
Lazy Evaluation
Pipelines are lazy. Nothing executes until you consume the output:
// This doesn't run anything yet
const pipeline = run("cat", "huge-file.txt")
.run("grep", "error")
.lines
.map((line) => line.toUpperCase());
// Now it runs, one line at a time
for await (const line of pipeline) {
console.log(line);
}
This enables processing files larger than memory—data flows through without loading everything at once.
Type Safety
proc is fully typed. TypeScript knows what you’re working with:
const lines: string[] = await run("ls").lines.collect();
const count: number = await run("ls").lines.count();
Type errors usually mean you’re using the API incorrectly.
See Also
- Running Processes — Execute commands and capture output
- Error Handling — How errors propagate through pipelines
- Understanding Enumerable — Deep dive into async iterables
Running Processes
Running child processes with proc is designed to be as simple and intuitive as possible, eliminating the boilerplate typically required for process management.
Getting Started
The most basic usage requires just importing run and calling it with your
command:
import { run } from "jsr:@j50n/proc@0.24.6";
// Run a command
await run("ls", "-la").lines.collect();
That’s all you need. No configuration objects, no callback functions, no complex setup—just run the command and get the results.
Understanding Command Arguments
The first parameter to run() is the command name, and all subsequent
parameters are individual arguments. This approach prevents shell injection
vulnerabilities and makes argument handling explicit:
run("command", "arg1", "arg2", "arg3");
It’s important to pass arguments as separate parameters rather than as a single string:
// ✅ Correct
run("ls", "-la", "/home");
// ❌ Wrong - this won't work
run("ls -la /home");
Capturing Output
As an Array
const lines = await run("ls", "-la").lines.collect();
// lines is string[]
Line by Line
for await (const line of run("ls", "-la").lines) {
console.log(line);
}
First or Last Line
const first = await run("ls").lines.first;
const last = await run("ls").lines.last;
As Raw Bytes
const bytes = await run("cat", "file.bin").collect();
// bytes is Uint8Array[]
Printing to Console
Send output directly to stdout:
await run("ls", "-la").toStdout();
This is perfect for commands where you just want to see the output.
Building Commands Dynamically
Sometimes you need to build a command from variables:
import { type Cmd, run } from "jsr:@j50n/proc@0.24.6";
const cmd: Cmd = ["ls"];
if (showAll) {
cmd.push("-la");
}
if (directory) {
cmd.push(directory);
}
await run(...cmd).toStdout();
The Cmd type is an array where the first element is the command (string or
URL) and the rest are string arguments. Using the Cmd type ensures type safety
when building commands dynamically.
Process Options
Customize process behavior with options:
await run(
{
cwd: "/tmp", // Working directory
env: { FOO: "bar" }, // Environment variables
},
"command",
"arg1",
).lines.collect();
Working Directory
await run(
{ cwd: "/var/log" },
"ls",
).toStdout();
Environment Variables
await run(
{ env: { PATH: "/custom/path" } },
"command",
).lines.collect();
Checking Exit Status
Get the exit status without throwing:
const p = run("command");
await p.lines.collect(); // Consume output first
const status = await p.status;
console.log(`Exit code: ${status.code}`);
console.log(`Success: ${status.success}`);
Remember: Always consume output before checking status, or you’ll leak resources.
Process ID
Get the process ID:
const p = run("sleep", "10");
console.log(`PID: ${p.pid}`);
await p.lines.collect();
Running with URLs
You can use URLs for the command:
const scriptUrl = new URL("./script.sh", import.meta.url);
await run(scriptUrl).toStdout();
Common Patterns
Silent Execution
Run a command and ignore output:
await run("command").lines.forEach(() => {});
Capture and Print
Capture output while also printing it:
const lines: string[] = [];
await run("command").lines.forEach((line) => {
console.log(line);
lines.push(line);
});
Conditional Execution
if (needsProcessing) {
await run("process-data").toStdout();
}
Error Handling
By default, non-zero exit codes throw ExitCodeError:
try {
await run("false").lines.collect();
} catch (error) {
console.error(`Command failed: ${error.code}`);
}
See Error Handling for complete details.
Performance Tips
Stream Instead of Collect
Process data as it arrives rather than loading everything into memory:
// ❌ Loads everything into memory
const lines = await run("cat", "huge-file.txt").lines.collect();
for (const line of lines) {
process(line);
}
// ✅ Processes one line at a time
for await (const line of run("cat", "huge-file.txt").lines) {
process(line);
}
Pipe Instead of Collect Intermediate Results
Chain processes instead of collecting intermediate results:
// ❌ Collects intermediate results
const lines1 = await run("cat", "file.txt").lines.collect();
const input = lines1.join("\n");
const lines2 = await run("grep", "pattern").lines.collect();
// ✅ Streams through
await run("cat", "file.txt")
.run("grep", "pattern")
.toStdout();
Use take() to Stop Early
Stop processing once you have what you need:
// Stops after finding 10 matches
const first10 = await run("grep", "ERROR", "huge.log")
.lines
.take(10)
.collect();
Filter Before Expensive Operations
Reduce the amount of data flowing through expensive operations:
// ✅ Filter first (fast), then transform (expensive)
const result = await run("cat", "data.txt")
.lines
.filter((line) => line.length > 0) // Fast filter
.map(expensiveTransform) // Only runs on filtered data
.collect();
For more performance optimization strategies, see Concurrent Processing and Streaming Large Files.
See Also
- Working with Output — Transform and process command output
- Process Pipelines — Chain multiple commands together
- Error Handling — Handle failures gracefully
- Shell Script Replacement — Replace bash scripts with proc
Working with Output
Capturing, transforming, and processing command output is central to building effective data processing pipelines. proc provides multiple approaches depending on your data size and processing needs.
Choosing the Right Output Method
When you need all output as an array and you’re confident the output is small
enough to fit in memory, use .lines.collect():
const lines = await run("ls").lines.collect(); // All lines in memory
For large outputs that you want to process line-by-line without loading
everything into memory, use .lines with for-await loops:
for await (const line of run("cat", "huge.log").lines) {
process(line); // Constant memory usage
}
When you just want to see the output in your console, .toStdout() prints
directly without capturing:
await run("ls", "-la").toStdout(); // Prints directly to console
For single-line results, .first or .last properties give you exactly what
you need:
const result = await run("git", "rev-parse", "HEAD").lines.first;
Getting Output
As Lines
import { run } from "jsr:@j50n/proc@0.24.6";
const lines = await run("ls", "-la").lines.collect();
// string[]
As Bytes
const bytes = await run("cat", "file.bin").collect();
// Uint8Array[]
First Line
const result = await run("git", "rev-parse", "HEAD").lines.first;
// Single string
Print to Console
await run("ls", "-la").toStdout();
Transforming Output
Map Lines
const uppercase = await run("cat", "file.txt")
.lines
.map((line) => line.toUpperCase())
.collect();
Filter Lines
const errors = await run("cat", "app.log")
.lines
.filter((line) => line.includes("ERROR"))
.collect();
Parse Output
const commits = await run("git", "log", "--oneline")
.lines
.map((line) => {
const [hash, ...message] = line.split(" ");
return { hash, message: message.join(" ") };
})
.collect();
Streaming Output
Process output as it arrives:
for await (const line of run("tail", "-f", "app.log").lines) {
if (line.includes("ERROR")) {
console.error(line);
}
}
Counting Output
const lineCount = await run("ls", "-la").lines.count();
Finding in Output
const match = await run("ps", "aux")
.lines
.find((line) => line.includes("node"));
Real-World Examples
Parse JSON Output
const data = await run("curl", "https://api.example.com/data")
.lines
.map((line) => JSON.parse(line))
.collect();
Extract Fields
const pids = await run("ps", "aux")
.lines
.drop(1) // Skip header
.map((line) => line.split(/\s+/)[1])
.collect();
Aggregate Data
const total = await run("du", "-sh", "*")
.lines
.map((line) => {
const size = line.split("\t")[0];
return parseInt(size);
})
.reduce((sum, size) => sum + size, 0);
Next Steps
- Process Pipelines - Chain commands
- Running Processes - More ways to run
- Array-Like Methods - Transform output
Working with Input
Sending data to process stdin is fundamental to building effective data processing pipelines. proc provides several approaches depending on your data source and use case.
Choosing the Right Input Method
The most common approach is using .run() to pipe output from one process
directly to another, creating efficient process-to-process pipelines:
await run("cat", "file.txt").run("grep", "pattern").toStdout();
When you have in-memory data that you want to send to a process, enumerate()
wraps your data and makes it pipeable:
await enumerate(["line1", "line2"]).run("grep", "1").toStdout();
For file input, read() creates a stream directly from the file system:
await read("input.txt").run("grep", "pattern").toStdout();
When you need generated sequences, range() creates numeric streams that you
can transform and pipe:
await range({ to: 100 }).map((n) => n.toString()).run("shuf").toStdout();
Piping Between Processes
The most common way to provide input is piping output from one process directly to another. This creates efficient data flows without intermediate storage:
import { run } from "jsr:@j50n/proc@0.24.6";
await run("echo", "hello")
.run("tr", "a-z", "A-Z") // Receives "hello" as stdin
.toStdout();
// HELLO
Working with In-Memory Data
When you have data in memory that you want to send to a process, enumerate()
makes any iterable pipeable to processes:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const data = ["line 1", "line 2", "line 3"];
await enumerate(data)
.run("grep", "2")
.toStdout();
// line 2
Reading from Files
For file input, read() creates a stream directly from the file system,
allowing you to process files of any size efficiently:
import { read } from "jsr:@j50n/proc@0.24.6";
await read("input.txt")
.run("grep", "pattern")
.toStdout();
Real-World Examples
Filter Data
await read("data.txt")
.run("grep", "ERROR")
.run("sort")
.run("uniq")
.toStdout();
Transform and Process
await read("input.txt")
.lines
.map((line) => line.toUpperCase())
.run("sort")
.toStdout();
Generate and Process
import { range } from "jsr:@j50n/proc@0.24.6";
await range({ to: 100 })
.map((n) => n.toString())
.run("shuf") // Shuffle
.run("head", "-10")
.toStdout();
Next Steps
- Process Pipelines - Chain commands
- Working with Output - Capture results
Process Pipelines
Chaining processes together creates powerful data processing workflows that combine the efficiency of Unix tools with the expressiveness of JavaScript.
Understanding Pipeline Basics
In a shell, you’d write:
cat file.txt | grep error | wc -l
In proc, you write the same logic with method chaining:
const count = await run("cat", "file.txt")
.run("grep", "error")
.run("wc", "-l")
.lines.first;
Each .run() pipes the previous output to the next command’s stdin, creating a
seamless data flow where each process receives exactly what the previous one
produces.
How It Works
run("command1") // Produces output
.run("command2") // Receives command1's output as stdin
.run("command3"); // Receives command2's output as stdin
The data flows through, one buffer at a time. Nothing is collected in memory unless you ask for it.
Real Examples
Count Lines
const lines = await run("cat", "file.txt")
.run("wc", "-l")
.lines.first;
console.log(`${lines} lines`);
Find and Count
const errorCount = await run("cat", "app.log")
.run("grep", "ERROR")
.run("wc", "-l")
.lines.first;
Sort and Unique
const unique = await run("cat", "words.txt")
.run("sort")
.run("uniq")
.lines.collect();
Case Conversion
const lowercase = await run("echo", "HELLO WORLD")
.run("tr", "A-Z", "a-z")
.lines.first;
// "hello world"
Mixing Processes and Transformations
You can mix process pipes with JavaScript transformations:
const result = await run("cat", "data.txt")
.run("grep", "pattern")
.lines
.map((line) => line.trim())
.filter((line) => line.length > 0)
.collect();
The .lines converts bytes to text, then JavaScript takes over.
Complex Pipelines
Build sophisticated data processing pipelines:
const stats = await run("cat", "access.log")
.run("grep", "ERROR")
.run("cut", "-d", " ", "-f", "1") // Extract IP addresses
.run("sort")
.run("uniq", "-c") // Count occurrences
.run("sort", "-rn") // Sort by count
.run("head", "-10") // Top 10
.lines
.collect();
console.log("Top 10 error sources:");
stats.forEach((line) => console.log(line));
Branching Pipelines
Sometimes you need to process the same data in multiple ways. Use .tee() to
split a pipeline into multiple branches that can be processed independently:
const [branch1, branch2] = run("cat", "data.txt")
.lines
.tee();
// Process both branches concurrently
const [result1, result2] = await Promise.all([
branch1.filter((line) => line.includes("A")).collect(),
branch2.filter((line) => line.includes("B")).collect(),
]);
The .tee() method creates two independent iterables from one source, allowing
each branch to be processed differently while both run concurrently. This is
perfect for collecting different subsets of data in one pass, calculating
multiple statistics simultaneously, or processing data while also logging it.
Remember that both branches must be consumed to avoid resource leaks.
Error Handling in Pipelines
Errors propagate through the entire pipeline:
try {
await run("cat", "missing.txt") // This fails
.run("grep", "pattern") // Never runs
.run("wc", "-l") // Never runs
.lines.collect();
} catch (error) {
// Catches the error from cat
console.error(`Pipeline failed: ${error.message}`);
}
See Error Handling for details.
Performance and Efficiency
Pipelines are designed for optimal performance and resource usage. They stream data through the pipeline one buffer at a time, meaning nothing is collected in memory unless you explicitly request it. All processes in the pipeline run concurrently, creating efficient parallel processing:
// This processes a 10GB file using ~constant memory
await run("cat", "huge-file.txt")
.run("grep", "pattern")
.run("wc", "-l")
.lines.first;
The lazy evaluation means nothing actually runs until you consume the output, and the streaming nature ensures minimal memory usage even for huge files.
Debugging Pipelines
Print intermediate results:
await run("cat", "file.txt")
.run("grep", "pattern")
.lines
.map((line) => {
console.log(`Processing: ${line}`);
return line;
})
.forEach(process);
Or split it up:
const step1 = run("cat", "file.txt");
const step2 = step1.run("grep", "pattern");
const step3 = step2.lines;
// Now you can inspect each step
for await (const line of step3) {
console.log(line);
}
Common Patterns
Extract and Count
const count = await run("cat", "file.txt")
.run("grep", "-o", "pattern")
.lines.count();
Filter and Transform
const results = await run("cat", "data.csv")
.run("grep", "-v", "^#") // Remove comments
.run("cut", "-d", ",", "-f", "1,3") // Extract columns
.lines
.map((line) => line.split(","))
.collect();
Aggregate Data
const sum = await run("cat", "numbers.txt")
.lines
.map((line) => parseInt(line))
.reduce((acc, n) => acc + n, 0);
Choosing Between Pipelines and JavaScript
Understanding when to use each approach helps you build efficient and maintainable data processing workflows.
Use pipelines when you’re processing large files, want to chain Unix tools together, need streaming performance, or you’re replacing shell scripts with more robust TypeScript code.
Use JavaScript transformations when you need complex logic that’s difficult to express with Unix tools, you’re working with structured data like JSON, you need type safety and IDE support, or the operation is CPU-bound rather than I/O-bound.
The most powerful approach is mixing both techniques, using Unix tools for efficient data filtering and JavaScript for complex transformations and business logic.
Next Steps
- Working with Output - Transform and process output
- Concurrent Processing - Parallel pipelines
- Streaming Large Files - Handle huge files efficiently
Error Handling
Error handling is proc’s primary design goal. Rather than requiring complex coordination between producers and consumers, proc makes errors flow through pipelines naturally, just like data.
Solving the Backpressure and Error Problem
Traditional streams create two problems: complex backpressure coordination AND complex error handling. proc solves both:
Traditional Streams - Complex Backpressure + Error Handling
// Node.js streams - backpressure AND error handling complexity
const stream1 = createReadStream("input.txt");
const transform1 = new Transform({/* options */});
const transform2 = new Transform({/* options */});
const output = createWriteStream("output.txt");
// Backpressure handling
stream1.pipe(transform1, { end: false });
transform1.pipe(transform2, { end: false });
transform2.pipe(output);
// Error handling at each stage
stream1.on("error", handleError);
transform1.on("error", handleError);
transform2.on("error", handleError);
output.on("error", handleError);
// Plus drain events, pause/resume, etc.
proc - No Backpressure, Simple Errors
// proc - pull-based flow eliminates both problems
try {
await read("input.txt")
.lines
.map(transform1)
.map(transform2)
.writeTo("output.txt");
} catch (error) {
// All errors caught here - no backpressure coordination needed
console.error(`Pipeline failed: ${error.message}`);
}
Why this works:
- Pull-based flow: Consumer controls pace, no backpressure needed
- Error propagation: Errors flow with data through the same path
- One catch block: Handle all errors in one place
The Traditional Problem
Traditional stream error handling requires managing errors at multiple points:
// With Deno.Command - manual error handling at each step
const cmd1 = new Deno.Command("cat", { args: ["file.txt"] });
const proc1 = cmd1.spawn();
const output1 = await proc1.output();
if (!output1.success) {
throw new Error(`cat failed: ${output1.code}`);
}
const cmd2 = new Deno.Command("grep", {
args: ["pattern"],
stdin: "piped",
});
const proc2 = cmd2.spawn();
// ... manually pipe output1 to proc2 stdin ...
const output2 = await proc2.output();
if (!output2.success) {
throw new Error(`grep failed: ${output2.code}`);
}
With Node.js streams, you need error handlers on each stream:
stream1.on("error", handleError);
stream2.on("error", handleError);
stream3.on("error", handleError);
How proc Changes Everything
proc treats errors as first-class data that flows through your pipeline alongside the actual results. When you build a pipeline with multiple operations—running processes, transforming data, filtering results—any error that occurs anywhere in the chain automatically propagates to your final catch block:
try {
await run("cat", "file.txt")
.run("grep", "pattern")
.run("wc", "-l")
.lines
.map(transform)
.filter(predicate)
.forEach(process);
} catch (error) {
// All errors caught here:
// - Process exit codes
// - Transform errors
// - Filter errors
// - Your own errors
console.error(`Pipeline failed: ${error.message}`);
}
This approach eliminates the need for error handling at each step. Whether a process exits with a non-zero code, a transformation throws an exception, or a filter encounters invalid data, the error flows downstream and gets caught in one place.
How Error Propagation Works
When something goes wrong anywhere in the pipeline:
- The error is captured
- Downstream operations are skipped
- The error propagates to your catch block
It’s functional programming—errors are just another type of data flowing through.
Errors Are Synchronous With Data
Here’s a critical design principle that makes proc fundamentally different from the Streams API: errors occur in sync with the data stream.
When a process fails, you don’t see the error immediately. You see it when you iterate to it:
try {
// If this command fails on line 100...
await run("command")
.lines
.forEach((line) => {
console.log(line);
});
} catch (error) {
// ...you'll successfully process lines 1-99 first
console.error(`Error after processing data: ${error.message}`);
}
This is how streaming should work:
- Partial consumption is safe: If you only take 50 lines (
.take(50)), you never encounter an error that happens on line 100 - Data comes first: You always process all available data before seeing the error
- Predictable flow: Errors arrive exactly when you iterate to them, not asynchronously
Why This Matters
The Streams API has a fundamental problem: errors occur out of sync with the data. A process might fail, but the error arrives as a separate event, disconnected from the data flow. This creates subtle bugs and edge cases:
// Streams API - error arrives separately from data
const stream = createReadableStream();
stream.on("data", (chunk) => {
// Process chunk
});
stream.on("error", (error) => {
// Error arrives here - but how much data did we process?
// Did we miss some data? Did we process partial data?
// Hard to reason about!
});
With proc, errors are part of the iteration. They’re not separate events—they’re items in the stream that you encounter when you reach them:
// proc - error is part of the data flow
try {
for await (const line of run("command").lines) {
// Process line
// If an error occurs, you've already processed all previous lines
// No race conditions, no missing data, no ambiguity
}
} catch (error) {
// You know exactly where you are in the stream
}
What This Means for You
- No race conditions: Data and errors flow in a single, predictable sequence
- Easier debugging: You know exactly how much data was processed before the error
- Simpler code: No need to coordinate between data handlers and error handlers
- Correct by default: Code that looks right actually is right
This synchronous error propagation is a core design goal of proc. It takes careful engineering to ensure errors from child processes are thrown in sync with the data stream, but it eliminates entire categories of bugs that plague traditional stream-based code.
Understanding Error Types
proc throws specific error types that help you handle different failure scenarios appropriately. Each error type carries relevant context about what went wrong, making debugging and error recovery more straightforward.
ExitCodeError occurs when a process exits with a non-zero code:
import { ExitCodeError } from "jsr:@j50n/proc@0.24.6";
try {
await run("false").lines.collect();
} catch (error) {
if (error instanceof ExitCodeError) {
console.error(`Process failed with code ${error.code}`);
console.error(`Command: ${error.command.join(" ")}`);
}
}
SignalError happens when a process is terminated by a signal, such as when you interrupt it with Ctrl+C:
import { SignalError } from "jsr:@j50n/proc@0.24.6";
try {
await run("sleep", "1000").lines.collect();
// Kill it with Ctrl+C
} catch (error) {
if (error instanceof SignalError) {
console.error(`Process killed by signal: ${error.signal}`);
}
}
UpstreamError wraps errors that come from earlier stages in a pipeline:
import { UpstreamError } from "jsr:@j50n/proc@0.24.6";
try {
await run("cat", "missing.txt") // This fails
.run("grep", "pattern") // This gets UpstreamError
.lines.collect();
} catch (error) {
if (error instanceof UpstreamError) {
console.error(`Upstream failure: ${error.cause}`);
}
}
Checking Exit Status Without Throwing
Sometimes you want to inspect a process’s exit status without triggering an exception. proc supports this pattern by letting you consume the process output first, then check the status afterward. This approach is useful when non-zero exit codes are expected or when you want to implement custom error handling logic.
Remember to always consume the output before checking the status—otherwise
you’ll create resource leaks. The pattern is straightforward: run your process,
consume its output with methods like .lines.collect(), then access the
.status property to inspect the exit code and make decisions based on the
result.
Handling Specific Exit Codes
try {
await run("grep", "pattern", "file.txt").lines.collect();
} catch (error) {
if (error instanceof ExitCodeError) {
if (error.code === 1) {
// grep returns 1 when no matches found
console.log("No matches found");
} else {
// Other errors
throw error;
}
}
}
Errors in Transformations
Errors in your own code propagate the same way:
try {
await run("cat", "numbers.txt")
.lines
.map((line) => {
const num = parseInt(line);
if (isNaN(num)) {
throw new Error(`Invalid number: ${line}`);
}
return num;
})
.forEach(console.log);
} catch (error) {
// Catches both process errors AND your parsing errors
console.error(`Pipeline failed: ${error.message}`);
}
Custom Error Handling
While proc’s default error handling works well for most cases, you can customize
how errors are handled using the fnError option. This function receives the
error and any stderr data, giving you the opportunity to suppress specific
errors, transform them, or add additional context.
For example, some commands like grep return exit code 1 when no matches are
found, which isn’t really an error in many contexts. You can use a custom error
handler to treat this as normal behavior while still catching genuine failures.
Similarly, you might want to add context to errors to make debugging easier, or
suppress errors entirely for commands where failure is acceptable.
Working with Stderr
By default, proc passes stderr through to Deno.stderr, which means error
messages from child processes appear in your terminal as expected. However, you
can capture and process stderr using the fnStderr option, which gives you an
async iterable of stderr lines.
This capability is useful when you need to analyze error output, combine stdout and stderr streams, or implement custom logging. You can collect stderr lines into an array for later analysis, process them in real-time, or merge them with stdout to create a unified output stream. The stderr handler runs concurrently with your main pipeline, so it doesn’t block the processing of stdout.
import { enumerate, run, toLines } from "jsr:@j50n/proc";
const stderrLines: string[] = [];
await run(
{
fnStderr: async (stderr) => {
for await (const line of enumerate(stderr).transform(toLines)) {
stderrLines.push(line);
}
},
},
"sh",
"-c",
"echo 'normal output'; echo 'error message' >&2",
).lines.collect();
console.log("Captured stderr:", stderrLines);
Best Practices for Error Handling
1. Catch at the End
Don’t catch errors in the middle of a pipeline unless you’re handling them specifically:
// ❌ Don't do this
try {
const lines = await run("command").lines.collect();
} catch (e) {
// Handle here
}
try {
const filtered = lines.filter(predicate);
} catch (e) {
// And here
}
// ✅ Do this
try {
await run("command")
.lines
.filter(predicate)
.forEach(process);
} catch (error) {
// Handle once
}
2. Always Consume Output
Even if you don’t care about the output, consume it:
// ❌ Resource leak
const p = run("command");
// Never consumed!
// ✅ Consume it
await run("command").lines.collect();
// Or
await run("command").lines.forEach(() => {});
3. Use Specific Error Types
Handle different errors differently:
try {
await pipeline();
} catch (error) {
if (error instanceof ExitCodeError) {
// Process failed
} else if (error instanceof SignalError) {
// Process killed
} else {
// Something else
}
}
4. Use Custom Handlers Sparingly
Only customize error handling when you have a specific need. The default behavior works well for most cases.
Why This Approach Matters
Error handling is the primary reason proc exists. If you’ve struggled with stream error events, debugged edge cases in error propagation, or written the same error handling code repeatedly, proc’s approach will feel like a relief.
Errors propagate naturally. One catch block handles everything. The complexity disappears.
See Also
- Running Processes — All the ways to run commands
- Process Pipelines — Chain commands together
- Troubleshooting — Common issues and solutions
Resource Management
Proper resource management ensures your applications don’t leak memory or file handles when working with processes and streams.
The Fundamental Rule
The most important principle in proc is simple: always consume process output.
When you start a process, you must consume its output through methods like
.collect(), .forEach(), or by iterating through the results:
import { run } from "jsr:@j50n/proc@0.24.6";
// ❌ Resource leak
const p = run("ls");
// Output never consumed!
// ✅ Output consumed
await run("ls").lines.collect();
Understanding Resource Leaks
Unconsumed output keeps the process handle open, preventing proper cleanup. This happens because the process continues running and holding resources until its output stream is fully consumed. Even if you don’t care about the actual output data, you still need to consume it to signal that the process can be cleaned up.
Methods for Consuming Output
proc provides several ways to consume process output, each suited to different
use cases. Use .collect() when you need all output as an array:
const lines = await run("ls").lines.collect();
Use .forEach() when you want to process each item without collecting
everything in memory:
await run("ls").lines.forEach((line) => {
console.log(line);
});
Use for-await loops when you need more control over the iteration process:
for await (const line of run("ls").lines) {
console.log(line);
}
Use .toStdout() when you just want to display the output:
await run("ls").toStdout();
Aggregation methods like .count() and property access like .first also
consume output:
const count = await run("ls").lines.count();
const first = await run("ls").lines.first;
Checking Status
Consume output before checking status:
const p = run("command");
await p.lines.collect(); // Consume first
const status = await p.status; // Then check
Error Handling
Errors automatically clean up resources:
try {
await run("false").lines.collect();
} catch (error) {
// Resources cleaned up automatically
}
Long-Running Processes
For processes that run indefinitely:
// This is fine - consuming output as it arrives
for await (const line of run("tail", "-f", "log").lines) {
process(line);
}
Best Practices for Resource Management
Following these principles will help you avoid resource leaks and build reliable applications:
Always consume output using methods like collect(), forEach(), or iteration.
This is the most important rule for preventing resource leaks.
When you need to check process status, consume the output first, then check the status. The process must complete its output before status information is reliable.
Let errors propagate naturally through your pipelines. proc’s error handling automatically cleans up resources when errors occur, so you don’t need to manually manage cleanup in error cases.
For custom cleanup scenarios, use try-finally blocks, but remember that proc handles most cleanup automatically through its error propagation system.
Next Steps
- Error Handling - Handle failures
- Running Processes - Process basics
Understanding Enumerable
Enumerable is the core of proc’s AsyncIterable support. It wraps any iterable and provides Array-like methods for working with async data streams.
What is Enumerable?
Think of Enumerable as an Array, but for async data. It gives you familiar
methods like map, filter, and reduce—but for data that arrives over time
rather than all at once:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
// Wrap any iterable
const nums = enumerate([1, 2, 3, 4, 5]);
// Use Array methods
const doubled = await nums
.map((n) => n * 2)
.filter((n) => n > 5)
.collect();
console.log(doubled); // [6, 8, 10]
The Problem with Traditional Streams
JavaScript has Arrays for sync data and Streams for async data, but Streams are awkward to work with. They require verbose transformation chains and complex error handling:
// Streams are verbose
const stream = readableStream
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk * 2);
},
}),
)
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
if (chunk > 5) controller.enqueue(chunk);
},
}),
);
Enumerable makes the same operations clean and readable:
// Enumerable is clean
const result = await enumerate(asyncIterable)
.map((n) => n * 2)
.filter((n) => n > 5)
.collect();
Creating Enumerables
From Arrays
const nums = enumerate([1, 2, 3]);
From Async Generators
async function* generate() {
yield 1;
yield 2;
yield 3;
}
const nums = enumerate(generate());
From Process Output
import { run } from "jsr:@j50n/proc@0.24.6";
const lines = run("ls", "-la").lines;
// lines is already an Enumerable<string>
From Files
import { read } from "jsr:@j50n/proc@0.24.6";
const bytes = read("file.txt");
// bytes is Enumerable<Uint8Array>
const lines = read("file.txt").lines;
// lines is Enumerable<string>
Consuming Enumerables
Collect to Array
const array = await enumerate([1, 2, 3]).collect();
// [1, 2, 3]
Iterate with for-await
for await (const item of enumerate([1, 2, 3])) {
console.log(item);
}
Process Each Item
await enumerate([1, 2, 3]).forEach((item) => {
console.log(item);
});
Get First or Last
const first = await enumerate([1, 2, 3]).first;
const last = await enumerate([1, 2, 3]).last;
Lazy Evaluation
Enumerables are lazy—nothing happens until you consume them:
// This doesn't run anything yet
const pipeline = enumerate([1, 2, 3])
.map((n) => {
console.log(`Processing ${n}`);
return n * 2;
});
// Now it runs
const result = await pipeline.collect();
// Logs: Processing 1, Processing 2, Processing 3
This is powerful for large datasets:
// Processes one line at a time, never loads entire file
for await (const line of read("huge-file.txt").lines) {
process(line);
}
Chaining Operations
Chain as many operations as you want:
const result = await enumerate([1, 2, 3, 4, 5])
.map((n) => n * 2) // [2, 4, 6, 8, 10]
.filter((n) => n > 5) // [6, 8, 10]
.map((n) => n.toString()) // ["6", "8", "10"]
.collect();
Each operation returns a new Enumerable, so you can keep chaining.
Type Safety
Enumerable is fully typed:
const nums: Enumerable<number> = enumerate([1, 2, 3]);
const strings: Enumerable<string> = nums.map((n) => n.toString());
// ^-- TypeScript knows this is Enumerable<string>
const result: string[] = await strings.collect();
// ^-- TypeScript knows this is string[]
Your IDE will guide you with autocomplete and type errors.
Common Patterns
Transform and Collect
const result = await enumerate(data)
.map(transform)
.collect();
Filter and Count
const count = await enumerate(data)
.filter(predicate)
.count();
Find First Match
const match = await enumerate(data)
.find(predicate);
Check if Any/All
const hasMatch = await enumerate(data).some(predicate);
const allMatch = await enumerate(data).every(predicate);
Performance Characteristics
Enumerable is designed for efficiency and scalability. It processes data in a streaming fashion, handling one item at a time rather than loading everything into memory. This lazy evaluation means operations only run when you actually consume the results, making it possible to work with datasets larger than available RAM:
// This processes a 10GB file using constant memory
await read("huge-file.txt")
.lines
.filter((line) => line.includes("ERROR"))
.forEach(console.log);
Enumerable vs Array Comparison
Understanding when to use each approach helps you choose the right tool:
| Feature | Array | Enumerable |
|---|---|---|
| Data | Sync | Async |
| Memory | All in memory | Streaming |
| Size | Limited by RAM | Unlimited |
| Methods | map, filter, etc. | map, filter, etc. |
| Lazy | No | Yes |
Use Arrays for small, synchronous data that fits comfortably in memory. Use Enumerable for large datasets, async data sources, or when you need streaming processing capabilities.
Caching Iterables
Sometimes you need to reuse an iterable’s results multiple times. Use cache()
to store results for replay, which is particularly useful for expensive
computations or when you need to iterate over the same data multiple times:
import { cache, enumerate } from "jsr:@j50n/proc@0.24.6";
const expensive = enumerate(data)
.map(expensiveOperation);
const cached = cache(expensive);
// First time - runs the operations
const result1 = await cached.collect();
// Second time - uses cached results, doesn't re-run
const result2 = await cached.collect();
Caching is ideal for reusing expensive computations, replaying iterables multiple times, or sharing results across operations. However, be mindful that caching stores all results in memory, so only use it when the dataset is small enough to fit in memory, you need to iterate multiple times, and the computation is expensive enough to justify the memory usage.
Writable Iterables
Need to convert callbacks or events into async iterables? See the dedicated WritableIterable page for complete documentation.
Create async iterables you can write to programmatically, which bridges the gap between push-based and pull-based data models:
import { WritableIterable } from "jsr:@j50n/proc@0.24.6";
const writable = new WritableIterable<string>();
// Write to it
await writable.write("item1");
await writable.write("item2");
await writable.write("item3");
await writable.close();
// Read from it
const items = await writable.collect();
// ["item1", "item2", "item3"]
WritableIterable is perfect for generating data programmatically, bridging between push and pull models, creating custom data sources, or implementing producer-consumer patterns. Here’s an example of event-driven data processing:
const events = new WritableIterable<Event>();
// Producer: write events as they occur
eventEmitter.on("data", async (event) => {
await events.write(event);
});
eventEmitter.on("end", async () => {
await events.close();
});
// Consumer: process events as they arrive
for await (const event of events) {
processEvent(event);
}
Next Steps
- Array-Like Methods - All the methods available
- Transformations - map, flatMap, transform
- Aggregations - reduce, count, sum
Array-Like Methods
Enumerable gives you the Array methods you know and love, but for async data.
Transformations
map()
Transform each item:
const doubled = await enumerate([1, 2, 3])
.map((n) => n * 2)
.collect();
// [2, 4, 6]
Works with async functions:
const results = await enumerate(urls)
.map(async (url) => {
const response = await fetch(url);
return response.json();
})
.collect();
filter()
Keep only items that match:
const evens = await enumerate([1, 2, 3, 4])
.filter((n) => n % 2 === 0)
.collect();
// [2, 4]
flatMap()
Map and flatten in one step:
const words = await enumerate(["hello world", "foo bar"])
.flatMap((line) => line.split(" "))
.collect();
// ["hello", "world", "foo", "bar"]
Aggregations
reduce()
Combine items into a single value:
const sum = await enumerate([1, 2, 3, 4])
.reduce((acc, n) => acc + n, 0);
// 10
Build complex objects:
const grouped = await enumerate(items)
.reduce((acc, item) => {
acc[item.category] = acc[item.category] || [];
acc[item.category].push(item);
return acc;
}, {});
count()
Count items:
const total = await enumerate([1, 2, 3]).count();
// 3
some()
Check if any item matches:
const hasError = await enumerate(lines)
.some((line) => line.includes("ERROR"));
every()
Check if all items match:
const allPositive = await enumerate([1, 2, 3])
.every((n) => n > 0);
Finding Items
find()
Find first match:
const match = await enumerate([1, 2, 3, 4])
.find((n) => n > 2);
// 3
first
Get first item:
const first = await enumerate([1, 2, 3]).first;
// 1
last
Get last item:
const last = await enumerate([1, 2, 3]).last;
// 3
nth()
Get item at index:
const third = await enumerate([1, 2, 3, 4]).nth(2);
// 3 (zero-indexed)
Slicing
take()
Take first N items:
const first3 = await enumerate([1, 2, 3, 4, 5])
.take(3)
.collect();
// [1, 2, 3]
drop()
Skip first N items:
const rest = await enumerate([1, 2, 3, 4, 5])
.drop(2)
.collect();
// [3, 4, 5]
slice()
Get a range:
const middle = await enumerate([1, 2, 3, 4, 5])
.slice(1, 4)
.collect();
// [2, 3, 4]
Iteration
forEach()
Process each item:
await enumerate([1, 2, 3]).forEach((n) => {
console.log(n);
});
for-await
Use standard JavaScript iteration:
for await (const item of enumerate([1, 2, 3])) {
console.log(item);
}
Collecting
collect()
Gather all items into an array:
const array = await enumerate([1, 2, 3]).collect();
// [1, 2, 3]
toArray()
Alias for collect():
const array = await enumerate([1, 2, 3]).toArray();
Utilities
enum()
Add indices to items:
const indexed = await enumerate(["a", "b", "c"])
.enum()
.collect();
// [["a", 0], ["b", 1], ["c", 2]]
Use with map:
const numbered = await enumerate(["a", "b", "c"])
.enum()
.map(([item, i]) => `${i + 1}. ${item}`)
.collect();
// ["1. a", "2. b", "3. c"]
tee()
Split into multiple streams:
const [stream1, stream2] = enumerate([1, 2, 3]).tee();
const [sum, product] = await Promise.all([
stream1.reduce((a, b) => a + b, 0),
stream2.reduce((a, b) => a * b, 1),
]);
flatten()
Flatten nested iterables:
const flat = await enumerate([[1, 2], [3, 4]])
.flatten()
.collect();
// [1, 2, 3, 4]
Concurrent Operations
concurrentMap()
Map with controlled concurrency:
const results = await enumerate(urls)
.concurrentMap(async (url) => {
return await fetch(url);
}, { concurrency: 5 })
.collect();
Results are returned in order.
concurrentUnorderedMap()
Map with maximum concurrency:
const results = await enumerate(urls)
.concurrentUnorderedMap(async (url) => {
return await fetch(url);
}, { concurrency: 5 })
.collect();
Results are returned as they complete (faster).
Chaining Examples
Complex Pipeline
const result = await enumerate(data)
.filter((item) => item.active)
.map((item) => item.value)
.filter((value) => value > 0)
.map((value) => value * 2)
.take(10)
.collect();
Real-World Example
const topErrors = await read("app.log")
.lines
.filter((line) => line.includes("ERROR"))
.map((line) => {
const match = line.match(/ERROR: (.+)/);
return match ? match[1] : line;
})
.reduce((acc, error) => {
acc[error] = (acc[error] || 0) + 1;
return acc;
}, {});
Performance Tips
Use Streaming
Don’t collect if you don’t need to:
// ❌ Loads everything
const items = await enumerate(huge).collect();
for (const item of items) process(item);
// ✅ Streams
for await (const item of enumerate(huge)) {
process(item);
}
Use take() for Limits
// Get first 10 matches
const matches = await enumerate(data)
.filter(predicate)
.take(10)
.collect();
Use concurrentMap() for I/O
// Process 5 URLs at a time
const results = await enumerate(urls)
.concurrentMap(fetch, { concurrency: 5 })
.collect();
Next Steps
- Transformations - Deep dive into map, flatMap, transform
- Aggregations - Deep dive into reduce, count, sum
- Slicing and Sampling - Deep dive into take, drop, slice
Transformations
Transform data as it flows through your pipeline using familiar Array-like methods that work seamlessly with async data streams.
Understanding map()
The map() method transforms each item in your stream, applying a function to
every element and returning a new stream with the transformed values:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const doubled = await enumerate([1, 2, 3])
.map((n) => n * 2)
.collect();
// [2, 4, 6]
Map works seamlessly with async functions, making it perfect for I/O operations like API calls:
const results = await enumerate(urls)
.map(async (url) => {
const response = await fetch(url);
return response.json();
})
.collect();
You can transform data types, converting numbers to strings or restructuring objects:
const strings = await enumerate([1, 2, 3])
.map((n) => n.toString())
.collect();
// ["1", "2", "3"]
For complex transformations, map can restructure entire objects:
const processed = await enumerate(rawData)
.map((item) => ({
id: item.id,
name: item.name.toUpperCase(),
value: parseFloat(item.value),
timestamp: new Date(item.timestamp),
}))
.collect();
})) .collect();
## Working with flatMap()
The `flatMap()` method combines mapping and flattening in a single operation, which is perfect when your transformation function returns arrays that you want to merge into a single stream:
<!-- NOT TESTED: Illustrative example -->
```typescript
const words = await enumerate(["hello world", "foo bar"])
.flatMap(line => line.split(" "))
.collect();
// ["hello", "world", "foo", "bar"]
You can use flatMap to expand items, creating multiple output items from each input:
const expanded = await enumerate([1, 2, 3])
.flatMap((n) => [n, n * 10])
.collect();
// [1, 10, 2, 20, 3, 30]
FlatMap is also useful for filtering while mapping—return an empty array to skip items:
const valid = await enumerate(data)
.flatMap((item) => {
if (item.valid) {
return [item.value];
}
return []; // Skip invalid items
})
.collect();
Filtering with filter()
The filter() method keeps only items that match your criteria, discarding
everything else:
const evens = await enumerate([1, 2, 3, 4, 5])
.filter((n) => n % 2 === 0)
.collect();
// [2, 4]
You can use complex predicates that check multiple conditions:
const active = await enumerate(users)
.filter((user) =>
user.active &&
user.lastLogin > cutoffDate &&
user.role !== "guest"
)
.collect();
Filter works well with TypeScript type guards to narrow types:
const numbers = await enumerate(mixed)
.filter((item): item is number => typeof item === "number")
.collect();
Using transform() with Streams
The transform() method lets you apply any TransformStream to your data, which
is particularly useful for built-in transformations like compression:
import { read } from "jsr:@j50n/proc@0.24.6";
const decompressed = await read("file.gz")
.transform(new DecompressionStream("gzip"))
.lines
.collect();
You can also create custom TransformStreams for specialized processing:
const transformed = await enumerate(data)
.transform(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
}),
)
.collect();
Chaining Transformations
Combine multiple transformations:
const result = await enumerate(data)
.map((item) => item.trim())
.filter((item) => item.length > 0)
.map((item) => item.toUpperCase())
.filter((item) => item.startsWith("A"))
.collect();
Real-World Examples
Parse CSV
const data = await read("data.csv")
.lines
.drop(1) // Skip header
.map((line) => line.split(","))
.map(([name, age, city]) => ({
name,
age: parseInt(age),
city,
}))
.filter((row) => row.age >= 18)
.collect();
Extract URLs
const urls = await read("page.html")
.lines
.flatMap((line) => {
const matches = line.match(/https?:\/\/[^\s"']+/g);
return matches || [];
})
.collect();
Clean Data
const cleaned = await enumerate(rawData)
.map((item) => item.trim())
.filter((item) => item.length > 0)
.map((item) => item.toLowerCase())
.filter((item) => !item.startsWith("#"))
.collect();
Transform JSON Lines
const objects = await read("data.jsonl")
.lines
.map((line) => JSON.parse(line))
.filter((obj) => obj.status === "active")
.map((obj) => ({
id: obj.id,
name: obj.name,
value: obj.value * 1.1, // Apply 10% increase
}))
.collect();
Performance Optimization
Understanding how transformations work can help you build more efficient pipelines. Transformations use lazy evaluation, meaning nothing actually runs until you consume the results:
// Nothing happens yet
const pipeline = enumerate(data)
.map(expensive)
.filter(predicate);
// Now it runs
const result = await pipeline.collect();
For better performance, filter before expensive operations to reduce the amount of data that needs processing:
// ✅ Filter first
const result = await enumerate(data)
.filter(cheap) // Fast filter
.map(expensive) // Expensive operation
.collect();
// ❌ Map first
const result = await enumerate(data)
.map(expensive) // Runs on everything
.filter(cheap) // Then filters
.collect();
Use take() to limit processing when you only need a subset of results:
// Stop after 10 matches
const first10 = await enumerate(huge)
.filter(predicate)
.take(10)
.collect();
Common Patterns
Normalize Data
const normalized = await enumerate(data)
.map((item) => ({
...item,
name: item.name.trim().toLowerCase(),
email: item.email.toLowerCase(),
phone: item.phone.replace(/\D/g, ""),
}))
.collect();
Extract Fields
const names = await enumerate(users)
.map((user) => user.name)
.collect();
Conditional Transform
const processed = await enumerate(items)
.map((item) => {
if (item.type === "A") {
return processTypeA(item);
} else {
return processTypeB(item);
}
})
.collect();
Batch Transform
const batched = await enumerate(items)
.map((item, i) => ({
...item,
batch: Math.floor(i / 100),
}))
.collect();
Error Handling
Errors in transformations propagate:
try {
await enumerate(data)
.map((item) => {
if (!item.valid) {
throw new Error(`Invalid item: ${item.id}`);
}
return item.value;
})
.collect();
} catch (error) {
console.error(`Transform failed: ${error.message}`);
}
Next Steps
- Aggregations - Combine items into single values
- Array-Like Methods - All available methods
- Concurrent Processing - Transform in parallel
Aggregations
Combine many items into one result.
reduce()
The Swiss Army knife of aggregations:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const sum = await enumerate([1, 2, 3, 4])
.reduce((acc, n) => acc + n, 0);
// 10
How It Works
// Start with initial value: 0
// Step 1: 0 + 1 = 1
// Step 2: 1 + 2 = 3
// Step 3: 3 + 3 = 6
// Step 4: 6 + 4 = 10
Building Objects
const grouped = await enumerate(items)
.reduce((acc, item) => {
const key = item.category;
acc[key] = acc[key] || [];
acc[key].push(item);
return acc;
}, {});
Calculating Statistics
const stats = await enumerate(numbers)
.reduce((acc, n) => ({
sum: acc.sum + n,
count: acc.count + 1,
min: Math.min(acc.min, n),
max: Math.max(acc.max, n),
}), { sum: 0, count: 0, min: Infinity, max: -Infinity });
const average = stats.sum / stats.count;
count()
Count items:
const total = await enumerate([1, 2, 3, 4, 5]).count();
// 5
Count Matches
const errorCount = await read("app.log")
.lines
.filter((line) => line.includes("ERROR"))
.count();
some()
Check if any item matches:
const hasError = await enumerate(lines)
.some((line) => line.includes("ERROR"));
// true or false
Early Exit
Stops as soon as it finds a match:
// Stops reading after first match
const hasLargeFile = await enumerate(files)
.some((file) => file.size > 1_000_000_000);
every()
Check if all items match:
const allPositive = await enumerate([1, 2, 3, 4])
.every((n) => n > 0);
// true
Validation
const allValid = await enumerate(records)
.every((record) =>
record.name &&
record.email &&
record.age >= 0
);
find()
Find first matching item:
const firstError = await enumerate(lines)
.find((line) => line.includes("ERROR"));
// First line with ERROR, or undefined
With Complex Predicate
const admin = await enumerate(users)
.find((user) =>
user.role === "admin" &&
user.active
);
Real-World Examples
Sum Values
const total = await enumerate(orders)
.map((order) => order.amount)
.reduce((sum, amount) => sum + amount, 0);
Count by Category
const counts = await enumerate(items)
.reduce((acc, item) => {
acc[item.category] = (acc[item.category] || 0) + 1;
return acc;
}, {});
Find Maximum
const max = await enumerate(numbers)
.reduce((max, n) => Math.max(max, n), -Infinity);
Build Index
const index = await enumerate(items)
.reduce((acc, item) => {
acc[item.id] = item;
return acc;
}, {});
Concatenate Strings
const combined = await enumerate(words)
.reduce((acc, word) => acc + " " + word, "");
Collect Unique Values
const unique = await enumerate(items)
.reduce((acc, item) => {
acc.add(item);
return acc;
}, new Set());
Advanced Patterns
Running Average
const runningAvg = await enumerate(numbers)
.reduce((acc, n) => {
acc.sum += n;
acc.count += 1;
acc.average = acc.sum / acc.count;
return acc;
}, { sum: 0, count: 0, average: 0 });
Nested Grouping
const grouped = await enumerate(items)
.reduce((acc, item) => {
const cat = item.category;
const type = item.type;
acc[cat] = acc[cat] || {};
acc[cat][type] = acc[cat][type] || [];
acc[cat][type].push(item);
return acc;
}, {});
Frequency Map
const frequency = await enumerate(words)
.reduce((acc, word) => {
acc[word] = (acc[word] || 0) + 1;
return acc;
}, {});
// Find most common
const mostCommon = Object.entries(frequency)
.sort((a, b) => b[1] - a[1])[0];
Accumulate with Transform
const processed = await enumerate(data)
.reduce((acc, item) => {
const transformed = transform(item);
if (transformed.valid) {
acc.push(transformed);
}
return acc;
}, []);
Performance Tips
Use Specific Methods
// ❌ Slower
const count = await enumerate(items)
.reduce((acc) => acc + 1, 0);
// ✅ Faster
const count = await enumerate(items).count();
Early Exit with some/every
// Stops at first match
const hasMatch = await enumerate(huge)
.some(predicate);
// Better than
const matches = await enumerate(huge)
.filter(predicate)
.count();
Combine Operations
// ✅ One pass
const stats = await enumerate(numbers)
.reduce((acc, n) => ({
sum: acc.sum + n,
count: acc.count + 1,
}), { sum: 0, count: 0 });
// ❌ Two passes
const sum = await enumerate(numbers).reduce((a, b) => a + b, 0);
const count = await enumerate(numbers).count();
Common Mistakes
Forgetting Initial Value
// ❌ Error with empty array
const sum = await enumerate([]).reduce((a, b) => a + b);
// ✅ Works with empty array
const sum = await enumerate([]).reduce((a, b) => a + b, 0);
Not Returning Accumulator
// ❌ Returns undefined
const result = await enumerate(items)
.reduce((acc, item) => {
acc.push(item);
// Missing return!
}, []);
// ✅ Returns accumulator
const result = await enumerate(items)
.reduce((acc, item) => {
acc.push(item);
return acc;
}, []);
Next Steps
- Transformations - Transform items
- Array-Like Methods - All available methods
- Streaming Large Files - Aggregate huge datasets
Slicing and Sampling
Take portions of your data stream.
take()
Take first N items:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const first3 = await enumerate([1, 2, 3, 4, 5])
.take(3)
.collect();
// [1, 2, 3]
Early Exit
Stops reading after N items:
// Only reads first 10 lines
const preview = await read("huge-file.txt")
.lines
.take(10)
.collect();
With Filter
// First 5 errors
const errors = await read("app.log")
.lines
.filter((line) => line.includes("ERROR"))
.take(5)
.collect();
drop()
Skip first N items:
const rest = await enumerate([1, 2, 3, 4, 5])
.drop(2)
.collect();
// [3, 4, 5]
Skip Header
const data = await read("data.csv")
.lines
.drop(1) // Skip header row
.collect();
Combining drop() and take()
Get a range of items by combining drop and take:
const middle = await enumerate([1, 2, 3, 4, 5])
.drop(1)
.take(3)
.collect();
// [2, 3, 4]
Pagination
const page = 2;
const pageSize = 10;
const items = await enumerate(allItems)
.drop(page * pageSize)
.take(pageSize)
.collect();
first
Get first item:
const first = await enumerate([1, 2, 3]).first;
// 1
With Pipeline
const result = await run("ls", "-la")
.lines
.first;
last
Get last item:
const last = await enumerate([1, 2, 3]).last;
// 3
Note: Reads entire stream to find last item.
nth()
Get item at index:
const third = await enumerate([1, 2, 3, 4, 5]).nth(2);
// 3 (zero-indexed)
Real-World Examples
Preview File
console.log("First 10 lines:");
await read("file.txt")
.lines
.take(10)
.forEach((line) => console.log(line));
Skip and Take
// Lines 11-20
const batch = await read("file.txt")
.lines
.drop(10)
.take(10)
.collect();
Sample Data
// Every 10th item
const sample = await enumerate(data)
.filter((_, i) => i % 10 === 0)
.collect();
Find Nth Match
// 5th error
const fifthError = await read("app.log")
.lines
.filter((line) => line.includes("ERROR"))
.nth(4); // Zero-indexed
Performance Tips
Use take() for Limits
// ✅ Stops early
const first100 = await enumerate(huge)
.take(100)
.collect();
// ❌ Reads everything
const all = await enumerate(huge).collect();
const first100 = all.slice(0, 100); // Array slice, not Enumerable
Combine with Filter
// Efficient: stops after 10 matches
const matches = await enumerate(data)
.filter(predicate)
.take(10)
.collect();
Next Steps
- Array-Like Methods - All available methods
- Transformations - Transform items
- Streaming Large Files - Work with huge files
WritableIterable
WritableIterable is a fascinating utility that inverts the normal data flow:
instead of pulling data from an iterable, you push data into it. It bridges the
gap between push-based (callbacks, events) and pull-based (async iteration)
programming models.
The Problem It Solves
Imagine you have a callback-based API (like event emitters, WebSocket messages, or sensor data) and you want to process it with proc’s pipeline operations. You can’t easily convert callbacks to an AsyncIterable… until now.
How It Works
WritableIterable is both:
- Writable: You can
.write()items to it - AsyncIterable: You can iterate over it with
for await
It uses an internal queue to buffer items between the writer and reader, allowing them to operate at different speeds.
Basic Usage
import { sleep, WritableIterable } from "jsr:@j50n/proc@0.24.6";
const writable = new WritableIterable<number>();
// Write in background (simulating slow producer)
(async () => {
await writable.write(1);
await sleep(100);
await writable.write(2);
await sleep(100);
await writable.write(3);
await writable.close();
})();
// Read (items arrive as they're written)
const results: number[] = [];
for await (const item of writable) {
console.log("Received:", item);
results.push(item);
}
console.log(results); // [1, 2, 3]
This demonstrates the streaming nature: the reader receives items as they’re written, not all at once.
⚠️ Important: You MUST call
.close()when done writing, or iteration will hang forever waiting for more data.
Key Concepts
Push vs Pull
Traditional AsyncIterable (pull-based):
// Consumer pulls data
for await (const item of iterable) {
// Process item
}
WritableIterable (push-based):
// Producer pushes data
await writable.write(item);
Backpressure
WritableIterable implements automatic backpressure. If the writer is faster
than the reader, .write() will pause until the reader catches up. This
prevents unbounded memory growth.
Real-World Examples
Example 1: Event Stream to Pipeline
Convert DOM events into a processable stream:
import { enumerate, WritableIterable } from "jsr:@j50n/proc@0.24.6";
const clicks = new WritableIterable<MouseEvent>();
// Producer: capture clicks
document.addEventListener("click", async (event) => {
await clicks.write(event);
});
// Consumer: process clicks
enumerate(clicks)
.map((event) => ({ x: event.clientX, y: event.clientY }))
.filter((pos) => pos.x > 100)
.forEach((pos) => console.log("Click at:", pos));
// Close when done (e.g., on page unload)
window.addEventListener("unload", () => clicks.close());
Example 2: WebSocket to Process
Feed WebSocket messages to a process:
import { WritableIterable } from "jsr:@j50n/proc@0.24.6";
const messages = new WritableIterable<string>();
// Producer: WebSocket messages
const ws = new WebSocket("wss://example.com");
ws.onmessage = async (event) => {
await messages.write(event.data);
};
ws.onclose = () => messages.close();
// Consumer: pipe to process
await enumerate(messages)
.run("jq", ".") // Pretty-print JSON
.toStdout();
Example 3: Sensor Data Stream
Process sensor readings as they arrive:
import { enumerate, WritableIterable } from "jsr:@j50n/proc@0.24.6";
interface SensorReading {
temperature: number;
timestamp: number;
}
const readings = new WritableIterable<SensorReading>();
// Producer: sensor callback
sensor.onReading(async (reading) => {
await readings.write(reading);
});
// Consumer: calculate moving average
const averages = await enumerate(readings)
.map((r) => r.temperature)
.take(100) // First 100 readings
.reduce((acc, temp) => acc + temp, 0)
.then((sum) => sum / 100);
console.log(`Average: ${averages}°C`);
await readings.close();
Example 4: Manual Process stdin
Feed data to a process programmatically:
import { enumerate, WritableIterable } from "jsr:@j50n/proc@0.24.6";
const input = new WritableIterable<string>();
// Producer: generate data
(async () => {
for (let i = 0; i < 10; i++) {
await input.write(`line ${i}`);
}
await input.close();
})();
// Consumer: pipe to process
await enumerate(input)
.run("grep", "5")
.toStdout();
// Output: line 5
Error Handling
Errors propagate through the iteration:
import { WritableIterable } from "jsr:@j50n/proc@0.24.6";
const writable = new WritableIterable<number>();
// Write and close with error
(async () => {
await writable.write(1);
await writable.write(2);
await writable.close(new Error("something failed"));
})();
try {
for await (const item of writable) {
console.log(item);
}
} catch (error) {
console.error("Error:", error.message);
}
// Output:
// 1
// 2
// Error: something failed
Cleanup with onclose
You can provide a cleanup callback:
import { WritableIterable } from "jsr:@j50n/proc@0.24.6";
const writable = new WritableIterable<string>({
onclose: async () => {
console.log("Cleaning up resources...");
// Close connections, files, etc.
},
});
await writable.write("data");
await writable.close();
// Output: Cleaning up resources...
API Reference
Constructor
new WritableIterable<T>(options?: { onclose?: () => void | Promise<void> })
options.onclose: Optional callback invoked when.close()is called
Methods
.write(item: T): Promise<void>
- Write an item to the stream
- Throws if already closed
- Implements backpressure (pauses if reader is slow)
.close(error?: Error): Promise<void>
- Close the stream
- Must be called to end iteration
- Safe to call multiple times
- Optional error propagates to reader
Properties
.isClosed: boolean
- Returns
trueif.close()has been called
Common Patterns
Pattern: Timed Data Generation
const timed = new WritableIterable<number>();
(async () => {
for (let i = 0; i < 5; i++) {
await timed.write(i);
await new Promise((resolve) => setTimeout(resolve, 1000));
}
await timed.close();
})();
for await (const item of timed) {
console.log(item); // Prints 0, 1, 2, 3, 4 (one per second)
}
Pattern: Conditional Close
const conditional = new WritableIterable<number>();
(async () => {
for (let i = 0; i < 100; i++) {
await conditional.write(i);
if (i === 10) {
await conditional.close(); // Stop early
break;
}
}
})();
const items = await enumerate(conditional).collect();
console.log(items.length); // 11 (0 through 10)
When to Use WritableIterable
Use it when:
- Converting callback-based APIs to AsyncIterable
- Feeding data to process stdin programmatically
- Bridging event-driven and stream-based code
- You need backpressure between producer and consumer
Don’t use it when:
- You already have an AsyncIterable (use
enumerate()instead) - You’re working with synchronous data (use arrays)
- You need multi-consumer support (WritableIterable is single-consumer)
Performance Notes
- Internal queue grows if writer is faster than reader
- Backpressure prevents unbounded growth
- Each
.write()creates a Promise (small overhead) - Best for moderate data rates (not millions of items/second)
Comparison with Other Approaches
vs. Array
// Array: all data in memory
const data = [1, 2, 3];
for (const item of data) {}
// WritableIterable: streaming, backpressure
const writable = new WritableIterable<number>();
for await (const item of writable) {}
vs. TransformStream
// TransformStream: byte-oriented, Web Streams API
const { readable, writable } = new TransformStream();
// WritableIterable: value-oriented, AsyncIterable
const writable = new WritableIterable<T>();
vs. Channel (from other languages)
If you’re familiar with Go channels or Rust channels, WritableIterable is
similar but:
- Single-consumer (not multi-consumer)
- Unbuffered by default (backpressure on every write)
- Integrates with AsyncIterable ecosystem
The “Interesting Little Beast”
What makes WritableIterable interesting:
- Inverted Control: Most iterables pull data; this one receives pushes
- Backpressure: Automatically slows down fast producers
- Bridge Pattern: Connects imperative (callbacks) to declarative (iteration)
- Error Propagation: Errors flow naturally through the iteration
- Simple API: Just
.write(),.close(), and iterate
It’s a small utility that solves a specific problem elegantly: turning push-based data sources into pull-based async iterables that work seamlessly with proc’s pipeline operations.
Data Transforms
Transform structured data between formats with streaming support and high performance.
⚠️ Experimental API (v0.24.0+): Data transforms are new and under active development. While tests pass and performance is reasonable, expect API changes and edge cases as we improve correctness and streaming performance. Production use should include thorough testing of your specific data patterns.
Choosing Your Approach
proc offers several ways to process data. Here’s how to choose:
| Approach | Best For | Performance |
|---|---|---|
| flatdata CLI | Large files (100MB+), batch processing | Highest |
| Data Transforms | In-process conversion, filtering, enrichment | Good to High |
| Process Pipelines | Shell-like operations, text processing | Varies |
| Async Iterables | Custom logic, API data, any async source | Varies |
Decision guide:
- Converting CSV/TSV/JSON files? → Data Transforms (this chapter)
- Processing 100MB+ files for maximum speed? → flatdata CLI
- Running shell commands and piping output? → Process Pipelines
- Working with API responses or custom data? → Async Iterables
Overview
The data transforms module converts between CSV, TSV, JSON, and Record formats. All transforms stream data without loading everything into memory.
Import
Data transforms are a separate import to keep the core library lightweight:
// Core library
import { enumerate, read, run } from "jsr:@j50n/proc";
// Data transforms (separate import)
import {
fromCsvToRows,
fromJsonToRows,
fromTsvToRows,
toJson,
toRecord,
toTsv,
} from "jsr:@j50n/proc/transforms";
Quick Start
import { read } from "jsr:@j50n/proc";
import { fromCsvToRows, toTsv } from "jsr:@j50n/proc/transforms";
// Convert CSV to TSV
await read("data.csv")
.transform(fromCsvToRows())
.transform(toTsv())
.writeTo("data.tsv");
Key Benefits
🚀 Streaming & Performance
- Streaming design: Constant memory usage regardless of file size
- LazyRow optimization: Faster parsing for CSV/TSV when accessing selective fields
- flatdata CLI: WASM-powered tool for very large files
📊 Format Support
- CSV: Universal compatibility with proper RFC 4180 compliance
- TSV: Fast, simple tab-separated format
- JSON Lines: Full object structure preservation
- Record: High-performance binary-safe format
🔄 Flexible Data Types
- Row arrays:
string[][]for simple tabular data - LazyRow: Optimized read-only access with lazy conversion
- Objects: Full JSON object support with optional validation
When to Use Each Format
CSV - Universal Compatibility
// Best for: Compatibility, Excel integration, legacy systems
await read("legacy-data.csv")
.transform(fromCsvToRows())
.transform(toRecord()) // Convert to faster format
.writeTo("optimized.record");
TSV - Speed + Readability
// Best for: Fast processing, human-readable data
await read("logs.tsv")
.transform(fromTsvToRows())
.filter((row) => row[2] === "ERROR")
.transform(toTsv())
.writeTo("errors.tsv");
JSON Lines - Rich Objects
// Best for: Complex nested data, APIs, configuration
await read("events.jsonl")
.transform(fromJsonToRows())
.filter((event) => event.severity === "high")
.transform(toJson())
.writeTo("alerts.jsonl");
Record - Maximum Performance
// Best for: High-throughput processing, internal formats
await read("big-data.record")
.transform(fromRecordToRows())
.map((row) => [row[0], processValue(row[1]), row[2]])
.transform(toRecord())
.writeTo("processed.record");
LazyRow: Optimized Data Access
LazyRow provides a read-only interface optimized for field access without upfront parsing costs:
import { fromCsvToLazyRows } from "jsr:@j50n/proc@0.24.6/transforms";
// Parse CSV into LazyRow format
const lazyRows = await read("data.csv")
.transform(fromCsvToLazyRows())
.collect();
// Efficient field access
for (const row of lazyRows) {
const name = row.getField(0); // Fast field access
const age = row.getField(1); // No parsing until needed
if (parseInt(age) > 18) {
console.log(`Adult: ${name}`);
}
}
LazyRow Benefits
- Zero conversion cost: Choose optimal backing based on source
- Lazy evaluation: Parse fields only when accessed
- Caching: Repeated access uses cached results
Real-World Examples
Data Pipeline
// Process sales data: CSV → filter → enrich → JSON
await read("sales.csv")
.transform(fromCsvToLazyRows())
.filter((row) => parseFloat(row.getField(3)) > 1000) // Amount > $1000
.map((row) => ({
id: row.getField(0),
customer: row.getField(1),
amount: parseFloat(row.getField(3)),
processed: new Date().toISOString(),
}))
.transform(toJson())
.writeTo("high-value-sales.jsonl");
Format Conversion
// Convert legacy CSV to Record format for efficient processing
await read("legacy.csv")
.transform(fromCsvToRows())
.transform(toRecord())
.writeTo("optimized.record");
Log Processing
// Parse structured logs and extract errors
await read("app.log.tsv")
.transform(fromTsvToRows())
.filter((row) => row[2] === "ERROR")
.map((row) => ({
timestamp: row[0],
service: row[1],
level: row[2],
message: row[3],
}))
.transform(toJson())
.writeTo("errors.jsonl");
Memory Efficiency
All transforms use streaming processing:
// ✅ Processes 10GB file with constant ~128KB memory usage
await read("huge-dataset.csv")
.transform(fromCsvToRows())
.filter((row) => row[0].startsWith("2024"))
.transform(toTsv())
.writeTo("filtered.tsv");
// ❌ Don't do this - loads everything into memory
const allRows = await read("huge-dataset.csv")
.transform(fromCsvToRows())
.collect(); // Memory explosion!
Error Handling
Transforms use strict error handling:
try {
await read("data.csv")
.transform(fromCsvToRows())
.transform(toJson())
.writeTo("output.jsonl");
} catch (error) {
if (error.message.includes("Invalid UTF-8")) {
console.error("File encoding issue");
} else if (error.message.includes("CSV")) {
console.error("Malformed CSV data");
}
}
See Also
- CSV Transforms — Detailed CSV parsing and generation
- TSV Transforms — Tab-separated value processing
- JSON Transforms — JSON Lines with validation
- Record Format — High-performance binary format
- LazyRow Optimization — Optimized data access patterns
- Performance Guide — Benchmarks and optimization tips
- flatdata CLI — WASM-powered processing at 330 MB/s
CSV Transforms
Parse and generate CSV (Comma-Separated Values) files with RFC 4180 compliance and LazyRow optimization.
⚠️ Experimental (v0.24.0+): CSV transforms are under active development. API may change as we improve correctness and streaming performance. Test thoroughly with your data patterns.
⚡ WASM-powered: CSV parsing uses WebAssembly for high performance. It uses the same WASM parser as flatdata CLI. See Fast CSV Parsing below.
Overview
CSV transforms provide robust parsing and generation of CSV files with proper handling of quoted fields, escaping, and edge cases.
Tip: Use LazyRow (fromCsvToLazyRows()) for better performance, especially when you only need to access a few fields from each row.
Basic Usage
Parsing CSV to Rows
import { read } from "jsr:@j50n/proc@0.24.6";
import { fromCsvToRows } from "jsr:@j50n/proc@0.24.6/transforms";
// Parse CSV into string arrays
const rows = await read("data.csv")
.transform(fromCsvToRows())
.collect();
// rows[0] = ["Name", "Age", "City"] // Header
// rows[1] = ["Alice", "30", "New York"] // Data row
// rows[2] = ["Bob", "25", "London"] // Data row
Parsing CSV to LazyRow (Recommended)
import { fromCsvToLazyRows } from "jsr:@j50n/proc@0.24.6/transforms";
// Parse CSV into optimized LazyRow format
const lazyRows = await read("data.csv")
.transform(fromCsvToLazyRows())
.collect();
// Efficient field access
for (const row of lazyRows) {
const name = row.getField(0);
const age = parseInt(row.getField(1));
const city = row.getField(2);
if (age >= 18) {
console.log(`${name} from ${city} is an adult`);
}
}
Generating CSV
import { toCsv } from "jsr:@j50n/proc@0.24.6/transforms";
// From string arrays
const data = [
["Name", "Age", "City"],
["Alice", "30", "New York"],
["Bob", "25", "London"],
];
await enumerate(data)
.transform(toCsv())
.writeTo("output.csv");
Advanced Parsing Options
Custom Separators
// Parse semicolon-separated values
const rows = await read("european.csv")
.transform(fromCsvToRows({ separator: ";" }))
.collect();
Handling Comments
// Skip lines starting with #
const rows = await read("data-with-comments.csv")
.transform(fromCsvToRows({ comment: "#" }))
.collect();
Flexible Field Counts
// Allow variable number of fields per row
const rows = await read("irregular.csv")
.transform(fromCsvToRows({ fieldsPerRecord: -1 }))
.collect();
Complete Options
interface CsvParseOptions {
separator?: string; // Field separator (default: ",")
comment?: string; // Comment character to ignore lines
trimLeadingSpace?: boolean; // Trim leading whitespace
lazyQuotes?: boolean; // Allow lazy quotes
fieldsPerRecord?: number; // Expected fields per record (-1 for variable)
}
const rows = await read("complex.csv")
.transform(fromCsvToRows({
separator: ",",
comment: "#",
trimLeadingSpace: true,
lazyQuotes: false,
fieldsPerRecord: 5,
}))
.collect();
Advanced Generation Options
Custom Output Format
interface CsvStringifyOptions {
separator?: string; // Field separator (default: ",")
crlf?: boolean; // Use CRLF line endings (default: true)
quote?: string; // Quote character (default: '"')
quotedFields?: boolean; // Quote all fields (default: false)
}
await enumerate(data)
.transform(toCsv({
separator: ";",
crlf: false, // Use LF only
quotedFields: true, // Quote all fields
}))
.writeTo("european.csv");
Handling Special Characters
// Data with commas, quotes, and newlines
const complexData = [
["Product", "Description", "Price"],
["Widget A", 'A "premium" widget, very nice', "$19.99"],
["Widget B", "Contains commas, and\nnewlines", "$29.99"],
];
// Automatically handles quoting and escaping
await enumerate(complexData)
.transform(toCsv())
.writeTo("products.csv");
// Output:
// Product,Description,Price
// Widget A,"A ""premium"" widget, very nice",$19.99
// "Widget B","Contains commas, and
// newlines",$29.99
Real-World Examples
Data Cleaning Pipeline
// Clean and validate CSV data
await read("messy-data.csv")
.transform(fromCsvToLazyRows())
.drop(1) // Skip header
.filter((row) => row.columnCount >= 3) // Ensure minimum columns
.map((row) => [
row.getField(0).trim(), // Clean name
row.getField(1).replace(/[^\d]/g, ""), // Extract digits only
row.getField(2).toLowerCase(), // Normalize city
])
.filter((row) => row[1].length > 0) // Remove invalid ages
.transform(toCsv())
.writeTo("cleaned-data.csv");
CSV to JSON Conversion
import { toJson } from "jsr:@j50n/proc@0.24.6/transforms";
// Convert CSV to JSON with headers
const csvData = await read("employees.csv")
.transform(fromCsvToLazyRows())
.collect();
const headers = csvData[0].toStringArray();
const dataRows = csvData.slice(1);
await enumerate(dataRows)
.map((row) => {
const obj: Record<string, string> = {};
for (let i = 0; i < headers.length; i++) {
obj[headers[i]] = row.getField(i);
}
return obj;
})
.transform(toJson())
.writeTo("employees.jsonl");
Large File Processing
// Process 10GB CSV file with constant memory usage
let processedCount = 0;
await read("huge-dataset.csv")
.transform(fromCsvToLazyRows())
.drop(1) // Skip header
.filter((row) => {
const status = row.getField(3);
return status === "active";
})
.map((row) => {
processedCount++;
if (processedCount % 100000 === 0) {
console.log(`Processed ${processedCount} rows`);
}
return [
row.getField(0), // ID
row.getField(1), // Name
new Date().toISOString(), // Processing timestamp
];
})
.transform(toCsv())
.writeTo("active-users.csv");
Excel-Compatible Output
// Generate CSV that opens correctly in Excel
const salesData = [
["Date", "Product", "Amount", "Currency"],
["2024-01-15", "Widget A", "1,234.56", "USD"],
["2024-01-16", "Widget B", "2,345.67", "EUR"],
];
await enumerate(salesData)
.transform(toCsv({
crlf: true, // Windows line endings
quotedFields: true, // Quote all fields for safety
}))
.writeTo("sales-report.csv");
Error Handling
Common CSV Errors
try {
await read("problematic.csv")
.transform(fromCsvToRows())
.collect();
} catch (error) {
if (error.message.includes("quote")) {
console.error("Malformed quotes in CSV");
} else if (error.message.includes("field")) {
console.error("Inconsistent field count");
} else if (error.message.includes("UTF-8")) {
console.error("Invalid character encoding");
}
}
Validation During Processing
// Validate data during parsing
await read("data.csv")
.transform(fromCsvToLazyRows())
.drop(1) // Skip header
.map((row, index) => {
if (row.columnCount !== 3) {
throw new Error(
`Row ${index + 2} has ${row.columnCount} fields, expected 3`,
);
}
const age = parseInt(row.getField(1));
if (isNaN(age) || age < 0 || age > 150) {
throw new Error(`Row ${index + 2} has invalid age: ${row.getField(1)}`);
}
return row.toStringArray();
})
.transform(toCsv())
.writeTo("validated.csv");
Performance Tips
Use LazyRow for Large Files
// ✅ Efficient - only parse fields you need
await read("large.csv")
.transform(fromCsvToLazyRows())
.filter((row) => row.getField(0).startsWith("A")) // Only parse field 0
.collect();
// ❌ Less efficient - parses all fields upfront
await read("large.csv")
.transform(fromCsvToRows())
.filter((row) => row[0].startsWith("A"))
.collect();
Batch Processing
// Process in batches for memory efficiency
const batchSize = 1000;
let batch: string[][] = [];
await read("huge.csv")
.transform(fromCsvToRows())
.forEach(async (row) => {
batch.push(row);
if (batch.length >= batchSize) {
await processBatch(batch);
batch = [];
}
});
// Process remaining rows
if (batch.length > 0) {
await processBatch(batch);
}
Convert to Other Formats
// Convert CSV to Record format for efficient processing
await read("data.csv")
.transform(fromCsvToRows())
.transform(toRecord())
.writeTo("data.record");
// Later processing uses the optimized format
await read("data.record")
.transform(fromRecordToRows())
.filter((row) => row[1] === "target")
.collect();
Integration with Other Formats
CSV → TSV
import { toTsv } from "jsr:@j50n/proc@0.24.6/transforms";
await read("data.csv")
.transform(fromCsvToRows())
.transform(toTsv())
.writeTo("data.tsv");
CSV → Record
import { toRecord } from "jsr:@j50n/proc@0.24.6/transforms";
await read("data.csv")
.transform(fromCsvToRows())
.transform(toRecord())
.writeTo("data.record");
Best Practices
- Use LazyRow for CSV processing when you don’t need all fields
- Validate field counts if your data requires consistent structure
- Use streaming processing for large files to maintain constant memory usage
- Convert to other formats for repeated processing of the same data
WASM-Powered Parsing
CSV parsing uses WebAssembly for high performance:
import { read } from "jsr:@j50n/proc@0.24.6";
import {
fromCsvToLazyRows,
fromCsvToRows,
} from "jsr:@j50n/proc@0.24.6/transforms";
// WASM-powered parsing (batched output)
const rows = await read("large-file.csv")
.transform(fromCsvToRows())
.flatMap((batch) => batch) // Flatten batches
.collect();
// Or with LazyRow
const lazyRows = await read("large-file.csv")
.transform(fromCsvToLazyRows())
.flatMap((batch) => batch)
.collect();
Key differences:
- Returns batches of rows (
string[][]) instead of individual rows - Uses WASM engine for parsing
When to use which:
| Parser | Use Case |
|---|---|
fromCsvToRows() | In-process parsing with WASM |
flatdata CLI | Maximum throughput, batch pipelines |
See Also
- TSV Transforms — Tab-separated processing
- LazyRow Guide — Detailed LazyRow usage patterns
- Performance Guide — Optimization strategies
- flatdata CLI — Multi-process streaming
TSV Transforms
Fast, simple tab-separated value processing.
⚠️ Experimental (v0.24.0+): TSV transforms are under active development. API may change as we improve correctness and streaming performance. Test thoroughly with your data patterns.
Overview
TSV (Tab-Separated Values) provides a good balance between human readability and processing speed. With no complex quoting rules like CSV, TSV parsing is simpler and faster.
Basic Usage
Parsing TSV to Rows
import { read } from "jsr:@j50n/proc@0.24.6";
import { fromTsvToRows } from "jsr:@j50n/proc@0.24.6/transforms";
// Parse TSV into string arrays
const rows = await read("data.tsv")
.transform(fromTsvToRows())
.collect();
// rows[0] = ["Name", "Age", "City"] // Header
// rows[1] = ["Alice", "30", "New York"] // Data row
// rows[2] = ["Bob", "25", "London"] // Data row
Parsing TSV to LazyRow
import { fromTsvToLazyRows } from "jsr:@j50n/proc@0.24.6/transforms";
// Parse TSV into optimized LazyRow format
const lazyRows = await read("data.tsv")
.transform(fromTsvToLazyRows())
.collect();
// Efficient field access
for (const row of lazyRows) {
const name = row.getField(0);
const age = parseInt(row.getField(1));
const city = row.getField(2);
console.log(`${name} (${age}) lives in ${city}`);
}
Generating TSV
import { toTsv } from "jsr:@j50n/proc@0.24.6/transforms";
// From string arrays
const data = [
["Name", "Age", "City"],
["Alice", "30", "New York"],
["Bob", "25", "London"],
];
await enumerate(data)
.transform(toTsv())
.writeTo("output.tsv");
Format Characteristics
Advantages
- Fast parsing: No complex quoting rules
- Human readable: Easy to view and edit
- Simple format: Minimal edge cases
Limitations
- No tabs in data: Fields cannot contain tab characters
- No newlines in data: Fields cannot contain line breaks
- Limited escaping: No standard way to include tabs/newlines
When to Use TSV
- Data doesn’t contain tabs or newlines
- Performance is important but readability matters
- Processing log files or structured data
- Need faster alternative to CSV
Real-World Examples
Log File Processing
// Process web server access logs
await read("access.log")
.transform(fromTsvToLazyRows())
.filter((row) => {
const statusCode = row.getField(6);
return statusCode.startsWith("4") || statusCode.startsWith("5");
})
.map((row) => ({
timestamp: row.getField(0),
method: row.getField(3),
path: row.getField(4),
status: row.getField(6),
userAgent: row.getField(8),
}))
.transform(toJson())
.writeTo("errors.jsonl");
Data Pipeline
// ETL pipeline: TSV → filter → transform → TSV
await read("raw-data.tsv")
.transform(fromTsvToLazyRows())
.drop(1) // Skip header
.filter((row) => {
const score = parseFloat(row.getField(3));
return score >= 0.8; // High-quality records only
})
.map((row) => [
row.getField(0), // ID
row.getField(1).toUpperCase(), // Name (normalized)
row.getField(2).toLowerCase(), // Email (normalized)
(parseFloat(row.getField(3)) * 100).toFixed(1), // Score as percentage
])
.transform(toTsv())
.writeTo("processed-data.tsv");
Format Conversion
// Convert CSV to TSV (faster processing)
await read("data.csv")
.transform(fromCsvToRows())
.transform(toTsv())
.writeTo("data.tsv");
// Later processing is faster
await read("data.tsv")
.transform(fromTsvToRows())
.filter((row) => row[0].startsWith("A"))
.collect();
Streaming Analytics
// Real-time log analysis
let requestCount = 0;
let errorCount = 0;
const statusCodes = new Map<string, number>();
await read("live-access.log")
.transform(fromTsvToLazyRows())
.forEach((row) => {
requestCount++;
const statusCode = row.getField(6);
statusCodes.set(statusCode, (statusCodes.get(statusCode) || 0) + 1);
if (statusCode.startsWith("4") || statusCode.startsWith("5")) {
errorCount++;
}
if (requestCount % 1000 === 0) {
const errorRate = (errorCount / requestCount * 100).toFixed(2);
console.log(
`Processed ${requestCount} requests, error rate: ${errorRate}%`,
);
}
});
Performance Optimization
Choose LazyRow Based on Access Pattern
// ✅ Use LazyRow for selective field access
await read("wide-data.tsv")
.transform(fromTsvToLazyRows())
.filter((row) => {
// Only parse fields 0 and 5
const id = row.getField(0);
const status = row.getField(5);
return id.startsWith("USER_") && status === "active";
})
.collect();
// ✅ Use regular parsing for full field access
await read("data.tsv")
.transform(fromTsvToRows())
.map((row) => {
// Process all fields
return processAllFields(row);
})
.collect();
Batch Processing for Large Files
// Process large TSV files in batches
const batchSize = 5000;
let batch: string[][] = [];
await read("huge-data.tsv")
.transform(fromTsvToRows())
.forEach(async (row) => {
batch.push(row);
if (batch.length >= batchSize) {
await processBatch(batch);
batch = [];
}
});
// Process remaining rows
if (batch.length > 0) {
await processBatch(batch);
}
Data Validation
Field Count Validation
// Ensure consistent field counts
const expectedFields = 5;
const errors: string[] = [];
await read("data.tsv")
.transform(fromTsvToRows())
.forEach((row, index) => {
if (row.length !== expectedFields) {
errors.push(
`Row ${
index + 1
}: Expected ${expectedFields} fields, got ${row.length}`,
);
}
});
if (errors.length > 0) {
console.error(`Validation failed:\n${errors.join("\n")}`);
}
Data Type Validation
// Validate data types during processing
await read("metrics.tsv")
.transform(fromTsvToLazyRows())
.drop(1) // Skip header
.map((row, index) => {
const rowNum = index + 2;
// Validate timestamp
const timestamp = row.getField(0);
if (!/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/.test(timestamp)) {
throw new Error(`Row ${rowNum}: Invalid timestamp format: ${timestamp}`);
}
// Validate numeric value
const value = parseFloat(row.getField(2));
if (isNaN(value)) {
throw new Error(
`Row ${rowNum}: Invalid numeric value: ${row.getField(2)}`,
);
}
return {
timestamp: new Date(timestamp),
metric: row.getField(1),
value: value,
};
})
.transform(toJson())
.writeTo("validated-metrics.jsonl");
Integration Examples
TSV to Database
// Load TSV data into database
const insertBatch = async (rows: string[][]) => {
const values = rows.map((row) => `('${row[0]}', '${row[1]}', ${row[2]})`)
.join(",");
await db.execute(`INSERT INTO users (name, email, age) VALUES ${values}`);
};
let batch: string[][] = [];
const batchSize = 1000;
await read("users.tsv")
.transform(fromTsvToRows())
.drop(1) // Skip header
.forEach(async (row) => {
batch.push(row);
if (batch.length >= batchSize) {
await insertBatch(batch);
batch = [];
}
});
if (batch.length > 0) {
await insertBatch(batch);
}
TSV to API
// Send TSV data to REST API
await read("events.tsv")
.transform(fromTsvToLazyRows())
.drop(1) // Skip header
.map((row) => ({
eventId: row.getField(0),
timestamp: row.getField(1),
userId: row.getField(2),
action: row.getField(3),
metadata: JSON.parse(row.getField(4) || "{}"),
}))
.concurrentMap(async (event) => {
const response = await fetch("/api/events", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(event),
});
if (!response.ok) {
throw new Error(
`Failed to send event ${event.eventId}: ${response.statusText}`,
);
}
return response.json();
}, { concurrency: 10 })
.forEach((result) => console.log("Sent:", result.id));
Error Handling
Malformed Data
try {
await read("data.tsv")
.transform(fromTsvToRows())
.collect();
} catch (error) {
if (error.message.includes("UTF-8")) {
console.error("Invalid character encoding in TSV file");
} else {
console.error(`TSV parsing failed: ${error.message}`);
}
}
Graceful Error Recovery
// Continue processing despite individual row errors
const errors: Array<{ row: number; error: string }> = [];
let successCount = 0;
await read("data.tsv")
.transform(fromTsvToLazyRows())
.drop(1)
.forEach((row, index) => {
try {
const processed = processRow(row);
successCount++;
} catch (error) {
errors.push({
row: index + 2, // Account for header and 0-based index
error: error.message,
});
}
});
console.log(`Successfully processed ${successCount} rows`);
if (errors.length > 0) {
console.error(`${errors.length} rows had errors:`);
errors.forEach(({ row, error }) => {
console.error(` Row ${row}: ${error}`);
});
}
Best Practices
- Use LazyRow selectively - great for small/large datasets with selective access
- Validate field counts if your data requires consistent structure
- Avoid tabs and newlines in your data fields
- Use TSV for logs and structured data without complex formatting needs
- Convert from CSV to TSV for faster repeated processing
- Handle encoding properly - ensure UTF-8 compatibility
- Batch large datasets to control memory usage
- Validate data types during processing for early error detection
Comparison with Other Formats
TSV vs CSV
- Simplicity: No complex quoting/escaping rules
- Limitations: Cannot handle tabs or newlines in data
- Compatibility: Less universal than CSV
TSV vs Record
- Readability: TSV is human-readable, Record is binary
- Speed: Record is faster for large datasets
- Portability: TSV works with any text editor
- Safety: Record handles any UTF-8 content safely
TSV vs JSON
- Structure: JSON supports nested objects, TSV is flat
- Speed: TSV is faster for tabular data
- Size: TSV is more compact for simple data
- Flexibility: JSON is more flexible for complex structures
Next Steps
- CSV Transforms - When you need CSV compatibility
- Record Format - For maximum performance
- LazyRow Guide - Advanced optimization patterns
- Performance Guide - Detailed benchmarks and optimization
JSON Transforms
Process JSON Lines (JSONL) format with full object structure support and optional schema validation.
⚠️ Experimental (v0.24.0+): JSON transforms are under active development. API may change as we improve correctness and streaming performance. Test thoroughly with your data patterns.
Overview
JSON transforms handle JSON Lines format - one complete JSON value per line. Unlike other formats, JSON preserves full object structure including nested objects, arrays, and all JSON data types. This makes it ideal for APIs, configuration data, and complex structured information.
Basic Usage
Parsing JSON Lines
import { read } from "jsr:@j50n/proc@0.24.6";
import { fromJsonToRows } from "jsr:@j50n/proc@0.24.6/transforms";
// Parse JSONL into objects
const objects = await read("events.jsonl")
.transform(fromJsonToRows())
.collect();
// Each object preserves full JSON structure
// objects[0] = { id: "evt_123", type: "click", user: { name: "Alice" } }
// objects[1] = { id: "evt_124", type: "view", metadata: [1, 2, 3] }
Generating JSON Lines
import { toJson } from "jsr:@j50n/proc@0.24.6/transforms";
// From objects
const events = [
{ id: "evt_123", type: "click", timestamp: "2024-01-15T10:30:00Z" },
{ id: "evt_124", type: "view", user: { id: 456, name: "Bob" } },
];
await enumerate(events)
.transform(toJson())
.writeTo("events.jsonl");
Schema Validation
Using Zod Schemas
import { z } from "zod";
// Define schema
const EventSchema = z.object({
id: z.string(),
type: z.enum(["click", "view", "purchase"]),
timestamp: z.string().datetime(),
user: z.object({
id: z.number(),
name: z.string(),
}).optional(),
metadata: z.record(z.unknown()).optional(),
});
type Event = z.infer<typeof EventSchema>;
// Parse with validation
const validEvents = await read("events.jsonl")
.transform(fromJsonToRows<Event>({
schema: EventSchema,
}))
.collect();
Partial Validation for Performance
// Validate only first 1000 rows for performance
const events = await read("large-events.jsonl")
.transform(fromJsonToRows<Event>({
schema: EventSchema,
sampleSize: 1000, // Only validate first 1000 rows
}))
.collect();
Custom Validation
// Custom validation logic
const validateEvent = (obj: unknown): obj is Event => {
return typeof obj === "object" &&
obj !== null &&
"id" in obj &&
"type" in obj;
};
const events = await read("events.jsonl")
.transform(fromJsonToRows())
.filter((obj): obj is Event => {
if (!validateEvent(obj)) {
console.warn(`Invalid event: ${JSON.stringify(obj)}`);
return false;
}
return true;
})
.collect();
Real-World Examples
API Event Processing
// Process webhook events
interface WebhookEvent {
id: string;
type: string;
timestamp: string;
data: Record<string, unknown>;
}
await read("webhook-events.jsonl")
.transform(fromJsonToRows<WebhookEvent>())
.filter((event) => event.type === "user.created")
.map((event) => ({
userId: event.data.id as string,
email: event.data.email as string,
createdAt: new Date(event.timestamp),
source: "webhook",
}))
.transform(toJson())
.writeTo("new-users.jsonl");
Configuration Processing
// Process application configurations
interface AppConfig {
name: string;
version: string;
features: string[];
database: {
host: string;
port: number;
ssl: boolean;
};
cache?: {
ttl: number;
maxSize: number;
};
}
const configs = await read("app-configs.jsonl")
.transform(fromJsonToRows<AppConfig>())
.filter((config) => config.version.startsWith("2.")) // Version 2.x only
.map((config) => ({
...config,
features: config.features.filter((f) => f !== "deprecated-feature"),
database: {
...config.database,
ssl: true, // Force SSL for all configs
},
}))
.collect();
// Write updated configurations
await enumerate(configs)
.transform(toJson())
.writeTo("updated-configs.jsonl");
Log Analysis
// Analyze structured application logs
interface LogEntry {
timestamp: string;
level: "debug" | "info" | "warn" | "error";
service: string;
message: string;
context?: Record<string, unknown>;
error?: {
name: string;
message: string;
stack?: string;
};
}
// Extract error patterns
const errorPatterns = new Map<string, number>();
await read("app-logs.jsonl")
.transform(fromJsonToRows<LogEntry>())
.filter((log) => log.level === "error" && log.error)
.forEach((log) => {
const errorType = log.error!.name;
errorPatterns.set(errorType, (errorPatterns.get(errorType) || 0) + 1);
});
// Output error summary
const sortedErrors = Array.from(errorPatterns.entries())
.sort(([, a], [, b]) => b - a);
console.log("Top error types:");
sortedErrors.slice(0, 10).forEach(([type, count]) => {
console.log(` ${type}: ${count} occurrences`);
});
Data Transformation Pipeline
// Transform nested data structures
interface RawOrder {
orderId: string;
customer: {
id: string;
name: string;
email: string;
};
items: Array<{
productId: string;
quantity: number;
price: number;
}>;
shipping: {
address: string;
method: string;
cost: number;
};
}
interface ProcessedOrder {
id: string;
customerId: string;
customerEmail: string;
totalAmount: number;
itemCount: number;
shippingCost: number;
processedAt: string;
}
await read("raw-orders.jsonl")
.transform(fromJsonToRows<RawOrder>())
.map((order): ProcessedOrder => ({
id: order.orderId,
customerId: order.customer.id,
customerEmail: order.customer.email,
totalAmount: order.items.reduce(
(sum, item) => sum + (item.quantity * item.price),
0,
),
itemCount: order.items.reduce((sum, item) => sum + item.quantity, 0),
shippingCost: order.shipping.cost,
processedAt: new Date().toISOString(),
}))
.filter((order) => order.totalAmount > 100) // High-value orders only
.transform(toJson())
.writeTo("processed-orders.jsonl");
Advanced Usage
Streaming Large JSON Files
// Process large JSONL files with memory control
let processedCount = 0;
const batchSize = 1000;
let batch: LogEntry[] = [];
await read("huge-logs.jsonl")
.transform(fromJsonToRows<LogEntry>())
.forEach(async (log) => {
batch.push(log);
processedCount++;
if (batch.length >= batchSize) {
await processBatch(batch);
batch = [];
if (processedCount % 10000 === 0) {
console.log(`Processed ${processedCount} log entries`);
}
}
});
// Process remaining entries
if (batch.length > 0) {
await processBatch(batch);
}
Nested Object Manipulation
// Deep object transformation
interface NestedData {
user: {
profile: {
personal: {
name: string;
age: number;
};
preferences: {
theme: string;
notifications: boolean;
};
};
};
metadata: Record<string, unknown>;
}
await read("nested-data.jsonl")
.transform(fromJsonToRows<NestedData>())
.map((data) => ({
// Flatten nested structure
userName: data.user.profile.personal.name,
userAge: data.user.profile.personal.age,
theme: data.user.profile.preferences.theme,
notifications: data.user.profile.preferences.notifications,
// Preserve metadata as-is
metadata: data.metadata,
}))
.transform(toJson())
.writeTo("flattened-data.jsonl");
Array Processing
// Handle arrays within JSON objects
interface EventBatch {
batchId: string;
timestamp: string;
events: Array<{
type: string;
data: Record<string, unknown>;
}>;
}
// Flatten event batches into individual events
await read("event-batches.jsonl")
.transform(fromJsonToRows<EventBatch>())
.flatMap((batch) =>
batch.events.map((event) => ({
batchId: batch.batchId,
batchTimestamp: batch.timestamp,
eventType: event.type,
eventData: event.data,
}))
)
.transform(toJson())
.writeTo("individual-events.jsonl");
Error Handling
JSON Parsing Errors
try {
await read("malformed.jsonl")
.transform(fromJsonToRows())
.collect();
} catch (error) {
if (error.message.includes("JSON")) {
console.error("Invalid JSON format in file");
} else if (error.message.includes("UTF-8")) {
console.error("Invalid character encoding");
}
}
Schema Validation Errors
try {
await read("events.jsonl")
.transform(fromJsonToRows({ schema: EventSchema }))
.collect();
} catch (error) {
if (error.name === "ZodError") {
console.error("Schema validation failed:");
error.errors.forEach((err: any) => {
console.error(` ${err.path.join(".")}: ${err.message}`);
});
}
}
Graceful Error Recovery
// Continue processing despite individual JSON errors
const errors: Array<{ line: number; error: string }> = [];
let successCount = 0;
await read("mixed-quality.jsonl")
.lines
.forEach((line, index) => {
try {
const obj = JSON.parse(line);
// Process valid JSON
successCount++;
} catch (error) {
errors.push({
line: index + 1,
error: error.message,
});
}
});
console.log(`Successfully parsed ${successCount} objects`);
if (errors.length > 0) {
console.error(`${errors.length} lines had JSON errors`);
}
Performance Optimization
Selective Processing
// Only parse objects that match criteria
await read("events.jsonl")
.lines
.filter((line) => line.includes('"type":"error"')) // Quick string check
.map((line) => JSON.parse(line)) // Parse only matching lines
.transform(toJson())
.writeTo("error-events.jsonl");
Streaming vs Batch Processing
// ✅ Streaming - constant memory usage
await read("large-data.jsonl")
.transform(fromJsonToRows())
.filter((obj) => obj.status === "active")
.transform(toJson())
.writeTo("active-data.jsonl");
// ❌ Batch - loads everything into memory
const allData = await read("large-data.jsonl")
.transform(fromJsonToRows())
.collect(); // Memory explosion!
Integration Examples
JSON to Database
// Load JSON data into database
interface User {
id: string;
name: string;
email: string;
metadata: Record<string, unknown>;
}
await read("users.jsonl")
.transform(fromJsonToRows<User>())
.concurrentMap(async (user) => {
await db.users.create({
data: {
id: user.id,
name: user.name,
email: user.email,
metadata: JSON.stringify(user.metadata),
},
});
}, { concurrency: 10 })
.forEach(() => {}); // Consume the stream
JSON to Other Formats
// Convert JSON to CSV (flatten objects)
await read("users.jsonl")
.transform(fromJsonToRows<User>())
.map((user) => [
user.id,
user.name,
user.email,
JSON.stringify(user.metadata), // Serialize complex data
])
.transform(toCsv())
.writeTo("users.csv");
Best Practices
- Use for rich object structures - JSON excels with nested data
- Validate with schemas for production data processing
- Sample validation for large files to balance safety and performance
- Handle parsing errors gracefully - not all lines may be valid JSON
- Stream large files - avoid loading everything into memory
- Consider flattening complex nested structures for simpler processing
- Use selective parsing when you only need specific object types
- Preserve object structure when possible rather than flattening unnecessarily
Comparison with Other Formats
JSON vs CSV/TSV
- Structure: JSON supports nested objects, CSV/TSV are flat
- Types: JSON preserves data types, CSV/TSV are all strings
- Size: JSON is larger due to field names and structure
- Speed: CSV/TSV are faster for simple tabular data
JSON vs Record
- Readability: JSON is human-readable, Record is binary
- Flexibility: JSON supports any structure, Record is tabular
- Performance: Record is faster for large datasets
- Compatibility: JSON works everywhere, Record is specialized
Next Steps
- Record Format - For maximum performance with structured data
- CSV Transforms - When you need tabular compatibility
- LazyRow Guide - Not applicable to JSON (preserves object structure)
- Performance Guide - Optimization strategies for all formats
Record Format
High-performance binary-safe format using ASCII control characters.
⚠️ Experimental (v0.24.0+): Record format transforms are under active development. API may change as we improve correctness and streaming performance. Test thoroughly with your data patterns.
Overview
Record format is designed for high-throughput data processing pipelines. It uses ASCII control characters (Record Separator and Field Separator) to achieve reliable parsing while supporting any UTF-8 content in field values, including tabs and newlines.
Format Specification
Record format uses ASCII control characters for reliable field and record separation:
- Record Separator (RS):
\x1E(ASCII 30) - separates records - Field Separator (FS):
\x1F(ASCII 31) - separates fields within records
These characters are defined in common.ts and should not appear in actual
data, allowing safe processing of tabs, newlines, and other special characters
within field values.
Format Example
field1\x1Ffield2\x1Ffield3\x1E
field1\x1Ffield2\x1Ffield3\x1E
For data: [["Alice", "30", "New\nYork"], ["Bob", "25", "London"]]
Alice\x1F30\x1FNew\nYork\x1E
Bob\x1F25\x1FLondon\x1E
Basic Usage
Parsing Record Format
import { read } from "jsr:@j50n/proc@0.24.6";
import { fromRecordToRows } from "jsr:@j50n/proc@0.24.6/transforms";
// Parse Record format into string arrays
const rows = await read("data.record")
.transform(fromRecordToRows())
.collect();
// rows[0] = ["Alice", "30", "New\nYork"]
// rows[1] = ["Bob", "25", "London"]
Parsing Record to LazyRow
import { fromRecordToLazyRows } from "jsr:@j50n/proc@0.24.6/transforms";
// Parse Record format into optimized LazyRow format
const lazyRows = await read("data.record")
.transform(fromRecordToLazyRows())
.collect();
// Efficient field access
for (const row of lazyRows) {
const name = row.getField(0);
const age = parseInt(row.getField(1));
const city = row.getField(2);
console.log(`${name} (${age}) from ${city}`);
}
Generating Record Format
import { toRecord } from "jsr:@j50n/proc@0.24.6/transforms";
// From string arrays
const data = [
["Alice", "30", "New\nYork"], // Newlines are safe
["Bob", "25", "Tab\there"], // Tabs are safe
["Carol", "35", 'Quote"here'], // Quotes are safe
];
await enumerate(data)
.transform(toRecord())
.writeTo("output.record");
Key Advantages
Binary Safety
Record format safely handles any UTF-8 content:
// All special characters are preserved safely
const complexData = [
["Product", "Description", "Notes"],
["Widget A", "Contains\ttabs\nand newlines", "Special chars: \"'`"],
["Widget B", "Unicode: café naïve 🚀 東京", "Control chars safe"],
["Widget C", "Commas, semicolons; all safe", "No escaping needed"],
];
await enumerate(complexData)
.transform(toRecord())
.writeTo("complex.record");
// Perfect round-trip preservation
const restored = await read("complex.record")
.transform(fromRecordToRows())
.collect();
// restored === complexData (exact match)
No Escaping Required
Unlike CSV, Record format needs no complex escaping:
// CSV requires complex quoting and escaping
const csvProblematic = [
['Field with "quotes"', "Field with, commas", "Field with\nnewlines"],
];
// Record format handles it naturally
await enumerate(csvProblematic)
.transform(toRecord())
.writeTo("no-escaping-needed.record");
Maximum Performance
Record format is optimized for speed:
// Fastest format for high-throughput processing
const startTime = Date.now();
await read("large-dataset.record") // Fast parsing
.transform(fromRecordToRows())
.filter((row) => row[0].startsWith("A"))
.transform(toRecord()) // Fast stringify
.writeTo("filtered.record");
const duration = Date.now() - startTime;
console.log(`Processed in ${duration}ms`);
Real-World Examples
High-Throughput ETL Pipeline
// Process millions of records efficiently
let processedCount = 0;
await read("raw-data.record")
.transform(fromRecordToLazyRows())
.filter((row) => {
const status = row.getField(5);
return status === "active";
})
.map((row) => [
row.getField(0), // ID
row.getField(1).toUpperCase(), // Name (normalized)
row.getField(2).toLowerCase(), // Email (normalized)
new Date().toISOString(), // Processing timestamp
"processed", // Status
])
.transform(toRecord())
.writeTo("processed-data.record");
Format Conversion for Performance
// Convert slow formats to Record for repeated processing
console.log("Converting CSV to Record format...");
await read("slow-data.csv")
.transform(fromCsvToRows())
.transform(toRecord())
.writeTo("fast-data.record");
console.log("Conversion complete.");
// Later processing benefits from Record format
await read("fast-data.record")
.transform(fromRecordToRows())
.filter((row) => row[2] === "target")
.collect();
Streaming Data Processing
// Real-time data processing with Record format
interface ProcessingStats {
totalRecords: number;
validRecords: number;
errorRecords: number;
startTime: number;
}
const stats: ProcessingStats = {
totalRecords: 0,
validRecords: 0,
errorRecords: 0,
startTime: Date.now(),
};
await read("streaming-data.record")
.transform(fromRecordToLazyRows())
.forEach((row) => {
stats.totalRecords++;
try {
// Validate and process record
const id = row.getField(0);
const value = parseFloat(row.getField(3));
if (id && !isNaN(value)) {
stats.validRecords++;
// Process valid record
} else {
stats.errorRecords++;
}
// Report progress
if (stats.totalRecords % 100000 === 0) {
const elapsed = (Date.now() - stats.startTime) / 1000;
const rate = (stats.totalRecords / elapsed).toFixed(0);
console.log(
`Processed ${stats.totalRecords} records (${rate} records/sec)`,
);
}
} catch (error) {
stats.errorRecords++;
}
});
console.log(
`Final stats: ${stats.validRecords}/${stats.totalRecords} valid records`,
);
Data Archival and Compression
// Record format compresses well due to regular structure
import { gzip } from "jsr:@j50n/proc@0.24.6/transforms";
// Archive data with compression
await read("large-dataset.record")
.transform(gzip)
.writeTo("archived-data.record.gz");
// Later retrieval with decompression
await read("archived-data.record.gz")
.transform(gunzip)
.transform(fromRecordToRows())
.take(1000) // Sample first 1000 records
.collect();
Performance Optimization
LazyRow Usage with Record
Record format shows mixed LazyRow performance:
// ✅ Use LazyRow for selective field access
await read("wide-data.record")
.transform(fromRecordToLazyRows())
.filter((row) => {
// Only parse fields 0 and 10
const id = row.getField(0);
const status = row.getField(10);
return id.startsWith("USER_") && status === "active";
})
.collect();
// ✅ Use regular parsing for full field processing
await read("data.record")
.transform(fromRecordToRows())
.map((row) => {
// Process all fields efficiently
return processAllFields(row);
})
.collect();
Batch Processing
// Process large Record files in memory-efficient batches
const batchSize = 10000;
let batch: string[][] = [];
await read("huge-data.record")
.transform(fromRecordToRows())
.forEach(async (row) => {
batch.push(row);
if (batch.length >= batchSize) {
await processBatch(batch);
batch = [];
}
});
// Process remaining rows
if (batch.length > 0) {
await processBatch(batch);
}
Parallel Processing
// Process Record data in parallel streams
const inputFiles = ["data1.record", "data2.record", "data3.record"];
const results = await Promise.all(
inputFiles.map(async (file, index) => {
return await read(file)
.transform(fromRecordToRows())
.filter((row) => row[0].startsWith(`BATCH_${index}`))
.collect();
}),
);
// Combine results
const allResults = results.flat();
Data Validation
Field Count Validation
// Ensure consistent record structure
const expectedFields = 7;
const errors: string[] = [];
await read("data.record")
.transform(fromRecordToRows())
.forEach((row, index) => {
if (row.length !== expectedFields) {
errors.push(
`Record ${
index + 1
}: Expected ${expectedFields} fields, got ${row.length}`,
);
}
});
if (errors.length > 0) {
console.error(`Validation failed:\n${errors.join("\n")}`);
}
Data Integrity Checks
// Validate data during processing
await read("transactions.record")
.transform(fromRecordToLazyRows())
.map((row, index) => {
const recordNum = index + 1;
// Validate transaction ID format
const txId = row.getField(0);
if (!/^TX_\d{8}$/.test(txId)) {
throw new Error(`Record ${recordNum}: Invalid transaction ID: ${txId}`);
}
// Validate amount
const amount = parseFloat(row.getField(3));
if (isNaN(amount) || amount <= 0) {
throw new Error(
`Record ${recordNum}: Invalid amount: ${row.getField(3)}`,
);
}
return {
id: txId,
amount: amount,
timestamp: row.getField(1),
description: row.getField(2),
};
})
.transform(toJson())
.writeTo("validated-transactions.jsonl");
Integration Examples
Record to Database
// Bulk load Record data into database
const insertBatch = async (rows: string[][]) => {
const values = rows.map((row) =>
`(${row.map((field) => `'${field.replace(/'/g, "''")}'`).join(", ")})`
).join(", ");
await db.execute(`
INSERT INTO users (id, name, email, created_at, status)
VALUES ${values}
`);
};
let batch: string[][] = [];
const batchSize = 5000;
await read("users.record")
.transform(fromRecordToRows())
.forEach(async (row) => {
batch.push(row);
if (batch.length >= batchSize) {
await insertBatch(batch);
batch = [];
}
});
if (batch.length > 0) {
await insertBatch(batch);
}
Record to API
// Stream Record data to REST API
await read("events.record")
.transform(fromRecordToLazyRows())
.map((row) => ({
eventId: row.getField(0),
timestamp: row.getField(1),
userId: row.getField(2),
action: row.getField(3),
metadata: JSON.parse(row.getField(4) || "{}"),
}))
.concurrentMap(async (event) => {
const response = await fetch("/api/events", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(event),
});
if (!response.ok) {
throw new Error(
`API error for event ${event.eventId}: ${response.statusText}`,
);
}
return response.json();
}, { concurrency: 20 }) // Higher concurrency due to Record format speed
.forEach((result) => console.log("Processed:", result.id));
Error Handling
Malformed Records
try {
await read("data.record")
.transform(fromRecordToRows())
.collect();
} catch (error) {
if (error.message.includes("UTF-8")) {
console.error("Invalid character encoding in Record file");
} else if (error.message.includes("separator")) {
console.error("Malformed Record format - invalid separators");
}
}
Graceful Error Recovery
// Continue processing despite individual record errors
const errors: Array<{ record: number; error: string }> = [];
let successCount = 0;
await read("data.record")
.transform(fromRecordToLazyRows())
.forEach((row, index) => {
try {
const processed = processRecord(row);
successCount++;
} catch (error) {
errors.push({
record: index + 1,
error: error.message,
});
}
});
console.log(`Successfully processed ${successCount} records`);
if (errors.length > 0) {
console.error(`${errors.length} records had errors:`);
errors.slice(0, 10).forEach(({ record, error }) => {
console.error(` Record ${record}: ${error}`);
});
}
Best Practices
- Use for internal processing - Record format is not human-readable
- Leverage binary safety - no need to escape special characters
- Choose LazyRow based on access patterns - selective vs full field access
- Validate field counts if your data requires consistent structure
- Use for high-throughput pipelines - efficient format for large datasets
- Convert from other formats for repeated processing
- Handle UTF-8 properly - ensure proper encoding throughout pipeline
- Batch large datasets to control memory usage in processing
Comparison with Other Formats
Record vs CSV
- Safety: No escaping needed for special characters
- Readability: CSV is human-readable, Record is binary
- Compatibility: CSV is universal, Record is specialized
Record vs TSV
- Content: Record handles tabs/newlines safely, TSV cannot
- Simplicity: TSV is simpler and human-readable
- Performance: Record scales well with dataset size
Record vs JSON
- Structure: JSON supports nested objects, Record is flat tabular
- Speed: Record is faster for large tabular datasets
- Flexibility: JSON is more flexible for complex structures
- Size: Record is more compact for simple tabular data
Next Steps
- Performance Guide - Detailed benchmarks and optimization
- LazyRow Guide - Optimization patterns for Record format
- CSV Transforms - When you need human-readable compatibility
- JSON Transforms - When you need rich object structures
LazyRow Guide
Optimized read-only data access with lazy evaluation and caching.
⚠️ Experimental (v0.24.0+): LazyRow is under active development. API may change as we improve correctness and streaming performance. Test thoroughly with your data patterns.
Overview
LazyRow is a data structure designed for efficient field access in tabular data. It uses lazy evaluation and caching to minimize parsing overhead while providing a clean, simple API.
Key Benefits
- Zero conversion cost: Choose optimal backing based on source data
- Lazy evaluation: Parse fields only when accessed
- Automatic caching: Repeated access uses cached results
- Memory efficient: Minimal overhead for conversion caching
Basic Usage
Creating LazyRow
import { LazyRow } from "jsr:@j50n/proc@0.24.6/transforms";
// From string array (zero cost)
const lazyRow1 = LazyRow.fromStringArray(['Alice', '30', 'Engineer']);
// From binary data (zero cost)
const binaryData = new Uint8Array([...]); // LazyRow binary format
const lazyRow2 = LazyRow.fromBinary(binaryData);
Field Access
// Efficient field access
const name = lazyRow.getField(0); // "Alice"
const age = lazyRow.getField(1); // "30"
const job = lazyRow.getField(2); // "Engineer"
// Column count
console.log(lazyRow.columnCount); // 3
Conversions with Caching
// Convert to string array (cached after first call)
const fields1 = lazyRow.toStringArray(); // Converts and caches
const fields2 = lazyRow.toStringArray(); // Returns cached result
// Convert to binary (cached after first call)
const binary1 = lazyRow.toBinary(); // Converts and caches
const binary2 = lazyRow.toBinary(); // Returns cached result
Parsing with LazyRow
CSV to LazyRow
import { fromCsvToLazyRows } from "jsr:@j50n/proc@0.24.6/transforms";
const lazyRows = await read("data.csv")
.transform(fromCsvToLazyRows())
.collect();
// Process efficiently
for (const row of lazyRows) {
// Only parse the fields you actually use
const id = row.getField(0);
if (id.startsWith("USER_")) {
const name = row.getField(1); // Parse on demand
const email = row.getField(2); // Parse on demand
console.log(`User: ${name} <${email}>`);
}
// Fields 3+ are never parsed if not accessed
}
TSV to LazyRow
import { fromTsvToLazyRows } from "jsr:@j50n/proc@0.24.6/transforms";
const lazyRows = await read("logs.tsv")
.transform(fromTsvToLazyRows())
.collect();
// Efficient log processing
for (const row of lazyRows) {
const level = row.getField(2); // Parse log level
if (level === "ERROR") {
const timestamp = row.getField(0);
const message = row.getField(3);
console.error(`${timestamp}: ${message}`);
}
}
Record to LazyRow
import { fromRecordToLazyRows } from "jsr:@j50n/proc@0.24.6/transforms";
const lazyRows = await read("data.record")
.transform(fromRecordToLazyRows())
.collect();
Binary Format Specification
LazyRow uses an efficient binary format for storage and transmission:
LazyRow Binary Layout:
┌─────────────────┬──────────────────┬─────────────────┐
│ Field Count │ Field Lengths │ Field Data │
│ (4 bytes) │ (4 * N bytes) │ (UTF-8 bytes) │
└─────────────────┴──────────────────┴─────────────────┘
Field Count: int32 - Number of fields (N)
Field Lengths: int32[N] - Byte length of each field
Field Data: Concatenated UTF-8 encoded field values
Binary Format Example
// For data: ["Alice", "30", "Engineer"]
const binary = lazyRow.toBinary();
// Binary layout:
// [0-3]: 0x00000003 // 3 fields
// [4-7]: 0x00000005 // "Alice" = 5 bytes
// [8-11]: 0x00000002 // "30" = 2 bytes
// [12-15]: 0x00000008 // "Engineer" = 8 bytes
// [16-20]: "Alice" // UTF-8 data
// [21-22]: "30" // UTF-8 data
// [23-30]: "Engineer" // UTF-8 data
Implementation Details
Polymorphic Design
LazyRow uses an abstract base class with two concrete implementations:
abstract class LazyRow {
abstract readonly columnCount: number;
abstract getField(index: number): string;
abstract toStringArray(): string[];
abstract toBinary(): Uint8Array;
// Static factory methods
static fromStringArray(fields: string[]): LazyRow;
static fromBinary(data: Uint8Array): LazyRow;
}
StringArrayLazyRow
- Backing:
string[]array - Best for: Data parsed from text formats (CSV, TSV)
- Lazy conversion: Binary format generated on demand
- Caching: Binary result cached after first
toBinary()call
BinaryLazyRow
- Backing:
Uint8Arraywith field boundaries - Best for: Data from binary sources or network transmission
- Lazy conversion: String parsing on demand
- Caching: String results cached after field access
Performance Patterns
Selective Field Access
// ✅ Efficient - only parse needed fields
await read("large.csv")
.transform(fromCsvToLazyRows())
.filter((row) => {
const status = row.getField(5); // Only parse field 5
return status === "active";
})
.map((row) => ({
id: row.getField(0), // Parse fields 0, 1, 2 on demand
name: row.getField(1),
email: row.getField(2),
// Fields 3, 4, 6+ never parsed
}))
.collect();
Avoid Unnecessary Conversions
// ✅ Efficient - work with LazyRow directly
const processRow = (row: LazyRow) => {
const name = row.getField(0);
const age = parseInt(row.getField(1));
return age >= 18 ? name : null;
};
// ❌ Less efficient - unnecessary conversion
const processRow = (row: LazyRow) => {
const fields = row.toStringArray(); // Converts all fields
const name = fields[0];
const age = parseInt(fields[1]);
return age >= 18 ? name : null;
};
Batch Conversions
// When you need all fields, convert once
const processAllFields = (row: LazyRow) => {
const fields = row.toStringArray(); // Convert once
return {
name: fields[0],
age: parseInt(fields[1]),
city: fields[2],
country: fields[3],
email: fields[4],
};
};
Real-World Examples
Log Analysis
// Analyze web server logs efficiently
let errorCount = 0;
let totalRequests = 0;
await read("access.log.tsv")
.transform(fromTsvToLazyRows())
.forEach((row) => {
totalRequests++;
const statusCode = row.getField(6); // Only parse status code
if (statusCode.startsWith("4") || statusCode.startsWith("5")) {
errorCount++;
// Only parse additional fields for errors
const timestamp = row.getField(0);
const path = row.getField(4);
const userAgent = row.getField(8);
console.error(`${timestamp}: ${statusCode} ${path} - ${userAgent}`);
}
});
console.log(`Error rate: ${(errorCount / totalRequests * 100).toFixed(2)}%`);
Data Validation
// Validate CSV data with detailed error reporting
const errors: string[] = [];
await read("users.csv")
.transform(fromCsvToLazyRows())
.drop(1) // Skip header
.forEach((row, index) => {
const rowNum = index + 2; // Account for header and 0-based index
// Validate required fields exist
if (row.columnCount < 4) {
errors.push(`Row ${rowNum}: Missing required fields`);
return;
}
// Validate email format (only parse if needed)
const email = row.getField(2);
if (!email.includes("@")) {
errors.push(`Row ${rowNum}: Invalid email format: ${email}`);
}
// Validate age (only parse if needed)
const ageStr = row.getField(1);
const age = parseInt(ageStr);
if (isNaN(age) || age < 0 || age > 150) {
errors.push(`Row ${rowNum}: Invalid age: ${ageStr}`);
}
});
if (errors.length > 0) {
console.error(`Validation failed with ${errors.length} errors:`);
errors.forEach((error) => console.error(` ${error}`));
}
Format Conversion with Filtering
// Convert CSV to JSON, filtering and transforming data
await read("products.csv")
.transform(fromCsvToLazyRows())
.drop(1) // Skip header
.filter((row) => {
const price = parseFloat(row.getField(3));
return price > 10.00; // Only expensive products
})
.map((row) => ({
id: row.getField(0),
name: row.getField(1),
category: row.getField(2),
price: parseFloat(row.getField(3)),
inStock: row.getField(4) === "true",
lastUpdated: new Date().toISOString(),
}))
.transform(toJson())
.writeTo("expensive-products.jsonl");
Streaming Aggregation
// Calculate statistics without loading all data
const stats = {
totalRows: 0,
totalSales: 0,
avgAge: 0,
ageSum: 0,
};
await read("sales-data.csv")
.transform(fromCsvToLazyRows())
.drop(1) // Skip header
.forEach((row) => {
stats.totalRows++;
// Only parse fields we need for calculations
const saleAmount = parseFloat(row.getField(4));
const customerAge = parseInt(row.getField(2));
stats.totalSales += saleAmount;
stats.ageSum += customerAge;
});
stats.avgAge = stats.ageSum / stats.totalRows;
console.log(`Processed ${stats.totalRows} sales`);
console.log(`Total revenue: $${stats.totalSales.toFixed(2)}`);
console.log(`Average customer age: ${stats.avgAge.toFixed(1)}`);
Error Handling
Field Access Errors
try {
const value = row.getField(10); // Index out of bounds
} catch (error) {
console.error(`Field access error: ${error.message}`);
}
// Safe field access
const safeGetField = (row: LazyRow, index: number): string | null => {
if (index >= 0 && index < row.columnCount) {
return row.getField(index);
}
return null;
};
Conversion Errors
try {
const binary = row.toBinary();
} catch (error) {
if (error.message.includes("UTF-8")) {
console.error("Invalid UTF-8 in field data");
} else {
console.error(`Binary conversion failed: ${error.message}`);
}
}
Best Practices
- Use selective field access - only parse fields you actually need
- Cache conversions - let LazyRow handle caching automatically
- Prefer LazyRow over string arrays for large datasets
- Validate field counts before accessing fields by index
- Handle encoding errors when working with binary data
- Use appropriate factory methods based on your data source
Integration Examples
With Other Transforms
// LazyRow → other formats
await read("data.csv")
.transform(fromCsvToLazyRows())
.map((row) => row.toStringArray()) // Convert when needed
.transform(toTsv())
.writeTo("data.tsv");
With Validation Libraries
import { z } from "zod";
const UserSchema = z.object({
name: z.string().min(1),
age: z.number().min(0).max(150),
email: z.string().email(),
});
await read("users.csv")
.transform(fromCsvToLazyRows())
.drop(1)
.map((row) => {
const user = {
name: row.getField(0),
age: parseInt(row.getField(1)),
email: row.getField(2),
};
return UserSchema.parse(user); // Validates and throws on error
})
.transform(toJson())
.writeTo("validated-users.jsonl");
Next Steps
- CSV Transforms - Using LazyRow with CSV data
- TSV Transforms - Using LazyRow with TSV data
- Performance Guide - Optimization strategies
- Record Format - Binary format for maximum speed
Format Selection Guide
Guidance for choosing the right data format for your use case.
⚠️ Experimental (v0.24.0+): Data transforms are under active development. API stability is not guaranteed as we improve correctness and streaming performance.
Quick Format Comparison
| Format | Best For | Notes |
|---|---|---|
| CSV | Universal compatibility | Use LazyRow for better speed |
| TSV | Balance of speed & readability | Simpler than CSV |
| JSON | Rich object structures | Best for small-medium datasets |
| Record | Maximum throughput | Internal processing only |
Choosing a Format
CSV - Universal Compatibility
Use when you need compatibility with Excel, legacy systems, or when human readability matters.
// Best practice: Use LazyRow with CSV
await read("data.csv")
.transform(fromCsvToLazyRows())
.filter((row) => row.getField(0).startsWith("A"))
.collect();
TSV - Simple and Fast
Use when you want a balance of speed and readability, and your data doesn’t contain tabs or newlines.
await read("data.tsv")
.transform(fromTsvToRows())
.filter((row) => row[0].startsWith("A"))
.collect();
JSON - Rich Structures
Use when you need full object structures, nested data, or arrays in fields.
await read("events.jsonl")
.transform(fromJsonToRows<EventData>())
.collect();
Record - Maximum Throughput
Use for internal processing when you need maximum throughput and don’t need human readability.
await read("data.record")
.transform(fromRecordToRows())
.map(processAllFields)
.collect();
Key Optimization Tips
1. Always Stream Large Files
// ✅ Good: Constant memory usage
await read("large-file.csv")
.transform(fromCsvToRows())
.filter((row) => row[0] === "target")
.writeTo("filtered.csv");
// ❌ Bad: Loads entire file into memory
const allData = await read("large-file.csv")
.transform(fromCsvToRows())
.collect();
2. Use LazyRow for Selective Field Access
Only parse the fields you actually need:
// Only parses fields 0 and 5
await read("wide-data.csv")
.transform(fromCsvToLazyRows())
.filter((row) => {
const id = row.getField(0);
const status = row.getField(5);
return id.startsWith("A") && status === "active";
})
.collect();
3. Filter Early in the Pipeline
// ✅ Good: Filter before expensive operations
await read("data.csv")
.transform(fromCsvToRows())
.filter((row) => row[0] === "target")
.map((row) => expensiveProcessing(row))
.collect();
4. Convert Formats for Repeated Processing
If you’re processing the same data multiple times, convert to a faster format first:
// One-time conversion
await read("data.csv")
.transform(fromCsvToRows())
.transform(toRecord())
.writeTo("data.record");
// Subsequent processing is faster
await read("data.record")
.transform(fromRecordToRows())
.filter((row) => row[1] === "target")
.collect();
See Also
- CSV Transforms - CSV parsing and generation
- TSV Transforms - TSV processing
- JSON Transforms - JSON Lines handling
- Record Format - High-performance format
- LazyRow Guide - Optimized field access
Concurrent Processing
Process multiple items in parallel with controlled concurrency. It’s easier than you think.
The Problem
You have a list of URLs to fetch. Sequential is slow:
// Takes 10 seconds for 10 URLs (1 second each)
for (const url of urls) {
await fetch(url);
}
Promise.all is fast but dangerous:
// Starts 1000 requests at once - might crash
await Promise.all(urls.map((url) => fetch(url)));
The Solution
proc gives you controlled concurrency:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
// Defaults to CPU count (usually 4-8)
const results = await enumerate(urls)
.concurrentMap(async (url) => {
return await fetch(url);
})
.collect();
// Or specify a limit
const results = await enumerate(urls)
.concurrentMap(async (url) => {
return await fetch(url);
}, { concurrency: 5 })
.collect();
Fast, but won’t overwhelm your system.
Understanding JavaScript Concurrency
Important: JavaScript concurrency is not parallelism. You’re running on a single thread.
What This Means
When you use concurrentMap or concurrentUnorderedMap, you’re not creating
threads or workers. You’re managing multiple async operations on one thread.
The JavaScript event loop switches between them when they’re waiting.
This works great for:
- Network requests - While waiting for a response, other operations run
- File I/O - While waiting for disk reads/writes, other operations run
- Process execution - While a child process runs, other operations continue
- Database queries - While waiting for results, other operations run
This does NOT work for:
- CPU-intensive calculations - Pure JavaScript math, parsing, etc. blocks everything
- Synchronous operations - Anything that doesn’t
awaitblocks the thread - Heavy computation - You still only have one CPU core’s worth of processing power
Example: What Works
// ✅ Good: I/O-bound operations run concurrently
const results = await enumerate(urls)
.concurrentUnorderedMap(async (url) => {
// While waiting for fetch, other URLs are being fetched
const response = await fetch(url);
return response.json();
})
.collect();
// This is genuinely faster - 10 URLs in ~1 second instead of ~10 seconds
Example: What Doesn’t Work
// ❌ Bad: CPU-bound operations don't benefit
const results = await enumerate(numbers)
.concurrentUnorderedMap(async (n) => {
// This blocks the thread - no other operations can run
let result = 0;
for (let i = 0; i < 1000000; i++) {
result += Math.sqrt(n * i);
}
return result;
})
.collect();
// This is NOT faster - still uses one CPU core sequentially
Why It Still Matters
Even though it’s not true parallelism, concurrency is incredibly useful:
- I/O operations dominate - Most real-world tasks are waiting for network/disk
- Child processes run in parallel - When you
run()a command, it uses a separate process - Better resource utilization - Keep the CPU busy while waiting for I/O
- Simpler than threads - No race conditions, no locks, no shared memory issues
When You Need True Parallelism
If you need to parallelize CPU-intensive JavaScript code, use:
- Web Workers (in browsers)
- Worker Threads (in Node.js/Deno)
- Child processes with
run()- each process gets its own CPU
But for most tasks (fetching URLs, processing files, running commands), JavaScript’s concurrency model is perfect.
When to Use Concurrent Processing
Use concurrentUnorderedMap() (recommended default) when:
- Order doesn’t matter - you want maximum speed
- Processing independent tasks where results can arrive in any order
- You’ll sort or aggregate results anyway
- This is usually what you want - it keeps all workers busy and delivers results as they complete
- Example: Downloading files, processing logs, fetching data you’ll aggregate
Use concurrentMap() when:
- You must have results in the same order as input
- Be aware: can bottleneck on the slowest item in each batch
- If work isn’t balanced, faster items wait for slower ones
- Example: Fetching user profiles where display order must match input order
Use sequential processing when:
- Tasks depend on each other
- You must respect strict rate limits
- Order is critical and tasks are fast
- Example: Database transactions that must happen in sequence
Choose concurrency level based on:
- I/O-bound tasks (network, disk): Start with 5-10, increase if you have bandwidth (see “Understanding JavaScript Concurrency” above)
- CPU-bound tasks: Won’t benefit from concurrency - use Worker Threads or child processes instead
- Rate-limited APIs: Match the rate limit (e.g., 10 requests/second = concurrency 1 with 100ms delays)
- Memory constraints: Lower concurrency if processing large data per task
concurrentUnorderedMap() - Recommended
Process items concurrently, return results as they complete (fastest):
// Defaults to CPU count
const results = await enumerate([1, 2, 3, 4, 5])
.concurrentUnorderedMap(async (n) => {
await sleep(Math.random() * 1000);
return n * 2;
})
.collect();
// [6, 2, 10, 4, 8] - order varies, but all workers stay busy
Why it’s faster: Results are delivered as soon as they’re ready. If item 3 finishes before item 1, you get item 3 immediately. No waiting for slower items.
Use when: You don’t care about order (most cases). Better performance under real-world conditions where work isn’t perfectly balanced.
Concurrency: Defaults to navigator.hardwareConcurrency (CPU count).
Override with { concurrency: n } if needed.
concurrentMap() - Order Preserved
Process items concurrently, return results in input order:
const results = await enumerate([1, 2, 3, 4, 5])
.concurrentMap(async (n) => {
await sleep(Math.random() * 1000);
return n * 2;
}, { concurrency: 3 })
.collect();
// [2, 4, 6, 8, 10] - always in order
Performance caveat: If item 1 takes 5 seconds and item 2 takes 1 second, item 2 waits for item 1 before being returned. This can create bottlenecks where fast items wait for slow ones.
Use when: You specifically need results in the same order as input. Only use if order truly matters for your use case.
Concurrency: Defaults to CPU count. Override with { concurrency: n } if
needed.
Real-World Examples
Fetch Multiple URLs
const urls = [
"https://api.example.com/1",
"https://api.example.com/2",
"https://api.example.com/3",
// ... 100 more
];
// Uses CPU count by default
const data = await enumerate(urls)
.concurrentUnorderedMap(async (url) => {
const response = await fetch(url);
return response.json();
})
.collect();
// Or limit for rate-limited APIs
const data = await enumerate(urls)
.concurrentUnorderedMap(async (url) => {
const response = await fetch(url);
return response.json();
}, { concurrency: 10 })
.collect();
Process Files in Parallel
import { read } from "jsr:@j50n/proc@0.24.6";
const files = ["log1.txt", "log2.txt", "log3.txt"];
const results = await enumerate(files)
.concurrentMap(async (file) => {
const errors = await read(file)
.lines
.filter((line) => line.includes("ERROR"))
.count();
return { file, errors };
})
.collect();
Download and Process
const downloads = await enumerate(imageUrls)
.concurrentUnorderedMap(async (url) => {
const response = await fetch(url);
const blob = await response.blob();
return processImage(blob);
})
.collect();
Choosing Concurrency
Default behavior: Both methods default to navigator.hardwareConcurrency
(CPU count, typically 4-8). This is usually a good starting point.
When to override:
For I/O-bound tasks (network, disk):
- Default is often fine
- Increase to 10-20 if you have bandwidth and no rate limits
- Decrease to 1-5 for rate-limited APIs
For CPU-bound tasks:
- Default (CPU count) is optimal
- Don’t increase - you’ll just add overhead
For rate-limited APIs:
- Set to match the rate limit
- Add delays if needed
// Respect rate limits with low concurrency
const results = await enumerate(apiCalls)
.concurrentUnorderedMap(async (call) => {
const result = await makeApiCall(call);
await sleep(100); // 10 requests per second
return result;
}, { concurrency: 1 })
.collect();
Error Handling
Errors propagate naturally:
try {
const results = await enumerate(urls)
.concurrentMap(async (url) => {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Failed: ${url}`);
}
return response.json();
}, { concurrency: 5 })
.collect();
} catch (error) {
// First error stops everything
console.error(`Failed: ${error.message}`);
}
Progress Tracking
Track progress as items complete:
let completed = 0;
const total = urls.length;
const results = await enumerate(urls)
.concurrentMap(async (url) => {
const result = await fetch(url);
completed++;
console.log(`Progress: ${completed}/${total}`);
return result;
}, { concurrency: 5 })
.collect();
Combining with Other Operations
Chain concurrent operations with other methods:
const results = await enumerate(urls)
.concurrentMap(fetch, { concurrency: 5 })
.filter((response) => response.ok)
.concurrentMap((response) => response.json(), { concurrency: 5 })
.filter((data) => data.active)
.collect();
Performance Comparison
// Sequential: 10 seconds (one at a time)
for (const url of urls) {
await fetch(url);
}
// concurrentMap (5): 2-4 seconds
// Can bottleneck if one item is slow - others wait
await enumerate(urls)
.concurrentMap(fetch, { concurrency: 5 })
.collect();
// concurrentUnorderedMap (5): 2 seconds
// Faster - no waiting, results delivered as ready
await enumerate(urls)
.concurrentUnorderedMap(fetch, { concurrency: 5 })
.collect();
Why unordered is faster: Imagine 5 tasks with times [1s, 1s, 1s, 1s, 5s].
With concurrentMap, the 5-second task blocks its batch. With
concurrentUnorderedMap, the four 1-second tasks complete and return
immediately while the 5-second task finishes in the background.
Advanced Patterns
Batch Processing
Process in batches:
const batchSize = 10;
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const results = await enumerate(batch)
.concurrentMap(process, { concurrency: 5 })
.collect();
await saveBatch(results);
}
Retry Failed Items
const results = await enumerate(urls)
.concurrentMap(async (url) => {
let attempts = 0;
while (attempts < 3) {
try {
return await fetch(url);
} catch (error) {
attempts++;
if (attempts === 3) throw error;
await sleep(1000 * attempts);
}
}
}, { concurrency: 5 })
.collect();
Dynamic Concurrency
Adjust concurrency based on results:
let concurrency = 5;
for (const batch of batches) {
const start = Date.now();
const results = await enumerate(batch)
.concurrentMap(process, { concurrency })
.collect();
const duration = Date.now() - start;
// Adjust based on performance
if (duration < 1000) concurrency = Math.min(concurrency + 1, 20);
if (duration > 5000) concurrency = Math.max(concurrency - 1, 1);
}
Best Practices
- Prefer unordered - Use
concurrentUnorderedMapunless you specifically need order - Start conservative - Begin with low concurrency, increase if needed
- Monitor resources - Watch memory and network usage
- Respect rate limits - Don’t overwhelm external services
- Handle errors - One error stops everything, handle gracefully
- Understand the bottleneck -
concurrentMapcan wait on slow items; unordered doesn’t
Common Mistakes
Too Much Concurrency
// ❌ Might crash with 10,000 concurrent requests
await enumerate(hugeList)
.concurrentMap(fetch, { concurrency: 10000 })
.collect();
// ✅ Reasonable concurrency
await enumerate(hugeList)
.concurrentMap(fetch, { concurrency: 10 })
.collect();
Forgetting to Await
// ❌ Returns promises, not results
const promises = enumerate(urls)
.concurrentMap(fetch, { concurrency: 5 });
// ✅ Await the results
const results = await enumerate(urls)
.concurrentMap(fetch, { concurrency: 5 })
.collect();
Next Steps
- Streaming Large Files - Handle huge files efficiently
- Performance Optimization - Make it faster
- Parallel Downloads - Real-world example
Streaming Large Files
Process files bigger than your RAM. It’s easier than you think.
When to Stream vs Collect
Always stream when:
- File is larger than available RAM (or even close to it)
- You don’t need all data at once
- Processing can be done incrementally (line-by-line, record-by-record)
- You want to start processing immediately without waiting for full download/read
- Memory efficiency is important
Consider collecting when:
- File is small (< 100MB) and fits comfortably in memory
- You need random access to data
- You need to process data multiple times
- You need to sort or aggregate all data before processing
- Memory is not a concern
Memory/Speed Tradeoffs:
- Streaming: Constant memory (~64KB buffer), processes as data arrives, can’t random access
- Collecting: Memory = file size, all data available immediately, can random access, must wait for full load
Example decision:
// 10GB log file - MUST stream
for await (const line of read("huge.log").lines) {
if (line.includes("ERROR")) console.log(line);
}
// 1MB config file - can collect
const config = await read("config.json").lines.collect();
const parsed = JSON.parse(config.join("\n"));
// 500MB data file - stream if processing once
const sum = await read("numbers.txt")
.lines
.map((line) => parseFloat(line))
.reduce((a, b) => a + b, 0);
The Problem
You have a 10GB log file. Loading it into memory crashes your program:
// ❌ Crashes with large files
const content = await Deno.readTextFile("huge.log");
const lines = content.split("\n");
The Solution
Stream it, one line at a time:
import { read } from "jsr:@j50n/proc@0.24.6";
// ✅ Constant memory, any file size
for await (const line of read("huge.log").lines) {
if (line.includes("ERROR")) {
console.log(line);
}
}
How Streaming Works
Instead of loading everything:
- Read a chunk (buffer)
- Process it
- Discard it
- Repeat
Memory usage stays constant, no matter how big the file.
Real Examples
Count Lines in Huge File
const count = await read("10gb-file.txt").lines.count();
console.log(`${count} lines`);
Uses ~constant memory, even for 10GB.
Find Pattern in Large File
const matches = await read("huge.log")
.lines
.filter((line) => line.includes("ERROR"))
.take(10) // Stop after 10 matches
.collect();
Stops reading once it finds 10 matches.
Process CSV File
const data = await read("huge-data.csv")
.lines
.drop(1) // Skip header
.map((line) => {
const [id, name, value] = line.split(",");
return { id, name, value: parseFloat(value) };
})
.filter((row) => row.value > 100)
.collect();
Aggregate Large Dataset
const sum = await read("numbers.txt")
.lines
.map((line) => parseFloat(line))
.reduce((acc, n) => acc + n, 0);
Compressed Files
Stream compressed files without extracting:
const lineCount = await read("huge.log.gz")
.transform(new DecompressionStream("gzip"))
.lines
.count();
Decompresses on-the-fly, never stores uncompressed data.
Multiple Files
Process multiple large files:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const files = ["log1.txt", "log2.txt", "log3.txt"];
for (const file of files) {
const errors = await read(file)
.lines
.filter((line) => line.includes("ERROR"))
.count();
console.log(`${file}: ${errors} errors`);
}
Streaming Transformations
Chain transformations, all streaming:
const result = await read("data.txt")
.lines
.map((line) => line.trim())
.filter((line) => line.length > 0)
.map((line) => line.toUpperCase())
.filter((line) => line.startsWith("ERROR"))
.collect();
Each line flows through all transformations before the next line is read.
Writing Large Files
Stream output to a file:
import { concat } from "jsr:@j50n/proc@0.24.6";
const processed = await read("input.txt")
.lines
.map((line) => line.toUpperCase())
.map((line) => new TextEncoder().encode(line + "\n"))
.collect();
await Deno.writeFile("output.txt", concat(processed));
Performance Tips
Use take() for Early Exit
// Stops reading after 100 matches
const first100 = await read("huge.txt")
.lines
.filter(predicate)
.take(100)
.collect();
Don’t Collect Unless Needed
// ❌ Loads everything into memory
const lines = await read("huge.txt").lines.collect();
for (const line of lines) process(line);
// ✅ Streams
for await (const line of read("huge.txt").lines) {
process(line);
}
Use Concurrent Processing
Process multiple files in parallel:
const results = await enumerate(files)
.concurrentMap(async (file) => {
return await read(file).lines.count();
}, { concurrency: 3 })
.collect();
Memory Usage
Streaming uses constant memory:
// File size: 10GB
// Memory used: ~64KB (buffer size)
await read("10gb-file.txt")
.lines
.forEach((line) => process(line));
Real-World Example
Analyze a year of logs:
const errorsByDay = await read("year-of-logs.txt")
.lines
.filter((line) => line.includes("ERROR"))
.map((line) => {
const date = line.match(/\d{4}-\d{2}-\d{2}/)?.[0];
return date;
})
.filter((date) => date !== null)
.reduce((acc, date) => {
acc[date] = (acc[date] || 0) + 1;
return acc;
}, {});
// Show top 10 error days
Object.entries(errorsByDay)
.sort((a, b) => b[1] - a[1])
.slice(0, 10)
.forEach(([date, count]) => {
console.log(`${date}: ${count} errors`);
});
Processes gigabytes of logs with minimal memory.
When to Stream
Always stream when:
- File is larger than available RAM
- You don’t need all data at once
- Processing can be done incrementally
- You want to start processing immediately
Consider collecting when:
- File is small (< 100MB)
- You need random access
- You need to process data multiple times
- Memory is not a concern
Common Patterns
Filter and Count
const count = await read("file.txt")
.lines
.filter(predicate)
.count();
Transform and Save
const output = await read("input.txt")
.lines
.map(transform)
.map((line) => new TextEncoder().encode(line + "\n"))
.collect();
await Deno.writeFile("output.txt", concat(output));
Aggregate Data
const stats = await read("data.txt")
.lines
.reduce((acc, line) => {
// Update statistics
return acc;
}, initialStats);
Next Steps
- Concurrent Processing - Process multiple files in parallel
- Performance Optimization - Make it faster
- Decompressing Files - Work with compressed files
Custom Transformations
Build powerful data transformations with async generators—the readable, maintainable way.
Why Async Generators?
When you need custom transformations beyond map() and filter(), you have two
choices: async generators or TransformStream. Async generators are almost
always better.
Compare these approaches for parsing JSON lines:
Async Generator:
async function* parseJsonLines(lines: AsyncIterable<string>) {
for await (const line of lines) {
try {
const obj = JSON.parse(line.trim());
if (obj.id && obj.timestamp) {
yield obj;
}
} catch {
// Skip invalid JSON
}
}
}
TransformStream:
const parseJsonLines = new TransformStream({
transform(chunk, controller) {
try {
const obj = JSON.parse(chunk.trim());
if (obj.id && obj.timestamp) {
controller.enqueue(obj);
}
} catch {
// Error handling is more complex
}
},
});
The generator reads like the logic you’re implementing. The stream forces you into callbacks.
Batching Data
Group items into fixed-size chunks:
import { enumerate } from "jsr:@j50n/proc";
async function* batch<T>(items: AsyncIterable<T>, size: number) {
let batch: T[] = [];
for await (const item of items) {
batch.push(item);
if (batch.length === size) {
yield batch;
batch = [];
}
}
if (batch.length > 0) yield batch;
}
const batches = await enumerate([1, 2, 3, 4, 5, 6, 7])
.transform((items) => batch(items, 3))
.collect();
console.log(batches); // [[1, 2, 3], [4, 5, 6], [7]]
Stateful Processing
Keep running calculations as data flows:
async function* runningAverage(numbers: AsyncIterable<number>) {
let sum = 0;
let count = 0;
for await (const num of numbers) {
sum += num;
count++;
yield sum / count;
}
}
const averages = await enumerate([10, 20, 30, 40])
.transform(runningAverage)
.collect();
console.log(averages); // [10, 15, 20, 25]
State variables live naturally in the function scope—no external state management needed.
Parsing with Error Recovery
Handle complex parsing gracefully:
interface LogEntry {
id: string;
timestamp: string;
level: string;
message: string;
}
async function* parseJsonLines(lines: AsyncIterable<string>) {
for await (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const obj = JSON.parse(trimmed);
if (obj.id && obj.timestamp && obj.level && obj.message) {
yield obj as LogEntry;
}
} catch {
// Skip invalid JSON, could log errors here
}
}
}
const logs = await enumerate([
'{"id":"1","timestamp":"2024-01-01","level":"info","message":"Started"}',
"invalid json line",
'{"id":"2","timestamp":"2024-01-01","level":"error","message":"Failed"}',
"",
]).transform(parseJsonLines).collect();
console.log(logs.length); // 2 (invalid lines skipped)
Rate Limiting
Control timing between items:
import { enumerate, sleep } from "jsr:@j50n/proc";
async function* throttle<T>(items: AsyncIterable<T>, delayMs: number) {
let first = true;
for await (const item of items) {
if (!first) {
await sleep(delayMs);
}
first = false;
yield item;
}
}
// Rate-limit API calls
const results = await enumerate(["url1", "url2", "url3"])
.transform((urls) => throttle(urls, 1000))
.map(async (url) => {
const response = await fetch(url);
return response.status;
})
.collect();
Multi-Stage Processing
Combine filtering, enrichment, and transformation:
async function* processLogEntries(lines: AsyncIterable<string>) {
for await (const line of lines) {
try {
const entry = JSON.parse(line);
if (entry.level !== "error") continue;
const enriched = {
...entry,
processedAt: new Date().toISOString(),
severity: entry.message.toLowerCase().includes("critical")
? "high"
: "medium",
};
yield {
timestamp: enriched.timestamp,
severity: enriched.severity,
summary: enriched.message.substring(0, 100),
};
} catch {
// Skip invalid entries
}
}
}
const processed = await enumerate([
'{"level":"info","message":"System started","timestamp":"2024-01-01T10:00:00Z"}',
'{"level":"error","message":"Critical database failure","timestamp":"2024-01-01T10:01:00Z"}',
'{"level":"error","message":"Minor timeout","timestamp":"2024-01-01T10:02:00Z"}',
]).transform(processLogEntries).collect();
console.log(processed.length); // 2 (only error entries)
Generator vs TransformStream
The same batching logic, both ways:
Generator (8 lines):
async function* batch<T>(items: AsyncIterable<T>, size: number) {
let batch: T[] = [];
for await (const item of items) {
batch.push(item);
if (batch.length === size) {
yield batch;
batch = [];
}
}
if (batch.length > 0) yield batch;
}
TransformStream (15+ lines):
function createBatchTransform<T>(size: number) {
let batch: T[] = [];
return new TransformStream<T, T[]>({
transform(chunk, controller) {
batch.push(chunk);
if (batch.length === size) {
controller.enqueue([...batch]);
batch = [];
}
},
flush(controller) {
if (batch.length > 0) {
controller.enqueue(batch);
}
},
});
}
Generators are shorter, more readable, easier to debug, and handle errors naturally with try-catch.
When to Use Each
Use Async Generators for:
- Complex state management (faster + easier)
- Error handling and recovery
- Multi-stage processing
- Readable, maintainable code
- Most custom transformations
Use TransformStream for:
- Simple 1:1 transformations (much faster)
- Built-in streams (
CompressionStream,DecompressionStream) - Interfacing with existing stream APIs
In practice:
// Built-in streams - use directly
.transform(new CompressionStream("gzip"))
// Custom logic - use generators
.transform(items => batch(items, 100))
.transform(parseJsonLines)
Best Practices
// Good: Clear, focused, well-typed
async function* parseAndValidateUsers(
lines: AsyncIterable<string>,
): AsyncGenerator<User> {
for await (const line of lines) {
try {
const user = JSON.parse(line) as User;
if (isValidUser(user)) {
yield user;
}
} catch (error) {
console.warn(`Skipping invalid user data: ${error.message}`);
}
}
}
- Keep generators focused - One responsibility per function
- Handle errors gracefully - Use try-catch for parsing/validation
- Yield frequently - Don’t buffer unnecessarily
- Use meaningful names -
parseJsonLinesnottransform1 - Add type annotations - Help TypeScript help you
Performance
We ran comprehensive benchmarks comparing async generators vs TransformStream across different scenarios:
TransformStream excels at simple operations:
- Small datasets: Similar performance
- Large datasets: Up to 810x faster for simple transformations
- JSON parsing: Up to 150x faster
- Best for: Simple 1:1 transformations, especially with large data
Async generators excel at complex operations:
- Stateful processing: 4-6x faster (batching, running totals)
- Error handling: 3-4x faster with try-catch
- Multi-stage logic: 4x faster for complex processing
- Best for: State management, error recovery, complex logic
Recommendation:
- Use TransformStream for simple operations on large datasets
- Use async generators for complex logic, state management, or when readability matters
- For most real-world transformations (parsing, validation, multi-step processing), generators are both faster and easier to write
Start with these patterns and build more sophisticated processing pipelines as needed.
Performance Characteristics
Understanding async iteration performance and how proc optimizes for real-world usage.
The Async Iteration Overhead
Async generators create promises per iteration, which adds overhead for simple operations. This is built into the JavaScript language and V8 engine, not a limitation of this library:
// Benchmark: 10,000 items
async function* double(items: AsyncIterable<number>) {
for await (const item of items) {
yield item * 2;
}
}
// Results:
// Async generator: 2.7ms
// TransformStream: 40µs (67x faster)
// Raw array iteration: 39µs
The overhead grows with data size - at 100,000 items, the gap becomes 810x.
This is fundamental to how for await loops work in JavaScript, not an
implementation issue.
Note: JavaScript engines continue to optimize async iteration. These performance characteristics may improve in future V8 versions.
The V8 Optimization Cliff
TransformStream performance has a dramatic cliff - it looks extremely fast in microbenchmarks but can become orders of magnitude slower with minimal added complexity.
Simple operations get heavily optimized:
// V8 can inline this entire pipeline
const doubleStream = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk * 2); // Becomes nearly native code
},
});
// Result: 750x faster than generators
Adding any complexity breaks optimization:
// Closure state prevents V8 inlining
function createRunningTotalStream() {
let total = 0; // This breaks optimization
return new TransformStream({
transform(chunk, controller) {
total += chunk; // Closure access overhead
controller.enqueue(total); // Can't optimize with state
},
});
}
// Result: 6x slower than generators
Why this happens:
- V8 aggressively optimizes trivial TransformStream operations into native code paths
- Any closure variables, state dependencies, or complex logic breaks these optimizations
- Once optimization fails, the callback overhead makes TransformStream much slower
- Generators have consistent overhead regardless of complexity
Practical implication: TransformStream microbenchmarks are misleading - they show best-case performance that disappears in real-world usage.
The bottom line: JavaScript performance is notoriously difficult to predict. V8’s aggressive optimizations can make simple code incredibly fast, but these optimizations are fragile and break easily. If performance truly matters for your use case, profile your actual workload rather than relying on synthetic benchmarks.
The Chunking Solution
proc uses chunking to amortize async iteration costs by processing multiple items per iteration:
// Instead of: 1 promise per line (expensive)
async function* inefficientLines(bytes: AsyncIterable<Uint8Array>) {
for await (const chunk of bytes) {
for (const line of decode(chunk).split("\n")) {
yield line; // 1000 lines = 1000 promises
}
}
}
// proc does: 1 promise per chunk of lines (efficient)
export async function* toChunkedLines(bytes: AsyncIterable<Uint8Array>) {
for await (const chunk of bytes) {
const lines = decode(chunk).split("\n");
if (lines.length > 0) {
yield lines; // 1000 lines in 10 chunks = 10 promises
}
}
}
// Then flatten efficiently
async function* toLines(bytes: AsyncIterable<Uint8Array>) {
for await (const lines of toChunkedLines(bytes)) {
yield* lines; // Sync iteration within async iteration
}
}
Result: 10x performance improvement for line processing.
When Complexity Flips Performance
For complex operations, async generators become faster because TransformStream’s callback model creates overhead:
// Complex stateful processing
async function* processLogs(lines: AsyncIterable<string>) {
let errorCount = 0;
for await (const line of lines) {
try {
const entry = JSON.parse(line);
if (entry.level === "error") {
errorCount++;
yield {
...entry,
errorNumber: errorCount,
severity: entry.message.includes("critical") ? "high" : "medium",
};
}
} catch {
errorCount++;
}
}
}
// Benchmark results (10k items):
// Async generator: 3.1ms
// TransformStream: 13.8ms (4x slower)
Practical Implications
Async generators win for real-world use cases:
- Parsing and validation (where most errors occur)
- Multi-step transformations
- State management across items
- Error handling and recovery
The chunking strategy makes overhead negligible for typical data processing workloads.
TransformStream is available for the rare cases where simple, high-volume transformations need maximum performance, but the complexity trade-off usually isn’t worth it.
File I/O
Read and write files with streaming efficiency.
Reading Files
Read as Bytes
import { read } from "jsr:@j50n/proc@0.24.6";
const bytes = await read("file.bin").collect();
// Uint8Array[]
Read as Lines
const lines = await read("file.txt").lines.collect();
// string[]
Stream Large Files
Process files larger than memory:
for await (const line of read("huge-file.txt").lines) {
process(line); // One line at a time
}
File Paths
Relative Paths
read("data.txt"); // Relative to current directory
Absolute Paths
read("/var/log/app.log");
URLs
const path = new URL("./data.txt", import.meta.url);
read(path);
Common Patterns
Count Lines
const lineCount = await read("file.txt").lines.count();
Find Pattern
const matches = await read("file.txt")
.lines
.filter((line) => line.includes("ERROR"))
.collect();
Transform Lines
const processed = await read("input.txt")
.lines
.map((line) => line.toUpperCase())
.collect();
Parse CSV
const data = await read("data.csv")
.lines
.drop(1) // Skip header
.map((line) => {
const [name, age, city] = line.split(",");
return { name, age: parseInt(age), city };
})
.collect();
Writing Files
Write Array to File
const lines = ["line 1", "line 2", "line 3"];
const content = lines.join("\n");
await Deno.writeTextFile("output.txt", content);
Stream to File
import { concat } from "jsr:@j50n/proc@0.24.6";
const bytes = await read("input.txt")
.lines
.map((line) => new TextEncoder().encode(line + "\n"))
.collect();
await Deno.writeFile("output.txt", concat(bytes));
Working with Binary Files
Read Binary
const bytes = await read("image.png").collect();
const data = concat(bytes);
Process Binary
const processed = await read("data.bin")
.map((chunk) => {
// Process each chunk
return transform(chunk);
})
.collect();
Compressed Files
Read Compressed
const lines = await read("file.gz")
.transform(new DecompressionStream("gzip"))
.lines
.collect();
See Decompressing Files for more.
Multiple Files
Process Multiple Files
const files = ["log1.txt", "log2.txt", "log3.txt"];
for (const file of files) {
const errors = await read(file)
.lines
.filter((line) => line.includes("ERROR"))
.count();
console.log(`${file}: ${errors} errors`);
}
Concurrent Processing
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const results = await enumerate(files)
.concurrentMap(async (file) => {
const lines = await read(file).lines.count();
return { file, lines };
}, { concurrency: 3 })
.collect();
Error Handling
File Not Found
try {
const lines = await read("missing.txt").lines.collect();
} catch (error) {
console.error(`Failed to read file: ${error.message}`);
}
Permission Denied
try {
const lines = await read("/root/secret.txt").lines.collect();
} catch (error) {
if (error instanceof Deno.errors.PermissionDenied) {
console.error("Permission denied");
}
}
Performance Tips
Stream Don’t Collect
// ❌ Loads entire file into memory
const lines = await read("huge.txt").lines.collect();
// ✅ Processes one line at a time
for await (const line of read("huge.txt").lines) {
process(line);
}
Use Chunked Lines for Performance
For files with many small lines:
const chunks = await read("file.txt").chunkedLines.collect();
// Array of string arrays
Real-World Examples
Log Analysis
const errorsByType = await read("app.log")
.lines
.filter((line) => line.includes("ERROR"))
.reduce((acc, line) => {
const type = line.match(/ERROR: (\w+)/)?.[1] || "unknown";
acc[type] = (acc[type] || 0) + 1;
return acc;
}, {});
Data Extraction
const emails = await read("contacts.txt")
.lines
.map((line) => line.match(/[\w.-]+@[\w.-]+\.\w+/))
.filter((match) => match !== null)
.map((match) => match[0])
.collect();
File Conversion
const jsonLines = await read("data.csv")
.lines
.drop(1) // Skip header
.map((line) => {
const [name, age] = line.split(",");
return JSON.stringify({ name, age: parseInt(age) });
})
.collect();
await Deno.writeTextFile("data.jsonl", jsonLines.join("\n"));
Next Steps
- Decompressing Files - Work with compressed files
- Streaming Large Files - Handle huge files
- Log Processing - Analyze logs
flatdata CLI
flatdata is a high-performance command-line utility for converting between tabular data formats. It’s distributed as part of proc and uses WebAssembly for near-native parsing speed.
Installation
# Install globally with required permissions
deno install -g --allow-read --allow-write -n flatdata jsr:@j50n/proc@0.24.6/flatdata
This installs flatdata globally, making it available from any terminal.
For pipeline-only use (stdin/stdout), you can install without file permissions:
deno install -g -n flatdata jsr:@j50n/proc@0.24.6/flatdata
This restricts flatdata to streaming mode—no direct file reading or writing.
To verify the installation:
flatdata --help
flatdata --version
Why flatdata?
CSV parsing is CPU-intensive. When processing large files, the parsing step can become a bottleneck. flatdata solves this by:
- Offloading parsing to a separate process - Your main application stays responsive
- Using WASM for speed - ~7x faster than pure JavaScript, about half native speed
- Streaming design - Handles files of any size with constant memory
The key insight: by converting CSV to a simple binary format (record), downstream processing becomes trivial string splits instead of complex CSV parsing.
Formats
| Format | Description | Use Case |
|---|---|---|
| csv | RFC 4180 comma-separated values | Standard interchange |
| tsv | Tab-separated values | Simple data, no quoting |
| record | Binary: \x1F field, \x1E record | Fast processing |
| lazyrow | Binary with length-prefixed fields | Efficient random field access |
The record format is the key to performance. It uses ASCII control characters that never appear in text data:
\x1F(Unit Separator) between fields\x1E(Record Separator) between rows
This makes parsing trivial: row.split('\x1F') gives you fields instantly.
Basic Usage
# Convert CSV to record format
cat data.csv | flatdata csv2record > data.rec
# Convert back to CSV
flatdata record2csv < data.rec > output.csv
# Full pipeline
cat huge.csv | flatdata csv2record | ./process | flatdata record2csv > results.csv
Commands
Direct Conversions
flatdata csv2tsv [options] # CSV → TSV
flatdata tsv2csv [options] # TSV → CSV
Options:
-d, --separator <char>- CSV field separator (default:,)-q, --quote-all- Quote all fields in output-i, --input <file>- Input file (default: stdin)-o, --output <file>- Output file (default: stdout)
CSV/TSV Input
flatdata csv2record [options] # CSV → record
flatdata csv2lazyrow [options] # CSV → lazyrow
flatdata tsv2record [options] # TSV → record
flatdata tsv2lazyrow [options] # TSV → lazyrow
Options:
-d, --separator <char>- Field separator (CSV only, default:,)-c, --columns <n>- Expected column count (fail if mismatch)-s, --strict- Fail on parse errors-i, --input <file>- Input file (default: stdin)-o, --output <file>- Output file (default: stdout)
Record/Lazyrow Output
flatdata record2csv [options] # record → CSV
flatdata record2tsv [options] # record → TSV
flatdata lazyrow2csv [options] # lazyrow → CSV
flatdata lazyrow2tsv [options] # lazyrow → TSV
Options:
-d, --separator <char>- Field separator (CSV only, default:,)-q, --quote-all- Quote all fields-i, --input <file>- Input file (default: stdin)-o, --output <file>- Output file (default: stdout)
Record ↔ Lazyrow Conversion
flatdata record2lazyrow [options] # record → lazyrow
flatdata lazyrow2record [options] # lazyrow → record
Options:
-i, --input <file>- Input file (default: stdin)-o, --output <file>- Output file (default: stdout)
Using with proc
The real power comes from combining flatdata with proc’s pipeline capabilities.
Direct Format Conversion
# Convert CSV to TSV
cat data.csv | flatdata csv2tsv > data.tsv
# Convert TSV to CSV
flatdata tsv2csv -i data.tsv -o data.csv
# European CSV (semicolon) to TSV
flatdata csv2tsv -d ';' -i euro.csv -o data.tsv
Basic Pipeline
import { enumerate, run } from "jsr:@j50n/proc";
// Parse CSV in a subprocess, process records in JS
const results = await run("flatdata", "csv2record")
.writeToStdin(csvData)
.lines
.map((record) => record.split("\x1F")) // Split into fields
.filter((fields) => fields[2] === "active")
.map((fields) => ({ id: fields[0], name: fields[1] }))
.collect();
Processing Large Files
import { read, run } from "jsr:@j50n/proc";
// Stream a large CSV through flatdata
await read("huge.csv")
.run("flatdata", "csv2record")
.lines
.map((record) => {
const fields = record.split("\x1F");
return processRow(fields);
})
.forEach((result) => console.log(result));
With enumerate for Indexing
import { enumerate, run } from "jsr:@j50n/proc";
// Number each row
await run("cat", "data.csv")
.run("flatdata", "csv2record")
.lines
.enum()
.map(([record, index]) => {
const fields = record.split("\x1F");
return `${index + 1}: ${fields[0]}`;
})
.toStdout();
Bidirectional Pipeline
import { run } from "jsr:@j50n/proc";
// CSV → process → CSV
const output = await run("flatdata", "csv2record", "-i", "input.csv")
.lines
.map((record) => {
const fields = record.split("\x1F");
fields[1] = fields[1].toUpperCase(); // Transform field
return fields.join("\x1F");
})
.run("flatdata", "record2csv")
.lines
.collect();
Transforms for Record Format
proc provides transforms to convert between the binary record format and JavaScript objects.
fromRecordToRows
Convert record-delimited bytes to string arrays:
import { run } from "jsr:@j50n/proc";
import { fromRecordToRows } from "jsr:@j50n/proc/transforms";
await run("flatdata", "csv2record", "-i", "data.csv")
.transform(fromRecordToRows())
.flatten()
.filter((row) => row[2] === "active")
.forEach((row) => console.log(row[0], row[1]));
fromRecordToLazyRows
Convert record-delimited bytes to LazyRow objects (more efficient for wide rows):
import { run } from "jsr:@j50n/proc";
import { fromRecordToLazyRows } from "jsr:@j50n/proc/transforms";
await run("flatdata", "csv2record", "-i", "wide.csv")
.transform(fromRecordToLazyRows())
.flatten()
.filter((row) => row.getField(0) === "active")
.forEach((row) => console.log(row.getField(1), row.getField(5)));
fromLazyRowBinary
Convert binary lazyrow format (from csv2lazyrow) to LazyRow objects:
import { run } from "jsr:@j50n/proc";
import { fromLazyRowBinary } from "jsr:@j50n/proc/transforms";
await run("flatdata", "csv2lazyrow", "-i", "wide.csv")
.transform(fromLazyRowBinary())
.flatten()
.filter((row) => row.getField(2) === "error")
.forEach((row) => console.log(`${row.getField(0)}: ${row.getField(3)}`));
toRecord
Convert row data to record format for piping to flatdata:
import { run } from "jsr:@j50n/proc";
import { fromRecordToRows, toRecord } from "jsr:@j50n/proc/transforms";
// Transform CSV: uppercase the second field
await run("flatdata", "csv2record", "-i", "input.csv")
.transform(fromRecordToRows())
.flatten()
.map((row) => [row[0], row[1].toUpperCase(), row[2]])
.transform(toRecord())
.run("flatdata", "record2csv")
.toStdout();
LazyRow for Memory Efficiency
LazyRow defers field parsing until accessed - ideal when you only need a few fields from wide rows:
import { run } from "jsr:@j50n/proc";
import { fromLazyRowBinary } from "jsr:@j50n/proc/transforms";
await run("flatdata", "csv2lazyrow", "-i", "huge.csv")
.transform(fromLazyRowBinary())
.flatten()
.filter((row) => row.columnCount > 5) // O(1) column count
.map((row) => row.getField(0)) // Only parse field 0
.take(100)
.toStdout();
LazyRow methods:
columnCount- Number of fields (O(1), no parsing)getField(n)- Get nth field as string (parses on demand)toArray()- Get all fields as string[]
European CSV (Semicolon-Delimited)
# Convert European CSV to US CSV
flatdata csv2record -d ';' -i euro.csv | flatdata record2csv -o us.csv
Validation
# Fail if any row doesn't have exactly 10 columns
flatdata csv2record --columns 10 --strict -i data.csv > /dev/null
Tips
- Pipe through flatdata to offload CPU work from your main process
- Use record format for intermediate processing - it’s trivial to parse
- LazyRow when you only need a few fields from wide rows
- Validate early with
--columnsand--strictto catch data issues
Architecture
flatdata uses a custom RFC 4180 CSV parser written in Odin and compiled to WebAssembly. We needed a push parser—one that accepts arbitrary chunks of input and tracks state across calls—because WASM modules can’t pull data from JavaScript. Odin’s standard library CSV parser is a pull parser that expects to read from a file or complete buffer.
Range and Iteration
Generate sequences of numbers lazily, providing a powerful foundation for data generation and iteration patterns.
Creating Basic Ranges
The simplest range generates numbers from 0 up to (but not including) a specified limit:
import { range } from "jsr:@j50n/proc@0.24.6";
const numbers = await range({ to: 5 }).collect();
// [0, 1, 2, 3, 4]
Understanding Exclusive vs Inclusive Bounds
Range provides two ways to specify the upper bound. The to parameter is
exclusive, stopping before the specified number:
const nums = await range({ to: 3 }).collect();
// [0, 1, 2]
The until parameter is inclusive, including the specified number in the
result:
const nums = await range({ until: 3 }).collect();
// [0, 1, 2, 3]
Customizing Start Points
You can specify where the range begins using the from parameter:
const nums = await range({ from: 5, to: 10 }).collect();
// [5, 6, 7, 8, 9]
Working with Step Values
The step parameter controls the increment between numbers, allowing you to
create sequences like even numbers, multiples, or any regular interval:
const evens = await range({ from: 0, to: 10, step: 2 }).collect();
// [0, 2, 4, 6, 8]
Counting Down
const countdown = await range({ from: 5, to: 0, step: -1 }).collect();
// [5, 4, 3, 2, 1]
Real-World Examples
Repeat N Times
await range({ to: 10 }).forEach((i) => {
console.log(`Iteration ${i}`);
});
Generate Test Data
const users = await range({ to: 100 })
.map((i) => ({
id: i,
name: `User ${i}`,
email: `user${i}@example.com`,
}))
.collect();
Batch Processing
const batchSize = 10;
const total = 100;
for await (const batch of range({ from: 0, to: total, step: batchSize })) {
const items = data.slice(batch, batch + batchSize);
await processBatch(items);
}
Pagination
const pages = Math.ceil(total / pageSize);
for await (const page of range({ to: pages })) {
const items = await fetchPage(page);
await processItems(items);
}
Retry Logic
for await (const attempt of range({ to: 3 })) {
try {
await operation();
break;
} catch (error) {
if (attempt === 2) throw error;
await sleep(1000 * (attempt + 1));
}
}
Infinite Ranges
Warning: Don’t collect infinite ranges!
// ❌ Never completes
const infinite = await range({ from: 0, to: Infinity }).collect();
// ✅ Use with take()
const first100 = await range({ from: 0, to: Infinity })
.take(100)
.collect();
Performance
Ranges are lazy—numbers generated on demand:
// Doesn't generate all numbers upfront
const huge = range({ to: 1_000_000_000 });
// Only generates what you use
const first10 = await huge.take(10).collect();
Next Steps
- Zip and Enumerate - Combine iterables
- Array-Like Methods - Transform ranges
Zip and Enumerate
Combine and index iterables.
enumerate()
Wrap any iterable for Array-like methods:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const result = await enumerate([1, 2, 3])
.map((n) => n * 2)
.collect();
// [2, 4, 6]
.enum()
Add indices to items:
const indexed = await enumerate(["a", "b", "c"])
.enum()
.collect();
// [["a", 0], ["b", 1], ["c", 2]]
Format with Indices
const numbered = await enumerate(["apple", "banana", "cherry"])
.enum()
.map(([fruit, i]) => `${i + 1}. ${fruit}`)
.collect();
// ["1. apple", "2. banana", "3. cherry"]
zip()
Combine two iterables:
import { zip } from "jsr:@j50n/proc@0.24.6";
const names = ["Alice", "Bob", "Charlie"];
const ages = [25, 30, 35];
const people = await zip(names, ages)
.map(([name, age]) => ({ name, age }))
.collect();
// [{ name: "Alice", age: 25 }, ...]
Multiple Iterables
const combined = await zip(iter1, iter2)
.map(([a, b]) => a + b)
.collect();
Real-World Examples
Number Lines
const numbered = await read("file.txt")
.lines
.enum()
.map(([line, i]) => `${i + 1}: ${line}`)
.forEach(console.log);
Combine Data Sources
const merged = await zip(
read("names.txt").lines,
read("emails.txt").lines,
)
.map(([name, email]) => ({ name, email }))
.collect();
Track Progress
const items = [...]; // Large array
await enumerate(items)
.enum()
.forEach(([item, i]) => {
console.log(`Processing ${i + 1}/${items.length}`);
process(item);
});
Next Steps
- Range and Iteration - Generate sequences
- Array-Like Methods - Transform data
Sleep
The sleep function pauses execution for a specified duration. While it might
seem like an outlier in a process management library, it’s surprisingly useful
when working with async pipelines, rate limiting, and testing.
Basic Usage
import { sleep } from "jsr:@j50n/proc@0.24.6";
console.log("Starting...");
await sleep(2000); // Pause for 2 seconds
console.log("Done!");
Why It’s Included
When working with processes and async iterables, you often need to:
- Rate limit operations
- Add delays between retries
- Simulate slow data sources for testing
- Throttle concurrent operations
- Add breathing room for external services
Having sleep built-in means you don’t need to import it from another library
or write the setTimeout wrapper yourself.
Common Use Cases
Rate Limiting API Calls
import { enumerate, sleep } from "jsr:@j50n/proc@0.24.6";
const urls = ["url1", "url2", "url3"];
await enumerate(urls)
.concurrentMap(async (url) => {
const response = await fetch(url);
await sleep(1000); // Wait 1 second between requests
return response.json();
}, { concurrency: 1 })
.forEach((data) => console.log(data));
Retry with Backoff
import { run, sleep } from "jsr:@j50n/proc@0.24.6";
async function runWithRetry(maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
return await run("flaky-command").lines.collect();
} catch (error) {
if (i === maxRetries - 1) throw error;
const delay = Math.pow(2, i) * 1000; // Exponential backoff
console.log(`Retry ${i + 1} after ${delay}ms...`);
await sleep(delay);
}
}
}
Simulating Slow Data Sources
import { sleep, WritableIterable } from "jsr:@j50n/proc@0.24.6";
const slowData = new WritableIterable<number>();
// Simulate data arriving slowly
(async () => {
for (let i = 0; i < 10; i++) {
await slowData.write(i);
await sleep(500); // 500ms between items
}
await slowData.close();
})();
for await (const item of slowData) {
console.log("Received:", item);
}
Throttling Process Output
import { run, sleep } from "jsr:@j50n/proc@0.24.6";
// Process lines slowly to avoid overwhelming downstream
await run("cat", "large-file.txt")
.lines
.map(async (line) => {
await sleep(10); // 10ms delay per line
return line;
})
.toStdout();
Testing Concurrent Operations
import { enumerate, sleep } from "jsr:@j50n/proc@0.24.6";
// Verify concurrency limit works correctly
const startTimes: number[] = [];
await enumerate([1, 2, 3, 4, 5])
.concurrentMap(async (n) => {
startTimes.push(Date.now());
await sleep(100); // Simulate work
return n;
}, { concurrency: 2 })
.collect();
// Analyze timing to verify only 2 ran concurrently
Time Constants
The library also provides time constants for readability:
import { MINUTES, SECONDS, sleep } from "jsr:@j50n/proc@0.24.6";
await sleep(5 * SECONDS); // 5 seconds
await sleep(2 * MINUTES); // 2 minutes
Available constants:
SECONDS= 1000 millisecondsMINUTES= 60 secondsHOURS= 60 minutesDAYS= 24 hoursWEEKS= 7 days
API
function sleep(delayms: number): Promise<void>;
Parameters:
delayms: Delay in milliseconds
Returns:
- Promise that resolves after the specified delay
Notes
- Uses
setTimeoutinternally - Non-blocking (other async operations can run)
- Minimum delay depends on JavaScript runtime (typically ~4ms)
- For precise timing, consider using
performance.now()to measure actual elapsed time
Counting Words
Word counting demonstrates the elegance of process pipelines, showing how complex text analysis can be built from simple Unix tools chained together.
Basic Word Counting
The simplest approach uses the wc command to count total words in a file:
import { run } from "jsr:@j50n/proc@0.24.6";
const wordCount = await run("wc", "-w", "book.txt").lines.first;
console.log(`Total words: ${wordCount}`);
Finding Unique Words
To count unique words, you need to extract individual words, normalize their case, and eliminate duplicates. This pipeline breaks text into words, converts everything to lowercase, sorts the results, and removes duplicates:
const uniqueWords = await run("cat", "book.txt")
.run("tr", "-cs", "A-Za-z", "\n") // Extract words
.run("tr", "A-Z", "a-z") // Lowercase
.run("sort") // Sort
.run("uniq") // Unique
.lines
.count();
console.log(`Unique words: ${uniqueWords}`);
Analyzing Word Frequency
For more sophisticated analysis, you can find the most frequently used words by adding frequency counting and sorting by occurrence:
const topWords = await run("cat", "book.txt")
.run("tr", "-cs", "A-Za-z", "\n")
.run("tr", "A-Z", "a-z")
.run("sort")
.run("uniq", "-c")
.run("sort", "-rn")
.run("head", "-10")
.lines
.collect();
console.log("Top 10 words:");
topWords.forEach((line) => console.log(line));
topWords.forEach(line => console.log(line));
## Pure JavaScript Version
Do it all in JavaScript:
<!-- NOT TESTED: Illustrative example -->
```typescript
import { read } from "jsr:@j50n/proc@0.24.6";
const wordCounts = await read("book.txt")
.lines
.flatMap(line => line.toLowerCase().match(/\w+/g) || [])
.reduce((acc, word) => {
acc[word] = (acc[word] || 0) + 1;
return acc;
}, {});
const topWords = Object.entries(wordCounts)
.sort((a, b) => b[1] - a[1])
.slice(0, 10);
console.log("Top 10 words:");
topWords.forEach(([word, count]) => {
console.log(`${count} ${word}`);
});
Compressed Files
Count words in a compressed file:
const wordCount = await read("book.txt.gz")
.transform(new DecompressionStream("gzip"))
.lines
.flatMap((line) => line.match(/\w+/g) || [])
.count();
console.log(`Total words: ${wordCount}`);
Multiple Files
Count words across multiple files:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const files = ["book1.txt", "book2.txt", "book3.txt"];
const results = await enumerate(files)
.concurrentMap(async (file) => {
const words = await read(file)
.lines
.flatMap((line) => line.match(/\w+/g) || [])
.count();
return { file, words };
}, { concurrency: 3 })
.collect();
results.forEach(({ file, words }) => {
console.log(`${file}: ${words} words`);
});
Filter Stop Words
Exclude common words:
const stopWords = new Set([
"the",
"a",
"an",
"and",
"or",
"but",
"in",
"on",
"at",
"to",
"for",
]);
const meaningfulWords = await read("book.txt")
.lines
.flatMap((line) => line.toLowerCase().match(/\w+/g) || [])
.filter((word) => !stopWords.has(word))
.reduce((acc, word) => {
acc[word] = (acc[word] || 0) + 1;
return acc;
}, {});
Word Length Distribution
Analyze word lengths:
const lengthDist = await read("book.txt")
.lines
.flatMap((line) => line.match(/\w+/g) || [])
.reduce((acc, word) => {
const len = word.length;
acc[len] = (acc[len] || 0) + 1;
return acc;
}, {});
console.log("Word length distribution:");
Object.entries(lengthDist)
.sort((a, b) => parseInt(a[0]) - parseInt(b[0]))
.forEach(([len, count]) => {
console.log(`${len} letters: ${count} words`);
});
Real-World Example: War and Peace
Analyze Tolstoy’s War and Peace:
const [totalWords, uniqueWords] = await Promise.all([
// Total words
read("warandpeace.txt.gz")
.transform(new DecompressionStream("gzip"))
.lines
.flatMap((line) => line.match(/\w+/g) || [])
.count(),
// Unique words
read("warandpeace.txt.gz")
.transform(new DecompressionStream("gzip"))
.lines
.flatMap((line) => line.toLowerCase().match(/\w+/g) || [])
.reduce((acc, word) => {
acc.add(word);
return acc;
}, new Set())
.then((set) => set.size),
]);
console.log(`Total words: ${totalWords.toLocaleString()}`);
console.log(`Unique words: ${uniqueWords.toLocaleString()}`);
console.log(
`Vocabulary richness: ${(uniqueWords / totalWords * 100).toFixed(1)}%`,
);
Performance Comparison
Shell Pipeline (fast)
// Uses native Unix tools
const count = await run("cat", "book.txt")
.run("wc", "-w")
.lines.first;
JavaScript (flexible)
// More control, type-safe
const count = await read("book.txt")
.lines
.flatMap((line) => line.match(/\w+/g) || [])
.count();
Hybrid (best of both)
// Use Unix tools for heavy lifting, JavaScript for logic
const words = await run("cat", "book.txt")
.run("tr", "-cs", "A-Za-z", "\n")
.lines
.filter((word) => word.length > 5) // JavaScript filter
.count();
Next Steps
- Process Pipelines - Chain commands together
- Concurrent Processing - Process multiple files
- Streaming Large Files - Handle huge files
Processing Log Files
Log file analysis is a common task that benefits greatly from proc’s streaming capabilities, allowing you to analyze even huge log files efficiently without loading them entirely into memory.
Counting Errors
The simplest log analysis task is counting error occurrences. This approach streams through the file, filtering for error lines and counting them:
import { read } from "jsr:@j50n/proc@0.24.6";
const errorCount = await read("app.log")
.lines
.filter((line) => line.includes("ERROR"))
.count();
console.log(`${errorCount} errors found`);
Categorizing Errors by Type
For more detailed analysis, you can group errors by type to understand which kinds of errors are most common:
const errorTypes = await read("app.log")
.lines
.filter((line) => line.includes("ERROR"))
.reduce((acc, line) => {
const match = line.match(/ERROR: (\w+)/);
const type = match ? match[1] : "unknown";
acc[type] = (acc[type] || 0) + 1;
return acc;
}, {});
console.log("Errors by type:");
Object.entries(errorTypes)
.sort((a, b) => b[1] - a[1])
.forEach(([type, count]) => {
console.log(` ${type}: ${count}`);
});
Extracting Structured Data
When you need to extract specific information from log entries, you can parse timestamps, error messages, and other structured data:
const errors = await read("app.log")
.lines
.filter((line) => line.includes("ERROR"))
.map((line) => {
const timestamp = line.match(/\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/)?.[0];
const message = line.split("ERROR:")[1]?.trim();
return { timestamp, message };
})
.filter((error) => error.timestamp && error.message)
.collect();
console.log(`Found ${errors.length} structured errors`);
return { timestamp, message };
}) .collect();
## Find Patterns
<!-- NOT TESTED: Illustrative example -->
```typescript
const suspiciousIPs = await read("access.log")
.lines
.map(line => {
const ip = line.match(/\d+\.\d+\.\d+\.\d+/)?.[0];
const status = line.match(/HTTP\/\d\.\d" (\d+)/)?.[1];
return { ip, status };
})
.filter(entry => entry.status === "404")
.reduce((acc, entry) => {
if (entry.ip) {
acc[entry.ip] = (acc[entry.ip] || 0) + 1;
}
return acc;
}, {});
// Show IPs with > 100 404s
Object.entries(suspiciousIPs)
.filter(([_, count]) => count > 100)
.forEach(([ip, count]) => {
console.log(`${ip}: ${count} 404s`);
});
Time-Based Analysis
const errorsByHour = await read("app.log")
.lines
.filter((line) => line.includes("ERROR"))
.reduce((acc, line) => {
const hour = line.match(/T(\d{2}):/)?.[1];
if (hour) {
acc[hour] = (acc[hour] || 0) + 1;
}
return acc;
}, {});
console.log("Errors by hour:");
Object.entries(errorsByHour)
.sort((a, b) => a[0].localeCompare(b[0]))
.forEach(([hour, count]) => {
console.log(`${hour}:00 - ${count} errors`);
});
Multiple Log Files
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const files = ["app1.log", "app2.log", "app3.log"];
const results = await enumerate(files)
.concurrentMap(async (file) => {
const errors = await read(file)
.lines
.filter((line) => line.includes("ERROR"))
.count();
return { file, errors };
}, { concurrency: 3 })
.collect();
results.forEach(({ file, errors }) => {
console.log(`${file}: ${errors} errors`);
});
Compressed Logs
const errors = await read("app.log.gz")
.transform(new DecompressionStream("gzip"))
.lines
.filter((line) => line.includes("ERROR"))
.take(10)
.collect();
Real-Time Monitoring
// Process log as it grows
for await (const line of read("app.log").lines) {
if (line.includes("ERROR")) {
console.error(`🔴 ${line}`);
} else if (line.includes("WARN")) {
console.warn(`⚠️ ${line}`);
}
}
Next Steps
- Streaming Large Files - Handle huge logs
- Concurrent Processing - Process multiple files
- Decompressing Files - Work with compressed logs
Decompressing Files
Process compressed files without creating temporary files. Stream everything.
Decompress and Count Lines
import { read } from "jsr:@j50n/proc@0.24.6";
const lineCount = await read("war-and-peace.txt.gz")
.transform(new DecompressionStream("gzip"))
.lines
.count();
console.log(`${lineCount} lines`);
What’s happening:
read()opens the file as a stream of bytes.transform()pipes through the decompression stream.linesconverts bytes to text lines.count()counts them
All streaming. No temp files. Constant memory usage.
Search in Compressed File
import { read } from "jsr:@j50n/proc@0.24.6";
const matches = await read("logs.gz")
.transform(new DecompressionStream("gzip"))
.lines
.filter((line) => line.includes("ERROR"))
.collect();
console.log(`Found ${matches.length} errors`);
Process Multiple Compressed Files
import { enumerate, read } from "jsr:@j50n/proc@0.24.6";
const files = ["log1.gz", "log2.gz", "log3.gz"];
for (const file of files) {
const errors = await read(file)
.transform(new DecompressionStream("gzip"))
.lines
.filter((line) => line.includes("ERROR"))
.count();
console.log(`${file}: ${errors} errors`);
}
Decompress and Transform
import { read } from "jsr:@j50n/proc@0.24.6";
const data = await read("data.json.gz")
.transform(new DecompressionStream("gzip"))
.lines
.map((line) => JSON.parse(line))
.filter((obj) => obj.status === "active")
.collect();
Supported Formats
The Web Streams API supports:
- gzip -
.gzfiles - deflate -
.zipfiles (deflate compression) - deflate-raw - Raw deflate
// Gzip
.transform(new DecompressionStream("gzip"))
// Deflate
.transform(new DecompressionStream("deflate"))
// Deflate-raw
.transform(new DecompressionStream("deflate-raw"))
Compress Output
You can also compress:
import { read } from "jsr:@j50n/proc@0.24.6";
const compressed = await read("large-file.txt")
.transform(new CompressionStream("gzip"))
.collect();
await Deno.writeFile("large-file.txt.gz", concat(compressed));
Real-World Example: Log Analysis
Analyze compressed logs without extracting them:
import { read } from "jsr:@j50n/proc@0.24.6";
interface LogEntry {
timestamp: string;
level: string;
message: string;
}
const errors = await read("app.log.gz")
.transform(new DecompressionStream("gzip"))
.lines
.map((line) => {
const [timestamp, level, ...message] = line.split(" ");
return { timestamp, level, message: message.join(" ") };
})
.filter((entry) => entry.level === "ERROR")
.collect();
console.log(`Found ${errors.length} errors`);
errors.slice(0, 10).forEach((e) => {
console.log(`${e.timestamp}: ${e.message}`);
});
Performance Tips
Stream, Don’t Collect
// ❌ Loads entire file into memory
const lines = await read("huge.gz")
.transform(new DecompressionStream("gzip"))
.lines
.collect();
// ✅ Processes one line at a time
for await (
const line of read("huge.gz")
.transform(new DecompressionStream("gzip"))
.lines
) {
process(line);
}
Use Concurrent Processing
Process multiple files in parallel:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const files = ["log1.gz", "log2.gz", "log3.gz"];
const results = await enumerate(files)
.concurrentMap(async (file) => {
const errors = await read(file)
.transform(new DecompressionStream("gzip"))
.lines
.filter((line) => line.includes("ERROR"))
.count();
return { file, errors };
}, { concurrency: 3 })
.collect();
Why This Is Better
Traditional approach:
# Extract first
gunzip file.gz
# Then process
grep ERROR file
# Clean up
rm file
proc approach:
// One step, no temp files
await read("file.gz")
.transform(new DecompressionStream("gzip"))
.lines
.filter((line) => line.includes("ERROR"))
.forEach(console.log);
Faster, cleaner, more memory-efficient.
Next Steps
- Streaming Large Files - Handle huge files
- Concurrent Processing - Process multiple files in parallel
- File I/O - More file operations
Parallel Downloads
Download multiple files concurrently with controlled concurrency.
Basic Example
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const urls = [
"https://example.com/file1.json",
"https://example.com/file2.json",
"https://example.com/file3.json",
// ... more URLs
];
const results = await enumerate(urls)
.concurrentMap(async (url) => {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Failed to fetch ${url}: ${response.status}`);
}
return {
url,
data: await response.json(),
size: response.headers.get("content-length"),
};
}, { concurrency: 5 })
.collect();
console.log(`Downloaded ${results.length} files`);
Download and Save Files
Download files and save them to disk:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const downloads = [
{ url: "https://example.com/image1.jpg", path: "./downloads/image1.jpg" },
{ url: "https://example.com/image2.jpg", path: "./downloads/image2.jpg" },
{ url: "https://example.com/image3.jpg", path: "./downloads/image3.jpg" },
];
await enumerate(downloads)
.concurrentMap(async ({ url, path }) => {
const response = await fetch(url);
const blob = await response.blob();
const buffer = await blob.arrayBuffer();
await Deno.writeFile(path, new Uint8Array(buffer));
console.log(`Downloaded: ${path}`);
return path;
}, { concurrency: 3 })
.collect();
console.log("All downloads complete");
With Progress Tracking
Track download progress:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
let completed = 0;
const total = urls.length;
const results = await enumerate(urls)
.concurrentMap(async (url) => {
const response = await fetch(url);
const data = await response.json();
completed++;
console.log(
`Progress: ${completed}/${total} (${
Math.round(completed / total * 100)
}%)`,
);
return { url, data };
}, { concurrency: 5 })
.collect();
With Retry Logic
Retry failed downloads:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
async function fetchWithRetry(url: string, maxRetries = 3): Promise<Response> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(url);
if (response.ok) return response;
if (attempt === maxRetries) {
throw new Error(
`Failed after ${maxRetries} attempts: ${response.status}`,
);
}
} catch (error) {
if (attempt === maxRetries) throw error;
// Exponential backoff
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000);
console.log(`Retry ${attempt}/${maxRetries} for ${url} after ${delay}ms`);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
throw new Error("Unreachable");
}
const results = await enumerate(urls)
.concurrentMap(async (url) => {
const response = await fetchWithRetry(url);
return await response.json();
}, { concurrency: 5 })
.collect();
Download Large Files
Stream large files to disk without loading into memory:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const largeFiles = [
{ url: "https://example.com/large1.zip", path: "./large1.zip" },
{ url: "https://example.com/large2.zip", path: "./large2.zip" },
];
await enumerate(largeFiles)
.concurrentMap(async ({ url, path }) => {
const response = await fetch(url);
if (!response.body) throw new Error("No response body");
const file = await Deno.open(path, { write: true, create: true });
await response.body.pipeTo(file.writable);
console.log(`Downloaded: ${path}`);
return path;
}, { concurrency: 2 })
.collect();
API Rate Limiting
Respect API rate limits:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const apiEndpoints = [
"/api/users/1",
"/api/users/2",
// ... 100 more
];
// Add delay between requests
await enumerate(apiEndpoints)
.concurrentMap(async (endpoint) => {
const response = await fetch(`https://api.example.com${endpoint}`);
const data = await response.json();
// Wait 100ms between requests (10 requests/second)
await new Promise((resolve) => setTimeout(resolve, 100));
return data;
}, { concurrency: 1 }) // Sequential to respect rate limit
.collect();
Filter Failed Downloads
Continue even if some downloads fail:
import { enumerate } from "jsr:@j50n/proc@0.24.6";
const results = await enumerate(urls)
.concurrentMap(async (url) => {
try {
const response = await fetch(url);
if (!response.ok) return null;
return { url, data: await response.json() };
} catch (error) {
console.error(`Failed to download ${url}:`, error.message);
return null;
}
}, { concurrency: 5 })
.filter((result) => result !== null)
.collect();
console.log(`Successfully downloaded ${results.length}/${urls.length} files`);
When to Use
Use parallel downloads when:
- You have multiple independent files to fetch
- Network latency is the bottleneck
- The server can handle concurrent requests
- You want to minimize total download time
Choose concurrency based on:
- Server rate limits (respect them!)
- Your network bandwidth
- Server capacity
- Start with 3-5, adjust based on results
Next Steps
- Concurrent Processing - Deep dive into concurrency
- Error Handling - Handle download failures
- Streaming Large Files - Work with large downloads
Shell Script Replacement
Replace Bash scripts with type-safe Deno.
Why Replace Shell Scripts?
Shell scripts are:
- Hard to debug
- No type safety
- Limited error handling
- Platform-specific
proc gives you:
- Full TypeScript
- IDE support
- Proper error handling
- Cross-platform
Common Patterns
File Operations
Bash:
#!/bin/bash
for file in *.txt; do
wc -l "$file"
done
proc:
#!/usr/bin/env -S deno run --allow-read --allow-run
import { run } from "jsr:@j50n/proc@0.24.6";
for await (const entry of Deno.readDir(".")) {
if (entry.name.endsWith(".txt")) {
const count = await run("wc", "-l", entry.name).lines.first;
console.log(count);
}
}
Process Logs
Bash:
#!/bin/bash
grep ERROR app.log | wc -l
proc:
#!/usr/bin/env -S deno run --allow-read --allow-run
import { read } from "jsr:@j50n/proc@0.24.6";
const errors = await read("app.log")
.lines
.filter((line) => line.includes("ERROR"))
.count();
console.log(`${errors} errors`);
Backup Script
Bash:
#!/bin/bash
tar -czf backup-$(date +%Y%m%d).tar.gz /data
proc:
#!/usr/bin/env -S deno run --allow-read --allow-run
import { run } from "jsr:@j50n/proc@0.24.6";
const date = new Date().toISOString().split("T")[0].replace(/-/g, "");
await run("tar", "-czf", `backup-${date}.tar.gz`, "/data").toStdout();
System Monitoring
Bash:
#!/bin/bash
while true; do
df -h | grep /dev/sda1
sleep 60
done
proc:
#!/usr/bin/env -S deno run --allow-run
import { run, sleep } from "jsr:@j50n/proc@0.24.6";
while (true) {
const usage = await run("df", "-h")
.lines
.find((line) => line.includes("/dev/sda1"));
console.log(usage);
await sleep(60000); // sleep() is exported from proc
}
Real Script Example
Complete deployment script:
#!/usr/bin/env -S deno run --allow-all
import { run } from "jsr:@j50n/proc@0.24.6";
console.log("🚀 Deploying application...");
try {
// Pull latest code
console.log("📥 Pulling latest code...");
await run("git", "pull").toStdout();
// Install dependencies
console.log("📦 Installing dependencies...");
await run("npm", "install").toStdout();
// Run tests
console.log("🧪 Running tests...");
await run("npm", "test").toStdout();
// Build
console.log("🔨 Building...");
await run("npm", "run", "build").toStdout();
// Restart service
console.log("🔄 Restarting service...");
await run("systemctl", "restart", "myapp").toStdout();
console.log("✅ Deployment complete!");
} catch (error) {
console.error("❌ Deployment failed:", error.message);
Deno.exit(1);
}
Benefits
- Type Safety - Catch errors before running
- IDE Support - Autocomplete and refactoring
- Error Handling - Proper try-catch
- Debugging - Use debugger, breakpoints
- Testing - Write unit tests
- Portability - Works on any platform with Deno
Making Scripts Executable
chmod +x script.ts
./script.ts
Next Steps
- Running Processes - Process basics
- Error Handling - Handle failures
- Process Pipelines - Chain commands
API Reference
Complete API documentation is auto-generated from the source code using Deno’s documentation tool.
📚 View Full API Documentation
The API documentation includes:
- All exported functions - Complete signatures and descriptions
- All classes and interfaces - Full type information
- All methods and properties - Detailed documentation
- Type definitions - Complete TypeScript types
- Examples - Code examples from JSDoc
Quick Links
Core Functions
- run(){:target=“_blank”} - Run a child process
- enumerate(){:target=“_blank”} - Wrap an iterable
- read(){:target=“_blank”} - Read a file
Classes
- Enumerable{:target=“_blank”} - Array-like methods for async iterables
- ProcessEnumerable{:target=“_blank”} - Process-specific enumerable
- Process{:target=“_blank”} - Process management
Error Types
- ExitCodeError{:target=“_blank”} - Non-zero exit code
- SignalError{:target=“_blank”} - Process killed by signal
- UpstreamError{:target=“_blank”} - Error from upstream process
Utilities
- range(){:target=“_blank”} - Generate number ranges
- zip(){:target=“_blank”} - Combine iterables
- concat(){:target=“_blank”} - Concatenate byte arrays
- cache(){:target=“_blank”} - Cache iterable results
Using the API Docs
The generated documentation includes:
Search
Use the search box to find any function, class, or type.
Type Information
Click on any type to see its definition and usage.
Examples
Most functions include working code examples.
Source Links
Click “Source” to view the implementation.
Integration with This Guide
This user guide provides:
- Conceptual explanations - Why and when to use features
- Tutorials - Step-by-step learning
- Recipes - Real-world solutions
- Best practices - How to use effectively
The API reference provides:
- Complete signatures - Exact function parameters
- Type definitions - TypeScript types
- Technical details - Implementation specifics
- All exports - Everything available
Use both together for complete understanding.
Keeping Docs Updated
The API documentation is regenerated every time the site is built, so it’s always in sync with the code.
To regenerate manually:
deno doc --html --name="proc" --output=./site/src/api-docs ./mod.ts
Next Steps
- Browse the full API documentation{:target=“_blank”}
- Getting Started - If you’re new
- Core Features - Learn the essentials
- Recipes - See real examples
Frequently Asked Questions
General
What is proc?
proc is a Deno library for running child processes and working with async iterables. It gives you Array-like methods (map, filter, reduce) for streaming data, with error handling that actually makes sense.
Why should I use proc instead of Deno.Command?
Deno.Command is low-level and requires manual stream handling. proc gives you:
- Automatic resource management
- Natural error propagation
- Array-like methods for data processing
- Process pipelines that feel like shell pipes
- Streaming by default
How do I import data transforms (CSV, TSV, JSON)?
Data transforms are in a separate module from the core library:
// Core library
import { enumerate, read, run } from "jsr:@j50n/proc@0.24.6";
// Data transforms (separate import)
import { fromCsvToRows, toTsv } from "jsr:@j50n/proc@0.24.6/transforms";
This keeps the core library lightweight. See Data Transforms for details.
Is proc production-ready?
Yes! proc is stable, actively maintained, and used in production. The API is mature and unlikely to have breaking changes.
Does proc work with Node.js?
No, proc is Deno-only. It uses Deno-specific APIs like Deno.Command and
requires Deno’s permission system.
Usage
Why do I get “resource leak” errors?
You must consume process output. Unconsumed output keeps the process handle open:
// ❌ Resource leak
const p = run("ls");
// ✅ Consume output
await run("ls").lines.collect();
Is .lines a method or property?
Property. Use .lines not .lines():
// ✅ Correct
run("ls").lines;
// ❌ Wrong
run("ls").lines();
Same for .status, .first, .last.
How do I check exit code without throwing?
Consume output first, then check .status:
const p = run("command");
await p.lines.collect(); // Consume first
const status = await p.status; // Then check
if (status.code !== 0) {
console.error("Failed");
}
Why doesn’t enumerate() add indices?
enumerate() wraps an iterable. Use .enum() to add indices:
const result = await enumerate(["a", "b", "c"])
.enum() // This adds indices
.map(([item, i]) => `${i}: ${item}`)
.collect();
How do I pipe processes together?
Use .run() method:
await run("cat", "file.txt")
.run("grep", "pattern")
.run("wc", "-l")
.lines.first;
Can I use shell syntax like ls -la?
No, arguments must be separate:
// ✅ Correct
run("ls", "-la");
// ❌ Wrong
run("ls -la");
Error Handling
Do I need try-catch at every step?
No! That’s the whole point. Errors propagate through the pipeline:
try {
await run("cmd1")
.run("cmd2")
.run("cmd3")
.lines.forEach(process);
} catch (error) {
// All errors caught here
}
What happens when a process fails?
By default, non-zero exit codes throw ExitCodeError. You can catch it:
try {
await run("false").lines.collect();
} catch (error) {
if (error instanceof ExitCodeError) {
console.error(`Exit code: ${error.code}`);
}
}
Can I customize error handling?
Yes, use fnError option. See
Custom Error Handling.
Performance
Is proc fast?
Yes! proc is streaming by default, which means:
- Constant memory usage, even for huge files
- Concurrent process execution
- Lazy evaluation (only runs when consumed)
How do I process large files?
Stream them:
// Processes 10GB file with constant memory
for await (const line of read("huge.txt").lines) {
process(line);
}
Can I process files in parallel?
Yes, use concurrentMap:
await enumerate(files)
.concurrentMap(async (file) => {
return await processFile(file);
}, { concurrency: 5 })
.collect();
Troubleshooting
My process hangs
You probably didn’t consume the output:
// ❌ Hangs
const p = run("command");
await p.status; // Waiting for output to be consumed
// ✅ Works
const p = run("command");
await p.lines.collect(); // Consume first
await p.status;
I get type errors
Check if you’re using properties as methods:
// ❌ Type error
run("ls").lines();
// ✅ Correct
run("ls").lines;
DecompressionStream type error
Add a type cast:
.transform(new DecompressionStream("gzip") as TransformStream<Uint8Array, Uint8Array>)
Or use --no-check flag.
Permission denied errors
Grant the necessary permissions:
deno run --allow-run --allow-read your-script.ts
Comparison
proc vs Deno.Command
| Feature | Deno.Command | proc |
|---|---|---|
| Boilerplate | High | Low |
| Error handling | Manual | Automatic |
| Streaming | Manual | Built-in |
| Pipelines | Manual | .run() |
| Array methods | No | Yes |
proc vs shell scripts
| Feature | Shell | proc |
|---|---|---|
| Type safety | No | Yes |
| Error handling | Manual | Automatic |
| IDE support | Limited | Full |
| Debugging | Hard | Easy |
| Portability | Limited | Cross-platform |
Getting Help
Where can I find examples?
How do I report bugs?
File an issue on GitHub.
Is there a Discord/Slack?
Not currently. Use GitHub issues for questions and discussions.
Contributing
Can I contribute?
Yes! Contributions are welcome. See the repository for guidelines.
How can I help?
- Report bugs
- Improve documentation
- Add examples
- Fix issues
Miscellaneous
Why “proc”?
Short for “process”. Easy to type, easy to remember.
Who maintains proc?
proc is maintained by @j50n and contributors.
What’s the license?
MIT License. Use it freely.
Can I use proc in commercial projects?
Yes! MIT license allows commercial use.
Troubleshooting
Common issues and their solutions.
Process Issues
My process hangs
Cause: Output not consumed. proc waits for you to read the output before the process completes.
// ❌ Hangs - output never consumed
const p = run("ls");
await p.status;
// ✅ Works - consume output first
const p = run("ls");
await p.lines.collect();
await p.status;
“Resource leak” errors
Cause: Process output not consumed. Every process must have its output read.
// ❌ Resource leak
run("ls"); // Output ignored
// ✅ Consume the output
await run("ls").lines.collect();
// ✅ Or iterate through it
await run("ls").lines.forEach(console.log);
Process exits with unexpected code
Cause: The command failed. Check the error for details.
import { ExitCodeError } from "jsr:@j50n/proc@0.24.6";
try {
await run("grep", "pattern", "missing-file.txt").lines.collect();
} catch (error) {
if (error instanceof ExitCodeError) {
console.error(`Exit code: ${error.code}`);
console.error(`Command: ${error.command.join(" ")}`);
}
}
Permission denied
Cause: Missing Deno permissions.
# Grant necessary permissions
deno run --allow-run --allow-read --allow-write script.ts
Type Errors
“Property ‘lines’ does not exist”
Cause: Using .lines() instead of .lines (it’s a property, not a method).
// ❌ Wrong
run("ls").lines().collect();
// ✅ Correct
run("ls").lines.collect();
Same applies to .status, .first, .last.
DecompressionStream type error
Cause: TypeScript doesn’t recognize the stream type.
// Add type assertion
.transform(new DecompressionStream("gzip") as TransformStream<Uint8Array, Uint8Array>)
“Cannot find module” for transforms
Cause: Data transforms are a separate import.
// ❌ Wrong - transforms not in main module
import { fromCsvToRows } from "jsr:@j50n/proc@0.24.6";
// ✅ Correct - use /transforms subpath
import { fromCsvToRows } from "jsr:@j50n/proc/transforms";
Async Issues
Results are undefined or empty
Cause: Not awaiting async operations.
// ❌ Wrong - not awaited
const lines = run("ls").lines.collect();
console.log(lines); // Promise, not array
// ✅ Correct - await the result
const lines = await run("ls").lines.collect();
console.log(lines); // string[]
forEach doesn’t seem to run
Cause: forEach returns a Promise that must be awaited.
// ❌ Wrong - not awaited
run("ls").lines.forEach(console.log);
// ✅ Correct
await run("ls").lines.forEach(console.log);
Data Transform Issues
CSV parsing produces wrong columns
Cause: Delimiter mismatch or quoting issues.
// Check your delimiter
fromCsvToRows(); // Comma-delimited
fromTsvToRows(); // Tab-delimited
fromCsvToRows({ delimiter: ";" }); // Custom delimiter
Large file causes memory issues
Cause: Using .collect() on huge datasets loads everything into memory.
// ❌ Loads entire file into memory
const allRows = await read("huge.csv")
.transform(fromCsvToRows())
.collect();
// ✅ Stream and process one at a time
await read("huge.csv")
.transform(fromCsvToRows())
.forEach((row) => processRow(row));
flatdata command not found
Cause: flatdata CLI not installed globally.
deno install -g --allow-read --allow-write -n flatdata jsr:@j50n/proc@0.24.6/flatdata
Performance Issues
Processing is slower than expected
Possible causes:
- Using CSV when TSV would work — TSV is 3-5x faster than CSV
- Not using LazyRow — Enable with
fromCsvToRows({ lazy: true }) - Sequential when parallel would help — Use
concurrentMapfor I/O-bound work
// Faster: use concurrentMap for network requests
await enumerate(urls)
.concurrentMap(fetch, { concurrency: 10 })
.collect();
Memory usage grows over time
Cause: Caching or collecting when streaming would work.
// ❌ Caches everything
const cached = enumerate(hugeDataset).cache();
// ✅ Stream through without caching
await enumerate(hugeDataset)
.filter(predicate)
.forEach(process);
Still Stuck?
- Check the FAQ for common questions
- Search existing issues
- Open a new issue with:
- proc version
- Deno version
- Minimal reproduction code
- Expected vs actual behavior
Glossary
Key terms used throughout this documentation.
A
Aggregation : An operation that consumes an entire iterable and produces a
single value. Examples: count(), reduce(), collect(). Aggregations are
terminal operations.
Async Iterator : A JavaScript object that produces values asynchronously, one at a time, when requested. proc uses async iterators instead of streams for pull-based data flow.
B
Backpressure : The problem of coordinating data flow when a producer generates data faster than a consumer can process it. Traditional streams require explicit backpressure handling. proc eliminates backpressure by using pull-based async iterators.
C
Collect : A terminal operation that gathers all items from an iterable into an array. Use with caution on large datasets as it loads everything into memory.
E
Enumerable : proc’s wrapper around async iterables that provides Array-like
methods (map, filter, reduce, etc.). Created with enumerate() or
returned by run().lines.
enumerate() : Function that wraps any iterable (sync or async) in an Enumerable, giving it Array-like methods.
F
flatdata : A WASM-powered CLI tool included with proc for high-performance CSV/TSV processing. Achieves excellent throughput by offloading parsing to a subprocess.
L
Lazy Evaluation : Computation that only happens when results are needed.
proc pipelines are lazy—nothing executes until you call a terminal operation
like collect() or forEach().
LazyRow : An optimization for CSV parsing that defers field extraction until accessed. Improves performance when you only need some columns from each row.
P
Pipeline : A chain of operations where each step’s output becomes the next
step’s input. In proc, pipelines are built with method chaining:
.map().filter().collect().
Pull-based : A data flow model where consumers request data when ready. Contrast with push-based streams where producers send data regardless of consumer readiness. proc uses pull-based async iterators.
Push-based : A data flow model where producers send data to consumers. Traditional JavaScript streams are push-based, requiring backpressure coordination.
R
Record Format : A binary format used by flatdata for efficient inter-process communication. Faster than text formats because it avoids repeated parsing.
S
Streaming : Processing data piece by piece rather than loading everything into memory. proc streams by default, enabling processing of files larger than available RAM.
T
Terminal Operation : An operation that consumes an iterable and produces a
final result, triggering execution of the pipeline. Examples: collect(),
forEach(), count(), first, reduce().
Transform : An operation that converts data from one form to another. In proc, transforms can be TransformStreams or async generator functions.
TransformStream : A Web Streams API object that transforms data passing
through it. proc’s .transform() method accepts TransformStreams.
U
UpstreamError : An error type that wraps errors from earlier stages in a pipeline. Helps identify where in a chain the original error occurred.
CSV Parser Specification
This appendix documents the RFC 4180 compliant CSV parser used by proc’s data transforms and the flatdata CLI. The parser is implemented in Odin and compiled to WebAssembly for high-performance parsing in JavaScript/TypeScript environments.
Standards Compliance
The parser implements RFC 4180 - Common Format and MIME Type for Comma-Separated Values (CSV) Files.
RFC 4180 Requirements
| Requirement | Status | Notes |
|---|---|---|
| Records separated by line breaks | ✓ | Supports LF and CRLF |
| Optional header line | ✓ | Parser treats all rows uniformly |
| Fields separated by commas | ✓ | Configurable separator |
| Fields may be quoted | ✓ | Double-quote character |
| Quotes escaped by doubling | ✓ | "" becomes " |
| Newlines in quoted fields | ✓ | Preserved in output |
| Commas in quoted fields | ✓ | Preserved in output |
Extensions Beyond RFC 4180
- Configurable separator: Supports any single-byte delimiter (comma, semicolon, tab, etc.)
- Lenient mode: Accepts bare quotes in unquoted fields (non-strict)
- Strict mode: Rejects malformed input with detailed error reporting
- Streaming: Processes input in chunks without loading entire file
Parser Modes
Strict Mode
In strict mode, the parser rejects malformed CSV and reports errors with row and column positions.
Error conditions:
BareQuote: Unescaped quote in unquoted fieldInvalidCharAfterQuote: Non-separator/newline after closing quoteUnclosedQuote: EOF reached inside quoted fieldBareCR: Carriage return not followed by line feedFieldCountMismatch: Row has different field count than expected
Lenient Mode (Default)
In lenient mode, the parser accepts common malformations:
- Bare quotes in unquoted fields are preserved literally
- Bare CR characters start a new record
- Field count mismatches are allowed
Output Formats
Record Format
The primary output format uses ASCII control characters:
\x1F(Unit Separator) between fields\x1E(Record Separator) between rows
This format enables trivial downstream parsing: row.split('\x1F') yields
fields.
Span Format
For zero-copy parsing, the span format returns byte offsets into the original input rather than copying field data.
API Reference
Initialization
delimited_init(options: CsvOptions) -> DelimitedParser
Options:
separator: Field delimiter (default:,)strict: Enable strict mode (default:false)expected_fields: Expected field count per row, 0 to disable (default:0)
Parsing
delimited_parse(parser, input: []u8) -> (rows: u32, ok: bool)
Parses a chunk of input. May be called multiple times for streaming. Returns the number of complete rows parsed and success status.
delimited_finish(parser) -> (rows: u32, ok: bool)
Finalizes parsing after all input has been provided. Handles any remaining partial record.
Output Retrieval
delimited_get_complete_output(parser) -> []u8
Returns output bytes for complete records only. Partial records (those without a trailing record separator) are retained for the next chunk.
delimited_reset_output(parser)
Clears the output buffer after reading.
Error Handling
parser.error.kind // CsvErrorKind enum
parser.error.row // 0-indexed row number
parser.error.col // 0-indexed column number
Stringifier API
The stringifier converts record format back to CSV/TSV.
Initialization
delimited_stringify_init(options: StringifyOptions) -> DelimitedStringifier
Options:
separator: Output field delimiter (default:,)line_ending:.LFor.CRLF(default:.LF)always_quote: Quote all fields, not just those requiring it (default:false)expected_fields: Expected field count, 0 to disable (default:0)
Stringifying
delimited_stringify(stringifier, input: []u8) -> bool
Converts record-format input to CSV. Returns success status.
Quoting Rules
Fields are quoted when they contain:
- The separator character
- Double quotes (which are escaped by doubling)
- Newline characters (LF or CR)
With always_quote enabled, all fields are quoted regardless of content.
Performance Characteristics
| Metric | Value |
|---|---|
| Native throughput | Fastest |
| WASM throughput | Very Fast |
| Memory overhead | Low |
| Streaming chunk size | 64 KB recommended |
Performance characteristics based on typical CSV data with moderate field lengths.
WebAssembly Integration
The parser is compiled to WebAssembly with the following characteristics:
- Memory model: Uses imported memory for zero-copy buffer sharing
- Build flags:
--import-memory --strip-all - Target:
js_wasm32
WASM Exports
Buffer management:
alloc_input_buffer(size) -> ptralloc_output_buffer(size) -> ptr
Parser lifecycle:
create_delimited_parser(separator, strict, expected_fields) -> idparse_delimited(id, input_len) -> resultfinish_delimited(id) -> resultget_delimited_output(id) -> lenclear_delimited_output(id)destroy_delimited_parser(id)
Stringifier lifecycle:
create_delimited_stringifier(separator, crlf, always_quote, expected_fields) -> idstringify_delimited(id, input_len) -> okget_stringify_output(id) -> lenclear_stringify_output(id)destroy_delimited_stringifier(id)
Implementation Notes
State Machine
The parser uses a 5-state machine:
FieldStart: Beginning of a fieldUnquoted: Inside an unquoted fieldQuoted: Inside a quoted fieldQuoteInQuoted: After a quote inside a quoted field (escape or end)RecordEnd: After CR, expecting LF
Memory Management
- Dynamic arrays use Odin’s built-in allocator
- Output buffer grows as needed with 10% overhead reservation
- Streaming maintains partial record state between chunks
UTF-8 Handling
The parser operates on raw bytes and is UTF-8 transparent. Multi-byte UTF-8 sequences pass through unchanged. The separator and control characters are all single-byte ASCII, ensuring correct handling of UTF-8 text.
Contributing to proc
This section contains documentation for maintainers and contributors to the proc library.
Overview
The proc library provides a fluent API for running child processes and working with async iterables in Deno. It emphasizes composable operations, automatic resource cleanup, and proper error propagation.
Key Goals:
- Composable operations via method chaining
- Automatic resource cleanup (no leaked processes)
- Proper error propagation from stderr
- AsyncIterable streams (easier than manual stream handling)
- Type-safe transformations with minimal boilerplate
Quick Navigation
- Project Architecture - Core modules and design concepts
- Coding Standards - TypeScript standards, testing, and workflows
- API Design - Patterns and conventions for the public API
- Documentation Guidelines - How to write and maintain docs
- Testing Strategy - Test coverage and verification approach
- Build Process - Building, releasing, and maintaining the project
Project Architecture
Core Modules
- src/run.ts - Main entry point,
run()function - src/process.ts - Process wrapper class
- src/enumerable.ts - Enumerable class with AsyncIterable operations
- src/transformers.ts - Data transformation functions (toBytes, JSON, gzip)
- src/utility.ts - Helper functions (range, concat, read, etc.)
- src/concurrent.ts - Concurrent mapping operations
- src/writable-iterable.ts - WritableIterable for push-based iteration
- src/cache.ts - KV-based caching utilities
- src/helpers.ts - Internal helper functions
Key Concepts
Enumerable: Wrapper around AsyncIterable providing composable operations
- Created via
enumerate()factory function - Supports map, filter, reduce, take, drop, concat, etc.
- Integrates with process I/O via
.run()and.lines
ProcessEnumerable: Extends Enumerable for process output
- Returned by
run()function - Provides access to process PID and status
- Automatically handles resource cleanup
Transformers: Functions that convert between data types
toBytes()- Convert strings/arrays to Uint8ArraytoLines()- Convert bytes to text linesjsonStringify()/jsonParse()- JSON serializationgzip()/gunzip()- Compression
Design Principles
Properties vs Methods
- Properties (no parentheses): Return new objects or promises
- Examples:
.lines,.status,.first,.last
- Examples:
- Methods (with parentheses): Functions that take parameters or perform
actions
- Examples:
.collect(),.map(),.filter(),.count()
- Examples:
Resource Management
- Always consume process output to avoid resource leaks
- Terminal operations:
.collect(),.forEach(),.count(), etc. - Document resource management requirements clearly
Error Propagation
- Errors flow through pipelines naturally
- No need for error handling at each step
- One try-catch at the end handles everything
- This is a key differentiator of the library
Type Safety
- Full TypeScript support required
- Generic types where appropriate:
Enumerable<T>,ProcessEnumerable<S> - Type inference should work naturally
Coding Standards
TypeScript Standards
- Use explicit types, avoid
any(lint rule: no-explicit-any) - Follow Deno’s formatting standards (enforced by
deno fmt) - All code must pass
deno lintanddeno check - Use the shebang pattern:
":" //#;on line 2 for executable scripts
Testing Requirements
- All tests must pass before committing (172 tests currently)
- Tests should be added for new features following existing patterns
- Test files use descriptive names:
feature.test.ts - When adding new features, consider if tests are needed
Error Handling
- Errors should propagate naturally through pipelines
- Use try-catch at the end of pipelines, not at each step
- Process errors throw
ExitCodeErrorwith.codeproperty - Always provide clear error messages
File Structure
src/: Core library codetests/: All test filestests/mdbook_examples.test.ts: Tests for documentation examplestests/readme_examples.test.ts: Tests for README examplestests/docs/: Tests for API documentation examples
site/src/: mdbook documentation sourcesite/theme/: Custom CSS and JS for documentationdocs/: Generated documentation site (committed to git for GitHub Pages)tools/: Build tools and preprocessors
Naming Conventions
- Use kebab-case for file names:
my-feature.ts - Use PascalCase for classes:
ProcessEnumerable - Use camelCase for functions and variables:
enumerate,lineCount - Use SCREAMING_SNAKE_CASE for constants:
DAYS,HOURS
Git Workflow
- Contributors handle their own git commits
- Use descriptive commit messages
- All tests must pass before committing
- Follow the standard build process before releases
Security Best Practices
- Substitute PII in examples with generic placeholders
- No secret keys in code unless specifically required
- Follow secure coding practices for process execution
API Design Patterns
toStdout() Behavior
.toStdout() is a convenience method that handles multiple data types:
- Strings: Automatically have
\nappended (like println) - String arrays: Each string gets
\nappended - Uint8Array: Written as-is, no automatic newlines
- Uint8Array arrays: Written as-is, no automatic newlines
Internally uses toBytes() transformer to convert strings to bytes.
Examples
// Strings - newlines added automatically
await enumerate(["line1", "line2"]).toStdout();
// Output: line1\nline2\n
// Process lines - newlines added automatically
await run("ls")
.lines
.map((line) => line.toUpperCase())
.toStdout();
// Bytes - no automatic newlines
const encoder = new TextEncoder();
await enumerate([
encoder.encode("line1\n"),
encoder.encode("line2\n"),
]).toStdout();
// Output: line1\nline2\n (newlines from source data)
Key Points
- DO NOT manually add
\nto strings - it will double the line feeds - DO NOT use
.transform(toBytes)before.toStdout()- it’s redundant .toStdout()is the idiomatic way to write output.forEach(line => console.log(line))works but is not idiomatic
Common Patterns
Output to stdout
// Preferred: Use .toStdout() for writing to stdout (idiomatic)
// toStdout() handles strings, string arrays, Uint8Arrays, and arrays of those
// Strings automatically have newlines appended
await run("ls")
.lines
.map((line) => line.toUpperCase())
.toStdout();
// Alternative: forEach with console.log
await run("ls").lines.forEach((line) => console.log(line));
Process Execution
// Good: Output consumed
await run("ls").lines.collect();
// Bad: Output not consumed (resource leak)
const p = run("ls");
Error Handling
// Good: Single try-catch at end
try {
await run("cmd1").run("cmd2").lines.forEach(process);
} catch (error) {
handle(error);
}
// Bad: Try-catch at each step
try {
const p1 = run("cmd1");
try {
const p2 = p1.run("cmd2");
// ...
} catch (e2) {}
} catch (e1) {}
Enumeration
// Good: enumerate() wraps, .enum() adds indices
await enumerate(data).enum().map(([item, i]) => ...)
// Bad: Expecting enumerate() to add indices automatically
await enumerate(data).map((item, i) => ...) // i is undefined
Documentation Guidelines
Core Principles
- Every public API must have JSDoc with a working example
- Every example must have a corresponding test
- Examples must be minimal and focused on one concept
- Documentation must explain WHY, not just WHAT
- Error handling is the PRIMARY selling point
Consistency Across Sources
- mod.ts JSDoc, README.md, and site docs must be consistent
- Same terminology, same examples, same explanations across all three
- Documentation link (https://j50n.github.io/deno-proc/) should be prominent
- “Why proc?” benefits should be consistent
JSDoc Requirements
- Module-level JSDoc in mod.ts appears on JSR overview page
- Include: tagline, documentation link, “Why proc?”, Key Concepts, examples
- Function-level JSDoc should include:
- Clear description
@paramfor all parameters with types@returnswith return type@examplewith working code examples
- Mark technical details as “for advanced users” when appropriate
mdbook Documentation
- All code examples must have test markers:
<!-- TESTED: tests/mdbook_examples.test.ts - "test-name" -->for verified examples<!-- NOT TESTED: Illustrative example -->for conceptual examples
- Use visual callouts with emojis in blockquotes for emphasis
- Organize into clear sections: Getting Started, Core Features, Advanced Topics, etc.
- Include “Key points” explanations after code examples
Documentation Structure
Key Documentation Files
- mod.ts: Module-level JSDoc that appears on JSR overview page
- README.md: GitHub landing page, consistent with mod.ts JSDoc
- site/src/: mdbook documentation with comprehensive guides
Visual Enhancements (mdbook)
- site/theme/custom.css: Professional styling with shadows, rounded corners, hover effects
- site/theme/custom.js: Interactive features like copy buttons and smooth scrolling
- site/book.toml: Navy theme, GitHub integration, enhanced search
Example Standards
- Examples should be copy-paste ready (for TESTED examples)
- Use realistic scenarios, not toy examples
- Include comments explaining non-obvious parts
- Show complete working code, not fragments
- TESTED examples must match documentation exactly
Test Markers
All code examples in mdbook have HTML comments:
<!-- TESTED: tests/mdbook_examples.test.ts - "test-name" -->for verified examples<!-- NOT TESTED: Illustrative example -->for conceptual/comparison examples
Maintenance Guidelines
When Adding New Features
- Add JSDoc to the function/class
- Add example to mod.ts if it’s a common use case
- Update README if it’s a key feature
- Add pattern to patterns.md if it’s a common usage
- Consider if tests should be added
- Mark examples as TESTED or NOT TESTED
When Updating Documentation
- Keep README, mod.ts JSDoc, and site docs consistent
- Use same terminology across all docs
- Add test markers to all code examples
- Run both build scripts to verify
- Check that JSR overview page looks good
Testing Strategy
Test Coverage
- 172 total tests (all passing)
- 36 mdbook example tests covering key documentation examples
- 9 README example tests (100% coverage)
- 127 existing feature tests
Test Files
tests/readme_examples.test.ts- All README examplestests/mdbook_examples.test.ts- 36 key mdbook examples covering:- Getting Started (6 tests)
- Key Concepts (2 tests)
- Core Features (6 tests)
- Iterables (11 tests)
- Advanced (2 tests)
- Utilities (4 tests)
- Recipes (2 tests)
tests/docs/- Tests for API documentation examples
Documentation Test Markers
All code examples in the mdbook documentation have HTML comment markers:
Tested Examples
<!-- TESTED: tests/mdbook_examples.test.ts - "test-name" -->
These examples have corresponding tests and are verified to work correctly.
Untested Examples
<!-- NOT TESTED: Illustrative example -->
These examples are for illustration and may show:
- Comparison code (e.g., Deno.Command vs proc)
- Bad patterns for educational purposes
- Conceptual examples that don’t need testing
Statistics
- Tested examples: 31
- Untested examples: 331
- Total examples: 362
Running Tests
All tests:
deno test --no-check --allow-all
Just mdbook tests:
deno test --no-check --allow-all tests/mdbook_examples.test.ts
Adding New Tests
When adding a test for a documentation example:
- Add the test to appropriate test file
- Update the marker in documentation from:
to:<!-- NOT TESTED: Illustrative example --><!-- TESTED: tests/mdbook_examples.test.ts - "your-test-name" -->
Test Requirements
- All tests must pass before committing
- Tests should be added for new features
- Documentation examples should have corresponding tests when practical
- Test files use descriptive names following the pattern
feature.test.ts
Build Process
Version Management
- Single source of truth:
deno.jsonversion field - No separate version.json file
- Update version before release commits
Build Scripts
build.sh
Runs tests, lint, type checking:
- Updates Rust and mdbook
- Updates Deno
- Formats markdown and TypeScript
- Fixes shebang pattern (sed command)
- Lints TypeScript
- Type checks TypeScript
- Runs tests with specific allowed commands
- Must include ‘false’ in allowed run commands for error handling tests
- All 172 tests must pass
build-site.sh
Generates API docs, builds mdbook site:
- Updates Rust and mdbook
- Generates API docs with
deno doc --html - Formats site source files
- Builds mdbook site
- Copies to docs/ directory
Release Process
- Update version in deno.json
- Run
./build.shto verify all tests pass - Run
./build-site.shto regenerate documentation - Commit with descriptive message
- Push to GitHub
- JSR automatically publishes from git tags
Current Status
Version: 0.23.2
Registry: JSR (jsr.io)
License: MIT
Test Coverage: 172 tests (all passing)
Recent Improvements:
- Enhanced documentation for accessibility
- Added Common Patterns guide
- Visual enhancements to mdbook site (custom CSS/JS)
- Aligned JSDoc with README for consistency
- All documentation sources consistent
Build Tools
- tools/mdbook-deno-script-preprocessor.ts: Processes Deno code blocks in documentation
- site/gitv.ts: Git version preprocessor for mdbook
- site/theme/: Custom CSS and JavaScript for enhanced documentation experience
Dependencies
- Deno: Runtime and tooling
- mdbook: Documentation site generator
- Rust toolchain: Required for mdbook
- Git: Version control and preprocessor data