async-iterable-split
TypeScript icon, indicating that this package has built-in type declarations

3.0.2 • Public • Published

async-iterable-split

version downloads license node-current

English 中文文档


Split an async iterable (value is Uint8Array) into multiple "sub async iterable(iterator)"s by line, size or needle.

Installation

npm i async-iterable-split

Quick Start

1. Split by line

Suppose we have a foo.txt file with the following contents:

this is the first line
hello world
this is the last line
import { createReadStream } from "node:fs";
import { Splitable } from "async-iterable-split";

const iterable = createReadStream("./foo.txt"); // Node.js readable stream is async iterable (since v10.0.0)
const splitable = new Splitable(iterable); // Any object that deploys the [Symbol.asyncIterator] interface can be used as a parameter for instantiating Splitable

while (await splitable.hasValue()) {
  const subIterator = splitable.splitLine(); // Create a "sub async iterable(iterator)", all the data iterated from this object is one row of the original
  const chunks = [];
  for await (const chunk of subIterator) {
    chunks.push(chunk);
  }
  // When the "for await of" loop exits, all the data of a row will be iterated out
  console.dir(Buffer.concat(chunks).toString("utf-8"));
}

The output looks like this:

'this is the first line'
'hello world'
'this is the last line'

You can also use the readLine method to get a whole line at once:

import { createReadStream } from "node:fs";
import { Splitable } from "async-iterable-split";

const splitable = new Splitable(createReadStream("./foo.txt"));
while (await splitable.hasValue()) {
  const line = await splitable.readLine(); // Type of "line" is Uint8Array
  console.dir(Buffer.from(line).toString("utf-8"));
}

Line break can be either LF or CRLF.

2. Split by size

Suppose we have a foo.txt file with the following contents:

abcdefghijklmnopqrstuvwxyz
import { createReadStream } from "node:fs";
import { Splitable } from "async-iterable-split";

const splitable = new Splitable(createReadStream("./foo.txt"));

while (await splitable.hasValue()) {
  const subIterator = splitable.splitSize(10); // Create a "sub async iterable(iterator)", all the data iterated from this object is 10 bytes of the original
  const chunks = [];
  for await (const chunk of subIterator) {
    chunks.push(chunk);
  }
  console.dir(Buffer.concat(chunks).toString("utf-8"));
}

The output looks like this:

'abcdefghij'
'klmnopqrst'
'uvwxyz'

You can also use the readSize method to get fixed-size data at once:

import { createReadStream } from "node:fs";
import { Splitable } from "async-iterable-split";

const splitable = new Splitable(createReadStream("./foo.txt"));
while (await splitable.hasValue()) {
  const part = await splitable.readSize(10);
  console.dir(Buffer.from(part).toString("utf-8"));
}

3. Split by needle

Suppose we have a foo.txt file with the following contents:

foobarbaz
import { createReadStream } from "node:fs";
import { Splitable } from "async-iterable-split";

const splitable = new Splitable(createReadStream("./foo.txt"));

while (await splitable.hasValue()) {
  const subIterator = splitable.splitBeforeNeedle(Buffer.from("ba")); // Create a "sub async iterable(iterator)", all the data iterated from this object is data before "ba" of the original
  const chunks = [];
  for await (const chunk of subIterator) {
    chunks.push(chunk);
  }
  console.dir(Buffer.concat(chunks).toString("utf-8"));
}

The output looks like this:

'foo'
'r'
'z'

You can also use the readBeforeNeedle method to get all the data before needle at once:

import { createReadStream } from "node:fs";
import { Splitable } from "async-iterable-split";

const splitable = new Splitable(createReadStream("./foo.txt"));
while (await splitable.hasValue()) {
  const part = await splitable.readBeforeNeedle(Buffer.from("ba"));
  console.dir(Buffer.from(part).toString("utf-8"));
}

4. Cross-use

The above three splitting methods can be used alone or cross-used. The following is an example of parsing an HTTP request message, there are parts split by line and parts split by size:

Server:

import { createServer } from "node:net";
import { Splitable } from "async-iterable-split";

