Task API
- pab.task.TaskList
Type for an explicit list of Tasks
alias of
list
[Task]
- pab.task.RawTasksData
Type for an explicit list of task data dictionaries.
alias of
List
[dict
]
- class pab.task.Task(id_: int, strat: pab.strategy.BaseStrategy, next_at: int, repeat_every: Optional[dict] = None)
Container for a strategy to be executed in the future
- RUN_ASAP: int = -10
Constant. Means job should be rescheduled to run ASAP.
- RUN_NEVER: int = -20
Constant. Means job should’t be rescheduled.
- id
Internal Task ID
- strategy: pab.strategy.BaseStrategy
Strategy object
- next_at: int
Next execution time as timestamp
- repeat_every: dict | None
Repetition data. A dict that functions as kwargs for datetime.timedelta
- last_start: int
Last execution start time as timestamp
- reschedule() None
Calculates next execution if applies and calls
schedule_for()
- repeats() bool
True if Task has repetition data.
- next_repetition_time() int
Returns next repetition time based on
last_start
andrepeat_every
.
- schedule_for(next_at: int) None
Updates
self.next_at
for a specific time. Will disable job if value isTask.RUN_NEVER
.
- process() None
Calls
_process()
and handlespab.strategy.RescheduleError
.
- _process() None
Runs strategy and updates schedule.
- class pab.task.TaskFileParser(root: pathlib.Path, blockchain: pab.blockchain.Blockchain, strategies: list[pab.strategy.BaseStrategy])
Parses a tasks file and loads a TaskList.
- load() pab.task.TaskList
Loads TaskList from tasks file.
- _load_tasks_json_or_exception(fhandle: TextIO) pab.task.RawTasksData
Parses TextIO input as JSON, validates and returns raw data. May raise
TasksLoadError
.
- _validate_raw_data(data: Any) None
Validates raw tasks data format.
- _create_tasklist(tasks: pab.task.RawTasksData) pab.task.TaskList
Creates a list of
Task
objects from raw data.
- _create_strat_from_data(data: dict) pab.strategy.BaseStrategy
Creates a single
Task
object from raw data.
- _find_strat_by_name(name: str) Optional[Callable]
Finds a strategy by name. May raise
UnkownStrategyError
.
- exception pab.task.TasksLoadError
- exception pab.task.UnkownStrategyError