Creating Pipelines
The task
Decorator
Pyper’s task
decorator is the means by which we instantiate pipelines and control their behaviour:
from pyper import task, Pipeline
@task
def func(x: int):
return x + 1
assert isinstance(func, Pipeline)
This creates a Pipeline
object consisting of one ‘task’ (one step of data transformation).
The task
decorator can also be used more dynamically, which is preferable in most cases as this separates execution logic from the functional definitions themselves:
from pyper import task
def func(x: int):
return x + 1
pipeline = task(func)
In addition to functions, anything callable
in Python can be wrapped in task
in the same way:
from pyper import task
class Doubler:
def __call__(self, x: int):
return 2 * x
pipeline1 = task(Doubler())
pipeline2 = task(lambda x: x - 1)
pipeline3 = task(range)
The internal behaviour of a pipeline (e.g number of workers) is controlled by the different parameters for task
. Refer to the API Reference
Pipeline Usage
Recall that a Pipeline
is itself essentially a function. Pipelines return a Generator object (Python’s mechanism for lazily iterating through data).
from pyper import task
def func(x: int):
return x + 1
if __name__ == "__main__":
pipeline = task(func)
for output in pipeline(x=0):
print(output)
#> 1
A Pipeline always takes the input of its first task, and yields each output from its last task
A pipeline that generates multiple outputs can be created using the branch
parameter:
from pyper import task
def func(x: int):
yield x + 1
yield x + 2
yield x + 3
if __name__ == "__main__":
pipeline = task(func, branch=True)
for output in pipeline(x=0):
print(output)
#> 1
#> 2
#> 3
Asynchronous Code
Asynchronous functions/callables are used to create AsyncPipeline
objects, which behave in an intuitively analogous way to Pipeline
:
import asyncio
from pyper import task
async def func(x: int):
return x + 1
async def main():
pipeline = task(func)
async for output in pipeline(x=0):
print(output)
#> 1
if __name__ == "__main__":
asyncio.run(main())
Note that AsyncPipeline
objects return an AsyncGenerator
which is iterated over with async for
syntax.