pyper.task
For convenience, we will use the following terminology on this page:
- Producer: The first task within a pipeline
- Producer-consumer: Any task after the first task within a pipeline
task
def __new__(
cls,
func: Optional[Callable] = None,
/,
*,
branch: bool = False,
join: bool = False,
workers: int = 1,
throttle: int = 0,
multiprocess: bool = False,
bind: Optional[Tuple[Tuple[Any], Dict[str, Any]]] = None):
Used to initialize a Pipeline object, consisting of one ‘task’ (one functional operation).
Pipelines created this way can be composed into new pipelines that contain multiple tasks.
Parameters
func
- type:
Optional[Callable]
- default:
None
The function or callable object defining the logic of the task. This is a positional-only parameter.
from pyper import task
@task
def add_one(x: int):
return x + 1
# OR
def add_one(x: int):
return x + 1
pipeline = task(add_one)
branch
- type:
bool
- default:
False
When branch
is False
, the output of the task is the value it returns. Setting branch
to True
allows a task to generate multiple outputs. This requires the task to return an Iterable
(or AsyncIterable
).
from pyper import task
def create_data(x: int):
return [x + 1, x + 2, x + 3]
if __name__ == "__main__":
pipeline1 = task(create_data)
for output in pipeline1(0):
print(output)
#> [1, 2, 3]
pipeline2 = task(create_data, branch=True)
for output in pipeline2(0):
print(output)
#> 1
#> 2
#> 3
This can be applied to generator functions (or async generator functions) to submit outputs lazily:
from pyper import task
def create_data(x: int):
yield 1
yield 2
yield 3
if __name__ == "__main__":
pipeline = task(create_data, branch=True)
for output in pipeline(0):
print(output)
#> 1
#> 2
#> 3
join
- type:
bool
- default:
False
When join
is False
, a producer-consumer takes each individual output from the previous task as input. When True
, a producer-consumer takes a stream of inputs from the previous task.
from typing import Iterable
from pyper import task
@task(branch=True)
def create_data(x: int):
return [x + 1, x + 2, x + 3]
@task(branch=True, join=True)
def running_total(data: Iterable[int]):
total = 0
for item in data:
total += item
yield total
if __name__ == "__main__":
pipeline = create_data | running_total
for output in pipeline(0):
print(output)
#> 1
#> 3
#> 6
A producer cannot have join
set as True
A task with join=True
can also be run with multiple workers, which will pull from the previous task in a thread-safe/process-safe way. Note, however, that the order of outputs cannot be maintained consistently when a joined task is run with more than one worker.
workers
- type:
int
- default:
1
The parameter workers
takes a int
value which determines the number of workers executing the task concurrently or in parallel.
import time
from pyper import task
def slow_func(data: int):
time.sleep(2)
return data
if __name__ == "__main__":
pipeline = task(range, branch=True) | task(slow_func, workers=20)
# Runs in ~2 seconds
for output in pipeline(20):
print(output)
A producer cannot have workers
set greater than 1
throttle
- type:
int
- default:
0
The parameter throttle
determines the maximum size of a task’s output queue. The purpose of this parameter is to give finer control over memory in situations where:
- A producer/producer-consumer generates data very quickly
- A producer-consumer/consumer processes that data very slowly
import time
from pyper import task
@task(branch=True, throttle=5000)
def fast_producer():
for i in range(1_000_000):
yield i
@task
def slow_consumer(data: int):
time.sleep(10)
return data
In the example above, workers on fast_producer
are paused after 5000
values have been generated, until workers for slow_consumer
are ready to start processing again. If no throttle were specified, workers for fast_producer
would quickly flood its output queue with up to 1_000_000
values, which all have to be allocated in memory.
multiprocess
- type:
bool
- default:
False
By default, synchronous tasks are run in threading.Thread
workers and asynchronous tasks are run in asyncio.Task
workers. The multiprocess
parameter allows synchronous tasks be be run with multiprocessing.Process
instead, benefitting heavily CPU-bound tasks.
from pyper import task
def slow_func(data: int):
for i in range(1, 10_000_000):
i *= i
return data
if __name__ == "__main__":
pipeline = (
task(range, branch=True)
| task(slow_func, workers=20, multiprocess=True)
)
for output in pipeline(20):
print(output)
An asynchronous task cannot set multiprocessing
as True
See some considerations for when to set this parameter.
Note, also, that normal Python multiprocessing restrictions apply:
- Only picklable functions can be multiprocessed, which excludes certain types of functions like lambdas and closures.
- Arguments and return values of multiprocessed tasks must also be picklable, which excludes objects like file handles, connections, and (on Windows) generators.
bind
- type:
Optional[Tuple[Tuple[Any], Dict[str, Any]]]
- default:
None
The parameter bind
allows additional args
and kwargs
to be bound to a task when creating a pipeline.
from pyper import task
def apply_multiplier(data: int, multiplier: int):
return data * multiplier
if __name__ == "__main__":
pipeline = (
task(range, branch=True)
| task(apply_multiplier, bind=task.bind(multiplier=10))
)
for output in pipeline(1, 4):
print(output)
#> 10
#> 20
#> 30
Given that each producer-consumer expects to be given one input argument, the purpose of the bind
parameter is to allow functions to be defined flexibly in terms of the inputs they wish to take, as well as allowing tasks to access external states, like contexts.
task.bind
@staticmethod
def bind(*args, **kwargs):
task.bind
is the utility method that can be used to supply arguments to the bind
parameter, which uses functools.partial
under the hood.
The method accepts normal valid *args
and **kwargs
.