Creating Pipelines
The task
Decorator
Pyper’s task
decorator is the means by which we instantiate pipelines and control their behaviour:
from pyper import task, Pipeline
def func(x: int):
return x + 1
pipeline = task(func)
assert isinstance(pipeline, Pipeline)
This creates a Pipeline
object consisting of one ‘task’ (one step of data transformation).
In addition to functions, anything callable
in Python can be wrapped in task
in the same way:
from pyper import task
class Doubler:
def __call__(self, x: int):
return 2 * x
pipeline1 = task(Doubler())
pipeline2 = task(lambda x: x - 1)
pipeline3 = task(range)
The internal behaviour of a pipeline (e.g number of workers) is controlled by the different parameters for task
. Refer to the API Reference
Pipeline Usage
Recall that a Pipeline
is itself essentially a function. Pipelines return a Generator object (Python’s mechanism for lazily iterating through data).
from pyper import task
def func(x: int):
return x + 1
if __name__ == "__main__":
pipeline = task(func)
for output in pipeline(x=0):
print(output)
#> 1
A Pipeline always takes the input of its first task, and yields each output from its last task
A pipeline that generates multiple outputs can be created using the branch
parameter:
from pyper import task
def func(x: int):
yield x + 1
yield x + 2
yield x + 3
if __name__ == "__main__":
pipeline = task(func, branch=True)
for output in pipeline(x=0):
print(output)
#> 1
#> 2
#> 3
Asynchronous Code
Asynchronous functions/callables are used to create AsyncPipeline
objects, which behave in an intuitively analogous way to Pipeline
:
import asyncio
from pyper import task
async def func(x: int):
return x + 1
async def main():
pipeline = task(func)
async for output in pipeline(x=0):
print(output)
#> 1
if __name__ == "__main__":
asyncio.run(main())
Note that AsyncPipeline
objects return an AsyncGenerator
which is iterated over with async for
syntax.