const server = createServer(async (socket) => {
  try {
    // Node.js TCP socket is also async iterable
    const splitable = new Splitable(socket);
    while (await splitable.hasValue()) {
      // httpReqParser will parses one HTTP request message at a time, one TCP connection may contain multiple HTTP messages
      console.log(await httpReqParser(splitable));
    }
  } catch (error) {
    console.log("got error", error);
    socket.destroy(error);
  }
}).listen(8888, "127.0.0.1", () => {
  console.log(server.address());
});

// This is just a simple HTTP request message parser, a production-ready parser needs more consideration.
async function httpReqParser(splitable) {
  const reqMsg = {};

  // Parse the request line, limiting the request line size to 65535 bytes
  const reqLine = Buffer.from(await splitable.readLine(65536)).toString("ascii");
  const reqInfos = reqLine.split(" ");
  reqMsg.method = reqInfos.shift();
  reqMsg.uri = reqInfos.shift();
  reqMsg.httpVersion = reqInfos.shift().replace("HTTP/", "");

  // Parse the request headers, limit the size of a single request header to 16384 bytes, and limit the count of request headers to 256
  reqMsg.headers = {};
  let headerCount = 0;
  while (await splitable.hasValue()) {
    if (headerCount > 256) {
      throw new Error("header count exceeded limit");
    }
    const header = Buffer.from(await splitable.readLine(16384)).toString("ascii");
    // If a blank line is encountered, it means the end of the request headers, jump out of the while loop
    if (header.length === 0) {
      break;
    }
    headerCount++;
    const [key, value] = header.split(":");
    reqMsg.headers[key.toLowerCase()] = value.trim();
  }

  // Parse the request body and limit the size of the request body to 1MB
  const bodySize = Number(reqMsg.headers["content-length"]) || 0;
  if (bodySize > 2 ** 20) {
    throw new Error("body size exceeded limit");
  }
  reqMsg.body = Buffer.from(await splitable.readSize(bodySize)).toString("utf-8");

  // return parsing result
  return reqMsg;
}

Client:

import { connect } from "node:net";

// Establish a TCP connection, and after successful establishment, send two consecutive HTTP request messages
const socket = connect({ host: "127.0.0.1", port: 8888 }, () => {
  socket.write(`GET / HTTP/1.1
Host: 127.0.0.1

`);

  socket.write(`POST /ping HTTP/1.1
Host: 127.0.0.1
Content-Type: text/plain; charset=utf-8
Content-Length: 8

👋ping`);
});

The server output looks like this:

{
  method: 'GET',
  uri: '/',
  httpVersion: '1.1',
  headers: { host: '127.0.0.1' },
  body: ''
}
{
  method: 'POST',
  uri: '/ping',
  httpVersion: '1.1',
  headers: {
    host: '127.0.0.1',
    'content-type': 'text/plain; charset=utf-8',
    'content-length': '8'
  },
  body: '👋ping'
}

Cautions

  1. The "sub async iterable(iterator)" created by splitXXX methods cannot be iterated at the same time. You must wait for the previous round of iteration to end before starting a new round of iteration.

  2. When calling readXXX methods, the corresponding "sub async iterable(iterator)" will be automatically created and iterated immediately. The iterated data will be temporarily stored in memory. When the iteration ends, All data chunks will be concated into one data chunk and resolve out. It is recommended to set a reasonable size limit to avoid memory leaks.

  3. splitable.hasValue() is used to check if any data can be iterated out,this method is asynchronous. It cannot be called again while it is in pending state, or a "sub async iterable(iterator)" is iterating.

  4. After a "sub async iterable(iterator)" is done, if you want to get the remaining original data, you can directly iterate the splitable, it's also an async iterable(iterator).


将一个“异步可迭代对象”(值为 Uint8Array)按行、大小或 needle 拆分成多个“子异步可迭代对象(迭代器)”。

安装

npm i async-iterable-split

快速开始

1. 按行拆分

假设我们有一个 foo.txt 文件,文件内容如下所示:

