任务管理

简介

工作流 部分介绍了如何以松耦合的方式运行研究流程。但使用 qrun 时只能执行一个 任务。 为了自动生成和执行不同任务,任务管理 提供了完整流程,包括 任务生成任务存储任务训练任务收集。 通过该模块,用户可以在不同周期、使用不同损失函数甚至不同模型自动运行其 任务。任务生成、模型训练以及数据合并收集的流程如下图所示。

../_images/Task-Gen-Recorder-Collector.svg

该完整流程可用于 在线服务

完整流程示例参见 此处

任务生成

一个 taskModelDatasetRecord 或用户添加的任何内容组成。 具体任务模板可查看 任务章节。 尽管任务模板是固定的,但用户可以通过自定义 TaskGen 来基于任务模板生成不同的 task

以下是 TaskGen 的基类:

class qlib.workflow.task.gen.TaskGen

The base class for generating different tasks

Example 1:

input: a specific task template and rolling steps

output: rolling version of the tasks

Example 2:

input: a specific task template and losses list

output: a set of tasks with different losses

abstractmethod generate(task: dict) List[dict]

Generate different tasks based on a task template

参数:

task (dict) -- a task template

返回:

A list of tasks

返回类型:

List[dict]

Qlib provides a class RollingGen to generate a list of task of the dataset in different date segments. This class allows users to verify the effect of data from different periods on the model in one experiment. More information is here.

任务存储

To achieve higher efficiency and the possibility of cluster operation, Task Manager will store all tasks in MongoDB. TaskManager 能自动获取未完成任务,并通过错误处理管理一组任务的生命周期。 使用此模块时,用户**必须**完成 MongoDB 的配置。

用户需在 初始化 时提供MongoDB URL和数据库名称以使用 TaskManager,或进行如下声明。

from qlib.config import C
C["mongo"] = {
    "task_url" : "mongodb://localhost:27017/", # your MongoDB url
    "task_db_name" : "rolling_db" # database name
}
class qlib.workflow.task.manage.TaskManager(task_pool: str)

Here is what will a task looks like when it created by TaskManager

{
    'def': pickle serialized task definition.  using pickle will make it easier
    'filter': json-like data. This is for filtering the tasks.
    'status': 'waiting' | 'running' | 'done'
    'res': pickle serialized task result,
}

The tasks manager assumes that you will only update the tasks you fetched. The mongo fetch one and update will make it date updating secure.

This class can be used as a tool from commandline. Here are several examples. You can view the help of manage module with the following commands: python -m qlib.workflow.task.manage -h # show manual of manage module CLI python -m qlib.workflow.task.manage wait -h # show manual of the wait command of manage

python -m qlib.workflow.task.manage -t <pool_name> wait
python -m qlib.workflow.task.manage -t <pool_name> task_stat

备注

Assumption: the data in MongoDB was encoded and the data out of MongoDB was decoded

Here are four status which are:

STATUS_WAITING: waiting for training

STATUS_RUNNING: training

STATUS_PART_DONE: finished some step and waiting for next step

STATUS_DONE: all work done

__init__(task_pool: str)

Init Task Manager, remember to make the statement of MongoDB url and database name firstly. A TaskManager instance serves a specific task pool. The static method of this module serves the whole MongoDB.

参数:

task_pool (str) -- the name of Collection in MongoDB

static list() list

List the all collection(task_pool) of the db.

返回:

list

replace_task(task, new_task)

Use a new task to replace a old one

参数:
  • task -- old task

  • new_task -- new task

insert_task(task)

Insert a task.

参数:

task -- the task waiting for insert

返回:

pymongo.results.InsertOneResult

insert_task_def(task_def)

Insert a task to task_pool

参数:

task_def (dict) -- the task definition

返回类型:

pymongo.results.InsertOneResult

create_task(task_def_l, dry_run=False, print_nt=False) List[str]

If the tasks in task_def_l are new, then insert new tasks into the task_pool, and record inserted_id. If a task is not new, then just query its _id.

参数:
  • task_def_l (list) -- a list of task

  • dry_run (bool) -- if insert those new tasks to task pool

  • print_nt (bool) -- if print new task

返回:

a list of the _id of task_def_l

返回类型:

List[str]

fetch_task(query={}, status='waiting') dict

Use query to fetch tasks.

参数:
  • query (dict, optional) -- query dict. Defaults to {}.

  • status (str, optional) -- [description]. Defaults to STATUS_WAITING.

返回:

a task(document in collection) after decoding

返回类型:

dict

safe_fetch_task(query={}, status='waiting')

Fetch task from task_pool using query with contextmanager

参数:

query (dict) -- the dict of query

返回:

dict

返回类型:

a task(document in collection) after decoding

query(query={}, decode=True)

Query task in collection. This function may raise exception pymongo.errors.CursorNotFound: cursor id not found if it takes too long to iterate the generator

python -m qlib.workflow.task.manage -t <your task pool> query '{"_id": "615498be837d0053acbc5d58"}'

参数:
  • query (dict) -- the dict of query

  • decode (bool)

返回:

dict

返回类型:

a task(document in collection) after decoding

re_query(_id) dict

Use _id to query task.

参数:

_id (str) -- _id of a document

返回:

a task(document in collection) after decoding

返回类型:

dict

commit_task_res(task, res, status='done')

Commit the result to task['res'].

参数:
  • task ([type]) -- [description]

  • res (object) -- the result you want to save

  • status (str, optional) -- STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_DONE.

