crnt - v0.0.31
    Preparing search index...

    Interface Stream<T>

    Stream is an abstraction over AsyncIterable that supports concurrent processing with batching.

    interface Stream<T> {
        map<U>(fn: (value: T) => Promise<U>, config?: MapOption): Stream<U>;
        mapBatch<U>(
            fn: (value: T[]) => Promise<U[]>,
            config?: BatchOption,
        ): Stream<U>;
        toArray(): Promise<T[]>;
    }

    Type Parameters

    • T

      The type of items in the stream

    Hierarchy

    Index

    Methods

    • Maps a function over each item in the stream with controlled concurrency.

      This is essentially mapBatch with batchSize: 1, meaning each item is processed individually but with concurrency control.

      Type Parameters

      • U

        The type of items returned by the mapping function

      Parameters

      • fn: (value: T) => Promise<U>

        Async function to process each item

      • Optionalconfig: MapOption

        Configuration options for concurrency and progress tracking

      Returns Stream<U>

      New stream with mapped items

      // Process items with controlled concurrency
      const results = await Stream([1, 2, 3, 4, 5])
      .map(async (x) => {
      await delay(100); // Simulate async work
      return x * 2;
      }, { concurrency: 3 });
      // Results: [2, 4, 6, 8, 10]
    • Maps a function over batches of items in the stream with controlled concurrency.

      Items are grouped into batches of the specified size, and batches are processed with the specified concurrency. This is ideal for bulk operations like database inserts, API calls with batch endpoints, or any operation that benefits from processing multiple items together.

      Type Parameters

      • U

        The type of items returned by the mapping function

      Parameters

      • fn: (value: T[]) => Promise<U[]>

        Async function to process each batch of items

      • Optionalconfig: BatchOption

        Configuration options for batching, concurrency, timeouts, and progress

      Returns Stream<U>

      New stream with items from processed batches

      // Process items in batches
      const results = await Stream([1, 2, 3, 4, 5, 6])
      .mapBatch(async (batch) => {
      // batch is [1, 2, 3] then [4, 5, 6]
      console.log('Processing batch:', batch);
      return batch.map(x => x * 2);
      }, {
      batchSize: 3,
      concurrency: 2,
      batchDelay: 1000 // Flush incomplete batches after 1s
      });
    • Collects all items from the stream into an array.

      Returns Promise<T[]>

      Promise that resolves to an array containing all items from the stream