this is the first line
hello world
this is the last line
import { createReadStream } from "node:fs";
import { Splitable } from "async-iterable-split";

const iterable = createReadStream("./foo.txt"); // Node.js 的可读流是异步可迭代对象(从 v10.0.0 开始)
const splitable = new Splitable(iterable); // 任何部署了 [Symbol.asyncIterator] 接口的对象,都可以作为实例化 Splitable 的参数

while (await splitable.hasValue()) {
  const subIterator = splitable.splitLine(); // 创建一个“子异步可迭代对象(迭代器)”,该对象迭代出的全部数据是原始数据中的一行
  const chunks = [];
  for await (const chunk of subIterator) {
    chunks.push(chunk);
  }
  // 当 for await of 循环退出时,一行的数据就全部迭代出来了
  console.dir(Buffer.concat(chunks).toString("utf-8"));
}

输出如下所示:

'this is the first line'
'hello world'
'this is the last line'

也可以使用 readLine 方法一次性获取一整行的数据:

import { createReadStream } from "node:fs";
import { Splitable } from "async-iterable-split";

const splitable = new Splitable(createReadStream("./foo.txt"));
while (await splitable.hasValue()) {
  const line = await splitable.readLine(); // "line" 的类型是 Uint8Array
  console.dir(Buffer.from(line).toString("utf-8"));
}

换行符可以是 LF 也可以是 CRLF

2. 按大小拆分

假设我们有一个 foo.txt 文件,文件内容如下所示:

abcdefghijklmnopqrstuvwxyz
import { createReadStream } from "node:fs";
import { Splitable } from "async-iterable-split";

const splitable = new Splitable(createReadStream("./foo.txt"));

while (await splitable.hasValue()) {
  const subIterator = splitable.splitSize(10); // 创建一个子异步可迭代对象(迭代器),该对象迭代出的全部数据是原始数据中的 10 个字节
  const chunks = [];
  for await (const chunk of subIterator) {
    chunks.push(chunk);
  }
  console.dir(Buffer.concat(chunks).toString("utf-8"));
}

输出如下所示:

'abcdefghij'
'klmnopqrst'
'uvwxyz'

也可以使用 readSize 方法一次性获取固定大小的数据:

import { createReadStream } from "node:fs";
import { Splitable } from "async-iterable-split";

const splitable = new Splitable(createReadStream("./foo.txt"));
while (await splitable.hasValue()) {
  const part = await splitable.readSize(10);
  console.dir(Buffer.from(part).toString("utf-8"));
}

3. 按 needle 拆分

假设我们有一个 foo.txt 文件,文件内容如下所示:

foobarbaz
import { createReadStream } from "node:fs";
import { Splitable } from "async-iterable-split";

const splitable = new Splitable(createReadStream("./foo.txt"));

while (await splitable.hasValue()) {
  const subIterator = splitable.splitBeforeNeedle(Buffer.from("ba")); // 创建一个子异步可迭代对象(迭代器),该对象迭代出的数据是原始数据中 "ba" 前面的部分
  const chunks = [];
  for await (const chunk of subIterator) {
    chunks.push(chunk);
  }
  console.dir(Buffer.concat(chunks).toString("utf-8"));
}

输出如下所示:

'foo'
'r'
'z'

也可以使用 readBeforeNeedle 方法一次性获取 needle 前面的所有数据:

import { createReadStream } from "node:fs";
import { Splitable } from "async-iterable-split";

const splitable = new Splitable(createReadStream("./foo.txt"));
while (await splitable.hasValue()) {
  const part = await splitable.readBeforeNeedle(Buffer.from("ba"));
  console.dir(Buffer.from(part).toString("utf-8"));
}

4. 交叉使用

以上三种拆分方式即可以单独使用,也可以交叉使用,下面是一个解析 HTTP 请求报文的示例,即有按行拆分的部分,也有按大小拆分的部分:

服务端

import { createServer } from "node:net";
import { Splitable } from "async-iterable-split";