return_task(task, status='waiting')

Return a task to status. Always using in error handling.

参数:
  • task ([type]) -- [description]

  • status (str, optional) -- STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_WAITING.

remove(query={})

Remove the task using query

参数:

query (dict) -- the dict of query

task_stat(query={}) dict

Count the tasks in every status.

参数:

query (dict, optional) -- the query dict. Defaults to {}.

返回:

dict

reset_waiting(query={})

Reset all running task into waiting status. Can be used when some running task exit unexpected.

参数:

query (dict, optional) -- the query dict. Defaults to {}.

prioritize(task, priority: int)

Set priority for task

参数:
  • task (dict) -- The task query from the database

  • priority (int) -- the target priority

wait(query={})

When multiprocessing, the main progress may fetch nothing from TaskManager because there are still some running tasks. So main progress should wait until all tasks are trained well by other progress or machines.

参数:

query (dict, optional) -- the query dict. Defaults to {}.

关于 Task Manager 的更多信息可查阅 此处

任务训练

生成并存储这些 task 后,就可以运行处于 WAITING 状态的 task 了。 Qlib 提供了名为 run_task 的方法来运行任务池中的 task,但用户也可以自定义任务的执行方式。 获取 task_func 的简单方法是直接使用 qlib.model.trainer.task_train。 它将运行由 task 定义的完整工作流,包括 模型数据集记录

qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)

While the task pool is not empty (has WAITING tasks), use task_func to fetch and run tasks in task_pool

After running this method, here are 4 situations (before_status -> after_status):

STATUS_WAITING -> STATUS_DONE: use task["def"] as task_func param, it means that the task has not been started

STATUS_WAITING -> STATUS_PART_DONE: use task["def"] as task_func param

STATUS_PART_DONE -> STATUS_PART_DONE: use task["res"] as task_func param, it means that the task has been started but not completed

STATUS_PART_DONE -> STATUS_DONE: use task["res"] as task_func param

参数:
  • task_func (Callable) --

    def (task_def, **kwargs) -> <res which will be committed>

    the function to run the task

  • task_pool (str) -- the name of the task pool (Collection in MongoDB)

  • query (dict) -- will use this dict to query task_pool when fetching task

  • force_release (bool) -- will the program force to release the resource

  • before_status (str:) -- the tasks in before_status will be fetched and trained. Can be STATUS_WAITING, STATUS_PART_DONE.

  • after_status (str:) -- the tasks after trained will become after_status. Can be STATUS_WAITING, STATUS_PART_DONE.

  • kwargs -- the params for task_func

同时,Qlib 提供了一个名为 Trainer 的模块。

class qlib.model.trainer.Trainer

The trainer can train a list of models. There are Trainer and DelayTrainer, which can be distinguished by when it will finish real training.

__init__()
train(tasks: list, *args, **kwargs) list

Given a list of task definitions, begin training, and return the models.

For Trainer, it finishes real training in this method. For DelayTrainer, it only does some preparation in this method.

参数:

tasks -- a list of tasks

返回:

a list of models

返回类型:

list

end_train(models: list, *args, **kwargs) list

Given a list of models, finished something at the end of training if you need. The models may be Recorder, txt file, database, and so on.

For Trainer, it does some finishing touches in this method. For DelayTrainer, it finishes real training in this method.

参数:

models -- a list of models

返回:

a list of models

返回类型:

list

is_delay() bool

If Trainer will delay finishing end_train.

返回:

if DelayTrainer

返回类型:

bool

has_worker() bool

Some trainer has backend worker to support parallel training This method can tell if the worker is enabled.

返回:

if the worker is enabled

返回类型:

bool

worker()

start the worker

抛出:

NotImplementedError: -- If the worker is not supported

Trainer 将训练一系列任务并返回一组模型记录器。 Qlib 提供两种 Trainer,TrainerR 是最简单的方式,而 TrainerRM 基于 TaskManager 来自动管理任务生命周期。 如果您不想使用 Task Manager 来管理任务,那么使用 TrainerR 来训练由 TaskGen 生成的任务列表就足够了。 此处 是关于不同 Trainer 的详细信息。

任务收集

在收集模型训练结果前,您需要使用 qlib.init 指定 mlruns 的路径。

为了收集训练后的 task 结果,Qlib 提供了 CollectorGroupEnsemble,以可读、可扩展且松耦合的方式收集结果。

Collector 可以从任何地方收集对象并进行处理,如合并、分组、平均等。它包含两个步骤操作:``collect``(将任何内容收集到字典中)和 ``process_collect``(处理收集到的字典)。

Group 也包含两个步骤:group``(基于 `group_func` 对一组对象进行分组并将其转换为字典)和 ``reduce``(基于某些规则将字典转换为集成)。 例如:{(A,B,C1): object, (A,B,C2): object} ---``group---> {(A,B): {C1: object, C2: object}} ---reduce---> {(A,B): object}

Ensemble 可以合并集成中的对象。 例如:{C1: object, C2: object} ---Ensemble---> object。 您可以在 Collector 的 process_list 中设置所需的集成。 常见的集成包括 AverageEnsembleRollingEnsemble。平均集成用于集成同一时间段内不同模型的结果,滚动集成用于集成同一时间段内不同模型的结果

因此层级关系是:Collector 的第二步对应 Group,而 Group 的第二步对应 Ensemble

更多信息,请参阅 CollectorGroupEnsemble,或查看 示例