Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Welcome to proc

Running child processes and working with streams should be simple. proc makes it simple.

✨ The Big Idea: Treat processes and streams like arrays. Use map, filter, reduce on anything. Errors flow naturally through pipelines. No callbacks, no edge cases, no headaches.

What is proc?

proc is a Deno library that gives you two superpowers:

  1. Run child processes with a clean, composable API
  2. Work with async iterables using the Array methods you already know

But here's the real magic: errors just work. They flow through your pipelines naturally, like data. No edge cases, no separate error channels, no callbacks. One try-catch at the end handles everything.

💡 Tip: If you've ever struggled with JavaScript streams, you're going to love this.

A Taste of proc

Count lines in a compressed file—streaming, no temp files:

import { read } from "jsr:@j50n/proc@0.23.3";

const lines = await read("war-and-peace.txt.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .count();

console.log(`${lines} lines`); // 23,166 lines

Chain processes like shell pipes:

import { run } from "jsr:@j50n/proc@0.23.3";

const result = await run("cat", "data.txt")
  .run("grep", "error")
  .run("wc", "-l")
  .lines.first;

Handle errors gracefully:

import { run } from "jsr:@j50n/proc@0.23.3";

try {
  await run("npm", "test")
    .lines
    .map((line) => line.toUpperCase())
    .filter((line) => line.includes("FAIL"))
    .forEach((line) => console.log(line));
} catch (error) {
  // All errors caught here—from the process, from map, from filter
  console.error(`Tests failed: ${error.code}`);
}

Why proc?

JavaScript streaming is fast, but error handling shouldn't break your brain. proc gives you:

  • Errors that propagate naturally through pipelines
  • Array methods on async iterables (map, filter, reduce, and more)
  • Process management that feels like shell scripting
  • Streaming everything for memory efficiency
  • Type safety with full TypeScript support

Who is this for?

  • DevOps engineers automating deployments, processing logs, and managing infrastructure
  • Data engineers processing large CSV files, log files, or streaming data
  • Backend developers building CLI tools, batch processors, or data pipelines
  • System administrators replacing Bash scripts with type-safe, testable Deno code
  • Anyone who needs to run child processes or work with large datasets efficiently

Ready to dive in?

Start with Installation or jump straight to the Quick Start.


Current Version: 0.23.3
Status: Stable, actively maintained, ready for production

Found a bug? Have a question? File an issue or check the FAQ.

Installation

Getting started with proc is simple—it's just a Deno import away.

Import from JSR

Add proc to your Deno project:

import * as proc from "jsr:@j50n/proc@0.23.3";

Or import just what you need:

import { run, enumerate, read } from "jsr:@j50n/proc@0.23.3";

That's it! No installation step, no package.json, no node_modules.

Permissions

proc needs permissions to run child processes and access files. When you run your script, Deno will prompt you, or you can grant them upfront:

deno run --allow-run --allow-read your-script.ts

Common permissions:

  • --allow-run - Required to run child processes
  • --allow-read - Needed to read files
  • --allow-write - Needed to write files
  • --allow-env - If your processes need environment variables

You can be more specific:

# Only allow running specific commands
deno run --allow-run=ls,grep,wc your-script.ts

# Only allow reading specific directories
deno run --allow-read=/var/log your-script.ts

Version Pinning

For production, pin to a specific version:

import { run } from "jsr:@j50n/proc@1.0.0";

For development, use the latest:

import { run } from "jsr:@j50n/proc";

Next Steps

Ready to write your first proc script? Head to the Quick Start guide.

Quick Start

Let's get you running code in 5 minutes.

Your First Process

Create a file called hello.ts:

import { run } from "jsr:@j50n/proc@0.23.3";

// Run a command and capture output
const lines = await run("echo", "Hello, proc!").lines.collect();
console.log(lines); // ["Hello, proc!"]

Run it:

deno run --allow-run hello.ts

What just happened?

  • run() started the echo command
  • .lines converted the output to text lines
  • .collect() gathered all lines into an array

Chaining Processes

Let's chain commands together, like shell pipes:

import { run } from "jsr:@j50n/proc@0.23.3";

const result = await run("echo", "HELLO WORLD")
  .run("tr", "A-Z", "a-z")  // Convert to lowercase
  .lines.first;

console.log(result); // "hello world"

Each .run() pipes the previous output to the next command's input.

Working with Files

Process a file line by line:

import { read } from "jsr:@j50n/proc@0.23.3";

const errorCount = await read("app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .count();

console.log(`Found ${errorCount} errors`);

Handling Errors

Errors propagate naturally—catch them once at the end:

import { run } from "jsr:@j50n/proc@0.23.3";

try {
  await run("false")  // This command exits with code 1
    .lines
    .collect();
} catch (error) {
  console.error(`Command failed: ${error.code}`);
}

No need to check errors at each step. They flow through the pipeline and you catch them once. For details, see Error Handling.

Using Array Methods

Work with async data using familiar Array methods:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const data = ["apple", "banana", "cherry"];

const numbered = await enumerate(data)
  .enum()  // Add indices
  .map(([fruit, i]) => `${i + 1}. ${fruit}`)
  .collect();

console.log(numbered);
// ["1. apple", "2. banana", "3. cherry"]

A Real Example

Let's find the 5 most recent commits that mention "fix":

import { run } from "jsr:@j50n/proc@0.23.3";

const commits = await run("git", "log", "--oneline")
  .lines
  .filter(line => line.includes("fix"))
  .take(5)
  .collect();

commits.forEach(commit => console.log(commit));

This chains multiple operations, all streaming, using minimal memory. For more complex examples, see Recipes.

What's Next?

Now that you've got the basics, learn about:

Or jump straight to Recipes for copy-paste solutions.

Key Concepts

Before you dive deep, let's cover a few concepts that will make everything click.

Properties vs Methods

This trips up everyone at first. Some APIs are properties (no parentheses), some are methods (with parentheses).

Properties:

.lines    // Not .lines()
.status   // Not .status()
.first    // Not .first()
.last     // Not .last()

Methods:

.collect()
.map()
.filter()
.count()

Why? Properties are getters that return new objects or promises. Methods are functions you call. Your IDE will help, but when in doubt, check the docs.

Error Propagation

Errors flow through pipelines like data—no need to check at every step:

try {
  await run("command1")
    .run("command2")
    .run("command3")
    .lines
    .forEach(process);
} catch (error) {
  // All errors caught here
  handle(error);
}

Errors from processes, transformations, or your own code all propagate to the same place. For details, see Error Handling.

Resource Management

Golden rule: Always consume process output.

Good:

await run("ls").lines.collect();  // ✅ Output consumed
await run("ls").lines.forEach(console.log);  // ✅ Output consumed

Bad:

const p = run("ls");  // ❌ Output never consumed = resource leak

Why? Unconsumed output keeps the process handle open. Always use .collect(), .forEach(), or iterate through the output.

Enumeration Pattern

enumerate() wraps an iterable to give it Array-like methods. To add indices, call .enum() on the result:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

// enumerate() wraps the iterable, .enum() adds indices
const numbered = await enumerate(["a", "b", "c"])
  .enum()  // Returns [item, index] tuples
  .map(([item, i]) => `${i}: ${item}`)
  .collect();
// ["0: a", "1: b", "2: c"]

Why two steps? enumerate() gives you the methods (map, filter, etc.), while .enum() is just one of many operations you can perform. You might not always need indices:

// Use enumerate() without .enum() for other operations
const doubled = await enumerate([1, 2, 3])
  .map(n => n * 2)  // No indices needed
  .collect();

Streaming Everything

proc is lazy and streaming by default. Nothing happens until you consume the output.

// This doesn't run anything yet
const pipeline = run("cat", "huge-file.txt")
  .run("grep", "error")
  .lines
  .map(line => line.toUpperCase());

// Now it runs, one line at a time
for await (const line of pipeline) {
  console.log(line);  // Processes one line at a time
}

This means you can process files larger than memory. The data flows through, never all loaded at once.

Type Safety

proc is fully typed. Your IDE will guide you:

const lines: string[] = await run("ls").lines.collect();
//    ^-- TypeScript knows this is string[]

const count: number = await run("ls").lines.count();
//    ^-- TypeScript knows this is number

If you see a type error, you're probably using the API wrong. Check the docs!

Next Steps

Now that you understand the concepts, dive into:

Common Patterns

This guide shows typical usage patterns to help you understand how proc works in practice.

Pattern: Run and Collect

The most basic pattern—run a command and get all output:

import { run } from "jsr:@j50n/proc@0.23.3";

const lines = await run("ls", "-la").lines.collect();
// lines is string[]

Key points:

  • .lines is a property (no parentheses) that returns an Enumerable
  • .collect() is a method that consumes the stream and returns an array
  • Always consume output to avoid resource leaks

Pattern: Process Pipeline

Chain commands like shell pipes:

import { run } from "jsr:@j50n/proc@0.23.3";

const count = await run("cat", "data.txt")
  .run("grep", "error")
  .run("wc", "-l")
  .lines.first;

Key points:

  • Each .run() pipes the previous command's stdout to the next command's stdin
  • .first is a property that returns a Promise<string | undefined>
  • Errors from any command in the chain propagate to the catch block

Pattern: Transform and Filter

Process output with Array methods:

import { run } from "jsr:@j50n/proc@0.23.3";

const errors = await run("cat", "app.log")
  .lines
  .filter((line) => line.includes("ERROR"))
  .map((line) => line.trim())
  .take(10)
  .collect();

Key points:

  • .lines converts byte stream to line stream
  • Methods like .filter(), .map(), .take() work on the stream
  • Nothing executes until you call a terminal operation like .collect()

Pattern: Error Handling

Catch errors at the end of the pipeline:

import { run } from "jsr:@j50n/proc@0.23.3";

try {
  await run("npm", "test")
    .lines
    .forEach((line) => console.log(line));
} catch (error) {
  if (error.code) {
    console.error(`Process exited with code ${error.code}`);
  } else {
    console.error(`Error: ${error.message}`);
  }
}

Key points:

  • Processes that exit with non-zero codes throw ExitCodeError
  • The error has a .code property with the exit code
  • All errors (process, transform, your code) propagate to the same catch block

Pattern: Check Status Without Throwing

Get exit status without throwing an error:

import { run } from "jsr:@j50n/proc@0.23.3";

const p = run("some-command");
await p.lines.collect(); // Consume output first
const status = await p.status; // .status is a property

if (status.code !== 0) {
  console.error(`Command failed with code ${status.code}`);
}

Key points:

  • .status is a property that returns Promise<CommandStatus>
  • You must consume output before checking status
  • This doesn't throw on non-zero exit codes

Pattern: Enumerate with Indices

Add indices to any iterable:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const numbered = await enumerate(["a", "b", "c"])
  .enum() // Adds [item, index] tuples
  .map(([item, i]) => `${i + 1}. ${item}`)
  .collect();
// ["1. a", "2. b", "3. c"]

Key points:

  • enumerate() wraps an iterable to add Array-like methods
  • .enum() is a method that transforms items to [item, index] tuples
  • This is a two-step process: wrap, then enumerate

Pattern: Enumerate Without Indices

Use Array methods without adding indices:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const filtered = await enumerate(["a", "b", "c"])
  .filter((item) => item !== "b")
  .map((item) => item.toUpperCase())
  .collect();
// ["A", "C"]

Key points:

  • You don't need to call .enum() if you don't need indices
  • enumerate() just adds Array-like methods to any iterable

Pattern: Stream Large Files

Process files without loading into memory:

import { read } from "jsr:@j50n/proc@0.23.3";

const errorCount = await read("huge-log.txt")
  .lines
  .filter((line) => line.includes("ERROR"))
  .count();

Key points:

  • read() returns an Enumerable of bytes
  • .lines converts to line stream
  • Everything streams—no memory spike for large files

Pattern: Decompress and Process

Handle compressed files:

import { read } from "jsr:@j50n/proc@0.23.3";

const lines = await read("data.txt.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .collect();

Key points:

  • .transform() applies a TransformStream
  • DecompressionStream is built into Deno
  • Everything streams—no temp files needed

Pattern: Concurrent Processing

Process items in parallel with concurrency control:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const urls = ["url1", "url2", "url3"];

await enumerate(urls)
  .concurrentMap(async (url) => {
    const response = await fetch(url);
    return { url, status: response.status };
  }, { concurrency: 5 })
  .forEach((result) => console.log(result));

Key points:

  • .concurrentMap() processes items in parallel
  • concurrency option limits how many run at once
  • Results maintain input order (use .concurrentUnorderedMap() for faster unordered results)

Pattern: Build Objects with Reduce

Aggregate data into a single value:

import { run } from "jsr:@j50n/proc@0.23.3";

const wordCount = await run("cat", "data.txt")
  .lines
  .reduce((acc, line) => {
    const words = line.split(/\s+/);
    for (const word of words) {
      acc[word] = (acc[word] || 0) + 1;
    }
    return acc;
  }, {} as Record<string, number>);

Key points:

  • .reduce() works like Array.reduce()
  • Provide an initial value (second argument)
  • The accumulator can be any type

Pattern: Split Stream with Tee

Process the same stream multiple ways:

import { run } from "jsr:@j50n/proc@0.23.3";

const [errors, warnings] = run("cat", "app.log")
  .lines
  .tee(2);

const errorCount = errors.filter((line) => line.includes("ERROR")).count();
const warningCount = warnings.filter((line) => line.includes("WARN")).count();

console.log(`Errors: ${await errorCount}, Warnings: ${await warningCount}`);

Key points:

  • .tee(n) splits a stream into n identical streams
  • Each stream can be processed independently
  • All streams must be consumed

Anti-Pattern: Not Consuming Output

Don't do this:

const p = run("ls"); // ❌ Output never consumed

Why it's bad: Unconsumed output keeps the process handle open, causing resource leaks.

Do this instead:

await run("ls").lines.collect(); // ✅ Output consumed

Anti-Pattern: Mixing Sync and Async

Don't do this:

const lines = run("ls").lines; // ❌ Not awaited
lines.forEach((line) => console.log(line)); // ❌ Won't work

Why it's bad: .lines returns an Enumerable, but you need to await the terminal operation.

Do this instead:

await run("ls").lines.forEach((line) => console.log(line)); // ✅ Awaited

Error Handling

Error handling is proc's primary design goal. Errors flow through pipelines naturally, just like data.

The Problem

Traditional stream error handling requires managing errors at multiple points:

// With Deno.Command - manual error handling at each step
const cmd1 = new Deno.Command("cat", { args: ["file.txt"] });
const proc1 = cmd1.spawn();
const output1 = await proc1.output();
if (!output1.success) {
  throw new Error(`cat failed: ${output1.code}`);
}

const cmd2 = new Deno.Command("grep", { 
  args: ["pattern"],
  stdin: "piped"
});
const proc2 = cmd2.spawn();
// ... manually pipe output1 to proc2 stdin ...
const output2 = await proc2.output();
if (!output2.success) {
  throw new Error(`grep failed: ${output2.code}`);
}

With Node.js streams, you need error handlers on each stream:

stream1.on('error', handleError);
stream2.on('error', handleError);
stream3.on('error', handleError);

The proc Solution

Errors flow through pipelines like data. Handle them once, at the end:

try {
  await run("cat", "file.txt")
    .run("grep", "pattern")
    .run("wc", "-l")
    .lines
    .map(transform)
    .filter(predicate)
    .forEach(process);
} catch (error) {
  // All errors caught here:
  // - Process exit codes
  // - Transform errors
  // - Filter errors
  // - Your own errors
  console.error(`Pipeline failed: ${error.message}`);
}

One try-catch. No edge cases. No separate error channels.

How It Works

When something goes wrong anywhere in the pipeline:

  1. The error is captured
  2. Downstream operations are skipped
  3. The error propagates to your catch block

It's functional programming—errors are just another type of data flowing through.

Error Types

proc throws specific error types so you can handle them differently:

ExitCodeError

Thrown when a process exits with a non-zero code:

import { ExitCodeError } from "jsr:@j50n/proc@0.23.3";

try {
  await run("false").lines.collect();
} catch (error) {
  if (error instanceof ExitCodeError) {
    console.error(`Process failed with code ${error.code}`);
    console.error(`Command: ${error.command.join(" ")}`);
  }
}

SignalError

Thrown when a process is killed by a signal:

import { SignalError } from "jsr:@j50n/proc@0.23.3";

try {
  await run("sleep", "1000").lines.collect();
  // Kill it with Ctrl+C
} catch (error) {
  if (error instanceof SignalError) {
    console.error(`Process killed by signal: ${error.signal}`);
  }
}

UpstreamError

Thrown when an error comes from upstream in a pipeline:

import { UpstreamError } from "jsr:@j50n/proc@0.23.3";

try {
  await run("cat", "missing.txt")  // This fails
    .run("grep", "pattern")         // This gets UpstreamError
    .lines.collect();
} catch (error) {
  if (error instanceof UpstreamError) {
    console.error(`Upstream failure: ${error.cause}`);
  }
}

Checking Exit Status Without Throwing

Sometimes you want to check the exit code without throwing:

const p = run("some-command");
await p.lines.collect();  // Consume output
const status = await p.status;  // Check status

if (status.code !== 0) {
  console.error(`Command failed with code ${status.code}`);
}

Important: Consume the output first, then check status. Otherwise you'll leak resources.

Handling Specific Exit Codes

try {
  await run("grep", "pattern", "file.txt").lines.collect();
} catch (error) {
  if (error instanceof ExitCodeError) {
    if (error.code === 1) {
      // grep returns 1 when no matches found
      console.log("No matches found");
    } else {
      // Other errors
      throw error;
    }
  }
}

Errors in Transformations

Errors in your own code propagate the same way:

try {
  await run("cat", "numbers.txt")
    .lines
    .map(line => {
      const num = parseInt(line);
      if (isNaN(num)) {
        throw new Error(`Invalid number: ${line}`);
      }
      return num;
    })
    .forEach(console.log);
} catch (error) {
  // Catches both process errors AND your parsing errors
  console.error(`Pipeline failed: ${error.message}`);
}

Custom Error Handling

You can customize how errors are handled per process using the fnError option:

await run(
  {
    fnError: (error, stderrData) => {
      // Custom error handling
      if (error?.code === 1) {
        // Suppress or transform specific errors
        console.warn("Command returned 1, continuing anyway");
        return;
      }
      // Re-throw other errors
      throw error;
    }
  },
  "command"
).lines.collect();

Suppress All Errors

Sometimes you want to ignore failures:

// Ignore all errors from this command
await run(
  { fnError: () => {} },
  "command"
).lines.collect();

Transform Errors

Add context or change error types:

await run(
  {
    fnError: (error) => {
      throw new Error(`Database backup failed: ${error.message}`);
    }
  },
  "pg_dump", "mydb"
).lines.collect();

Working with Stderr

By default, stderr is passed through to Deno.stderr. You can capture and process it:

await run(
  {
    fnStderr: async (stderr) => {
      for await (const line of stderr.lines) {
        console.error(`[STDERR] ${line}`);
      }
    }
  },
  "command"
).lines.collect();

Collect Stderr

Capture stderr for analysis:

const stderrLines: string[] = [];

await run(
  {
    fnStderr: async (stderr) => {
      for await (const line of stderr.lines) {
        stderrLines.push(line);
      }
    }
  },
  "command"
).lines.collect();

console.log("Stderr output:", stderrLines);

Combine Stdout and Stderr

Process both streams together:

const allOutput: string[] = [];

await run(
  {
    fnStderr: async (stderr) => {
      for await (const line of stderr.lines) {
        allOutput.push(`[ERR] ${line}`);
      }
    }
  },
  "command"
).lines.forEach(line => {
  allOutput.push(`[OUT] ${line}`);
});

Best Practices

1. Catch at the End

Don't catch errors in the middle of a pipeline unless you're handling them specifically:

// ❌ Don't do this
try {
  const lines = await run("command").lines.collect();
} catch (e) {
  // Handle here
}
try {
  const filtered = lines.filter(predicate);
} catch (e) {
  // And here
}

// ✅ Do this
try {
  await run("command")
    .lines
    .filter(predicate)
    .forEach(process);
} catch (error) {
  // Handle once
}

2. Always Consume Output

Even if you don't care about the output, consume it:

// ❌ Resource leak
const p = run("command");
// Never consumed!

// ✅ Consume it
await run("command").lines.collect();
// Or
await run("command").lines.forEach(() => {});

3. Use Specific Error Types

Handle different errors differently:

try {
  await pipeline();
} catch (error) {
  if (error instanceof ExitCodeError) {
    // Process failed
  } else if (error instanceof SignalError) {
    // Process killed
  } else {
    // Something else
  }
}

4. Use Custom Handlers Sparingly

Only customize error handling when you have a specific need. The default behavior works well for most cases.

Why This Matters

Error handling is the primary reason proc exists. If you've ever:

  • Fought with stream error events
  • Debugged edge cases in error propagation
  • Written the same error handling code over and over
  • Lost errors in complex pipelines

...then you understand why this is revolutionary.

Errors just work. Like they should have all along.

Next Steps

Running Processes

Running a child process with proc is as simple as it gets.

Basic Usage

import { run } from "jsr:@j50n/proc@0.23.3";

// Run a command
await run("ls", "-la").lines.collect();

That's it. No boilerplate, no configuration, just run it.

Command and Arguments

The first parameter is the command, the rest are arguments:

run("command", "arg1", "arg2", "arg3")

Important: Arguments are separate parameters, not a single string:

// ✅ Correct
run("ls", "-la", "/home")

// ❌ Wrong - this won't work
run("ls -la /home")

Capturing Output

As an Array

const lines = await run("ls", "-la").lines.collect();
// lines is string[]

Line by Line

for await (const line of run("ls", "-la").lines) {
  console.log(line);
}

First or Last Line

const first = await run("ls").lines.first;
const last = await run("ls").lines.last;

As Raw Bytes

const bytes = await run("cat", "file.bin").collect();
// bytes is Uint8Array[]

Printing to Console

Send output directly to stdout:

await run("ls", "-la").toStdout();

This is perfect for commands where you just want to see the output.

Building Commands Dynamically

Sometimes you need to build a command from variables:

import { type Cmd, run } from "jsr:@j50n/proc@0.23.3";

const cmd: Cmd = ["ls"];

if (showAll) {
  cmd.push("-la");
}

if (directory) {
  cmd.push(directory);
}

await run(...cmd).toStdout();

The Cmd type is an array where the first element is the command (string or URL) and the rest are string arguments. Using the Cmd type ensures type safety when building commands dynamically.

Process Options

Customize process behavior with options:

await run(
  {
    cwd: "/tmp",           // Working directory
    env: { FOO: "bar" },   // Environment variables
  },
  "command",
  "arg1"
).lines.collect();

Working Directory

await run(
  { cwd: "/var/log" },
  "ls"
).toStdout();

Environment Variables

await run(
  { env: { PATH: "/custom/path" } },
  "command"
).lines.collect();

Checking Exit Status

Get the exit status without throwing:

const p = run("command");
await p.lines.collect();  // Consume output first
const status = await p.status;

console.log(`Exit code: ${status.code}`);
console.log(`Success: ${status.success}`);

Remember: Always consume output before checking status, or you'll leak resources.

Process ID

Get the process ID:

const p = run("sleep", "10");
console.log(`PID: ${p.pid}`);
await p.lines.collect();

Running with URLs

You can use URLs for the command:

const scriptUrl = new URL("./script.sh", import.meta.url);
await run(scriptUrl).toStdout();

Common Patterns

Silent Execution

Run a command and ignore output:

await run("command").lines.forEach(() => {});

Capture and Print

Capture output while also printing it:

const lines: string[] = [];
await run("command").lines.forEach(line => {
  console.log(line);
  lines.push(line);
});

Conditional Execution

if (needsProcessing) {
  await run("process-data").toStdout();
}

Error Handling

By default, non-zero exit codes throw ExitCodeError:

try {
  await run("false").lines.collect();
} catch (error) {
  console.error(`Command failed: ${error.code}`);
}

See Error Handling for complete details.

Performance Tips

Stream Instead of Collect

Process data as it arrives rather than loading everything into memory:

// ❌ Loads everything into memory
const lines = await run("cat", "huge-file.txt").lines.collect();
for (const line of lines) {
  process(line);
}

// ✅ Processes one line at a time
for await (const line of run("cat", "huge-file.txt").lines) {
  process(line);
}

Pipe Instead of Collect Intermediate Results

Chain processes instead of collecting intermediate results:

// ❌ Collects intermediate results
const lines1 = await run("cat", "file.txt").lines.collect();
const input = lines1.join("\n");
const lines2 = await run("grep", "pattern").lines.collect();

// ✅ Streams through
await run("cat", "file.txt")
  .run("grep", "pattern")
  .toStdout();

Use take() to Stop Early

Stop processing once you have what you need:

// Stops after finding 10 matches
const first10 = await run("grep", "ERROR", "huge.log")
  .lines
  .take(10)
  .collect();

Filter Before Expensive Operations

Reduce the amount of data flowing through expensive operations:

// ✅ Filter first (fast), then transform (expensive)
const result = await run("cat", "data.txt")
  .lines
  .filter(line => line.length > 0)  // Fast filter
  .map(expensiveTransform)          // Only runs on filtered data
  .collect();

For more performance optimization strategies, see Concurrent Processing and Streaming Large Files.

Next Steps

Process Pipelines

Chain processes together like shell pipes. It's beautiful.

The Basics

In a shell, you'd write:

cat file.txt | grep error | wc -l

In proc, you write:

const count = await run("cat", "file.txt")
  .run("grep", "error")
  .run("wc", "-l")
  .lines.first;

Each .run() pipes the previous output to the next command's stdin.

How It Works

run("command1")      // Produces output
  .run("command2")   // Receives command1's output as stdin
  .run("command3")   // Receives command2's output as stdin

The data flows through, one buffer at a time. Nothing is collected in memory unless you ask for it.

Real Examples

Count Lines

const lines = await run("cat", "file.txt")
  .run("wc", "-l")
  .lines.first;

console.log(`${lines} lines`);

Find and Count

const errorCount = await run("cat", "app.log")
  .run("grep", "ERROR")
  .run("wc", "-l")
  .lines.first;

Sort and Unique

const unique = await run("cat", "words.txt")
  .run("sort")
  .run("uniq")
  .lines.collect();

Case Conversion

const lowercase = await run("echo", "HELLO WORLD")
  .run("tr", "A-Z", "a-z")
  .lines.first;
// "hello world"

Mixing Processes and Transformations

You can mix process pipes with JavaScript transformations:

const result = await run("cat", "data.txt")
  .run("grep", "pattern")
  .lines
  .map(line => line.trim())
  .filter(line => line.length > 0)
  .collect();

The .lines converts bytes to text, then JavaScript takes over.

Complex Pipelines

Build sophisticated data processing pipelines:

const stats = await run("cat", "access.log")
  .run("grep", "ERROR")
  .run("cut", "-d", " ", "-f", "1")  // Extract IP addresses
  .run("sort")
  .run("uniq", "-c")                  // Count occurrences
  .run("sort", "-rn")                 // Sort by count
  .run("head", "-10")                 // Top 10
  .lines
  .collect();

console.log("Top 10 error sources:");
stats.forEach(line => console.log(line));

Branching Pipelines

Sometimes you need to process the same data in multiple ways. Use .tee() to split a pipeline into multiple branches:

const [branch1, branch2] = run("cat", "data.txt")
  .lines
  .tee();

// Process both branches concurrently
const [result1, result2] = await Promise.all([
  branch1.filter(line => line.includes("A")).collect(),
  branch2.filter(line => line.includes("B")).collect(),
]);

How it works: .tee() creates two independent iterables from one source. Each branch can be processed differently, and both can run concurrently.

Use cases:

  • Collect different subsets of data in one pass
  • Calculate multiple statistics simultaneously
  • Process data while also logging it

Important: Both branches must be consumed, or you'll leak resources.

Error Handling in Pipelines

Errors propagate through the entire pipeline:

try {
  await run("cat", "missing.txt")  // This fails
    .run("grep", "pattern")         // Never runs
    .run("wc", "-l")                // Never runs
    .lines.collect();
} catch (error) {
  // Catches the error from cat
  console.error(`Pipeline failed: ${error.message}`);
}

See Error Handling for details.

Performance Characteristics

Pipelines are:

  • Streaming - Data flows through, not collected in memory
  • Lazy - Nothing runs until you consume the output
  • Concurrent - All processes run at the same time
  • Efficient - Minimal memory usage, even for huge files
// This processes a 10GB file using ~constant memory
await run("cat", "huge-file.txt")
  .run("grep", "pattern")
  .run("wc", "-l")
  .lines.first;

Debugging Pipelines

Print intermediate results:

await run("cat", "file.txt")
  .run("grep", "pattern")
  .lines
  .map(line => {
    console.log(`Processing: ${line}`);
    return line;
  })
  .forEach(process);

Or split it up:

const step1 = run("cat", "file.txt");
const step2 = step1.run("grep", "pattern");
const step3 = step2.lines;

// Now you can inspect each step
for await (const line of step3) {
  console.log(line);
}

Common Patterns

Extract and Count

const count = await run("cat", "file.txt")
  .run("grep", "-o", "pattern")
  .lines.count();

Filter and Transform

const results = await run("cat", "data.csv")
  .run("grep", "-v", "^#")  // Remove comments
  .run("cut", "-d", ",", "-f", "1,3")  // Extract columns
  .lines
  .map(line => line.split(","))
  .collect();

Aggregate Data

const sum = await run("cat", "numbers.txt")
  .lines
  .map(line => parseInt(line))
  .reduce((acc, n) => acc + n, 0);

When to Use Pipelines

Use pipelines when:

  • You're processing large files
  • You want to chain Unix tools
  • You need streaming performance
  • You're replacing shell scripts

Use JavaScript when:

  • You need complex logic
  • You're working with structured data (JSON, etc.)
  • You need type safety
  • The operation is CPU-bound

Mix both for the best of both worlds!

Next Steps

Working with Output

Capture, transform, and process command output.

Choosing Your Approach

Use .lines.collect() when you need all output as an array (small outputs only):

const lines = await run("ls").lines.collect();  // All lines in memory

Use .lines with for-await when processing large outputs line-by-line:

for await (const line of run("cat", "huge.log").lines) {
  process(line);  // Constant memory usage
}

Use .toStdout() when you just want to see the output:

await run("ls", "-la").toStdout();  // Prints directly to console

Use .first or .last when you only need one line:

const result = await run("git", "rev-parse", "HEAD").lines.first;

Getting Output

As Lines

import { run } from "jsr:@j50n/proc@0.23.3";

const lines = await run("ls", "-la").lines.collect();
// string[]

As Bytes

const bytes = await run("cat", "file.bin").collect();
// Uint8Array[]

First Line

const result = await run("git", "rev-parse", "HEAD").lines.first;
// Single string
await run("ls", "-la").toStdout();

Transforming Output

Map Lines

const uppercase = await run("cat", "file.txt")
  .lines
  .map(line => line.toUpperCase())
  .collect();

Filter Lines

const errors = await run("cat", "app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .collect();

Parse Output

const commits = await run("git", "log", "--oneline")
  .lines
  .map(line => {
    const [hash, ...message] = line.split(" ");
    return { hash, message: message.join(" ") };
  })
  .collect();

Streaming Output

Process output as it arrives:

for await (const line of run("tail", "-f", "app.log").lines) {
  if (line.includes("ERROR")) {
    console.error(line);
  }
}

Counting Output

const lineCount = await run("ls", "-la").lines.count();

Finding in Output

const match = await run("ps", "aux")
  .lines
  .find(line => line.includes("node"));

Real-World Examples

Parse JSON Output

const data = await run("curl", "https://api.example.com/data")
  .lines
  .map(line => JSON.parse(line))
  .collect();

Extract Fields

const pids = await run("ps", "aux")
  .lines
  .drop(1)  // Skip header
  .map(line => line.split(/\s+/)[1])
  .collect();

Aggregate Data

const total = await run("du", "-sh", "*")
  .lines
  .map(line => {
    const size = line.split("\t")[0];
    return parseInt(size);
  })
  .reduce((sum, size) => sum + size, 0);

Next Steps

Working with Input

Send data to process stdin.

Choosing Your Approach

Use .run() for process-to-process pipes (most common):

await run("cat", "file.txt").run("grep", "pattern").toStdout();

Use enumerate() for in-memory data:

await enumerate(["line1", "line2"]).run("grep", "1").toStdout();

Use read() for file input:

await read("input.txt").run("grep", "pattern").toStdout();

Use range() for generated sequences:

await range({ to: 100 }).map(n => n.toString()).run("shuf").toStdout();

Piping Between Processes

The most common way to provide input:

import { run } from "jsr:@j50n/proc@0.23.3";

await run("echo", "hello")
  .run("tr", "a-z", "A-Z")  // Receives "hello" as stdin
  .toStdout();
// HELLO

From Enumerable

Pipe any enumerable to a process:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const data = ["line 1", "line 2", "line 3"];

await enumerate(data)
  .run("grep", "2")
  .toStdout();
// line 2

From File

import { read } from "jsr:@j50n/proc@0.23.3";

await read("input.txt")
  .run("grep", "pattern")
  .toStdout();

Real-World Examples

Filter Data

await read("data.txt")
  .run("grep", "ERROR")
  .run("sort")
  .run("uniq")
  .toStdout();

Transform and Process

await read("input.txt")
  .lines
  .map(line => line.toUpperCase())
  .run("sort")
  .toStdout();

Generate and Process

import { range } from "jsr:@j50n/proc@0.23.3";

await range({ to: 100 })
  .map(n => n.toString())
  .run("shuf")  // Shuffle
  .run("head", "-10")
  .toStdout();

Next Steps

Resource Management

Avoid leaks and manage resources properly.

The Golden Rule

Always consume process output.

import { run } from "jsr:@j50n/proc@0.23.3";

// ❌ Resource leak
const p = run("ls");
// Output never consumed!

// ✅ Output consumed
await run("ls").lines.collect();

Why This Matters

Unconsumed output keeps the process handle open, preventing cleanup.

Ways to Consume Output

collect()

const lines = await run("ls").lines.collect();

forEach()

await run("ls").lines.forEach(line => {
  console.log(line);
});

for-await

for await (const line of run("ls").lines) {
  console.log(line);
}

toStdout()

await run("ls").toStdout();

Aggregations

const count = await run("ls").lines.count();
const first = await run("ls").lines.first;

Checking Status

Consume output before checking status:

const p = run("command");
await p.lines.collect();  // Consume first
const status = await p.status;  // Then check

Error Handling

Errors automatically clean up resources:

try {
  await run("false").lines.collect();
} catch (error) {
  // Resources cleaned up automatically
}

Long-Running Processes

For processes that run indefinitely:

// This is fine - consuming output as it arrives
for await (const line of run("tail", "-f", "log").lines) {
  process(line);
}

Best Practices

  1. Always consume output - Use collect(), forEach(), or iterate
  2. Check status after consuming - Don't check status first
  3. Let errors propagate - They clean up automatically
  4. Use try-finally for cleanup - If you need custom cleanup

Next Steps

Understanding Enumerable

Enumerable is the heart of proc's async iterable magic. It wraps any iterable and gives you Array-like superpowers.

What is Enumerable?

Think of Enumerable as an Array, but for async data. It gives you map, filter, reduce, and more—but for data that arrives over time.

import { enumerate } from "jsr:@j50n/proc@0.23.3";

// Wrap any iterable
const nums = enumerate([1, 2, 3, 4, 5]);

// Use Array methods
const doubled = await nums
  .map(n => n * 2)
  .filter(n => n > 5)
  .collect();

console.log(doubled); // [6, 8, 10]

Why Enumerable?

JavaScript has Arrays for sync data and Streams for async data. But Streams are awkward:

// Streams are verbose
const stream = readableStream
  .pipeThrough(new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk * 2);
    }
  }))
  .pipeThrough(new TransformStream({
    transform(chunk, controller) {
      if (chunk > 5) controller.enqueue(chunk);
    }
  }));

