Creating Pipelines

The task Decorator

Pyper’s task decorator is the means by which we instantiate pipelines and control their behaviour:

from pyper import task, Pipeline

def func(x: int):
    return x + 1

pipeline = task(func)

assert isinstance(pipeline, Pipeline)

This creates a Pipeline object consisting of one ‘task’ (one step of data transformation).

In addition to functions, anything callable in Python can be wrapped in task in the same way:

from pyper import task

class Doubler:
    def __call__(self, x: int):
        return 2 * x

pipeline1 = task(Doubler())
pipeline2 = task(lambda x: x - 1)
pipeline3 = task(range)

The internal behaviour of a pipeline (e.g number of workers) is controlled by the different parameters for task. Refer to the API Reference

Pipeline Usage

Recall that a Pipeline is itself essentially a function. Pipelines return a Generator object (Python’s mechanism for lazily iterating through data).

from pyper import task

def func(x: int):
    return x + 1

if __name__ == "__main__":
    pipeline = task(func)
    for output in pipeline(x=0):
        print(output)
        #> 1

A Pipeline always takes the input of its first task, and yields each output from its last task

A pipeline that generates multiple outputs can be created using the branch parameter:

from pyper import task

def func(x: int):
    yield x + 1
    yield x + 2
    yield x + 3

if __name__ == "__main__":
    pipeline = task(func, branch=True)
    for output in pipeline(x=0):
        print(output)
        #> 1
        #> 2
        #> 3

Asynchronous Code

Asynchronous functions/callables are used to create AsyncPipeline objects, which behave in an intuitively analogous way to Pipeline:

import asyncio
from pyper import task

async def func(x: int):
    return x + 1

async def main():
    pipeline = task(func)
    async for output in pipeline(x=0):
        print(output)
        #> 1

if __name__ == "__main__":
    asyncio.run(main())

Note that AsyncPipeline objects return an AsyncGenerator which is iterated over with async for syntax.