cxflow.datasets

Module with cxflow dataset concept (AbstractDataset) and python BaseDataset.

Classes

  • AbstractDataset: This concept prescribes the API that is required from every cxflow dataset.
  • BaseDataset: Base class for datasets written in python.
  • DownloadableDataset: DownloadableDataset is dataset base class implementing routines for downloading and extracting data via
  • StreamWrapper: Dataset stream wrapper which manages buffering, epoch cutting etc.
class cxflow.datasets.AbstractDataset(config_str)

Bases: object

This concept prescribes the API that is required from every cxflow dataset.

Every cxflow dataset has to have a constructor which takes YAML string config. Additionally, one may implement any <stream_name>_stream method in order to make stream_name stream available in the cxflow cxflow.MainLoop.

All the defined stream methods should return a Stream.

Inheritance diagram of AbstractDataset

__init__(config_str)[source]

Create new dataset configured with the given YAML string (obligatory).

The configuration must contain dataset entry and may contain output_dir entry.

Parameters:config_str (str) – YAML string config
class cxflow.datasets.BaseDataset(config_str)

Bases: datasets.AbstractDataset

Base class for datasets written in python.

In the inherited class, one should:
  • override the _configure_dataset
  • (optional) implement train_stream method if intended to be used with cxflow train ...
  • (optional) implement <stream_name>_stream method in order to make <stream_name> stream available
Inheritance diagram of BaseDataset

__init__(config_str)[source]

Create new dataset.

Decode the given YAML config string and pass the obtained **kwargs to _configure_dataset().

Parameters:config_str (str) – dataset configuration as YAML string
_configure_dataset(output_dir, **kwargs)[source]

Configure the dataset with **kwargs decoded from YAML configuration.

Parameters:
  • output_dir (Optional[str]) – output directory for logging and any additional outputs (None if no output dir is available)
  • kwargs – dataset configuration as **kwargs parsed from config['dataset']
Raises:

NotImplementedError – if not overridden

stream_info()[source]

Check and report source names, dtypes and shapes of all the streams available.

Return type:None
class cxflow.datasets.DownloadableDataset(config_str)

Bases: datasets.BaseDataset

DownloadableDataset is dataset base class implementing routines for downloading and extracting data via cxflow dataset download command.

The typical use-case is that data_root, url_root and download_filenames variables are passed to the dataset constructor. Alternatively, these properties might be directly implemented in their corresponding methods.

Inheritance diagram of DownloadableDataset

_configure_dataset(data_root=None, download_urls=None, **kwargs)[source]

Save the passed values and use them as a default property implementation.

Parameters:
  • data_root (Optional[str]) – directory to which the files will be downloaded
  • download_urls (Optional[Iterable[str]]) – list of URLs to be downloaded
Return type:

None

data_root

Path to the data root directory.

Return type:str
download()[source]

Maybe download and extract the extra files required.

If not already downloaded, download all files specified by download_urls(). Then, extract the downloaded files to data_root().

cxflow CLI example
cxflow dataset download <path-to-config>
Return type:None
download_urls

A list of URLs to be downloaded.

Return type:Iterable[str]
class cxflow.datasets.StreamWrapper(stream_fn, buffer_size=0, epoch_size=-1, name=None, profile=None)

Bases: object

Dataset stream wrapper which manages buffering, epoch cutting etc.

The main features are:
  • resets underlying dataset stream after the iteration reaches its end
  • if specified, uses consumer-producer buffer for batches allowing simultaneous batch producing and training
  • if specified, produces epochs of fixed size
  • logs the timings to the given profile

Caution

Buffered StreamWrapper must be used in with-resource environment so that the enqueueing thread can be properly managed.

non-buffered StreamWrapper
stream = StreamWrapper(dataset.train_stream, 'train')
for batch in stream:  # 1st batch
    # do stuff
for batch in stream:  # 2nd batch
    # do stuff
buffered StreamWrapper with fixed size epochs
stream = StreamWrapper(dataset.train_stream, 'train', buffer=16, epoch_size=1000)
with stream:  # we would get error without with-resource directive
    for batch in stream:  # 1st batch
        # do stuff
Inheritance diagram of StreamWrapper

__enter__()[source]

If buffered, start the enqueueing thread.

Return type:Iterator[Mapping[str, Sequence[Any]]]
__exit__(*args)[source]

If buffered, terminate the enqueueing thread.

Return type:None
__init__(stream_fn, buffer_size=0, epoch_size=-1, name=None, profile=None)[source]

Create new StreamWrapper.

Parameters:
__iter__()[source]

Get stream iterator.

Return type:Iterator[Mapping[str, Sequence[Any]]]
__next__()[source]

Return next batch or end epoch with StopIteration.

Return type:Mapping[str, Sequence[Any]]
Returns:next batch
Raises:StopIteration – at the end of the epoch
_dequeue_batch()[source]

Return a single batch from queue or None signaling epoch end.

Raises:ChildProcessError – if the enqueueing thread ended unexpectedly
Return type:Optional[Mapping[str, Sequence[Any]]]
_enqueue_batches(stop_event)[source]

Enqueue all the stream batches. If specified, stop after epoch_size batches.

Note

Signal the epoch end with None.

Stop when: - stop_event is risen - stream ends and epoch size is not set - specified number of batches is enqueued

Note

This is used only with buffer > 0.

Parameters:stop_event (Event) – event signaling stop instruction
Return type:None
_epoch_limit_reached()[source]

Returns True if the number of produced batches reached the specified epoch_size.

Always return False if no limit was specified.

Return type:bool
_get_stream()[source]

Possibly create and return raw dataset stream iterator.

Return type:Iterator[+T_co]
_next_batch()[source]

Return a single batch or None signaling epoch end.

Note

Signal the epoch end with None.

Stop when: - stream ends and epoch size is not set - specified number of batches is returned

Return type:Optional[Mapping[str, Sequence[Any]]]
Returns:a single batch or None signaling epoch end
_start_thread()[source]

Start an enqueueing thread.

_stop_thread()[source]

Stop the enqueueing thread. Keep the queue content and stream state.

allow_buffering

A resource that allows the stream object to prepare batches in advance.

After the construction of the stream wrapper, the buffering is disabled. This function makes it possible to allow buffering only when there is some spare CPU time. A good place to allow buffering is e.g., during the training procedure in the cxflow.models.AbstractModel.run() method, whenever the GIL is released.

Usage
# the training method of a model
def run(self, batch, train, stream):
    preprocess_batch_in_python(batch)  # this function holds the GIL and fully utilizes the CPU
    with stream.allow_buffering:
        call_native_backend(batch)  # this function is blocking, but releases the GIL
                                    # we can use the GIL and the spare CPU to prepare the next batch
Return type:ReleasedSemaphore
Returns:A resource object that allows buffering when in use.
empty()[source]

Return whether the buffer is empty.

Return type:bool
name

Stream name.

Return type:Optional[str]