Enumerable makes it simple:

// Enumerable is clean
const result = await enumerate(asyncIterable)
  .map(n => n * 2)
  .filter(n => n > 5)
  .collect();

Creating Enumerables

From Arrays

const nums = enumerate([1, 2, 3]);

From Async Generators

async function* generate() {
  yield 1;
  yield 2;
  yield 3;
}

const nums = enumerate(generate());

From Process Output

import { run } from "jsr:@j50n/proc@0.23.3";

const lines = run("ls", "-la").lines;
// lines is already an Enumerable<string>

From Files

import { read } from "jsr:@j50n/proc@0.23.3";

const bytes = read("file.txt");
// bytes is Enumerable<Uint8Array>

const lines = read("file.txt").lines;
// lines is Enumerable<string>

Consuming Enumerables

Collect to Array

const array = await enumerate([1, 2, 3]).collect();
// [1, 2, 3]

Iterate with for-await

for await (const item of enumerate([1, 2, 3])) {
  console.log(item);
}

Process Each Item

await enumerate([1, 2, 3]).forEach(item => {
  console.log(item);
});

Get First or Last

const first = await enumerate([1, 2, 3]).first;
const last = await enumerate([1, 2, 3]).last;

Lazy Evaluation

Enumerables are lazy—nothing happens until you consume them:

// This doesn't run anything yet
const pipeline = enumerate([1, 2, 3])
  .map(n => {
    console.log(`Processing ${n}`);
    return n * 2;
  });

// Now it runs
const result = await pipeline.collect();
// Logs: Processing 1, Processing 2, Processing 3

This is powerful for large datasets:

// Processes one line at a time, never loads entire file
for await (const line of read("huge-file.txt").lines) {
  process(line);
}

Chaining Operations

Chain as many operations as you want:

const result = await enumerate([1, 2, 3, 4, 5])
  .map(n => n * 2)        // [2, 4, 6, 8, 10]
  .filter(n => n > 5)     // [6, 8, 10]
  .map(n => n.toString()) // ["6", "8", "10"]
  .collect();

Each operation returns a new Enumerable, so you can keep chaining.

Type Safety

Enumerable is fully typed:

const nums: Enumerable<number> = enumerate([1, 2, 3]);

const strings: Enumerable<string> = nums.map(n => n.toString());
//    ^-- TypeScript knows this is Enumerable<string>

const result: string[] = await strings.collect();
//    ^-- TypeScript knows this is string[]

Your IDE will guide you with autocomplete and type errors.

Common Patterns

Transform and Collect

const result = await enumerate(data)
  .map(transform)
  .collect();

Filter and Count

const count = await enumerate(data)
  .filter(predicate)
  .count();

Find First Match

const match = await enumerate(data)
  .find(predicate);

Check if Any/All

const hasMatch = await enumerate(data).some(predicate);
const allMatch = await enumerate(data).every(predicate);

Performance

Enumerable is:

  • Streaming - Processes one item at a time
  • Lazy - Only runs when consumed
  • Memory efficient - Doesn't load everything at once
  • Fast - Minimal overhead
// This processes a 10GB file using constant memory
await read("huge-file.txt")
  .lines
  .filter(line => line.includes("ERROR"))
  .forEach(console.log);

Enumerable vs Array

FeatureArrayEnumerable
DataSyncAsync
MemoryAll in memoryStreaming
SizeLimited by RAMUnlimited
Methodsmap, filter, etc.map, filter, etc.
LazyNoYes

Use Arrays for small, sync data. Use Enumerable for large, async data.

Caching Iterables

Sometimes you need to reuse an iterable's results. Use cache() to store results for replay:

import { cache, enumerate } from "jsr:@j50n/proc@0.23.3";

const expensive = enumerate(data)
  .map(expensiveOperation);

const cached = cache(expensive);

// First time - runs the operations
const result1 = await cached.collect();

// Second time - uses cached results, doesn't re-run
const result2 = await cached.collect();

Use cases:

  • Reuse expensive computations
  • Replay iterables multiple times
  • Share results across operations

Warning: Caching stores all results in memory. Only cache when:

  • The dataset is small enough to fit in memory
  • You need to iterate multiple times
  • The computation is expensive enough to justify memory usage

Writable Iterables

Create async iterables you can write to programmatically:

import { WritableIterable } from "jsr:@j50n/proc@0.23.3";

const writable = new WritableIterable<string>();

// Write to it
await writable.write("item1");
await writable.write("item2");
await writable.write("item3");
await writable.close();

// Read from it
const items = await writable.collect();
// ["item1", "item2", "item3"]

Use cases:

  • Generate data programmatically
  • Bridge between push and pull models
  • Create custom data sources
  • Implement producer-consumer patterns

Example: Event-driven data:

const events = new WritableIterable<Event>();

// Producer: write events as they occur
eventEmitter.on("data", async (event) => {
  await events.write(event);
});

eventEmitter.on("end", async () => {
  await events.close();
});

// Consumer: process events as they arrive
for await (const event of events) {
  processEvent(event);
}

Next Steps

Array-Like Methods

Enumerable gives you the Array methods you know and love, but for async data.

Transformations

map()

Transform each item:

const doubled = await enumerate([1, 2, 3])
  .map(n => n * 2)
  .collect();
// [2, 4, 6]

Works with async functions:

const results = await enumerate(urls)
  .map(async (url) => {
    const response = await fetch(url);
    return response.json();
  })
  .collect();

filter()

Keep only items that match:

const evens = await enumerate([1, 2, 3, 4])
  .filter(n => n % 2 === 0)
  .collect();
// [2, 4]

flatMap()

Map and flatten in one step:

const words = await enumerate(["hello world", "foo bar"])
  .flatMap(line => line.split(" "))
  .collect();
// ["hello", "world", "foo", "bar"]

Aggregations

reduce()

Combine items into a single value:

const sum = await enumerate([1, 2, 3, 4])
  .reduce((acc, n) => acc + n, 0);
// 10

Build complex objects:

const grouped = await enumerate(items)
  .reduce((acc, item) => {
    acc[item.category] = acc[item.category] || [];
    acc[item.category].push(item);
    return acc;
  }, {});

count()

Count items:

const total = await enumerate([1, 2, 3]).count();
// 3

some()

Check if any item matches:

const hasError = await enumerate(lines)
  .some(line => line.includes("ERROR"));

every()

Check if all items match:

const allPositive = await enumerate([1, 2, 3])
  .every(n => n > 0);

Finding Items

find()

Find first match:

const match = await enumerate([1, 2, 3, 4])
  .find(n => n > 2);
// 3

first

Get first item:

const first = await enumerate([1, 2, 3]).first;
// 1

last

Get last item:

const last = await enumerate([1, 2, 3]).last;
// 3

nth()

Get item at index:

const third = await enumerate([1, 2, 3, 4]).nth(2);
// 3 (zero-indexed)

Slicing

take()

Take first N items:

const first3 = await enumerate([1, 2, 3, 4, 5])
  .take(3)
  .collect();
// [1, 2, 3]

drop()

Skip first N items:

const rest = await enumerate([1, 2, 3, 4, 5])
  .drop(2)
  .collect();
// [3, 4, 5]

slice()

Get a range:

const middle = await enumerate([1, 2, 3, 4, 5])
  .slice(1, 4)
  .collect();
// [2, 3, 4]

Iteration

forEach()

Process each item:

await enumerate([1, 2, 3]).forEach(n => {
  console.log(n);
});

for-await

Use standard JavaScript iteration:

for await (const item of enumerate([1, 2, 3])) {
  console.log(item);
}

Collecting

collect()

Gather all items into an array:

const array = await enumerate([1, 2, 3]).collect();
// [1, 2, 3]

toArray()

Alias for collect():

const array = await enumerate([1, 2, 3]).toArray();

Utilities

enum()

Add indices to items:

const indexed = await enumerate(["a", "b", "c"])
  .enum()
  .collect();
// [["a", 0], ["b", 1], ["c", 2]]

Use with map:

const numbered = await enumerate(["a", "b", "c"])
  .enum()
  .map(([item, i]) => `${i + 1}. ${item}`)
  .collect();
// ["1. a", "2. b", "3. c"]

tee()

Split into multiple streams:

const [stream1, stream2] = enumerate([1, 2, 3]).tee();

const [sum, product] = await Promise.all([
  stream1.reduce((a, b) => a + b, 0),
  stream2.reduce((a, b) => a * b, 1),
]);

flatten()

Flatten nested iterables:

const flat = await enumerate([[1, 2], [3, 4]])
  .flatten()
  .collect();
// [1, 2, 3, 4]

Concurrent Operations

concurrentMap()

Map with controlled concurrency:

const results = await enumerate(urls)
  .concurrentMap(async (url) => {
    return await fetch(url);
  }, { concurrency: 5 })
  .collect();

Results are returned in order.

concurrentUnorderedMap()

Map with maximum concurrency:

const results = await enumerate(urls)
  .concurrentUnorderedMap(async (url) => {
    return await fetch(url);
  }, { concurrency: 5 })
  .collect();

Results are returned as they complete (faster).

Chaining Examples

Complex Pipeline

const result = await enumerate(data)
  .filter(item => item.active)
  .map(item => item.value)
  .filter(value => value > 0)
  .map(value => value * 2)
  .take(10)
  .collect();

Real-World Example

