async-job-state
TypeScript icon, indicating that this package has built-in type declarations

0.2.1 • Public • Published

async-job-state github, npm

A small helper library for creating asynchronous tasks with reporting and remaining time estimation.

TODO: better readme

Usage

Simple usage:

import {
  // job execution
  collectAsyncJob,
  startAsyncJob,
  asyncJobStreamTransform,

  // job creation helpers:
  indefiniteAsyncJob,
  arrayMapAsyncJob,
  rangeAsyncJob,
  sleep,

  // useful types:
  type AsyncJobState,
  AsyncJobPhase,
} from 'async-job-state';

const customTestJob = async function*(state: AsyncJobState) {
  yield state.itemIndex * 2;
  if(state.itemIndex >= 5) return;
};

const state = await startAsyncJob(customTestJob, {
  // optional delay between iterations
  delay: () => new Promise(r => setTimeout(r, 100)),
  onStart(state) {
    console.log('Job started at index:', state.itemIndex);
    return { myAdditionalStateProperty: 'test job!' };
  },
  onProgress(state, item) {
    console.log('Progress:', state.itemIndex, item);
    // do something with the item
  },
  onEnd(state) {
    console.log('Job ended at index:', state.itemIndex);
  }
});

console.log(state.myAdditionalStateProperty);
// cancel the job by setting state.cancelled = true

await state.donePromise;

// alternatively to startAsyncJob which returns the state, you can use:
const results = await collectAsyncJob(customTestJob, { /* options */ });

Common job helpers:

console.log(
  await collectAsyncJob(indefiniteAsyncJob((stop, state) => {
    // calling stop() will end the task after the current iteration
    if (state.itemIndex === 3) stop(); // you can `return stop()` instead to end the task immediately
    return state.itemIndex;
  }))
); // [0, 1, 2, 3]

console.log(
  await arrayMapAsyncJob(['a', 'b', 'c'], async (item, state) => {
    // callbacks can be async
    return await Promise.resolve(item.toUpperCase());
  })
); // ['A', 'B', 'C']

console.log(
  await collectAsyncJob(rangeAsyncJob(3, (item) => item * 10))
); // [0, 10, 20]


// stream status reporting
const { state, transform } = asyncJobStreamTransform({
  timingAverageWindow: 5,
  startImmediately: true, 
  // if startImmediately is not set to true, the job will only start automatically
  // when the first stream item is processed, but then first time measurement will be off
});

// optionally set state.totalItems for time remaining estimates.

const jobStream = stream.Readable.from(customTestJob()).pipe(transform);
// use your stream as usual, state will be updated on each processed item

Types:

enum AsyncJobPhase {
    NotStarted = 0,
    Started = 1,
    Ended = 2
}

interface AsyncJobState {
  itemIndex: number;
  itemsTotal: number | undefined;
  cancelled: boolean;
  readonly phase: AsyncJobPhase;
  readonly startDate: Date;
  readonly endDate: Date | null;
  readonly clock: DurationIntervalClock | null;
  readonly estimatedRemainingSeconds: number | undefined;
  readonly error: any;
  readonly donePromise: Promise<void>;
}

interface DurationIntervalClock {
  durationsSamples: number[];
  intervalSamples: number[];
  checkHasGoodAverage(q?: number): boolean;
  lastDuration: number | undefined;
  lastInterval: number | undefined;
  averageDuration: number | undefined;
  averageInterval: number | undefined;
}

interface AsyncJobOptions<ResultValue, StateEx = void> {
    delay?: () => Promise<any>;
    timingAverageWindow?: number;

    onStart?: (state: AsyncJobState) 
      => MaybePromise<StateEx | AsyncJobStateReplacement<StateEx>>;

    onEnd?: (state: ExtendedAsyncJobState<StateEx>) 
      => MaybePromise<void | Partial<StateEx & AsyncJobState> | AsyncJobStateReplacement<StateEx>>;

    onProgress?: (state: ExtendedAsyncJobState<StateEx>, item: ResultValue) 
      => MaybePromise<void | Partial<StateEx & AsyncJobState> | AsyncJobStateReplacement<StateEx>>;
}

type AsyncJob<R> = AsyncIterable<Awaited<R>, void, void>;
type AsyncJobFactory<R> = (state: AsyncJobState) => AsyncJob<R>;

// Returning from a callback an object wrapped using this function will cause the state to be completely replaced with the return value.
// If a raw object is returned from a callback, Object.assign is used to update the state.
function asyncJobStateReplacement<T extends AsyncJobState>(state: T): AsyncJobStateReplacement<T>;

async function startAsyncJob<R, StateEx>(factory: AsyncJob<R> | AsyncJobFactory<R>, options: AsyncJobOptions<R, StateEx>): Promise<ExtendedAsyncJobState<StateEx>>;

async function collectAsyncJob<R, StateEx>(factory: AsyncJob<R> | AsyncJobFactory<R>, options?: AsyncJobOptions<R, StateEx>): Promise<R[]>;

Package Sidebar

Install

npm i async-job-state

Weekly Downloads

0

Version

0.2.1

License

ISC

Unpacked Size

56.2 kB

Total Files

7

Last publish

Collaborators

  • sebimoe