proc 0.21.9

proc let's you use child processes with AsyncIterable instead of the streams API, and it includes a library of higher-order functions for AsyncIterator via Enumerable that roughly matches what you can do with an array (map, filter, find), but for asynchronous code.

proc simplifies the process of converting a bash script into a Deno application. The intention is to make writing code that uses lots of IO and child processes almost as easy as shell scripting, but you also get proper error handling, type checking, and Deno's security-by-default.

Developer Documentation

Usage

import { run } from "https://deno.land/x/proc@0.21.9/mod.ts";

A Simple Example

Run ls -la as a child process. Decode stdout as lines of text. Print to console.

await run("ls", "-la").toStdout();

A Better Example

Don't worry about understanding everything in this example yet. This shows a little of what is possible using proc.

Given the text for War and Peace:

  • Read the file into an AsyncIterable of Uint8Array.
  • Uncompress it (the file is GZ'd).
  • Convert to lowercase using JavaScript, because the JavaScript conversion is more correct than the one in tr.
  • grep out all the words on word boundaries.
  • tee this into two streams (AsyncIterable of Uint8Array) of words.
    • Count the total number of words.
    • Use sort with uniq to count the unique words.
const [words1, words2] = read(
  fromFileUrl(import.meta.resolve("./warandpeace.txt.gz")),
)
  .transform(gunzip)
  .lines
  .map((line) => line.toLocaleLowerCase())
  .run({ buffer: true }, "grep", "-oE", "(\\w|')+")
  .tee();

const [uniqueWords, totalWords] = await Promise.all([
  words1.run("sort").run("uniq").lines.count(),
  words2.lines.count(),
]);

console.log(`Total:  ${totalWords.toLocaleString()}`);
console.log(`Unique: ${uniqueWords.toLocaleString()}`);

Up to the point where we run Promise.all, this is asynchronous, streaming, lazily evaluated code. It is trivially running three child processes (grep, sort, and uniq), a DecompressionStream transform, and in-process logic to normalize to lower-case. This is all happening concurrently, mostly in parallel, one buffer, one line, or one word at a time.

Running a Process

proc lets you run a process from Deno with as little boilerplate as possible.

import { run } from "https://deno.land/x/proc@0.21.9/mod.ts";

To ls -la:

await run("ls", "-la").toStdout();

To capture the lines as an array:

const lines: string[] = await run("ls", "-la").lines.collect();

Create a Command Programmatically

import { Cmd, run } from "https://deno.land/x/proc@0.21.9/mod.ts";

A command requires that the first parameter be defined, and that it be either a string or a URL. Additional parameters are string values. This doesn't quite fit the signature of an array. Use Cmd as the type of the array. This can be spread into run.

// Assume options.all is a defined boolean.

const cmd: Cmd = ["ls"];
if (options.all) {
  ls.push("-la");
}

await run(...cmd).toStdout();

The command array is type Cmd, not string[]. You need to declare this explicitly.

Output

Process standard output, or stdout, is an AsyncIterable<Uint8Array>.

This can be efficiently piped to another process with run:

// Count the words and print the result.
await run("echo", "Hello, world.").run("wc", "-w").toStdout();

You can't assume much about the data you are receiving from a process. It may be written out line by line, or it may be in large or small chunks.

await run("echo", "Hello, world.").forEach((it) => console.dir(it));

// Uint8Array(14) [
//    72, 101, 108, 108, 111,
//    44,  32, 119, 111, 114,
//   108, 100,  46,  10
// ]

That's not very useful. Let's try again, converting to text.

await run("echo", "Hello,\nworld.").lines.forEach((it) => console.dir(it));

// Hello,
// world.

To convert the lines to an array, collect them.

const data: string[] = await run("echo", "Hello,\nworld.").lines.collect();
console.dir(data);

// [ "Hello,", "world." ]

If you just want to dump the output from the child process to stdout, there is an easy way to do that.

await run("echo", "Hello, world.").toStdout();

// Hello, world.

Input

proc supports standard input (stdin) of processes as AsyncIterable<Uint8Array | Uint8Array[] | string | string[]>. This means that you can pass in text data or byte data.

Note that for every string value (including each string in a string[]), proc will insert a line-feed character. This is not done for byte data in Uint8Array form, of course. If you need to use text data without the automatic line-feed characters, you will need to convert to bytes.

enumerate is a wrapper function that creates an AsyncIterable with higher-order functions. In the example, I am using it to iterate over a few Uint8Array instances that, together, spell out "Hello, world." This is providing stdin to wc -w.

// Count the words in 'Hello, world."
await enumerate([
  new Uint8Array([72, 101, 108, 108, 111, 44, 32]),
  new Uint8Array([119, 111, 114, 108, 100, 46, 10]),
]).run("wc", "-w").toStdout();

// 2

This also works with text data. proc converts strings to bytes automatically.

await enumerate(["Hello, world."]).run("wc", "-w").toStdout();

// 2

Stderr and Error Handling

Standard input and standard output from a process are handled directly as iterable data. There is a third data stream, standard error, that is a bit of an outlier. Standard error is meant to be used either purely for error text from the process or for some combination of logging and errors.

We are going to discuss how to handle standard error and how this relates to error handling in the proc library. There are examples if you want to skip ahead.

Default behavior of stderr and errors:

  • all process stderr will be written to Deno.stderr
  • any exit code other than 0 will throw an ExitCodeError
  • if the process ends due to a signal, it will throw a SignalError
  • an error coming from upstream (stdin) will be wrapped in an UpstreamError

While the default behaviors are usually adequate, these can be overridden. There is no standard for standard error, so it may take some effort to get the results you want.

Taking Control of Stderr

You can capture stderr by defining fnStderr in the process options. This example adds a timestamp and colors the stderr text red.

const decoratedStderr: ProcessOptions<void> = {
  fnStderr: async (stderr) => {
    for await (const line of stderr.lines) {
      console.error(`${gray(new Date().toISOString())} ${red(line)}`);
    }
  },
};

await run(
  { ...decoratedStderr },
  "bash",
  "-c",
  `
    echo "This goes to stderr." >&2 
    echo "This goes to stdout."
  `,
).toStdout();

Reinterpreting Process Errors

Catch and reinterpret exit code error, no stderr scraping.

Throwing Errors based on Stderr

Scrape stderr to throw an error. Simple version. Mention the "contract" with process that all lines of stdout should be printed, or logged, or something - where ever you put it, make sure nothing gets dropped. So error goes at the end, once all lines have been processed.

Throwing Errors based on Stderr (Advanced)

Scrape stderr to throw an error. Full version.

Reading Data

enumerate works with any iterable, including a ReadableStream (which is an AsyncIterable).

Reading from stdin

Deno provides Deno.stdin.readable which gives you a stdin as a ReadableStream<Uint8Array>. We can wrap this with enumerate(...) to convert to lines of text (strings).

Text of example.ts:

import { enumerate } from "https://deno.land/x/proc@0.21.9/mod.ts";

for await (const line of enumerate(Deno.stdin.readable).lines) {
  console.log(line);
}

To print War and Peace, line by line, to console:

zcat warandpeace.txt.gz | deno run example.ts

This operation will consume stdin and close it.

Reading from File

Performance

A few notes on performance.

Does Performance Matter?

For 90% of the code you write, the bottom line is that performance does not matter. For example, if you have some code that reads configuration on startup and dumps it into an object, that code might be complicated, but it won't matter if it runs in 10 milliseconds or 100 nanoseconds. Write clear code first and optimize once things are working. Follow this process, and you will quickly figure out which things do and don't matter.

The Cost of Iteration

We use iteration everywhere. Doing it wrong can kill your performance. Doing it right can get you close to (single threaded) C performance. This is a quick summary of what you can expect. To keep it short, I am just going to cover the high points and not show my work.

The fastest code you can write in pure JavaScript looks like asm.js. If you stick to for loops that count and index simple types or data object lookups in arrays or numbers in typed-arrays (like Uint8Array), you can expect that code to run at or near single-threaded C speed.

Expect for...of with iterables and generators to be about 10x slower. This includes array methods like map, filter, and reduce. Anything that has to call a function in a loop is going to have extra overhead.

Promise-driven asynchronous code is another 10x slower, or 100x slower than the asm.js-style code. This affects code written using proc, particularly Enumerable.

So does this mean you have to always use asm.js syntax? Not at all. for...of syntax and array methods make for cleaner code, and asynchronous operations are the whole reason we're here. Iteration performance is mostly about the inner loops. If your inner loops are tight, a little less efficiency in the outer loops won't matter much. Write clean code first. When things are working, look for opportunities to make it faster. Often this will mean a little profiling and rewriting a few routines in asm.js style. If you do it right, you should be able to get very good performance along with readable code.

Async Iterators: These Promises Are Killing My Performance! on Medium and supporting benchmarks in async-iteration on Github.

The Performance Overhead of JavaScript Promises and Async Await shows a couple of examples that isolate the performance difference to overhead due to promises.

sleep

sleep returns a Promise that resolves after a specified number of milliseconds.

console.log("Program starts");
await sleep(2000); // Pauses the execution for 2000 milliseconds
console.log("Program resumes after 2 seconds");

Working with Text Data

Streaming data doesn't have to be line-delimited text, but it probably will be most of the time. Many *nix tools work with this type of data or some variation of it.

Line-delimited text data is simply:

  • utf-8 encoded bytes
  • logically separated into lines with \n or alternately \r\n (Windows style) characters

Here is how you process text data in proc.

UTF-8 Lines

This is the "normal" way to work with line-delimited text. It should be a good solution most of the time.

The lines method converts a line at a time.

await run("ls", "-la")
  .lines
  .forEach((it) => console.log(it));

Alternately you can use transform with the toLines transformer function.

await read(resolve("./warandpeace.txt.gz"))
  .transform(toLines)
  .forEach((it) => console.log(it));

The Enumerable.run method will automatically treat string values as lines, adding \n to them and converting back into utf-8 encoded bytes.

Note that this always assumes string data passed to it is line-delimited. If that isn't the case (you may be working with buffered text data that is not delimited at all, for example), you must convert text data back to Uint8Array yourself or \n characters will be added.

Traditional Text and Lines

Deno provides a native TextDecoderStream to bulk-convert Uint8Array data into text. The boundaries are arbitrary. The characters will always be correct, but this can break within a word or within a control-character sequence. TextDecoderStream supports many standard character encodings.

To parse this data into lines, Deno provides TextLineStream. This splits the data into lines on \n and optionally \r.

These are meant to be used together to convert to text then split into lines.

The traditional stream implementation is a little slower than the utf-8-specialized transformers, but they support different character encodings and allow some flexibility in defining the split.

await read(resolve("./warandpeace.txt.gz"))
  .transform(gunzip)
  .transform(new TextDecoderStream())
  .transform(new TextLineStream())
  .map((line) => line.toLowerCase())
  .forEach((line) => console.log(line));

Note that most of the library assumes strings and arrays of strings represent line data. For text that is not divided on lines, you can use TextEncoderStream to convert back to utf-8 bytes. Note that unlike TextDecoderStream this does not support multiple encodings. This is in line with the official specification.

Not All Text Data is Text Data

There are many command-line utilities that use ANSI color and position sequences, as well as raw carriage-returns (\r) to enhance the user experience at the console. The codes are normally interpreted by the terminal, but if you dump them to file, you can see they make a mess. You've probably seen this in log files before.

This type of streamed text data can't be strictly interpreted as lines. You may be able to hack around the fluff. Use stripColor (Deno std library) to remove ANSI escape codes from strings. If the utility is using raw \r, you may have to deal with that as well.

The best solution is to turn off color and progress for command-line utilities you use for processing. This is not always possible (Debian apt is a famous example of this).

Reference the ANSI escape code wiki page.

You can always get around this problem by never attempting to split on lines.

await run("apt", "install", "build-essential")
  .writeTo(Deno.stdout.writable, { noclose: true });

Transformers

proc ships with some useful transformers.

A transformer is a plain-old JavaScript function with this signature:

type Transformer<T, U> = (it: AsyncIterable<T>) => AsyncIterable<U>;

Transformers are functions (and may be defined using asynchronous generator functions). You can compose them into new functions relatively easily. The transform operation is like pipeThrough in streaming.

A transformer transforms objects from one type to another. It is like map but with with complete control over the whole stream of data - including control over error handling.

You can create a transformer using an asynchronous generator. This one will transform strings to lower-case:

async function* toLower(texts: AsyncIterable<string>) {
  for await (const text of texts) {
    yield text.toLocaleLowerCase();
  }
}

Here it is in action:

const lowered = await enumerable(["A", "B", "C"])
  .transform(toLower)
  .collect();

assertEquals(lowered, ["a", "b", "c"], "Transformed to lower-case.");

Counting Words

This shell script counts total and unique words:

#!/bin/bash
set -e

# total word count
zcat ./warandpeace.txt.gz \
  | tr '[:upper:]' '[:lower:]' \
  | grep -oE "(\\w|'|’|-)+" \
  | wc -l 

#count unique words
zcat ./warandpeace.txt.gz \
  | tr '[:upper:]' '[:lower:]' \
  | grep -oE "(\\w|'|’|-)+" \
  | sort \
  | uniq \
  | wc -l

There are multiple approaches to doing the same thing in Deno using proc. You can run this in-process as a pure Typescript/JavaScript solution, run it as a shell script, or translate each command in the shell script into run methods.

⚠️ The tr used to convert to lowercase is not fully unicode compliant. Expect counts to be a little different between this code and the code that uses JavaScript's .toLocaleLowercase(), which is fully unicode compliant.

Direct Translation from Bash

This is the equivalent to the shell script using proc methods. This substitutes gunzip for zcat, translates each output to a number, and runs the operations concurrently (and in parallel) - since that is easy to do. Otherwise it is doing exactly the same thing.

Otherwise, this is a direct translation where proc just controls the streaming from process to process. All the same child processes are being launched.

const [total, unique] = await Promise.all([
  read(fromFileUrl(import.meta.resolve("./warandpeace.txt.gz")))
    .run("gunzip")
    .run("tr", "[:upper:]", "[:lower:]")
    .run("grep", "-oE", "(\\w|'|’|-)+")
    .run("wc", "-l")
    .lines
    .map((n) => parseInt(n, 10))
    .first,

  read(fromFileUrl(import.meta.resolve("./warandpeace.txt.gz")))
    .run("gunzip")
    .run("tr", "[:upper:]", "[:lower:]")
    .run("grep", "-oE", "(\\w|'|’|-)+")
    .run("sort")
    .run("uniq")
    .run("wc", "-l")
    .lines
    .map((n) => parseInt(n, 10))
    .first,
]);

console.log(total);
console.log(unique);

Embedding a Shell Script

Another approach is to embed a shell script. No translation required here. This is a bash script run using /bin/bash. This moves the entire workload and its management into other processes. Consider this solution if your application is doing lots of other things concurrently.

Note that you give up some control over error handling with this approach, so be sure to test for the types of errors you think you may encounter. Shell scripts are notorious for edge-case bugs - which is why we reach for a "real" programming language when things start to get complex.

This is also a simple example of a generated script. We are injecting the full path of our text file as determined by the Deno script.

This example shows the total count.

await run(
  "/bin/bash",
  "-c",
  ` set -e
    zcat "${fromFileUrl(import.meta.resolve("./warandpeace.txt.gz"))}" \
      | tr '[:upper:]' '[:lower:]' \
      | grep -oE "(\\w|'|’|-)+" \
      | wc -l
  `,
)
  .lines
  .forEach((line) => console.log(line));

Doing All the Work in Deno

This is a streaming solution staying fully in Deno, in a single Typescript/JavaScript VM (not using child processes at all). The avoids (most of) the memory overhead that would be needed to process the document in memory (non-streaming), and it is fast.

This demonstrates transformer-composition in proc. Because transformers are just functions of iterable collections, you can compose them into logical units the same way you would any other code.

Transformer for Unique Words

We could shell out to sort and uniq, but this way is much faster. It only needs a little extra memory. It dumps the words, one at a time, into a Set. Then it yields the contents of the Set.

The set of unique words is much smaller than the original document, so the memory required is quite small.

export async function* distinct(words: AsyncIterable<string>) {
  const uniqueWords = new Set();
  for await (const word of words) {
    uniqueWords.add(word);
  }
  yield* uniqueWords;
}

Transformer to Split into Words

Convert each line to lower case. Use Regex to split the line into words. Remove anything without a character (all symbols), anything with a number, and "CHAPTER" titles. The symbol characters in the regular expression are specific to the test document and probably won't work generally.

The document we are targeting, ./warandpeace.txt.gz, uses extended unicode letters and a few unicode symbols as well. We know that the Typescript solution below works correctly with unicode characters (note the u flag on the regular expression). Some of the *nix utilities were written a long time ago and still do not support unicode. In particular, tr does not translate case correctly all of the time, and I am not sure what grep is doing - it sort of works, but the regular expression language has subtle differences to what I am used to. A benefit of working in a tightly spec'd language like Typescript is you know what your code should be doing at all times. The counts are very close, but they are not exactly the same, so we know something is a little bit off with tr and/or grep.

export function split(lines: AsyncIterable<string>) {
  return enumerate(lines)
    .map((it) => it.toLocaleLowerCase())
    .flatMap((it) =>
      [...it.matchAll(/(\p{L}|\p{N}|['’-])+/gu)]
        .map((a) => a[0])
    )
    .filterNot((it) =>
      /^['’-]+$/.test(it) ||
      /[0-9]/.test(it) ||
      /CHAPTER/.test(it)
    );
}

Putting It All Together

Read the file. Uncompress it and convert to lines (string). Use the transformer function we created earlier, split, to split into words.

const words = read(
  fromFileUrl(import.meta.resolve("./warandpeace.txt.gz")),
)
  .transform(gunzip)
  .transform(toLines)
  .transform(split);

Now we need to get (1) a count of all words and (2) a count of unique words. We can use tee to create two copies of the stream - since we have to count twice. This gets around the limitation of being able to use an iterable only once and means we don't have to do extra work splitting the document into words two times.

const [w1, w2] = words.tee();

We can count the words in the first copy directly. For the second copy, we use the distinct transformer before counting.

const [count, unique] = await Promise.all([
  w1.count(),
  w2.transform(distinct).count(),
]);

console.log(`Total word count:  ${count.toLocaleString()}`);
console.log(`Unique word count: ${unique.toLocaleString()}`);

The results:

Total word count:  563,977
Unique word count: 18,609

Clean, readable code. Understandable error handling. Fast. The only downside is that the processing is done in-process (we only have one thread to work with in JavaScript). If you are doing other things at the same time, this will slow them down.

Concurrent Processes

proc supports concurrent operations with controlled (limited) concurrency. This is a way to run child processes in parallel without swamping your server.

If you have to work with S3 buckets, you know it is time consuming to determine how much storage space you are using/paying for, and where you are using the most storage. proc makes it possible to run ls --summarize with parallelism matching the number of CPU cores available (or whatever concurrency you specify). The specific methods that support concurrent operations are .concurrentMap() and .concurrentUnorderedMap().

To list the s3 buckets in your AWS account from terminal:

aws s3 ls

The result looks something like this:

2013-07-11 17:08:50 mybucket
2013-07-24 14:55:44 mybucket2

Get all the bucket names in the account:

const buckets = await run("aws", "s3", "ls")
  .map((b) => b.split(/\s+/g, 3))
  .map((b) => b[b.length - 1])
  .collect();

This is the shell command to get the total storage size in bytes from terminal:

aws s3 ls s3://mybucket --recursive --summarize

This will list all objects in the bucket, and we can ignore most of this. At the end of the operation, we are looking for a line that looks like this:

Total Size: 2.9 MiB

This is potentially a long-running operation (some buckets have a lot of objects), so we want to run this for many buckets at once, in parallel, and report the results as soon as they are available.

enumerate(buckets).concurrentUnorderedMap(
  async (bucket) => {
    const answer: string = await run(
        "nice", "-19",
        "aws", "s3", "ls", 
        `s3://${bucket}`, 
        "--recursive", "--summarize")
      .filter(line => line.includes("Total Size:"))
      .map(line => line.trim())
      .first;

    return {bucket, answer};
  }.forEach(({bucket, answer}) => console.log(`${bucket}\t${answer}`))
)

Use nice because this will eat your server otherwise. The method .concurrentUnorderedMap() will, by default, run one process for each CPU available concurrently until all work is done.

The result will look something like this:

mybucket  Total Size: 2.9 MiB
mybucket2 Total Size: 30.2 MiB

Input and Output

Use ReadableStream and WritableStream from Deno APIs for input and output.

Write to Stdout

Write to stdout a line at a time using console.log.

await range({ to: 3 })
  .forEach((line) => console.log(line.toString()));

Write to stdout as a WritableStream. In the case of stdout, we don't close it. To use writeTo, the data has to be in Uint8Array form. This also adds output buffering to consolidate the write operations into larger chunks.

Deno.stdout.writable is a WritableStream.

await range({ to: 10000 })
  .map((n) => n.toString())
  .transform(toBytes)
  .transform(buffer(8192))
  .writeTo(Deno.stdout.writable, { noclose: true });

Run a child process and stream output directly to stdout. This has no conversion to lines and no additional buffering, so it will also work with ANSI escape codes and positioning characters.

await run("ls", "-la")
  .writeTo(Deno.stdout.writable, { noclose: true });

Read from Stdin

Read stdin. Uncompress it and convert to lines (string). Remove all the blank lines. Count them. Print the count.

Deno.stdin.readable is a ReadableStream which is an AsyncIterable<Uint8Array>.

console.log(
  await enumerate(Deno.stdin.readable)
    .transform(gunzip)
    .lines
    .filter((line) => line.trim().length === 0)
    .count(),
);