const topErrors = await read("app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .map(line => {
    const match = line.match(/ERROR: (.+)/);
    return match ? match[1] : line;
  })
  .reduce((acc, error) => {
    acc[error] = (acc[error] || 0) + 1;
    return acc;
  }, {});

Performance Tips

Use Streaming

Don't collect if you don't need to:

// ❌ Loads everything
const items = await enumerate(huge).collect();
for (const item of items) process(item);

// ✅ Streams
for await (const item of enumerate(huge)) {
  process(item);
}

Use take() for Limits

// Get first 10 matches
const matches = await enumerate(data)
  .filter(predicate)
  .take(10)
  .collect();

Use concurrentMap() for I/O

// Process 5 URLs at a time
const results = await enumerate(urls)
  .concurrentMap(fetch, { concurrency: 5 })
  .collect();

Next Steps

Transformations

Change data as it flows through your pipeline.

map()

Transform each item:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const doubled = await enumerate([1, 2, 3])
  .map(n => n * 2)
  .collect();
// [2, 4, 6]

With Async Functions

const results = await enumerate(urls)
  .map(async (url) => {
    const response = await fetch(url);
    return response.json();
  })
  .collect();

Type Transformations

const strings = await enumerate([1, 2, 3])
  .map(n => n.toString())
  .collect();
// ["1", "2", "3"]

Complex Transformations

const processed = await enumerate(rawData)
  .map(item => ({
    id: item.id,
    name: item.name.toUpperCase(),
    value: parseFloat(item.value),
    timestamp: new Date(item.timestamp)
  }))
  .collect();

flatMap()

Map and flatten in one step:

const words = await enumerate(["hello world", "foo bar"])
  .flatMap(line => line.split(" "))
  .collect();
// ["hello", "world", "foo", "bar"]

Expanding Items

const expanded = await enumerate([1, 2, 3])
  .flatMap(n => [n, n * 10])
  .collect();
// [1, 10, 2, 20, 3, 30]

Filtering While Mapping

const valid = await enumerate(data)
  .flatMap(item => {
    if (item.valid) {
      return [item.value];
    }
    return [];  // Skip invalid items
  })
  .collect();

filter()

Keep only matching items:

const evens = await enumerate([1, 2, 3, 4, 5])
  .filter(n => n % 2 === 0)
  .collect();
// [2, 4]

Complex Predicates

const active = await enumerate(users)
  .filter(user => 
    user.active && 
    user.lastLogin > cutoffDate &&
    user.role !== "guest"
  )
  .collect();

With Type Guards

const numbers = await enumerate(mixed)
  .filter((item): item is number => typeof item === "number")
  .collect();

transform()

Apply a TransformStream:

import { read } from "jsr:@j50n/proc@0.23.3";

const decompressed = await read("file.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .collect();

Custom Transform

const transformed = await enumerate(data)
  .transform(new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    }
  }))
  .collect();

Chaining Transformations

Combine multiple transformations:

const result = await enumerate(data)
  .map(item => item.trim())
  .filter(item => item.length > 0)
  .map(item => item.toUpperCase())
  .filter(item => item.startsWith("A"))
  .collect();

Real-World Examples

Parse CSV

const data = await read("data.csv")
  .lines
  .drop(1)  // Skip header
  .map(line => line.split(","))
  .map(([name, age, city]) => ({
    name,
    age: parseInt(age),
    city
  }))
  .filter(row => row.age >= 18)
  .collect();

Extract URLs

