任务管理¶
简介¶
工作流 部分介绍了如何以松耦合的方式运行研究流程。但使用 qrun 时只能执行一个 任务。
为了自动生成和执行不同任务,任务管理 提供了完整流程,包括 任务生成、任务存储、任务训练 和 任务收集。
通过该模块,用户可以在不同周期、使用不同损失函数甚至不同模型自动运行其 任务。任务生成、模型训练以及数据合并收集的流程如下图所示。
该完整流程可用于 在线服务。
完整流程示例参见 此处。
任务生成¶
一个 task 由 Model、Dataset、Record 或用户添加的任何内容组成。
具体任务模板可查看
任务章节。
尽管任务模板是固定的,但用户可以通过自定义 TaskGen 来基于任务模板生成不同的 task。
以下是 TaskGen 的基类:
- class qlib.workflow.task.gen.TaskGen
The base class for generating different tasks
Example 1:
input: a specific task template and rolling steps
output: rolling version of the tasks
Example 2:
input: a specific task template and losses list
output: a set of tasks with different losses
- abstractmethod generate(task: dict) List[dict]
Generate different tasks based on a task template
- 参数:
task (dict) -- a task template
- 返回:
A list of tasks
- 返回类型:
List[dict]
Qlib provides a class RollingGen to generate a list of task of the dataset in different date segments.
This class allows users to verify the effect of data from different periods on the model in one experiment. More information is here.
任务存储¶
To achieve higher efficiency and the possibility of cluster operation, Task Manager will store all tasks in MongoDB.
TaskManager 能自动获取未完成任务,并通过错误处理管理一组任务的生命周期。
使用此模块时,用户**必须**完成 MongoDB 的配置。
用户需在 初始化 时提供MongoDB URL和数据库名称以使用 TaskManager,或进行如下声明。
from qlib.config import C C["mongo"] = { "task_url" : "mongodb://localhost:27017/", # your MongoDB url "task_db_name" : "rolling_db" # database name }
- class qlib.workflow.task.manage.TaskManager(task_pool: str)
Here is what will a task looks like when it created by TaskManager
{ 'def': pickle serialized task definition. using pickle will make it easier 'filter': json-like data. This is for filtering the tasks. 'status': 'waiting' | 'running' | 'done' 'res': pickle serialized task result, }
The tasks manager assumes that you will only update the tasks you fetched. The mongo fetch one and update will make it date updating secure.
This class can be used as a tool from commandline. Here are several examples. You can view the help of manage module with the following commands: python -m qlib.workflow.task.manage -h # show manual of manage module CLI python -m qlib.workflow.task.manage wait -h # show manual of the wait command of manage
python -m qlib.workflow.task.manage -t <pool_name> wait python -m qlib.workflow.task.manage -t <pool_name> task_stat
备注
Assumption: the data in MongoDB was encoded and the data out of MongoDB was decoded
Here are four status which are:
STATUS_WAITING: waiting for training
STATUS_RUNNING: training
STATUS_PART_DONE: finished some step and waiting for next step
STATUS_DONE: all work done
- __init__(task_pool: str)
Init Task Manager, remember to make the statement of MongoDB url and database name firstly. A TaskManager instance serves a specific task pool. The static method of this module serves the whole MongoDB.
- 参数:
task_pool (str) -- the name of Collection in MongoDB
- static list() list
List the all collection(task_pool) of the db.
- 返回:
list
- replace_task(task, new_task)
Use a new task to replace a old one
- 参数:
task -- old task
new_task -- new task
- insert_task(task)
Insert a task.
- 参数:
task -- the task waiting for insert
- 返回:
pymongo.results.InsertOneResult
- insert_task_def(task_def)
Insert a task to task_pool
- 参数:
task_def (dict) -- the task definition
- 返回类型:
pymongo.results.InsertOneResult
- create_task(task_def_l, dry_run=False, print_nt=False) List[str]
If the tasks in task_def_l are new, then insert new tasks into the task_pool, and record inserted_id. If a task is not new, then just query its _id.
- 参数:
task_def_l (list) -- a list of task
dry_run (bool) -- if insert those new tasks to task pool
print_nt (bool) -- if print new task
- 返回:
a list of the _id of task_def_l
- 返回类型:
List[str]
- fetch_task(query={}, status='waiting') dict
Use query to fetch tasks.
- 参数:
query (dict, optional) -- query dict. Defaults to {}.
status (str, optional) -- [description]. Defaults to STATUS_WAITING.
- 返回:
a task(document in collection) after decoding
- 返回类型:
dict
- safe_fetch_task(query={}, status='waiting')
Fetch task from task_pool using query with contextmanager
- 参数:
query (dict) -- the dict of query
- 返回:
dict
- 返回类型:
a task(document in collection) after decoding
- query(query={}, decode=True)
Query task in collection. This function may raise exception pymongo.errors.CursorNotFound: cursor id not found if it takes too long to iterate the generator
python -m qlib.workflow.task.manage -t <your task pool> query '{"_id": "615498be837d0053acbc5d58"}'
- 参数:
query (dict) -- the dict of query
decode (bool)
- 返回:
dict
- 返回类型:
a task(document in collection) after decoding
- re_query(_id) dict
Use _id to query task.
- 参数:
_id (str) -- _id of a document
- 返回:
a task(document in collection) after decoding
- 返回类型:
dict
- commit_task_res(task, res, status='done')
Commit the result to task['res'].
- 参数:
task ([type]) -- [description]
res (object) -- the result you want to save
status (str, optional) -- STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_DONE.
- return_task(task, status='waiting')
Return a task to status. Always using in error handling.
- 参数:
task ([type]) -- [description]
status (str, optional) -- STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_WAITING.
- remove(query={})
Remove the task using query
- 参数:
query (dict) -- the dict of query
- task_stat(query={}) dict
Count the tasks in every status.
- 参数:
query (dict, optional) -- the query dict. Defaults to {}.
- 返回:
dict
- reset_waiting(query={})
Reset all running task into waiting status. Can be used when some running task exit unexpected.
- 参数:
query (dict, optional) -- the query dict. Defaults to {}.
- prioritize(task, priority: int)
Set priority for task
- 参数:
task (dict) -- The task query from the database
priority (int) -- the target priority
- wait(query={})
When multiprocessing, the main progress may fetch nothing from TaskManager because there are still some running tasks. So main progress should wait until all tasks are trained well by other progress or machines.
- 参数:
query (dict, optional) -- the query dict. Defaults to {}.
关于 Task Manager 的更多信息可查阅 此处。
任务训练¶
生成并存储这些 task 后,就可以运行处于 WAITING 状态的 task 了。
Qlib 提供了名为 run_task 的方法来运行任务池中的 task,但用户也可以自定义任务的执行方式。
获取 task_func 的简单方法是直接使用 qlib.model.trainer.task_train。
它将运行由 task 定义的完整工作流,包括 模型、数据集、记录。
- qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)
While the task pool is not empty (has WAITING tasks), use task_func to fetch and run tasks in task_pool
After running this method, here are 4 situations (before_status -> after_status):
STATUS_WAITING -> STATUS_DONE: use task["def"] as task_func param, it means that the task has not been started
STATUS_WAITING -> STATUS_PART_DONE: use task["def"] as task_func param
STATUS_PART_DONE -> STATUS_PART_DONE: use task["res"] as task_func param, it means that the task has been started but not completed
STATUS_PART_DONE -> STATUS_DONE: use task["res"] as task_func param
- 参数:
task_func (Callable) --
def (task_def, **kwargs) -> <res which will be committed>
the function to run the task
task_pool (str) -- the name of the task pool (Collection in MongoDB)
query (dict) -- will use this dict to query task_pool when fetching task
force_release (bool) -- will the program force to release the resource
before_status (str:) -- the tasks in before_status will be fetched and trained. Can be STATUS_WAITING, STATUS_PART_DONE.
after_status (str:) -- the tasks after trained will become after_status. Can be STATUS_WAITING, STATUS_PART_DONE.
kwargs -- the params for task_func
同时,Qlib 提供了一个名为 Trainer 的模块。
- class qlib.model.trainer.Trainer
The trainer can train a list of models. There are Trainer and DelayTrainer, which can be distinguished by when it will finish real training.
- __init__()
- train(tasks: list, *args, **kwargs) list
Given a list of task definitions, begin training, and return the models.
For Trainer, it finishes real training in this method. For DelayTrainer, it only does some preparation in this method.
- 参数:
tasks -- a list of tasks
- 返回:
a list of models
- 返回类型:
list
- end_train(models: list, *args, **kwargs) list
Given a list of models, finished something at the end of training if you need. The models may be Recorder, txt file, database, and so on.
For Trainer, it does some finishing touches in this method. For DelayTrainer, it finishes real training in this method.
- 参数:
models -- a list of models
- 返回:
a list of models
- 返回类型:
list
- is_delay() bool
If Trainer will delay finishing end_train.
- 返回:
if DelayTrainer
- 返回类型:
bool
- has_worker() bool
Some trainer has backend worker to support parallel training This method can tell if the worker is enabled.
- 返回:
if the worker is enabled
- 返回类型:
bool
- worker()
start the worker
- 抛出:
NotImplementedError: -- If the worker is not supported
Trainer 将训练一系列任务并返回一组模型记录器。
Qlib 提供两种 Trainer,TrainerR 是最简单的方式,而 TrainerRM 基于 TaskManager 来自动管理任务生命周期。
如果您不想使用 Task Manager 来管理任务,那么使用 TrainerR 来训练由 TaskGen 生成的任务列表就足够了。
此处 是关于不同 Trainer 的详细信息。
任务收集¶
在收集模型训练结果前,您需要使用 qlib.init 指定 mlruns 的路径。
为了收集训练后的 task 结果,Qlib 提供了 Collector、Group 和 Ensemble,以可读、可扩展且松耦合的方式收集结果。
Collector 可以从任何地方收集对象并进行处理,如合并、分组、平均等。它包含两个步骤操作:``collect``(将任何内容收集到字典中)和 ``process_collect``(处理收集到的字典)。
Group 也包含两个步骤:group``(基于 `group_func` 对一组对象进行分组并将其转换为字典)和 ``reduce``(基于某些规则将字典转换为集成)。
例如:{(A,B,C1): object, (A,B,C2): object} ---``group---> {(A,B): {C1: object, C2: object}} ---reduce---> {(A,B): object}
Ensemble 可以合并集成中的对象。
例如:{C1: object, C2: object} ---Ensemble---> object。
您可以在 Collector 的 process_list 中设置所需的集成。
常见的集成包括 AverageEnsemble 和 RollingEnsemble。平均集成用于集成同一时间段内不同模型的结果,滚动集成用于集成同一时间段内不同模型的结果
因此层级关系是:Collector 的第二步对应 Group,而 Group 的第二步对应 Ensemble。