cxflow

cxflow core module.

Classes

  • MainLoop: cxflow main loop for training and model inference.
class cxflow.MainLoop(model, dataset, hooks=(), train_stream_name='train', extra_streams=(), buffer=0, on_empty_batch='error', on_empty_stream='error', on_unused_sources='warn', fixed_batch_size=None, fixed_epoch_size=None, skip_zeroth_epoch=False)[source]

Bases: cxflow.utils.misc.CaughtInterrupts

cxflow main loop for training and model inference.

Inheritance diagram of MainLoop

EMPTY_ACTIONS = ['ignore', 'warn', 'error']

Possible actions to be taken when a batch/stream is empty.

UNUSED_SOURCE_ACTIONS = ['ignore', 'warn', 'error']

Possible actions to be taken when a stream source is unused by the trained model.

__init__(model, dataset, hooks=(), train_stream_name='train', extra_streams=(), buffer=0, on_empty_batch='error', on_empty_stream='error', on_unused_sources='warn', fixed_batch_size=None, fixed_epoch_size=None, skip_zeroth_epoch=False)[source]
Parameters:
  • model (AbstractModel) – trained model
  • dataset (AbstractDataset) – loaded dataset
  • hooks (Iterable[AbstractHook]) – training hooks
  • train_stream_name (str) – name of the training stream
  • extra_streams (List[str]) – additional stream names to be evaluated between epochs
  • buffer (int) – size of the batch buffer, 0 means no buffer
  • on_empty_batch (str) – action to take when batch is empty; one of MainLoop.EMPTY_ACTIONS
  • on_empty_stream (str) – action to take when stream is empty; one of MainLoop.EMPTY_ACTIONS
  • on_unused_sources (str) – action to take when stream provides an unused sources; one of UNUSED_SOURCE_ACTIONS
  • fixed_batch_size (Optional[int]) – if specified, main_loop removes all batches that do not have the specified size
  • fixed_epoch_size (Optional[int]) – if specified, cut the train stream to epochs of at most fixed_epoch_size batches
  • skip_zeroth_epoch (bool) – if specified, main loop skips the 0th epoch
Raises:

AssertionError – in case of unsupported value of on_empty_batch, on_empty_stream or on_unused_sources

_check_sources(batch)[source]

Check for unused and missing sources.

Parameters:batch (Dict[str, object]) – batch to be checked
Raises:ValueError – if a source is missing or unused and self._on_unused_sources is set to error
Return type:None
_create_epoch_data(streams=None)[source]

Create empty epoch data double dict.

Return type:Mapping[str, object]
_run_epoch(stream, train)[source]

Iterate through the given stream and evaluate/train the model with the received batches.

Calls cxflow.hooks.AbstractHook.after_batch() events.

Parameters:
  • stream (StreamWrapper) – stream to iterate
  • train (bool) – if set to True, the model will be trained
Raises:
  • ValueError – in case of empty batch when on_empty_batch is set to error
  • ValueError – in case of empty stream when on_empty_stream is set to error
  • ValueError – in case of two batch variables having different lengths
Return type:

None

_run_zeroth_epoch(streams)[source]

Run zeroth epoch on the specified streams.

Calls
Parameters:streams (Iterable[str]) – stream names to be evaluated
Return type:None
_try_run(run_func)[source]

Try running the given function (training/prediction).

Calls
Parameters:run_func (Callable[[], None]) – function to be run
Return type:None
epochs_done

Number of training epochs done in the last call of self._run_training().

Return type:Optional[int]
evaluate_stream(stream)[source]

Evaluate the given stream.

Parameters:
  • stream (StreamWrapper) – stream to be evaluated
  • stream_name – stream name
Return type:

None

extra_streams

List of extra stream names as specified in self.__init__().

Return type:List[str]
fixed_epoch_size

Fixed epoch size parameter as specified in self.__init__().

Return type:Optional[int]
get_stream(stream_name)[source]

Get a StreamWrapper with the given name.

Parameters:stream_name (str) – stream name
Return type:StreamWrapper
Returns:dataset function name providing the respective stream
Raises:AttributeError – if the dataset does not provide the function creating the stream
run_evaluation(stream_name)[source]

Run the main loop with the given stream in the prediction mode.

Parameters:stream_name (str) – name of the stream to be evaluated
Return type:None
run_training(trace=None)[source]

Run the main loop in the training mode.

Calls
Return type:None
train_by_stream(stream)[source]

Train the model with the given stream.

Parameters:stream (StreamWrapper) – stream to train with
Return type:None