const urls = await read("page.html")
  .lines
  .flatMap(line => {
    const matches = line.match(/https?:\/\/[^\s"']+/g);
    return matches || [];
  })
  .collect();

Clean Data

const cleaned = await enumerate(rawData)
  .map(item => item.trim())
  .filter(item => item.length > 0)
  .map(item => item.toLowerCase())
  .filter(item => !item.startsWith("#"))
  .collect();

Transform JSON Lines

const objects = await read("data.jsonl")
  .lines
  .map(line => JSON.parse(line))
  .filter(obj => obj.status === "active")
  .map(obj => ({
    id: obj.id,
    name: obj.name,
    value: obj.value * 1.1  // Apply 10% increase
  }))
  .collect();

Performance Tips

Lazy Evaluation

Transformations don't run until you consume:

// Nothing happens yet
const pipeline = enumerate(data)
  .map(expensive)
  .filter(predicate);

// Now it runs
const result = await pipeline.collect();

Early Filtering

Filter before expensive operations:

// ✅ Filter first
const result = await enumerate(data)
  .filter(cheap)      // Fast filter
  .map(expensive)     // Expensive operation
  .collect();

// ❌ Map first
const result = await enumerate(data)
  .map(expensive)     // Runs on everything
  .filter(cheap)      // Then filters
  .collect();

Use take() to Limit

// Stop after 10 matches
const first10 = await enumerate(huge)
  .filter(predicate)
  .take(10)
  .collect();

Common Patterns

Normalize Data

const normalized = await enumerate(data)
  .map(item => ({
    ...item,
    name: item.name.trim().toLowerCase(),
    email: item.email.toLowerCase(),
    phone: item.phone.replace(/\D/g, "")
  }))
  .collect();

Extract Fields

const names = await enumerate(users)
  .map(user => user.name)
  .collect();

Conditional Transform

const processed = await enumerate(items)
  .map(item => {
    if (item.type === "A") {
      return processTypeA(item);
    } else {
      return processTypeB(item);
    }
  })
  .collect();

Batch Transform

const batched = await enumerate(items)
  .map((item, i) => ({
    ...item,
    batch: Math.floor(i / 100)
  }))
  .collect();

Error Handling

Errors in transformations propagate:

try {
  await enumerate(data)
    .map(item => {
      if (!item.valid) {
        throw new Error(`Invalid item: ${item.id}`);
      }
      return item.value;
    })
    .collect();
} catch (error) {
  console.error(`Transform failed: ${error.message}`);
}

Next Steps

Aggregations

Combine many items into one result.

reduce()

The Swiss Army knife of aggregations:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const sum = await enumerate([1, 2, 3, 4])
  .reduce((acc, n) => acc + n, 0);
// 10

How It Works

// Start with initial value: 0
// Step 1: 0 + 1 = 1
// Step 2: 1 + 2 = 3
// Step 3: 3 + 3 = 6
// Step 4: 6 + 4 = 10

Building Objects

const grouped = await enumerate(items)
  .reduce((acc, item) => {
    const key = item.category;
    acc[key] = acc[key] || [];
    acc[key].push(item);
    return acc;
  }, {});

Calculating Statistics

const stats = await enumerate(numbers)
  .reduce((acc, n) => ({
    sum: acc.sum + n,
    count: acc.count + 1,
    min: Math.min(acc.min, n),
    max: Math.max(acc.max, n)
  }), { sum: 0, count: 0, min: Infinity, max: -Infinity });

const average = stats.sum / stats.count;

count()

Count items:

const total = await enumerate([1, 2, 3, 4, 5]).count();
// 5

Count Matches

const errorCount = await read("app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .count();

some()

Check if any item matches:

const hasError = await enumerate(lines)
  .some(line => line.includes("ERROR"));
// true or false

Early Exit

Stops as soon as it finds a match:

// Stops reading after first match
const hasLargeFile = await enumerate(files)
  .some(file => file.size > 1_000_000_000);

every()

Check if all items match:

const allPositive = await enumerate([1, 2, 3, 4])
  .every(n => n > 0);
// true

Validation

const allValid = await enumerate(records)
  .every(record => 
    record.name && 
    record.email && 
    record.age >= 0
  );

find()

Find first matching item:

const firstError = await enumerate(lines)
  .find(line => line.includes("ERROR"));
// First line with ERROR, or undefined

With Complex Predicate

const admin = await enumerate(users)
  .find(user => 
    user.role === "admin" && 
    user.active
  );

Real-World Examples

Sum Values

const total = await enumerate(orders)
  .map(order => order.amount)
  .reduce((sum, amount) => sum + amount, 0);

Count by Category

const counts = await enumerate(items)
  .reduce((acc, item) => {
    acc[item.category] = (acc[item.category] || 0) + 1;
    return acc;
  }, {});

Find Maximum

const max = await enumerate(numbers)
  .reduce((max, n) => Math.max(max, n), -Infinity);

Build Index

const index = await enumerate(items)
  .reduce((acc, item) => {
    acc[item.id] = item;
    return acc;
  }, {});

Concatenate Strings

const combined = await enumerate(words)
  .reduce((acc, word) => acc + " " + word, "");

Collect Unique Values

const unique = await enumerate(items)
  .reduce((acc, item) => {
    acc.add(item);
    return acc;
  }, new Set());

Advanced Patterns

Running Average

const runningAvg = await enumerate(numbers)
  .reduce((acc, n) => {
    acc.sum += n;
    acc.count += 1;
    acc.average = acc.sum / acc.count;
    return acc;
  }, { sum: 0, count: 0, average: 0 });

Nested Grouping

const grouped = await enumerate(items)
  .reduce((acc, item) => {
    const cat = item.category;
    const type = item.type;
    
    acc[cat] = acc[cat] || {};
    acc[cat][type] = acc[cat][type] || [];
    acc[cat][type].push(item);
    
    return acc;
  }, {});

Frequency Map

const frequency = await enumerate(words)
  .reduce((acc, word) => {
    acc[word] = (acc[word] || 0) + 1;
    return acc;
  }, {});

// Find most common
const mostCommon = Object.entries(frequency)
  .sort((a, b) => b[1] - a[1])[0];

Accumulate with Transform

const processed = await enumerate(data)
  .reduce((acc, item) => {
    const transformed = transform(item);
    if (transformed.valid) {
      acc.push(transformed);
    }
    return acc;
  }, []);

Performance Tips

Use Specific Methods

// ❌ Slower
const count = await enumerate(items)
  .reduce((acc) => acc + 1, 0);

// ✅ Faster
const count = await enumerate(items).count();

Early Exit with some/every

// Stops at first match
const hasMatch = await enumerate(huge)
  .some(predicate);

// Better than
const matches = await enumerate(huge)
  .filter(predicate)
  .count();

Combine Operations

// ✅ One pass
const stats = await enumerate(numbers)
  .reduce((acc, n) => ({
    sum: acc.sum + n,
    count: acc.count + 1
  }), { sum: 0, count: 0 });

// ❌ Two passes
const sum = await enumerate(numbers).reduce((a, b) => a + b, 0);
const count = await enumerate(numbers).count();

Common Mistakes

Forgetting Initial Value

// ❌ Error with empty array
const sum = await enumerate([]).reduce((a, b) => a + b);

// ✅ Works with empty array
const sum = await enumerate([]).reduce((a, b) => a + b, 0);

Not Returning Accumulator

// ❌ Returns undefined
const result = await enumerate(items)
  .reduce((acc, item) => {
    acc.push(item);
    // Missing return!
  }, []);

// ✅ Returns accumulator
const result = await enumerate(items)
  .reduce((acc, item) => {
    acc.push(item);
    return acc;
  }, []);

Next Steps

Slicing and Sampling

Take portions of your data stream.

take()

Take first N items:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const first3 = await enumerate([1, 2, 3, 4, 5])
  .take(3)
  .collect();
// [1, 2, 3]

Early Exit

Stops reading after N items:

// Only reads first 10 lines
const preview = await read("huge-file.txt")
  .lines
  .take(10)
  .collect();

With Filter

// First 5 errors
const errors = await read("app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .take(5)
  .collect();

drop()

Skip first N items:

const rest = await enumerate([1, 2, 3, 4, 5])
  .drop(2)
  .collect();
// [3, 4, 5]

Skip Header

const data = await read("data.csv")
  .lines
  .drop(1)  // Skip header row
  .collect();

Combining drop() and take()

Get a range of items by combining drop and take:

const middle = await enumerate([1, 2, 3, 4, 5])
  .drop(1)
  .take(3)
  .collect();
// [2, 3, 4]

Pagination

const page = 2;
const pageSize = 10;

const items = await enumerate(allItems)
  .drop(page * pageSize)
  .take(pageSize)
  .collect();

first

Get first item:

const first = await enumerate([1, 2, 3]).first;
// 1

With Pipeline

const result = await run("ls", "-la")
  .lines
  .first;

last

Get last item:

const last = await enumerate([1, 2, 3]).last;
// 3

Note: Reads entire stream to find last item.

nth()

Get item at index:

const third = await enumerate([1, 2, 3, 4, 5]).nth(2);
// 3 (zero-indexed)

Real-World Examples

Preview File

console.log("First 10 lines:");
await read("file.txt")
  .lines
  .take(10)
  .forEach(line => console.log(line));

Skip and Take

// Lines 11-20
const batch = await read("file.txt")
  .lines
  .drop(10)
  .take(10)
  .collect();

Sample Data

// Every 10th item
const sample = await enumerate(data)
  .filter((_, i) => i % 10 === 0)
  .collect();

Find Nth Match

// 5th error
const fifthError = await read("app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .nth(4);  // Zero-indexed

Performance Tips

Use take() for Limits

// ✅ Stops early
const first100 = await enumerate(huge)
  .take(100)
  .collect();

// ❌ Reads everything
const all = await enumerate(huge).collect();
const first100 = all.slice(0, 100);  // Array slice, not Enumerable

Combine with Filter

// Efficient: stops after 10 matches
const matches = await enumerate(data)
  .filter(predicate)
  .take(10)
  .collect();

Next Steps

Concurrent Processing

Process multiple items in parallel with controlled concurrency. It's easier than you think.

The Problem

You have a list of URLs to fetch. Sequential is slow:

// Takes 10 seconds for 10 URLs (1 second each)
for (const url of urls) {
  await fetch(url);
}

Promise.all is fast but dangerous:

// Starts 1000 requests at once - might crash
await Promise.all(urls.map(url => fetch(url)));

The Solution

proc gives you controlled concurrency:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

// Defaults to CPU count (usually 4-8)
const results = await enumerate(urls)
  .concurrentMap(async (url) => {
    return await fetch(url);
  })
  .collect();

// Or specify a limit
const results = await enumerate(urls)
  .concurrentMap(async (url) => {
    return await fetch(url);
  }, { concurrency: 5 })
  .collect();

Fast, but won't overwhelm your system.

Understanding JavaScript Concurrency

Important: JavaScript concurrency is not parallelism. You're running on a single thread.

What This Means

When you use concurrentMap or concurrentUnorderedMap, you're not creating threads or workers. You're managing multiple async operations on one thread. The JavaScript event loop switches between them when they're waiting.

This works great for:

  • Network requests - While waiting for a response, other operations run
  • File I/O - While waiting for disk reads/writes, other operations run
  • Process execution - While a child process runs, other operations continue
  • Database queries - While waiting for results, other operations run

This does NOT work for:

  • CPU-intensive calculations - Pure JavaScript math, parsing, etc. blocks everything
  • Synchronous operations - Anything that doesn't await blocks the thread
  • Heavy computation - You still only have one CPU core's worth of processing power

Example: What Works

// ✅ Good: I/O-bound operations run concurrently
const results = await enumerate(urls)
  .concurrentUnorderedMap(async (url) => {
    // While waiting for fetch, other URLs are being fetched
    const response = await fetch(url);
    return response.json();
  })
  .collect();
// This is genuinely faster - 10 URLs in ~1 second instead of ~10 seconds

Example: What Doesn't Work

// ❌ Bad: CPU-bound operations don't benefit
const results = await enumerate(numbers)
  .concurrentUnorderedMap(async (n) => {
    // This blocks the thread - no other operations can run
    let result = 0;
    for (let i = 0; i < 1000000; i++) {
      result += Math.sqrt(n * i);
    }
    return result;
  })
  .collect();
// This is NOT faster - still uses one CPU core sequentially

Why It Still Matters

Even though it's not true parallelism, concurrency is incredibly useful:

  1. I/O operations dominate - Most real-world tasks are waiting for network/disk
  2. Child processes run in parallel - When you run() a command, it uses a separate process
  3. Better resource utilization - Keep the CPU busy while waiting for I/O
  4. Simpler than threads - No race conditions, no locks, no shared memory issues

When You Need True Parallelism

If you need to parallelize CPU-intensive JavaScript code, use:

  • Web Workers (in browsers)
  • Worker Threads (in Node.js/Deno)
  • Child processes with run() - each process gets its own CPU

But for most tasks (fetching URLs, processing files, running commands), JavaScript's concurrency model is perfect.

When to Use Concurrent Processing

Use concurrentUnorderedMap() (recommended default) when:

  • Order doesn't matter - you want maximum speed
  • Processing independent tasks where results can arrive in any order
  • You'll sort or aggregate results anyway
  • This is usually what you want - it keeps all workers busy and delivers results as they complete
  • Example: Downloading files, processing logs, fetching data you'll aggregate

Use concurrentMap() when:

  • You must have results in the same order as input
  • Be aware: can bottleneck on the slowest item in each batch
  • If work isn't balanced, faster items wait for slower ones
  • Example: Fetching user profiles where display order must match input order

Use sequential processing when:

  • Tasks depend on each other
  • You must respect strict rate limits
  • Order is critical and tasks are fast
  • Example: Database transactions that must happen in sequence

Choose concurrency level based on:

  • I/O-bound tasks (network, disk): Start with 5-10, increase if you have bandwidth (see "Understanding JavaScript Concurrency" above)
  • CPU-bound tasks: Won't benefit from concurrency - use Worker Threads or child processes instead
  • Rate-limited APIs: Match the rate limit (e.g., 10 requests/second = concurrency 1 with 100ms delays)
  • Memory constraints: Lower concurrency if processing large data per task

Process items concurrently, return results as they complete (fastest):

// Defaults to CPU count
const results = await enumerate([1, 2, 3, 4, 5])
  .concurrentUnorderedMap(async (n) => {
    await sleep(Math.random() * 1000);
    return n * 2;
  })
  .collect();
// [6, 2, 10, 4, 8] - order varies, but all workers stay busy

Why it's faster: Results are delivered as soon as they're ready. If item 3 finishes before item 1, you get item 3 immediately. No waiting for slower items.

Use when: You don't care about order (most cases). Better performance under real-world conditions where work isn't perfectly balanced.

Concurrency: Defaults to navigator.hardwareConcurrency (CPU count). Override with { concurrency: n } if needed.

concurrentMap() - Order Preserved

Process items concurrently, return results in input order:

const results = await enumerate([1, 2, 3, 4, 5])
  .concurrentMap(async (n) => {
    await sleep(Math.random() * 1000);
    return n * 2;
  }, { concurrency: 3 })
  .collect();
// [2, 4, 6, 8, 10] - always in order

Performance caveat: If item 1 takes 5 seconds and item 2 takes 1 second, item 2 waits for item 1 before being returned. This can create bottlenecks where fast items wait for slow ones.

Use when: You specifically need results in the same order as input. Only use if order truly matters for your use case.

Concurrency: Defaults to CPU count. Override with { concurrency: n } if needed.

Real-World Examples

Fetch Multiple URLs

const urls = [
  "https://api.example.com/1",
  "https://api.example.com/2",
  "https://api.example.com/3",
  // ... 100 more
];

// Uses CPU count by default
const data = await enumerate(urls)
  .concurrentUnorderedMap(async (url) => {
    const response = await fetch(url);
    return response.json();
  })
  .collect();

// Or limit for rate-limited APIs
const data = await enumerate(urls)
  .concurrentUnorderedMap(async (url) => {
    const response = await fetch(url);
    return response.json();
  }, { concurrency: 10 })
  .collect();

Process Files in Parallel

import { read } from "jsr:@j50n/proc@0.23.3";

const files = ["log1.txt", "log2.txt", "log3.txt"];

const results = await enumerate(files)
  .concurrentMap(async (file) => {
    const errors = await read(file)
      .lines
      .filter(line => line.includes("ERROR"))
      .count();
    return { file, errors };
  })
  .collect();

Download and Process

const downloads = await enumerate(imageUrls)
  .concurrentUnorderedMap(async (url) => {
    const response = await fetch(url);
    const blob = await response.blob();
    return processImage(blob);
  })
  .collect();

Choosing Concurrency

Default behavior: Both methods default to navigator.hardwareConcurrency (CPU count, typically 4-8). This is usually a good starting point.

When to override:

For I/O-bound tasks (network, disk):

  • Default is often fine
  • Increase to 10-20 if you have bandwidth and no rate limits
  • Decrease to 1-5 for rate-limited APIs

For CPU-bound tasks:

  • Default (CPU count) is optimal
  • Don't increase - you'll just add overhead

For rate-limited APIs:

  • Set to match the rate limit
  • Add delays if needed
// Respect rate limits with low concurrency
const results = await enumerate(apiCalls)
  .concurrentUnorderedMap(async (call) => {
    const result = await makeApiCall(call);
    await sleep(100); // 10 requests per second
    return result;
  }, { concurrency: 1 })
  .collect();

Error Handling

Errors propagate naturally:

try {
  const results = await enumerate(urls)
    .concurrentMap(async (url) => {
      const response = await fetch(url);
      if (!response.ok) {
        throw new Error(`Failed: ${url}`);
      }
      return response.json();
    }, { concurrency: 5 })
    .collect();
} catch (error) {
  // First error stops everything
  console.error(`Failed: ${error.message}`);
}

Progress Tracking

Track progress as items complete:

let completed = 0;
const total = urls.length;

const results = await enumerate(urls)
  .concurrentMap(async (url) => {
    const result = await fetch(url);
    completed++;
    console.log(`Progress: ${completed}/${total}`);
    return result;
  }, { concurrency: 5 })
  .collect();

Combining with Other Operations

Chain concurrent operations with other methods:

const results = await enumerate(urls)
  .concurrentMap(fetch, { concurrency: 5 })
  .filter(response => response.ok)
  .concurrentMap(response => response.json(), { concurrency: 5 })
  .filter(data => data.active)
  .collect();

Performance Comparison

// Sequential: 10 seconds (one at a time)
for (const url of urls) {
  await fetch(url);
}

// concurrentMap (5): 2-4 seconds
// Can bottleneck if one item is slow - others wait
await enumerate(urls)
  .concurrentMap(fetch, { concurrency: 5 })
  .collect();

// concurrentUnorderedMap (5): 2 seconds
// Faster - no waiting, results delivered as ready
await enumerate(urls)
  .concurrentUnorderedMap(fetch, { concurrency: 5 })
  .collect();

Why unordered is faster: Imagine 5 tasks with times [1s, 1s, 1s, 1s, 5s]. With concurrentMap, the 5-second task blocks its batch. With concurrentUnorderedMap, the four 1-second tasks complete and return immediately while the 5-second task finishes in the background.

Advanced Patterns

Batch Processing

Process in batches:

const batchSize = 10;
for (let i = 0; i < items.length; i += batchSize) {
  const batch = items.slice(i, i + batchSize);
  const results = await enumerate(batch)
    .concurrentMap(process, { concurrency: 5 })
    .collect();
  await saveBatch(results);
}

Retry Failed Items

const results = await enumerate(urls)
  .concurrentMap(async (url) => {
    let attempts = 0;
    while (attempts < 3) {
      try {
        return await fetch(url);
      } catch (error) {
        attempts++;
        if (attempts === 3) throw error;
        await sleep(1000 * attempts);
      }
    }
  }, { concurrency: 5 })
  .collect();

Dynamic Concurrency

Adjust concurrency based on results:

let concurrency = 5;

for (const batch of batches) {
  const start = Date.now();
  
  const results = await enumerate(batch)
    .concurrentMap(process, { concurrency })
    .collect();
  
  const duration = Date.now() - start;
  
  // Adjust based on performance
  if (duration < 1000) concurrency = Math.min(concurrency + 1, 20);
  if (duration > 5000) concurrency = Math.max(concurrency - 1, 1);
}

Best Practices

  1. Prefer unordered - Use concurrentUnorderedMap unless you specifically need order
  2. Start conservative - Begin with low concurrency, increase if needed
  3. Monitor resources - Watch memory and network usage
  4. Respect rate limits - Don't overwhelm external services
  5. Handle errors - One error stops everything, handle gracefully
  6. Understand the bottleneck - concurrentMap can wait on slow items; unordered doesn't

Common Mistakes

Too Much Concurrency

// ❌ Might crash with 10,000 concurrent requests
await enumerate(hugeList)
  .concurrentMap(fetch, { concurrency: 10000 })
  .collect();

// ✅ Reasonable concurrency
await enumerate(hugeList)
  .concurrentMap(fetch, { concurrency: 10 })
  .collect();

Forgetting to Await

// ❌ Returns promises, not results
const promises = enumerate(urls)
  .concurrentMap(fetch, { concurrency: 5 });

// ✅ Await the results
const results = await enumerate(urls)
  .concurrentMap(fetch, { concurrency: 5 })
  .collect();

Next Steps

Streaming Large Files

Process files bigger than your RAM. It's easier than you think.

When to Stream vs Collect

Always stream when:

  • File is larger than available RAM (or even close to it)
  • You don't need all data at once
  • Processing can be done incrementally (line-by-line, record-by-record)
  • You want to start processing immediately without waiting for full download/read
  • Memory efficiency is important

Consider collecting when:

  • File is small (< 100MB) and fits comfortably in memory
  • You need random access to data
  • You need to process data multiple times
  • You need to sort or aggregate all data before processing
  • Memory is not a concern

Memory/Speed Tradeoffs:

  • Streaming: Constant memory (~64KB buffer), processes as data arrives, can't random access
  • Collecting: Memory = file size, all data available immediately, can random access, must wait for full load

Example decision:

// 10GB log file - MUST stream
for await (const line of read("huge.log").lines) {
  if (line.includes("ERROR")) console.log(line);
}

// 1MB config file - can collect
const config = await read("config.json").lines.collect();
const parsed = JSON.parse(config.join("\n"));

// 500MB data file - stream if processing once
const sum = await read("numbers.txt")
  .lines
  .map(line => parseFloat(line))
  .reduce((a, b) => a + b, 0);

The Problem

You have a 10GB log file. Loading it into memory crashes your program:

// ❌ Crashes with large files
const content = await Deno.readTextFile("huge.log");
const lines = content.split("\n");

The Solution

Stream it, one line at a time:

import { read } from "jsr:@j50n/proc@0.23.3";

// ✅ Constant memory, any file size
for await (const line of read("huge.log").lines) {
  if (line.includes("ERROR")) {
    console.log(line);
  }
}

How Streaming Works

Instead of loading everything:

  1. Read a chunk (buffer)
  2. Process it
  3. Discard it
  4. Repeat

Memory usage stays constant, no matter how big the file.

Real Examples

Count Lines in Huge File

const count = await read("10gb-file.txt").lines.count();
console.log(`${count} lines`);

Uses ~constant memory, even for 10GB.

Find Pattern in Large File

const matches = await read("huge.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .take(10)  // Stop after 10 matches
  .collect();

Stops reading once it finds 10 matches. Efficient!

Process CSV File

const data = await read("huge-data.csv")
  .lines
  .drop(1)  // Skip header
  .map(line => {
    const [id, name, value] = line.split(",");
    return { id, name, value: parseFloat(value) };
  })
  .filter(row => row.value > 100)
  .collect();

Aggregate Large Dataset

const sum = await read("numbers.txt")
  .lines
  .map(line => parseFloat(line))
  .reduce((acc, n) => acc + n, 0);

Compressed Files

Stream compressed files without extracting:

const lineCount = await read("huge.log.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .count();

Decompresses on-the-fly, never stores uncompressed data.

Multiple Files

Process multiple large files:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const files = ["log1.txt", "log2.txt", "log3.txt"];

for (const file of files) {
  const errors = await read(file)
    .lines
    .filter(line => line.includes("ERROR"))
    .count();
  console.log(`${file}: ${errors} errors`);
}

Streaming Transformations

Chain transformations, all streaming:

const result = await read("data.txt")
  .lines
  .map(line => line.trim())
  .filter(line => line.length > 0)
  .map(line => line.toUpperCase())
  .filter(line => line.startsWith("ERROR"))
  .collect();

Each line flows through all transformations before the next line is read.

Writing Large Files

Stream output to a file:

import { concat } from "jsr:@j50n/proc@0.23.3";

const processed = await read("input.txt")
  .lines
  .map(line => line.toUpperCase())
  .map(line => new TextEncoder().encode(line + "\n"))
  .collect();

await Deno.writeFile("output.txt", concat(processed));

Performance Tips

Use take() for Early Exit

// Stops reading after 100 matches
const first100 = await read("huge.txt")
  .lines
  .filter(predicate)
  .take(100)
  .collect();

Don't Collect Unless Needed

// ❌ Loads everything into memory
const lines = await read("huge.txt").lines.collect();
for (const line of lines) process(line);

// ✅ Streams
for await (const line of read("huge.txt").lines) {
  process(line);
}

Use Concurrent Processing

Process multiple files in parallel:

const results = await enumerate(files)
  .concurrentMap(async (file) => {
    return await read(file).lines.count();
  }, { concurrency: 3 })
  .collect();

Memory Usage

Streaming uses constant memory:

// File size: 10GB
// Memory used: ~64KB (buffer size)
await read("10gb-file.txt")
  .lines
  .forEach(line => process(line));

Real-World Example

Analyze a year of logs:

const errorsByDay = await read("year-of-logs.txt")
  .lines
  .filter(line => line.includes("ERROR"))
  .map(line => {
    const date = line.match(/\d{4}-\d{2}-\d{2}/)?.[0];
    return date;
  })
  .filter(date => date !== null)
  .reduce((acc, date) => {
    acc[date] = (acc[date] || 0) + 1;
    return acc;
  }, {});

// Show top 10 error days
Object.entries(errorsByDay)
  .sort((a, b) => b[1] - a[1])
  .slice(0, 10)
  .forEach(([date, count]) => {
    console.log(`${date}: ${count} errors`);
  });

Processes gigabytes of logs with minimal memory.

When to Stream

Always stream when:

  • File is larger than available RAM
  • You don't need all data at once
  • Processing can be done incrementally
  • You want to start processing immediately

Consider collecting when:

  • File is small (< 100MB)
  • You need random access
  • You need to process data multiple times
  • Memory is not a concern

Common Patterns

Filter and Count

const count = await read("file.txt")
  .lines
  .filter(predicate)
  .count();

Transform and Save

const output = await read("input.txt")
  .lines
  .map(transform)
  .map(line => new TextEncoder().encode(line + "\n"))
  .collect();

await Deno.writeFile("output.txt", concat(output));

Aggregate Data

const stats = await read("data.txt")
  .lines
  .reduce((acc, line) => {
    // Update statistics
    return acc;
  }, initialStats);

Next Steps

File I/O

Read and write files with streaming efficiency.

Reading Files

Read as Bytes

import { read } from "jsr:@j50n/proc@0.23.3";

const bytes = await read("file.bin").collect();
// Uint8Array[]

Read as Lines

const lines = await read("file.txt").lines.collect();
// string[]

Stream Large Files

Process files larger than memory:

for await (const line of read("huge-file.txt").lines) {
  process(line);  // One line at a time
}

File Paths

Relative Paths

read("data.txt")  // Relative to current directory

Absolute Paths

read("/var/log/app.log")

URLs

const path = new URL("./data.txt", import.meta.url);
read(path)

Common Patterns

Count Lines

const lineCount = await read("file.txt").lines.count();

Find Pattern

const matches = await read("file.txt")
  .lines
  .filter(line => line.includes("ERROR"))
  .collect();

Transform Lines

const processed = await read("input.txt")
  .lines
  .map(line => line.toUpperCase())
  .collect();

Parse CSV

const data = await read("data.csv")
  .lines
  .drop(1)  // Skip header
  .map(line => {
    const [name, age, city] = line.split(",");
    return { name, age: parseInt(age), city };
  })
  .collect();

Writing Files

Write Array to File

const lines = ["line 1", "line 2", "line 3"];
const content = lines.join("\n");
await Deno.writeTextFile("output.txt", content);

Stream to File

import { concat } from "jsr:@j50n/proc@0.23.3";

const bytes = await read("input.txt")
  .lines
  .map(line => new TextEncoder().encode(line + "\n"))
  .collect();

await Deno.writeFile("output.txt", concat(bytes));

Working with Binary Files

Read Binary

const bytes = await read("image.png").collect();
const data = concat(bytes);

Process Binary

const processed = await read("data.bin")
  .map(chunk => {
    // Process each chunk
    return transform(chunk);
  })
  .collect();

Compressed Files

Read Compressed

const lines = await read("file.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .collect();

See Decompressing Files for more.

Multiple Files

Process Multiple Files

const files = ["log1.txt", "log2.txt", "log3.txt"];

for (const file of files) {
  const errors = await read(file)
    .lines
    .filter(line => line.includes("ERROR"))
    .count();
  console.log(`${file}: ${errors} errors`);
}

Concurrent Processing

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const results = await enumerate(files)
  .concurrentMap(async (file) => {
    const lines = await read(file).lines.count();
    return { file, lines };
  }, { concurrency: 3 })
  .collect();

Error Handling

File Not Found

try {
  const lines = await read("missing.txt").lines.collect();
} catch (error) {
  console.error(`Failed to read file: ${error.message}`);
}

Permission Denied

try {
  const lines = await read("/root/secret.txt").lines.collect();
} catch (error) {
  if (error instanceof Deno.errors.PermissionDenied) {
    console.error("Permission denied");
  }
}

Performance Tips

Stream Don't Collect

// ❌ Loads entire file into memory
const lines = await read("huge.txt").lines.collect();

// ✅ Processes one line at a time
for await (const line of read("huge.txt").lines) {
  process(line);
}

Use Chunked Lines for Performance

For files with many small lines:

const chunks = await read("file.txt").chunkedLines.collect();
// Array of string arrays

Real-World Examples

Log Analysis

const errorsByType = await read("app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .reduce((acc, line) => {
    const type = line.match(/ERROR: (\w+)/)?.[1] || "unknown";
    acc[type] = (acc[type] || 0) + 1;
    return acc;
  }, {});

Data Extraction

const emails = await read("contacts.txt")
  .lines
  .map(line => line.match(/[\w.-]+@[\w.-]+\.\w+/))
  .filter(match => match !== null)
  .map(match => match[0])
  .collect();

File Conversion

const jsonLines = await read("data.csv")
  .lines
  .drop(1)  // Skip header
  .map(line => {
    const [name, age] = line.split(",");
    return JSON.stringify({ name, age: parseInt(age) });
  })
  .collect();

await Deno.writeTextFile("data.jsonl", jsonLines.join("\n"));

Next Steps

Range and Iteration

Generate sequences of numbers lazily.

Basic Range

import { range } from "jsr:@j50n/proc@0.23.3";

const numbers = await range({ to: 5 }).collect();
// [0, 1, 2, 3, 4]

Exclusive vs Inclusive

to (exclusive)

const nums = await range({ to: 3 }).collect();
// [0, 1, 2]

until (inclusive)

const nums = await range({ until: 3 }).collect();
// [0, 1, 2, 3]

Custom Start

const nums = await range({ from: 5, to: 10 }).collect();
// [5, 6, 7, 8, 9]

Custom Step

const evens = await range({ from: 0, to: 10, step: 2 }).collect();
// [0, 2, 4, 6, 8]

Counting Down

const countdown = await range({ from: 5, to: 0, step: -1 }).collect();
// [5, 4, 3, 2, 1]

Real-World Examples

Repeat N Times

await range({ to: 10 }).forEach(i => {
  console.log(`Iteration ${i}`);
});

Generate Test Data

const users = await range({ to: 100 })
  .map(i => ({
    id: i,
    name: `User ${i}`,
    email: `user${i}@example.com`
  }))
  .collect();

Batch Processing

const batchSize = 10;
const total = 100;

for await (const batch of range({ from: 0, to: total, step: batchSize })) {
  const items = data.slice(batch, batch + batchSize);
  await processBatch(items);
}

Pagination

const pages = Math.ceil(total / pageSize);

for await (const page of range({ to: pages })) {
  const items = await fetchPage(page);
  await processItems(items);
}

Retry Logic

for await (const attempt of range({ to: 3 })) {
  try {
    await operation();
    break;
  } catch (error) {
    if (attempt === 2) throw error;
    await sleep(1000 * (attempt + 1));
  }
}

Infinite Ranges

Warning: Don't collect infinite ranges!

// ❌ Never completes
const infinite = await range({ from: 0, to: Infinity }).collect();

// ✅ Use with take()
const first100 = await range({ from: 0, to: Infinity })
  .take(100)
  .collect();

Performance

Ranges are lazy—numbers generated on demand:

// Doesn't generate all numbers upfront
const huge = range({ to: 1_000_000_000 });

// Only generates what you use
const first10 = await huge.take(10).collect();

Next Steps

Zip and Enumerate

Combine and index iterables.

enumerate()

Wrap any iterable for Array-like methods:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const result = await enumerate([1, 2, 3])
  .map(n => n * 2)
  .collect();
// [2, 4, 6]

.enum()

Add indices to items:

const indexed = await enumerate(["a", "b", "c"])
  .enum()
  .collect();
// [["a", 0], ["b", 1], ["c", 2]]

Format with Indices

const numbered = await enumerate(["apple", "banana", "cherry"])
  .enum()
  .map(([fruit, i]) => `${i + 1}. ${fruit}`)
  .collect();
// ["1. apple", "2. banana", "3. cherry"]

zip()

Combine two iterables:

import { zip } from "jsr:@j50n/proc@0.23.3";

const names = ["Alice", "Bob", "Charlie"];
const ages = [25, 30, 35];

const people = await zip(names, ages)
  .map(([name, age]) => ({ name, age }))
  .collect();
// [{ name: "Alice", age: 25 }, ...]

Multiple Iterables

const combined = await zip(iter1, iter2)
  .map(([a, b]) => a + b)
  .collect();

Real-World Examples

Number Lines

const numbered = await read("file.txt")
  .lines
  .enum()
  .map(([line, i]) => `${i + 1}: ${line}`)
  .forEach(console.log);

Combine Data Sources

const merged = await zip(
  read("names.txt").lines,
  read("emails.txt").lines
)
  .map(([name, email]) => ({ name, email }))
  .collect();

Track Progress

const items = [...]; // Large array

await enumerate(items)
  .enum()
  .forEach(([item, i]) => {
    console.log(`Processing ${i + 1}/${items.length}`);
    process(item);
  });

Next Steps

WritableIterable

WritableIterable is a fascinating utility that inverts the normal data flow: instead of pulling data from an iterable, you push data into it. It bridges the gap between push-based (callbacks, events) and pull-based (async iteration) programming models.

The Problem It Solves

Imagine you have a callback-based API (like event emitters, WebSocket messages, or sensor data) and you want to process it with proc's pipeline operations. You can't easily convert callbacks to an AsyncIterable... until now.

How It Works

WritableIterable is both:

  • Writable: You can .write() items to it
  • AsyncIterable: You can iterate over it with for await

It uses an internal queue to buffer items between the writer and reader, allowing them to operate at different speeds.

Basic Usage

import { WritableIterable, sleep } from "jsr:@j50n/proc@0.23.3";

const writable = new WritableIterable<number>();

// Write in background (simulating slow producer)
(async () => {
  await writable.write(1);
  await sleep(100);
  await writable.write(2);
  await sleep(100);
  await writable.write(3);
  await writable.close();
})();

// Read (items arrive as they're written)
const results: number[] = [];
for await (const item of writable) {
  console.log("Received:", item);
  results.push(item);
}

console.log(results); // [1, 2, 3]

This demonstrates the streaming nature: the reader receives items as they're written, not all at once.

⚠️ Important: You MUST call .close() when done writing, or iteration will hang forever waiting for more data.

Key Concepts

Push vs Pull

Traditional AsyncIterable (pull-based):

// Consumer pulls data
for await (const item of iterable) {
  // Process item
}

WritableIterable (push-based):

// Producer pushes data
await writable.write(item);

Backpressure

WritableIterable implements automatic backpressure. If the writer is faster than the reader, .write() will pause until the reader catches up. This prevents unbounded memory growth.

Real-World Examples

Example 1: Event Stream to Pipeline

Convert DOM events into a processable stream:

import { WritableIterable, enumerate } from "jsr:@j50n/proc@0.23.3";

const clicks = new WritableIterable<MouseEvent>();

// Producer: capture clicks
document.addEventListener("click", async (event) => {
  await clicks.write(event);
});

// Consumer: process clicks
enumerate(clicks)
  .map(event => ({ x: event.clientX, y: event.clientY }))
  .filter(pos => pos.x > 100)
  .forEach(pos => console.log("Click at:", pos));

// Close when done (e.g., on page unload)
window.addEventListener("unload", () => clicks.close());

Example 2: WebSocket to Process

Feed WebSocket messages to a process:

import { WritableIterable } from "jsr:@j50n/proc@0.23.3";

const messages = new WritableIterable<string>();

// Producer: WebSocket messages
const ws = new WebSocket("wss://example.com");
ws.onmessage = async (event) => {
  await messages.write(event.data);
};
ws.onclose = () => messages.close();

// Consumer: pipe to process
await enumerate(messages)
  .run("jq", ".")  // Pretty-print JSON
  .toStdout();

Example 3: Sensor Data Stream

Process sensor readings as they arrive:

import { WritableIterable, enumerate } from "jsr:@j50n/proc@0.23.3";

interface SensorReading {
  temperature: number;
  timestamp: number;
}

const readings = new WritableIterable<SensorReading>();

// Producer: sensor callback
sensor.onReading(async (reading) => {
  await readings.write(reading);
});

// Consumer: calculate moving average
const averages = await enumerate(readings)
  .map(r => r.temperature)
  .take(100)  // First 100 readings
  .reduce((acc, temp) => acc + temp, 0)
  .then(sum => sum / 100);

console.log(`Average: ${averages}°C`);
await readings.close();

Example 4: Manual Process stdin

Feed data to a process programmatically:

import { WritableIterable, enumerate } from "jsr:@j50n/proc@0.23.3";

const input = new WritableIterable<string>();

// Producer: generate data
(async () => {
  for (let i = 0; i < 10; i++) {
    await input.write(`line ${i}`);
  }
  await input.close();
})();

// Consumer: pipe to process
await enumerate(input)
  .run("grep", "5")
  .toStdout();
// Output: line 5

Error Handling

Errors propagate through the iteration:

import { WritableIterable } from "jsr:@j50n/proc@0.23.3";

const writable = new WritableIterable<number>();

// Write and close with error
(async () => {
  await writable.write(1);
  await writable.write(2);
  await writable.close(new Error("something failed"));
})();

try {
  for await (const item of writable) {
    console.log(item);
  }
} catch (error) {
  console.error("Error:", error.message);
}
// Output:
// 1
// 2
// Error: something failed

Cleanup with onclose

You can provide a cleanup callback:

import { WritableIterable } from "jsr:@j50n/proc@0.23.3";

const writable = new WritableIterable<string>({
  onclose: async () => {
    console.log("Cleaning up resources...");
    // Close connections, files, etc.
  }
});

await writable.write("data");
await writable.close();
// Output: Cleaning up resources...

API Reference

Constructor

new WritableIterable<T>(options?: { onclose?: () => void | Promise<void> })
  • options.onclose: Optional callback invoked when .close() is called

Methods

.write(item: T): Promise<void>

  • Write an item to the stream
  • Throws if already closed
  • Implements backpressure (pauses if reader is slow)

.close(error?: Error): Promise<void>

  • Close the stream
  • Must be called to end iteration
  • Safe to call multiple times
  • Optional error propagates to reader

Properties

.isClosed: boolean

  • Returns true if .close() has been called

Common Patterns

Pattern: Timed Data Generation

const timed = new WritableIterable<number>();

(async () => {
  for (let i = 0; i < 5; i++) {
    await timed.write(i);
    await new Promise(resolve => setTimeout(resolve, 1000));
  }
  await timed.close();
})();

for await (const item of timed) {
  console.log(item); // Prints 0, 1, 2, 3, 4 (one per second)
}

Pattern: Conditional Close

const conditional = new WritableIterable<number>();

(async () => {
  for (let i = 0; i < 100; i++) {
    await conditional.write(i);
    if (i === 10) {
      await conditional.close(); // Stop early
      break;
    }
  }
})();

const items = await enumerate(conditional).collect();
console.log(items.length); // 11 (0 through 10)

When to Use WritableIterable

Use it when:

  • Converting callback-based APIs to AsyncIterable
  • Feeding data to process stdin programmatically
  • Bridging event-driven and stream-based code
  • You need backpressure between producer and consumer

Don't use it when:

  • You already have an AsyncIterable (use enumerate() instead)
  • You're working with synchronous data (use arrays)
  • You need multi-consumer support (WritableIterable is single-consumer)

Performance Notes

  • Internal queue grows if writer is faster than reader
  • Backpressure prevents unbounded growth
  • Each .write() creates a Promise (small overhead)
  • Best for moderate data rates (not millions of items/second)

Comparison with Other Approaches

vs. Array

// Array: all data in memory
const data = [1, 2, 3];
for (const item of data) { }

// WritableIterable: streaming, backpressure
const writable = new WritableIterable<number>();
for await (const item of writable) { }

vs. TransformStream

// TransformStream: byte-oriented, Web Streams API
const { readable, writable } = new TransformStream();

// WritableIterable: value-oriented, AsyncIterable
const writable = new WritableIterable<T>();

vs. Channel (from other languages)

If you're familiar with Go channels or Rust channels, WritableIterable is similar but:

  • Single-consumer (not multi-consumer)
  • Unbuffered by default (backpressure on every write)
  • Integrates with AsyncIterable ecosystem

The "Interesting Little Beast"

What makes WritableIterable interesting:

  1. Inverted Control: Most iterables pull data; this one receives pushes
  2. Backpressure: Automatically slows down fast producers
  3. Bridge Pattern: Connects imperative (callbacks) to declarative (iteration)
  4. Error Propagation: Errors flow naturally through the iteration
  5. Simple API: Just .write(), .close(), and iterate

It's a small utility that solves a specific problem elegantly: turning push-based data sources into pull-based async iterables that work seamlessly with proc's pipeline operations.

Sleep

The sleep function pauses execution for a specified duration. While it might seem like an outlier in a process management library, it's surprisingly useful when working with async pipelines, rate limiting, and testing.

Basic Usage

import { sleep } from "jsr:@j50n/proc@0.23.3";

console.log("Starting...");
await sleep(2000);  // Pause for 2 seconds
console.log("Done!");

Why It's Included

When working with processes and async iterables, you often need to:

  • Rate limit operations
  • Add delays between retries
  • Simulate slow data sources for testing
  • Throttle concurrent operations
  • Add breathing room for external services

Having sleep built-in means you don't need to import it from another library or write the setTimeout wrapper yourself.

Common Use Cases

Rate Limiting API Calls

import { enumerate, sleep } from "jsr:@j50n/proc@0.23.3";

const urls = ["url1", "url2", "url3"];

await enumerate(urls)
  .concurrentMap(async (url) => {
    const response = await fetch(url);
    await sleep(1000);  // Wait 1 second between requests
    return response.json();
  }, { concurrency: 1 })
  .forEach(data => console.log(data));

Retry with Backoff

import { run, sleep } from "jsr:@j50n/proc@0.23.3";

async function runWithRetry(maxRetries = 3) {
  for (let i = 0; i < maxRetries; i++) {
    try {
      return await run("flaky-command").lines.collect();
    } catch (error) {
      if (i === maxRetries - 1) throw error;
      
      const delay = Math.pow(2, i) * 1000;  // Exponential backoff
      console.log(`Retry ${i + 1} after ${delay}ms...`);
      await sleep(delay);
    }
  }
}

Simulating Slow Data Sources

import { WritableIterable, sleep } from "jsr:@j50n/proc@0.23.3";

const slowData = new WritableIterable<number>();

// Simulate data arriving slowly
(async () => {
  for (let i = 0; i < 10; i++) {
    await slowData.write(i);
    await sleep(500);  // 500ms between items
  }
  await slowData.close();
})();

for await (const item of slowData) {
  console.log("Received:", item);
}

Throttling Process Output

import { run, sleep } from "jsr:@j50n/proc@0.23.3";

// Process lines slowly to avoid overwhelming downstream
await run("cat", "large-file.txt")
  .lines
  .map(async (line) => {
    await sleep(10);  // 10ms delay per line
    return line;
  })
  .toStdout();

Testing Concurrent Operations

import { enumerate, sleep } from "jsr:@j50n/proc@0.23.3";

// Verify concurrency limit works correctly
const startTimes: number[] = [];

await enumerate([1, 2, 3, 4, 5])
  .concurrentMap(async (n) => {
    startTimes.push(Date.now());
    await sleep(100);  // Simulate work
    return n;
  }, { concurrency: 2 })
  .collect();

// Analyze timing to verify only 2 ran concurrently

Time Constants

The library also provides time constants for readability:

import { sleep, SECONDS, MINUTES } from "jsr:@j50n/proc@0.23.3";

await sleep(5 * SECONDS);   // 5 seconds
await sleep(2 * MINUTES);   // 2 minutes

Available constants:

  • SECONDS = 1000 milliseconds
  • MINUTES = 60 seconds
  • HOURS = 60 minutes
  • DAYS = 24 hours
  • WEEKS = 7 days

API

function sleep(delayms: number): Promise<void>

Parameters:

  • delayms: Delay in milliseconds

Returns:

  • Promise that resolves after the specified delay

Notes

  • Uses setTimeout internally
  • Non-blocking (other async operations can run)
  • Minimum delay depends on JavaScript runtime (typically ~4ms)
  • For precise timing, consider using performance.now() to measure actual elapsed time

Counting Words

A classic example that shows the power of process pipelines.

Simple Word Count

Count total words in a file:

import { run } from "jsr:@j50n/proc@0.23.3";

const wordCount = await run("wc", "-w", "book.txt").lines.first;
console.log(`Total words: ${wordCount}`);

Unique Words

Count unique words:

const uniqueWords = await run("cat", "book.txt")
  .run("tr", "-cs", "A-Za-z", "\n")  // Extract words
  .run("tr", "A-Z", "a-z")            // Lowercase
  .run("sort")                         // Sort
  .run("uniq")                         // Unique
  .lines
  .count();

console.log(`Unique words: ${uniqueWords}`);

Word Frequency

Find most common words:

const topWords = await run("cat", "book.txt")
  .run("tr", "-cs", "A-Za-z", "\n")
  .run("tr", "A-Z", "a-z")
  .run("sort")
  .run("uniq", "-c")
  .run("sort", "-rn")
  .run("head", "-10")
  .lines
  .collect();

console.log("Top 10 words:");
topWords.forEach(line => console.log(line));

Pure JavaScript Version

Do it all in JavaScript:

import { read } from "jsr:@j50n/proc@0.23.3";

const wordCounts = await read("book.txt")
  .lines
  .flatMap(line => line.toLowerCase().match(/\w+/g) || [])
  .reduce((acc, word) => {
    acc[word] = (acc[word] || 0) + 1;
    return acc;
  }, {});

const topWords = Object.entries(wordCounts)
  .sort((a, b) => b[1] - a[1])
  .slice(0, 10);

console.log("Top 10 words:");
topWords.forEach(([word, count]) => {
  console.log(`${count} ${word}`);
});

Compressed Files

Count words in a compressed file:

const wordCount = await read("book.txt.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .flatMap(line => line.match(/\w+/g) || [])
  .count();

console.log(`Total words: ${wordCount}`);

Multiple Files

Count words across multiple files:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const files = ["book1.txt", "book2.txt", "book3.txt"];

const results = await enumerate(files)
  .concurrentMap(async (file) => {
    const words = await read(file)
      .lines
      .flatMap(line => line.match(/\w+/g) || [])
      .count();
    return { file, words };
  }, { concurrency: 3 })
  .collect();

results.forEach(({ file, words }) => {
  console.log(`${file}: ${words} words`);
});

Filter Stop Words

Exclude common words:

const stopWords = new Set([
  "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for"
]);

const meaningfulWords = await read("book.txt")
  .lines
  .flatMap(line => line.toLowerCase().match(/\w+/g) || [])
  .filter(word => !stopWords.has(word))
  .reduce((acc, word) => {
    acc[word] = (acc[word] || 0) + 1;
    return acc;
  }, {});

Word Length Distribution

Analyze word lengths:

const lengthDist = await read("book.txt")
  .lines
  .flatMap(line => line.match(/\w+/g) || [])
  .reduce((acc, word) => {
    const len = word.length;
    acc[len] = (acc[len] || 0) + 1;
    return acc;
  }, {});

console.log("Word length distribution:");
Object.entries(lengthDist)
  .sort((a, b) => parseInt(a[0]) - parseInt(b[0]))
  .forEach(([len, count]) => {
    console.log(`${len} letters: ${count} words`);
  });

Real-World Example: War and Peace

Analyze Tolstoy's War and Peace:

const [totalWords, uniqueWords] = await Promise.all([
  // Total words
  read("warandpeace.txt.gz")
    .transform(new DecompressionStream("gzip"))
    .lines
    .flatMap(line => line.match(/\w+/g) || [])
    .count(),
  
  // Unique words
  read("warandpeace.txt.gz")
    .transform(new DecompressionStream("gzip"))
    .lines
    .flatMap(line => line.toLowerCase().match(/\w+/g) || [])
    .reduce((acc, word) => {
      acc.add(word);
      return acc;
    }, new Set())
    .then(set => set.size)
]);

console.log(`Total words: ${totalWords.toLocaleString()}`);
console.log(`Unique words: ${uniqueWords.toLocaleString()}`);
console.log(`Vocabulary richness: ${(uniqueWords / totalWords * 100).toFixed(1)}%`);

Performance Comparison

Shell Pipeline (fast)

// Uses native Unix tools
const count = await run("cat", "book.txt")
  .run("wc", "-w")
  .lines.first;

JavaScript (flexible)

// More control, type-safe
const count = await read("book.txt")
  .lines
  .flatMap(line => line.match(/\w+/g) || [])
  .count();

Hybrid (best of both)

// Use Unix tools for heavy lifting, JavaScript for logic
const words = await run("cat", "book.txt")
  .run("tr", "-cs", "A-Za-z", "\n")
  .lines
  .filter(word => word.length > 5)  // JavaScript filter
  .count();

Next Steps

Processing Log Files

Analyze logs efficiently, even huge ones.

Count Errors

import { read } from "jsr:@j50n/proc@0.23.3";

const errorCount = await read("app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .count();

console.log(`${errorCount} errors found`);

Group by Error Type

const errorTypes = await read("app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .reduce((acc, line) => {
    const match = line.match(/ERROR: (\w+)/);
    const type = match ? match[1] : "unknown";
    acc[type] = (acc[type] || 0) + 1;
    return acc;
  }, {});

console.log("Errors by type:");
Object.entries(errorTypes)
  .sort((a, b) => b[1] - a[1])
  .forEach(([type, count]) => {
    console.log(`  ${type}: ${count}`);
  });

Extract Timestamps

const errors = await read("app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .map(line => {
    const timestamp = line.match(/\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/)?.[0];
    const message = line.split("ERROR:")[1]?.trim();
    return { timestamp, message };
  })
  .collect();

Find Patterns

const suspiciousIPs = await read("access.log")
  .lines
  .map(line => {
    const ip = line.match(/\d+\.\d+\.\d+\.\d+/)?.[0];
    const status = line.match(/HTTP\/\d\.\d" (\d+)/)?.[1];
    return { ip, status };
  })
  .filter(entry => entry.status === "404")
  .reduce((acc, entry) => {
    if (entry.ip) {
      acc[entry.ip] = (acc[entry.ip] || 0) + 1;
    }
    return acc;
  }, {});

// Show IPs with > 100 404s
Object.entries(suspiciousIPs)
  .filter(([_, count]) => count > 100)
  .forEach(([ip, count]) => {
    console.log(`${ip}: ${count} 404s`);
  });

Time-Based Analysis

const errorsByHour = await read("app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .reduce((acc, line) => {
    const hour = line.match(/T(\d{2}):/)?.[1];
    if (hour) {
      acc[hour] = (acc[hour] || 0) + 1;
    }
    return acc;
  }, {});

console.log("Errors by hour:");
Object.entries(errorsByHour)
  .sort((a, b) => a[0].localeCompare(b[0]))
  .forEach(([hour, count]) => {
    console.log(`${hour}:00 - ${count} errors`);
  });

Multiple Log Files

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const files = ["app1.log", "app2.log", "app3.log"];

const results = await enumerate(files)
  .concurrentMap(async (file) => {
    const errors = await read(file)
      .lines
      .filter(line => line.includes("ERROR"))
      .count();
    return { file, errors };
  }, { concurrency: 3 })
  .collect();

results.forEach(({ file, errors }) => {
  console.log(`${file}: ${errors} errors`);
});

Compressed Logs

const errors = await read("app.log.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .filter(line => line.includes("ERROR"))
  .take(10)
  .collect();

Real-Time Monitoring

// Process log as it grows
for await (const line of read("app.log").lines) {
  if (line.includes("ERROR")) {
    console.error(`🔴 ${line}`);
  } else if (line.includes("WARN")) {
    console.warn(`⚠️  ${line}`);
  }
}

Next Steps

Decompressing Files

Process compressed files without creating temporary files. Stream everything.

Decompress and Count Lines

import { read } from "jsr:@j50n/proc@0.23.3";

const lineCount = await read("war-and-peace.txt.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .count();

console.log(`${lineCount} lines`);

What's happening:

  • read() opens the file as a stream of bytes
  • .transform() pipes through the decompression stream
  • .lines converts bytes to text lines
  • .count() counts them

All streaming. No temp files. Constant memory usage.

Search in Compressed File

import { read } from "jsr:@j50n/proc@0.23.3";

const matches = await read("logs.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .filter(line => line.includes("ERROR"))
  .collect();

console.log(`Found ${matches.length} errors`);

Process Multiple Compressed Files

import { read, enumerate } from "jsr:@j50n/proc@0.23.3";

const files = ["log1.gz", "log2.gz", "log3.gz"];

for (const file of files) {
  const errors = await read(file)
    .transform(new DecompressionStream("gzip"))
    .lines
    .filter(line => line.includes("ERROR"))
    .count();
  
  console.log(`${file}: ${errors} errors`);
}

Decompress and Transform

import { read } from "jsr:@j50n/proc@0.23.3";

const data = await read("data.json.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .map(line => JSON.parse(line))
  .filter(obj => obj.status === "active")
  .collect();

Supported Formats

The Web Streams API supports:

  • gzip - .gz files
  • deflate - .zip files (deflate compression)
  • deflate-raw - Raw deflate
// Gzip
.transform(new DecompressionStream("gzip"))

// Deflate
.transform(new DecompressionStream("deflate"))

// Deflate-raw
.transform(new DecompressionStream("deflate-raw"))

Compress Output

You can also compress:

import { read } from "jsr:@j50n/proc@0.23.3";

const compressed = await read("large-file.txt")
  .transform(new CompressionStream("gzip"))
  .collect();

await Deno.writeFile("large-file.txt.gz", concat(compressed));

Real-World Example: Log Analysis

Analyze compressed logs without extracting them:

import { read } from "jsr:@j50n/proc@0.23.3";

interface LogEntry {
  timestamp: string;
  level: string;
  message: string;
}

const errors = await read("app.log.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .map(line => {
    const [timestamp, level, ...message] = line.split(" ");
    return { timestamp, level, message: message.join(" ") };
  })
  .filter(entry => entry.level === "ERROR")
  .collect();

console.log(`Found ${errors.length} errors`);
errors.slice(0, 10).forEach(e => {
  console.log(`${e.timestamp}: ${e.message}`);
});

Performance Tips

Stream, Don't Collect

// ❌ Loads entire file into memory
const lines = await read("huge.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .collect();

// ✅ Processes one line at a time
for await (const line of read("huge.gz")
  .transform(new DecompressionStream("gzip"))
  .lines) {
  process(line);
}

Use Concurrent Processing

Process multiple files in parallel:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const files = ["log1.gz", "log2.gz", "log3.gz"];

const results = await enumerate(files)
  .concurrentMap(async (file) => {
    const errors = await read(file)
      .transform(new DecompressionStream("gzip"))
      .lines
      .filter(line => line.includes("ERROR"))
      .count();
    return { file, errors };
  }, { concurrency: 3 })
  .collect();

Why This Is Better

Traditional approach:

# Extract first
gunzip file.gz
# Then process
grep ERROR file
# Clean up
rm file

proc approach:

// One step, no temp files
await read("file.gz")
  .transform(new DecompressionStream("gzip"))
  .lines
  .filter(line => line.includes("ERROR"))
  .forEach(console.log);

Faster, cleaner, more memory-efficient.

Next Steps

Parallel Downloads

Download multiple files concurrently with controlled concurrency.

Basic Example

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const urls = [
  "https://example.com/file1.json",
  "https://example.com/file2.json",
  "https://example.com/file3.json",
  // ... more URLs
];

const results = await enumerate(urls)
  .concurrentMap(async (url) => {
    const response = await fetch(url);
    if (!response.ok) {
      throw new Error(`Failed to fetch ${url}: ${response.status}`);
    }
    return {
      url,
      data: await response.json(),
      size: response.headers.get("content-length")
    };
  }, { concurrency: 5 })
  .collect();

console.log(`Downloaded ${results.length} files`);

Download and Save Files

Download files and save them to disk:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const downloads = [
  { url: "https://example.com/image1.jpg", path: "./downloads/image1.jpg" },
  { url: "https://example.com/image2.jpg", path: "./downloads/image2.jpg" },
  { url: "https://example.com/image3.jpg", path: "./downloads/image3.jpg" },
];

await enumerate(downloads)
  .concurrentMap(async ({ url, path }) => {
    const response = await fetch(url);
    const blob = await response.blob();
    const buffer = await blob.arrayBuffer();
    await Deno.writeFile(path, new Uint8Array(buffer));
    console.log(`Downloaded: ${path}`);
    return path;
  }, { concurrency: 3 })
  .collect();

console.log("All downloads complete");

With Progress Tracking

Track download progress:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

let completed = 0;
const total = urls.length;

const results = await enumerate(urls)
  .concurrentMap(async (url) => {
    const response = await fetch(url);
    const data = await response.json();
    
    completed++;
    console.log(`Progress: ${completed}/${total} (${Math.round(completed/total*100)}%)`);
    
    return { url, data };
  }, { concurrency: 5 })
  .collect();

With Retry Logic

Retry failed downloads:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

async function fetchWithRetry(url: string, maxRetries = 3): Promise<Response> {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      const response = await fetch(url);
      if (response.ok) return response;
      
      if (attempt === maxRetries) {
        throw new Error(`Failed after ${maxRetries} attempts: ${response.status}`);
      }
    } catch (error) {
      if (attempt === maxRetries) throw error;
      
      // Exponential backoff
      const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000);
      console.log(`Retry ${attempt}/${maxRetries} for ${url} after ${delay}ms`);
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
  throw new Error("Unreachable");
}

const results = await enumerate(urls)
  .concurrentMap(async (url) => {
    const response = await fetchWithRetry(url);
    return await response.json();
  }, { concurrency: 5 })
  .collect();

Download Large Files

Stream large files to disk without loading into memory:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const largeFiles = [
  { url: "https://example.com/large1.zip", path: "./large1.zip" },
  { url: "https://example.com/large2.zip", path: "./large2.zip" },
];

await enumerate(largeFiles)
  .concurrentMap(async ({ url, path }) => {
    const response = await fetch(url);
    if (!response.body) throw new Error("No response body");
    
    const file = await Deno.open(path, { write: true, create: true });
    await response.body.pipeTo(file.writable);
    
    console.log(`Downloaded: ${path}`);
    return path;
  }, { concurrency: 2 })
  .collect();

API Rate Limiting

Respect API rate limits:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const apiEndpoints = [
  "/api/users/1",
  "/api/users/2",
  // ... 100 more
];

// Add delay between requests
await enumerate(apiEndpoints)
  .concurrentMap(async (endpoint) => {
    const response = await fetch(`https://api.example.com${endpoint}`);
    const data = await response.json();
    
    // Wait 100ms between requests (10 requests/second)
    await new Promise(resolve => setTimeout(resolve, 100));
    
    return data;
  }, { concurrency: 1 })  // Sequential to respect rate limit
  .collect();

Filter Failed Downloads

Continue even if some downloads fail:

import { enumerate } from "jsr:@j50n/proc@0.23.3";

const results = await enumerate(urls)
  .concurrentMap(async (url) => {
    try {
      const response = await fetch(url);
      if (!response.ok) return null;
      return { url, data: await response.json() };
    } catch (error) {
      console.error(`Failed to download ${url}:`, error.message);
      return null;
    }
  }, { concurrency: 5 })
  .filter(result => result !== null)
  .collect();

console.log(`Successfully downloaded ${results.length}/${urls.length} files`);

When to Use

Use parallel downloads when:

  • You have multiple independent files to fetch
  • Network latency is the bottleneck
  • The server can handle concurrent requests
  • You want to minimize total download time

Choose concurrency based on:

  • Server rate limits (respect them!)
  • Your network bandwidth
  • Server capacity
  • Start with 3-5, adjust based on results

Next Steps

Shell Script Replacement

Replace Bash scripts with type-safe Deno.

Why Replace Shell Scripts?

Shell scripts are:

  • Hard to debug
  • No type safety
  • Limited error handling
  • Platform-specific

proc gives you:

  • Full TypeScript
  • IDE support
  • Proper error handling
  • Cross-platform

Common Patterns

File Operations

Bash:

#!/bin/bash
for file in *.txt; do
  wc -l "$file"
done

proc:

#!/usr/bin/env -S deno run --allow-read --allow-run
import { run } from "jsr:@j50n/proc@0.23.3";

for await (const entry of Deno.readDir(".")) {
  if (entry.name.endsWith(".txt")) {
    const count = await run("wc", "-l", entry.name).lines.first;
    console.log(count);
  }
}

Process Logs

Bash:

#!/bin/bash
grep ERROR app.log | wc -l

proc:

#!/usr/bin/env -S deno run --allow-read --allow-run
import { read } from "jsr:@j50n/proc@0.23.3";

const errors = await read("app.log")
  .lines
  .filter(line => line.includes("ERROR"))
  .count();

console.log(`${errors} errors`);

Backup Script

Bash:

#!/bin/bash
tar -czf backup-$(date +%Y%m%d).tar.gz /data

proc:

#!/usr/bin/env -S deno run --allow-read --allow-run
import { run } from "jsr:@j50n/proc@0.23.3";

const date = new Date().toISOString().split("T")[0].replace(/-/g, "");
await run("tar", "-czf", `backup-${date}.tar.gz`, "/data").toStdout();

System Monitoring

Bash:

#!/bin/bash
while true; do
  df -h | grep /dev/sda1
  sleep 60
done

proc:

#!/usr/bin/env -S deno run --allow-run
import { run, sleep } from "jsr:@j50n/proc@0.23.3";

while (true) {
  const usage = await run("df", "-h")
    .lines
    .find(line => line.includes("/dev/sda1"));
  
  console.log(usage);
  await sleep(60000);  // sleep() is exported from proc
}

Real Script Example

Complete deployment script:

#!/usr/bin/env -S deno run --allow-all
import { run } from "jsr:@j50n/proc@0.23.3";

console.log("🚀 Deploying application...");

try {
  // Pull latest code
  console.log("📥 Pulling latest code...");
  await run("git", "pull").toStdout();
  
  // Install dependencies
  console.log("📦 Installing dependencies...");
  await run("npm", "install").toStdout();
  
  // Run tests
  console.log("🧪 Running tests...");
  await run("npm", "test").toStdout();
  
  // Build
  console.log("🔨 Building...");
  await run("npm", "run", "build").toStdout();
  
  // Restart service
  console.log("🔄 Restarting service...");
  await run("systemctl", "restart", "myapp").toStdout();
  
  console.log("✅ Deployment complete!");
} catch (error) {
  console.error("❌ Deployment failed:", error.message);
  Deno.exit(1);
}

Benefits

  1. Type Safety - Catch errors before running
  2. IDE Support - Autocomplete and refactoring
  3. Error Handling - Proper try-catch
  4. Debugging - Use debugger, breakpoints
  5. Testing - Write unit tests
  6. Portability - Works on any platform with Deno

Making Scripts Executable

chmod +x script.ts
./script.ts

Next Steps

API Reference

Complete API documentation is auto-generated from the source code using Deno's documentation tool.

📚 View Full API Documentation

The API documentation includes:

  • All exported functions - Complete signatures and descriptions
  • All classes and interfaces - Full type information
  • All methods and properties - Detailed documentation
  • Type definitions - Complete TypeScript types
  • Examples - Code examples from JSDoc

Core Functions

  • run(){:target="_blank"} - Run a child process
  • enumerate(){:target="_blank"} - Wrap an iterable
  • read(){:target="_blank"} - Read a file

Classes

  • Enumerable{:target="_blank"} - Array-like methods for async iterables
  • ProcessEnumerable{:target="_blank"} - Process-specific enumerable
  • Process{:target="_blank"} - Process management

Error Types

Utilities

  • range(){:target="_blank"} - Generate number ranges
  • zip(){:target="_blank"} - Combine iterables
  • concat(){:target="_blank"} - Concatenate byte arrays
  • cache(){:target="_blank"} - Cache iterable results

Using the API Docs

The generated documentation includes:

Use the search box to find any function, class, or type.

Type Information

Click on any type to see its definition and usage.

Examples

Most functions include working code examples.

Click "Source" to view the implementation.

Integration with This Guide

This user guide provides:

  • Conceptual explanations - Why and when to use features
  • Tutorials - Step-by-step learning
  • Recipes - Real-world solutions
  • Best practices - How to use effectively

The API reference provides:

  • Complete signatures - Exact function parameters
  • Type definitions - TypeScript types
  • Technical details - Implementation specifics
  • All exports - Everything available

Use both together for complete understanding!

Keeping Docs Updated

The API documentation is regenerated every time the site is built, so it's always in sync with the code.

To regenerate manually:

deno doc --html --name="proc" --output=./site/src/api-docs ./mod.ts

Next Steps

Migration Guide

Migrating from other tools to proc.

From Deno.Command

Before:

const command = new Deno.Command("ls", { args: ["-la"] });
const output = await command.output();
const text = new TextDecoder().decode(output.stdout);

After:

import { run } from "jsr:@j50n/proc@0.23.3";
const lines = await run("ls", "-la").lines.collect();

From Shell Scripts

See Shell Script Replacement for detailed examples.

Key Differences

  • Properties vs methods: .lines not .lines()
  • Always consume output to avoid leaks
  • Errors propagate through pipelines
  • Use enumerate() then .enum() for indices

See Also

Frequently Asked Questions

General

What is proc?

proc is a Deno library for running child processes and working with async iterables. It gives you Array-like methods (map, filter, reduce) for streaming data, with error handling that actually makes sense.

Why should I use proc instead of Deno.Command?

Deno.Command is low-level and requires manual stream handling. proc gives you:

  • Automatic resource management
  • Natural error propagation
  • Array-like methods for data processing
  • Process pipelines that feel like shell pipes
  • Streaming by default

Is proc production-ready?

Yes! proc is stable, actively maintained, and used in production. The API is mature and unlikely to have breaking changes.

Does proc work with Node.js?

No, proc is Deno-only. It uses Deno-specific APIs like Deno.Command and requires Deno's permission system.

Usage

Why do I get "resource leak" errors?

You must consume process output. Unconsumed output keeps the process handle open:

// ❌ Resource leak
const p = run("ls");

// ✅ Consume output
await run("ls").lines.collect();

Is .lines a method or property?

Property. Use .lines not .lines():

// ✅ Correct
run("ls").lines;

// ❌ Wrong
run("ls").lines();

Same for .status, .first, .last.

How do I check exit code without throwing?

Consume output first, then check .status:

const p = run("command");
await p.lines.collect(); // Consume first
const status = await p.status; // Then check

if (status.code !== 0) {
  console.error("Failed");
}

Why doesn't enumerate() add indices?

enumerate() wraps an iterable. Use .enum() to add indices:

const result = await enumerate(["a", "b", "c"])
  .enum() // This adds indices
  .map(([item, i]) => `${i}: ${item}`)
  .collect();

How do I pipe processes together?

Use .run() method:

await run("cat", "file.txt")
  .run("grep", "pattern")
  .run("wc", "-l")
  .lines.first;

Can I use shell syntax like ls -la?

No, arguments must be separate:

// ✅ Correct
run("ls", "-la");

// ❌ Wrong
run("ls -la");

Error Handling

Do I need try-catch at every step?

No! That's the whole point. Errors propagate through the pipeline:

try {
  await run("cmd1")
    .run("cmd2")
    .run("cmd3")
    .lines.forEach(process);
} catch (error) {
  // All errors caught here
}

What happens when a process fails?

By default, non-zero exit codes throw ExitCodeError. You can catch it:

try {
  await run("false").lines.collect();
} catch (error) {
  if (error instanceof ExitCodeError) {
    console.error(`Exit code: ${error.code}`);
  }
}

Can I customize error handling?

Yes, use fnError option. See Custom Error Handling.

Performance

Is proc fast?

Yes! proc is streaming by default, which means:

  • Constant memory usage, even for huge files
  • Concurrent process execution
  • Lazy evaluation (only runs when consumed)

How do I process large files?

Stream them:

// Processes 10GB file with constant memory
for await (const line of read("huge.txt").lines) {
  process(line);
}

Can I process files in parallel?

Yes, use concurrentMap:

await enumerate(files)
  .concurrentMap(async (file) => {
    return await processFile(file);
  }, { concurrency: 5 })
  .collect();

Troubleshooting

My process hangs

You probably didn't consume the output:

// ❌ Hangs
const p = run("command");
await p.status; // Waiting for output to be consumed

// ✅ Works
const p = run("command");
await p.lines.collect(); // Consume first
await p.status;

I get type errors

Check if you're using properties as methods:

// ❌ Type error
run("ls").lines();

// ✅ Correct
run("ls").lines;

DecompressionStream type error

Add a type cast:

.transform(new DecompressionStream("gzip") as TransformStream<Uint8Array, Uint8Array>)

Or use --no-check flag.

Permission denied errors

Grant the necessary permissions:

deno run --allow-run --allow-read your-script.ts

Comparison

proc vs Deno.Command

FeatureDeno.Commandproc
BoilerplateHighLow
Error handlingManualAutomatic
StreamingManualBuilt-in
PipelinesManual.run()
Array methodsNoYes

proc vs shell scripts

FeatureShellproc
Type safetyNoYes
Error handlingManualAutomatic
IDE supportLimitedFull
DebuggingHardEasy
PortabilityLimitedCross-platform

Getting Help

Where can I find examples?

How do I report bugs?

File an issue on GitHub.

Is there a Discord/Slack?

Not currently. Use GitHub issues for questions and discussions.

Contributing

Can I contribute?

Yes! Contributions are welcome. See the repository for guidelines.

How can I help?

  • Report bugs
  • Improve documentation
  • Add examples
  • Fix issues

Miscellaneous

Why "proc"?

Short for "process". Easy to type, easy to remember.

Who maintains proc?

proc is maintained by @j50n and contributors.

What's the license?

MIT License. Use it freely.

Can I use proc in commercial projects?

Yes! MIT license allows commercial use.