python教程栏目介绍python中的hook钩子函数
大量免费学习推荐,敬请访问python教程(视频)
1. 什么是hook
经常会听到钩子函数(hook function)这个概念,最近在看目标检测开源框架mmdetection,里面也出现大量hook的编程方式,那到底什么是hook?hook的作用是什么?
what is hook ?钩子hook,顾名思义,可以理解是一个挂钩,作用是有需要的时候挂一个东西上去。具体的解释是:钩子函数是把我们自己实现的hook函数在某一时刻挂接到目标挂载点上。
hook函数的作用 举个例子,hook的概念在windows桌面软件开发很常见,特别是各种事件触发的机制; 比如c++的mfc程序中,要监听鼠标左键按下的时间,mfc提供了一个onleftkeydown的钩子函数。很显然,mfc框架并没有为我们实现onleftkeydown具体的操作,只是为我们提供一个钩子,当我们需要处理的时候,只要去重写这个函数,把我们需要操作挂载在这个钩子里,如果我们不挂载,mfc事件触发机制中执行的就是空的操作。
从上面可知
hook函数是程序中预定义好的函数,这个函数处于原有程序流程当中(暴露一个钩子出来)
我们需要再在有流程中钩子定义的函数块中实现某个具体的细节,需要把我们的实现,挂接或者注册(register)到钩子里,使得hook函数对目标可用
hook 是一种编程机制,和具体的语言没有直接的关系
如果从设计模式上看,hook模式是模板方法的扩展
钩子只有注册的时候,才会使用,所以原有程序的流程中,没有注册或挂载时,执行的是空(即没有执行任何操作)
本文用python来解释hook的实现方式,并展示在开源项目中hook的应用案例。hook函数和我们常听到另外一个名称:回调函数(callback function)功能是类似的,可以按照同种模式来理解。
2. hook实现例子据我所知,hook函数最常使用在某种流程处理当中。这个流程往往有很多步骤。hook函数常常挂载在这些步骤中,为增加额外的一些操作,提供灵活性。
下面举一个简单的例子,这个例子的目的是实现一个通用往队列中插入内容的功能。流程步骤有2个
需要再插入队列前,对数据进行筛选 input_filter_fn
插入队列 insert_queue
class contentstash(object): """ content stash for online operation pipeline is 1. input_filter: filter some contents, no use to user 2. insert_queue(redis or other broker): insert useful content to queue """ def __init__(self): self.input_filter_fn = none self.broker = [] def register_input_filter_hook(self, input_filter_fn): """ register input filter function, parameter is content dict args: input_filter_fn: input filter function returns: """ self.input_filter_fn = input_filter_fn def insert_queue(self, content): """ insert content to queue args: content: dict returns: """ self.broker.append(content) def input_pipeline(self, content, use=false): """ pipeline of input for content stash args: use: is use, defaul false content: dict returns: """ if not use: return # input filter if self.input_filter_fn: _filter = self.input_filter_fn(content) # insert to queue if not _filter: self.insert_queue(content)# test## 实现一个你所需要的钩子实现:比如如果content 包含time就过滤掉,否则插入队列def input_filter_hook(content): """ test input filter hook args: content: dict returns: none or content """ if content.get('time') is none: return else: return content# 原有程序content = {'filename': 'test.jpg', 'b64_file': "#test", 'data': {"result": "cat", "probility": 0.9}}content_stash = contentstash('audit', work_dir='')# 挂上钩子函数, 可以有各种不同钩子函数的实现,但是要主要函数输入输出必须保持原有程序中一致,比如这里是contentcontent_stash.register_input_filter_hook(input_filter_hook)# 执行流程content_stash.input_pipeline(content)
3. hook在开源框架中的应用3.1 keras在深度学习训练流程中,hook函数体现的淋漓尽致。
一个训练过程(不包括数据准备),会轮询多次训练集,每次称为一个epoch,每个epoch又分为多个batch来训练。流程先后拆解成:
开始训练
训练一个epoch前
训练一个batch前
训练一个batch后
训练一个epoch后
评估验证集
结束训练
这些步骤是穿插在训练一个batch数据的过程中,这些可以理解成是钩子函数,我们可能需要在这些钩子函数中实现一些定制化的东西,比如在训练一个epoch后我们要保存下训练的模型,在结束训练时用最好的模型执行下测试集的效果等等。
keras中是通过各种回调函数来实现钩子hook功能的。这里放一个callback的父类,定制时只要继承这个父类,实现你过关注的钩子就可以了。
@keras_export('keras.callbacks.callback')class callback(object): """abstract base class used to build new callbacks. attributes: params: dict. training parameters (eg. verbosity, batch size, number of epochs...). model: instance of `keras.models.model`. reference of the model being trained. the `logs` dictionary that callback methods take as argument will contain keys for quantities relevant to the current batch or epoch (see method-specific docstrings). """ def __init__(self): self.validation_data = none # pylint: disable=g-missing-from-attributes self.model = none # whether this callback should only run on the chief worker in a # multi-worker setting. # todo(omalleyt): make this attr public once solution is stable. self._chief_worker_only = none self._supports_tf_logs = false def set_params(self, params): self.params = params def set_model(self, model): self.model = model @doc_controls.for_subclass_implementers @generic_utils.default def on_batch_begin(self, batch, logs=none): """a backwards compatibility alias for `on_train_batch_begin`.""" @doc_controls.for_subclass_implementers @generic_utils.default def on_batch_end(self, batch, logs=none): """a backwards compatibility alias for `on_train_batch_end`.""" @doc_controls.for_subclass_implementers def on_epoch_begin(self, epoch, logs=none): """called at the start of an epoch. subclasses should override for any actions to run. this function should only be called during train mode. arguments: epoch: integer, index of epoch. logs: dict. currently no data is passed to this argument for this method but that may change in the future. """ @doc_controls.for_subclass_implementers def on_epoch_end(self, epoch, logs=none): """called at the end of an epoch. subclasses should override for any actions to run. this function should only be called during train mode. arguments: epoch: integer, index of epoch. logs: dict, metric results for this training epoch, and for the validation epoch if validation is performed. validation result keys are prefixed with `val_`. """ @doc_controls.for_subclass_implementers @generic_utils.default def on_train_batch_begin(self, batch, logs=none): """called at the beginning of a training batch in `fit` methods. subclasses should override for any actions to run. arguments: batch: integer, index of batch within the current epoch. logs: dict, contains the return value of `model.train_step`. typically, the values of the `model`'s metrics are returned. example: `{'loss': 0.2, 'accuracy': 0.7}`. """ # for backwards compatibility. self.on_batch_begin(batch, logs=logs) @doc_controls.for_subclass_implementers @generic_utils.default def on_train_batch_end(self, batch, logs=none): """called at the end of a training batch in `fit` methods. subclasses should override for any actions to run. arguments: batch: integer, index of batch within the current epoch. logs: dict. aggregated metric results up until this batch. """ # for backwards compatibility. self.on_batch_end(batch, logs=logs) @doc_controls.for_subclass_implementers @generic_utils.default def on_test_batch_begin(self, batch, logs=none): """called at the beginning of a batch in `evaluate` methods. also called at the beginning of a validation batch in the `fit` methods, if validation data is provided. subclasses should override for any actions to run. arguments: batch: integer, index of batch within the current epoch. logs: dict, contains the return value of `model.test_step`. typically, the values of the `model`'s metrics are returned. example: `{'loss': 0.2, 'accuracy': 0.7}`. """ @doc_controls.for_subclass_implementers @generic_utils.default def on_test_batch_end(self, batch, logs=none): """called at the end of a batch in `evaluate` methods. also called at the end of a validation batch in the `fit` methods, if validation data is provided. subclasses should override for any actions to run. arguments: batch: integer, index of batch within the current epoch. logs: dict. aggregated metric results up until this batch. """ @doc_controls.for_subclass_implementers @generic_utils.default def on_predict_batch_begin(self, batch, logs=none): """called at the beginning of a batch in `predict` methods. subclasses should override for any actions to run. arguments: batch: integer, index of batch within the current epoch. logs: dict, contains the return value of `model.predict_step`, it typically returns a dict with a key 'outputs' containing the model's outputs. """ @doc_controls.for_subclass_implementers @generic_utils.default def on_predict_batch_end(self, batch, logs=none): """called at the end of a batch in `predict` methods. subclasses should override for any actions to run. arguments: batch: integer, index of batch within the current epoch. logs: dict. aggregated metric results up until this batch. """ @doc_controls.for_subclass_implementers def on_train_begin(self, logs=none): """called at the beginning of training. subclasses should override for any actions to run. arguments: logs: dict. currently no data is passed to this argument for this method but that may change in the future. """ @doc_controls.for_subclass_implementers def on_train_end(self, logs=none): """called at the end of training. subclasses should override for any actions to run. arguments: logs: dict. currently the output of the last call to `on_epoch_end()` is passed to this argument for this method but that may change in the future. """ @doc_controls.for_subclass_implementers def on_test_begin(self, logs=none): """called at the beginning of evaluation or validation. subclasses should override for any actions to run. arguments: logs: dict. currently no data is passed to this argument for this method but that may change in the future. """ @doc_controls.for_subclass_implementers def on_test_end(self, logs=none): """called at the end of evaluation or validation. subclasses should override for any actions to run. arguments: logs: dict. currently the output of the last call to `on_test_batch_end()` is passed to this argument for this method but that may change in the future. """ @doc_controls.for_subclass_implementers def on_predict_begin(self, logs=none): """called at the beginning of prediction. subclasses should override for any actions to run. arguments: logs: dict. currently no data is passed to this argument for this method but that may change in the future. """ @doc_controls.for_subclass_implementers def on_predict_end(self, logs=none): """called at the end of prediction. subclasses should override for any actions to run. arguments: logs: dict. currently no data is passed to this argument for this method but that may change in the future. """ def _implements_train_batch_hooks(self): """determines if this callback should be called for each train batch.""" return (not generic_utils.is_default(self.on_batch_begin) or not generic_utils.is_default(self.on_batch_end) or not generic_utils.is_default(self.on_train_batch_begin) or not generic_utils.is_default(self.on_train_batch_end))
这些钩子的原始程序是在模型训练流程中的
keras源码位置: tensorflow\python\keras\engine\training.py
部分摘录如下(## i am hook):
# container that configures and calls `tf.keras.callback`s. if not isinstance(callbacks, callbacks_module.callbacklist): callbacks = callbacks_module.callbacklist( callbacks, add_history=true, add_progbar=verbose != 0, model=self, verbose=verbose, epochs=epochs, steps=data_handler.inferred_steps) ## i am hook callbacks.on_train_begin() training_logs = none # handle fault-tolerance for multi-worker. # todo(omalleyt): fix the ordering issues that mean this has to # happen after `callbacks.on_train_begin`. data_handler._initial_epoch = ( # pylint: disable=protected-access self._maybe_load_initial_epoch_from_ckpt(initial_epoch)) for epoch, iterator in data_handler.enumerate_epochs(): self.reset_metrics() callbacks.on_epoch_begin(epoch) with data_handler.catch_stop_iteration(): for step in data_handler.steps(): with trace.trace( 'tracecontext', graph_type='train', epoch_num=epoch, step_num=step, batch_size=batch_size): ## i am hook callbacks.on_train_batch_begin(step) tmp_logs = train_function(iterator) if data_handler.should_sync: context.async_wait() logs = tmp_logs # no error, now safe to assign to logs. end_step = step + data_handler.step_increment callbacks.on_train_batch_end(end_step, logs) epoch_logs = copy.copy(logs) # run validation. ## i am hook callbacks.on_epoch_end(epoch, epoch_logs)
3.2 mmdetectionmmdetection是一个目标检测的开源框架,集成了许多不同的目标检测深度学习算法(pytorch版),如faster-rcnn, fpn, retianet等。里面也大量使用了hook,暴露给应用实现流程中具体部分。
详见https://github.com/open-mmlab/mmdetection
这里看一个训练的调用例子(摘录)(https://github.com/open-mmlab/mmdetection/blob/5d592154cca589c5113e8aadc8798bbc73630d98/mmdet/apis/train.py)
def train_detector(model, dataset, cfg, distributed=false, validate=false, timestamp=none, meta=none): logger = get_root_logger(cfg.log_level) # prepare data loaders # put model on gpus # build runner optimizer = build_optimizer(model, cfg.optimizer) runner = epochbasedrunner( model, optimizer=optimizer, work_dir=cfg.work_dir, logger=logger, meta=meta) # an ugly workaround to make .log and .log.json filenames the same runner.timestamp = timestamp # fp16 setting # register hooks runner.register_training_hooks(cfg.lr_config, optimizer_config, cfg.checkpoint_config, cfg.log_config, cfg.get('momentum_config', none)) if distributed: runner.register_hook(distsamplerseedhook()) # register eval hooks if validate: # support batch_size > 1 in validation eval_cfg = cfg.get('evaluation', {}) eval_hook = distevalhook if distributed else evalhook runner.register_hook(eval_hook(val_dataloader, **eval_cfg)) # user-defined hooks if cfg.get('custom_hooks', none): custom_hooks = cfg.custom_hooks assert isinstance(custom_hooks, list), \ f'custom_hooks expect list type, but got {type(custom_hooks)}' for hook_cfg in cfg.custom_hooks: assert isinstance(hook_cfg, dict), \ 'each item in custom_hooks expects dict type, but got ' \ f'{type(hook_cfg)}' hook_cfg = hook_cfg.copy() priority = hook_cfg.pop('priority', 'normal') hook = build_from_cfg(hook_cfg, hooks) runner.register_hook(hook, priority=priority)
4. 总结本文介绍了hook的概念和应用,并给出了python的实现细则。希望对比有帮助。总结如下:
hook函数是流程中预定义好的一个步骤,没有实现
挂载或者注册时, 流程执行就会执行这个钩子函数
回调函数和hook函数功能上是一致的
hook设计方式带来灵活性,如果流程中有一个步骤,你想让调用方来实现,你可以用hook函数
相关免费学习推荐:php编程(视频)
以上就是迅速掌握python中的hook钩子函数的详细内容。