Unlock Deno's secret AsyncIterable superpowers!

A simpler, saner alternative to JavaScript streams. Built on async iterables—a more standard JavaScript primitive—proc eliminates backpressure problems, produces cleaner code, and is easier to work with. Run processes, transform data between formats, and use Array methods on async iterables.

📚 Full Documentation | 🚀 Quick Start | 📊 Performance Guide

Examples

Quick start

import { read, run } from "@j50n/proc";
import { fromCsvToRows, toTsv } from "@j50n/proc/transforms";

// Transform data between formats - CSV to TSV with filtering
await read("sales.csv")
  .transform(fromCsvToRows())
  .filter((row) => parseFloat(row[3]) > 1000)
  .transform(toTsv())
  .writeTo("high-value.tsv");

// Run processes and capture output
const lines = await run("ls", "-la").lines.collect();

// Chain processes like a shell pipeline
const result = await run("cat", "data.txt")
  .run("grep", "error")
  .run("wc", "-l")
  .lines.first;

// Work with async iterables using familiar Array methods
const commits = await run("git", "log", "--oneline")
  .lines
  .map(line => line.trim())
  .filter(line => line.includes("fix"))
  .take(5)
  .collect();

// Bridge event-driven code to async iteration
import { WritableIterable } from "@j50n/proc";

const messages = new WritableIterable<string>();
ws.onmessage = async (e) => await messages.write(e.data);
ws.onclose = () => messages.close();

for await (const msg of messages) {
  console.log("Received:", msg);
}

Why proc?

Simpler than streams — AsyncIterables are a standard JavaScript primitive, more standard than the Streams API. Pull-based iteration is easier to reason about than push-based streams. No complex coordination, no buffering logic, no backpressure headaches.

Backpressure solved — Traditional streams require careful coordination between producers and consumers. Async iterators eliminate this entirely—the consumer pulls when ready. No memory pressure, no dropped data, no complexity.

Cleaner, more intuitive code — Use map, filter, reduce, flatMap, take, drop and more—just like Arrays. Errors propagate naturally through pipelines. One try-catch at the end handles everything.

Bridge push and pull — Convert callback-based APIs (events, WebSockets, sensors) into async iterables with WritableIterable. Automatic backpressure, natural error propagation, no coordination complexity.

WASM-powered data transforms — Convert between CSV, TSV, JSON, and Record formats with WebAssembly-accelerated parsing. For maximum throughput, use the flatdata CLI for multi-process streaming.

Powerful process management — Run commands, pipe between processes, capture output, and control execution with a clean, composable API. Shell-like pipelines with proper error handling.

Type-safe and ergonomic — Full TypeScript support with intuitive APIs that guide you toward correct usage.

Key Concepts

Properties vs Methods: Some APIs are properties (.lines, .status, .first) and some are methods (.collect(), .map(), .filter()). Properties don't use parentheses.

Resource Management: Always consume process output via .lines.collect(), .lines.forEach(), or similar. Unconsumed output causes resource leaks.

Error Handling: Processes that exit with non-zero codes throw ExitCodeError when you consume their output. Use try-catch to handle failures.

Enumeration: enumerate() wraps iterables but doesn't add indices. Call .enum() on the result to get [item, index] tuples.

Type Hierarchy (for AI/advanced users)

  • Enumerable<T>: Base class providing Array-like methods for async iterables
  • ProcessEnumerable<T>: Extends Enumerable, adds process-specific features (.run(), .status, .pid)
  • Process: Low-level process handle (usually you use ProcessEnumerable instead)

Stream and process large compressed files

import { read } from "@j50n/proc";

// Read, decompress, and count lines - all streaming, no temp files!
const lineCount = await read("war-and-peace.txt.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .count();

console.log(`${lineCount} lines`); // 23,166 lines

Handle errors gracefully

import { run } from "@j50n/proc";

try {
  // Errors propagate through the entire pipeline
  await run("npm", "test")
    .lines
    .map(line => line.toUpperCase())
    .filter(line => line.includes("FAIL"))
    .toStdout();
} catch (error) {
  // Handle all errors in one place
  if (error.code) {
    console.error(`Tests failed with code ${error.code}`);
  }
}

Transform async iterables

import { enumerate } from "@j50n/proc";

const data = ["apple", "banana", "cherry"];

const numbered = await enumerate(data)
  .enum()  // Adds [item, index] tuples
  .map(([fruit, i]) => `${i + 1}. ${fruit}`)
  .collect();

console.log(numbered); // ["1. apple", "2. banana", "3. cherry"]

Custom transformations with async generators

import { enumerate } from "@j50n/proc";

// Parse JSON lines with error recovery
async function* parseJsonLines(lines: AsyncIterable<string>) {
  for await (const line of lines) {
    try {
      const obj = JSON.parse(line.trim());
      if (obj.id && obj.timestamp) yield obj;
    } catch {
      // Skip invalid JSON
    }
  }
}

const validEntries = await enumerate(logLines)
  .transform(parseJsonLines)
  .collect();

Process large files efficiently

import { read } from "@j50n/proc";

const errorCount = await read("app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .reduce((count) => count + 1, 0);

console.log(`Found ${errorCount} errors`);

Parallel processing with concurrency control

import { enumerate } from "@j50n/proc";

const urls = ["url1", "url2", "url3"];

await enumerate(urls)
  .concurrentMap(async (url) => {
    const response = await fetch(url);
    return { url, status: response.status };
  }, { concurrency: 5 })
  .forEach(result => console.log(result));