const server = createServer(async (socket) => {
  try {
    // Node.js 中的 TCP socket 也是异步可迭代对象
    const splitable = new Splitable(socket);
    while (await splitable.hasValue()) {
      // httpReqParser 每次解析一个 HTTP 请求报文,一个 TCP 连接可能包含多个 HTTP 报文
      console.log(await httpReqParser(splitable));
    }
  } catch (error) {
    console.log("got error", error);
    socket.destroy(error);
  }
}).listen(8888, "127.0.0.1", () => {
  console.log(server.address());
});

// 这只是一个简单的 HTTP 请求报文解析器,一个可用于生产环境的解析器要考虑更多。
async function httpReqParser(splitable) {
  const reqMsg = {};

  // 解析请求行,将请求行大小限制在 65535 个字节以内
  const reqLine = Buffer.from(await splitable.readLine(65536)).toString("ascii");
  const reqInfos = reqLine.split(" ");
  reqMsg.method = reqInfos.shift();
  reqMsg.uri = reqInfos.shift();
  reqMsg.httpVersion = reqInfos.shift().replace("HTTP/", "");

  // 解析请求头,将单个请求头大小限制在 16384 个字节以内,请求头个数限制在 256 个以内
  reqMsg.headers = {};
  let headerCount = 0;
  while (await splitable.hasValue()) {
    if (headerCount > 256) {
      throw new Error("header count exceeded limit");
    }
    const header = Buffer.from(await splitable.readLine(16384)).toString("ascii");
    // 如果遇到了空行代表请求头部分结束了,跳出 while 循环
    if (header.length === 0) {
      break;
    }
    headerCount++;
    const [key, value] = header.split(":");
    reqMsg.headers[key.toLowerCase()] = value.trim();
  }

  // 解析请求体,将请求体大小限制在 1MB 以内
  const bodySize = Number(reqMsg.headers["content-length"]) || 0;
  if (bodySize > 2 ** 20) {
    throw new Error("body size exceeded limit");
  }
  reqMsg.body = Buffer.from(await splitable.readSize(bodySize)).toString("utf-8");

  // 返回解析结果
  return reqMsg;
}

客户端

import { connect } from "node:net";

// 建立 TCP 连接,并在建立成功后,连续发送两条 HTTP 请求报文
const socket = connect({ host: "127.0.0.1", port: 8888 }, () => {
  socket.write(`GET / HTTP/1.1
Host: 127.0.0.1

`);

  socket.write(`POST /ping HTTP/1.1
Host: 127.0.0.1
Content-Type: text/plain; charset=utf-8
Content-Length: 8

👋ping`);
});

服务端的输出如下所示:

{
  method: 'GET',
  uri: '/',
  httpVersion: '1.1',
  headers: { host: '127.0.0.1' },
  body: ''
}
{
  method: 'POST',
  uri: '/ping',
  httpVersion: '1.1',
  headers: {
    host: '127.0.0.1',
    'content-type': 'text/plain; charset=utf-8',
    'content-length': '8'
  },
  body: '👋ping'
}

注意事项

  1. 通过 splitXXX 方法创建的“子异步可迭代对象(迭代器)”不能被同时迭代,必须等前一轮迭代结束了,才能开启新的一轮迭代。

  2. 调用 readXXX 方法时会自动创建对应的“子异步可迭代对象(迭代器)”并立即对其进行迭代,迭代出的数据会暂存在内存中,当迭代结束时,会将所有数据块合并成一个完整的数据块 resolve 出来。建议使用这些方法时设置一个合理的大小限制,避免内存泄漏。

  3. splitable.hasValue() 用于判断是否还可以迭代出数据,该方法是异步的。当它处于 pending 状态时不可再次调用,亦不可迭代“子异步可迭代对象(迭代器)”。

  4. 当迭代完某个“子异步可迭代对象(迭代器)”后,如果想获取剩余的原始数据,可以直接迭代 splitable,它也是一个异步可迭代对象(迭代器)。

Package Sidebar

Install

npm i async-iterable-split

Weekly Downloads

2

Version

3.0.2

License

MIT

Unpacked Size

36.8 kB

Total Files

5

Last publish

Collaborators

  • haochuan9421