diff --git a/docs/source/utils.rst b/docs/source/utils.rst index 0a115350e..cd80e0fab 100644 --- a/docs/source/utils.rst +++ b/docs/source/utils.rst @@ -27,3 +27,4 @@ Utils set_log_level setup_seed set_download_dir + make_process_pipelines diff --git a/docs/source/whats_new.rst b/docs/source/whats_new.rst index 59b27db14..719a12cd4 100644 --- a/docs/source/whats_new.rst +++ b/docs/source/whats_new.rst @@ -39,6 +39,8 @@ Enhancements - Systematically set the annotations when loading data, eventually using the stim channel (PR :gh:`408` by `Pierre Guetschel`_) - Allow :func:`moabb.datasets.utils.dataset_search` to search across paradigms ``paradigm=None`` (PR :gh:`408` by `Pierre Guetschel`_) - Improving the review processing with more pre-commit bots (:gh:`435` by `Bruno Aristimunha`_) +- Add methods ``make_processing_pipelines`` and ``make_labels_pipeline`` to :class:`moabb.paradigms.base.BaseProcessing` (:gh:`447` by `Pierre Guetschel`_) +- Pipelines' digests are now computed from the whole processing+classification pipeline (:gh:`447` by `Pierre Guetschel`_) - Update all dataset codes to remove white spaces and underscores (:gh:`448` by `Pierre Guetschel`_) - Add :func:`moabb.utils.depreciated_alias` decorator (:gh:`455` by `Pierre Guetschel`_) - Rename many dataset class names to standardize and deprecate old names (:gh:`455` by `Pierre Guetschel`_) diff --git a/examples/plot_disk_cache.py b/examples/plot_disk_cache.py index 0529088c0..c43c4b84d 100644 --- a/examples/plot_disk_cache.py +++ b/examples/plot_disk_cache.py @@ -225,7 +225,7 @@ # The main interest of the array cache is when the user passes a # computationally heavy but fixed additional preprocessing (for example # computing the covariance matrices of the epochs). This can be done by using -# the ``processing_pipeline`` argument. The output of this additional pipeline +# the ``postprocess_pipeline`` argument. The output of this additional pipeline # (necessary a numpy array) will be saved to avoid re-computing it each time. # # diff --git a/moabb/analysis/results.py b/moabb/analysis/results.py index c2507fab3..83ab0f15d 100644 --- a/moabb/analysis/results.py +++ b/moabb/analysis/results.py @@ -11,6 +11,7 @@ from mne import get_config, set_config from mne.datasets.utils import _get_path from sklearn.base import BaseEstimator +from sklearn.pipeline import Pipeline try: @@ -50,6 +51,11 @@ def get_digest(obj): return hashlib.md5(get_string_rep(obj)).hexdigest() +def get_pipeline_digest(process_pipeline, clf_pipeline): + full_pipeline = Pipeline(steps=[("process", process_pipeline), ("clf", clf_pipeline)]) + return get_digest(full_pipeline) + + class Results: """Class to hold results from the evaluation.evaluate method. @@ -110,7 +116,7 @@ def __init__( "{:%Y-%m-%d, %H:%M}".format(datetime.now()) ) - def add(self, results, pipelines): # noqa: C901 + def add(self, results, pipelines, process_pipeline): # noqa: C901 """Add results.""" def to_list(res): @@ -133,7 +139,7 @@ def to_list(res): with h5py.File(self.filepath, "r+") as f: for name, data_dict in results.items(): - digest = get_digest(pipelines[name]) + digest = get_pipeline_digest(process_pipeline, pipelines[name]) if digest not in f.keys(): # create pipeline main group if nonexistent f.create_group(digest) @@ -192,13 +198,20 @@ def to_list(res): ] ) - def to_dataframe(self, pipelines=None): + def to_dataframe(self, pipelines=None, process_pipeline=None): df_list = [] # get the list of pipeline hash digests = [] - if pipelines is not None: - digests = [get_digest(pipelines[name]) for name in pipelines] + if pipelines is not None and process_pipeline is not None: + digests = [ + get_pipeline_digest(process_pipeline, pipelines[name]) + for name in pipelines + ] + elif pipelines is not None or process_pipeline is not None: + raise ValueError( + "Either both of none of pipelines and process_pipeline must be specified." + ) with h5py.File(self.filepath, "r") as f: for digest, p_group in f.items(): @@ -221,21 +234,23 @@ def to_dataframe(self, pipelines=None): return pd.concat(df_list, ignore_index=True) - def not_yet_computed(self, pipelines, dataset, subj): + def not_yet_computed(self, pipelines, dataset, subj, process_pipeline): """Check if a results has already been computed.""" ret = { k: pipelines[k] for k in pipelines.keys() - if not self._already_computed(pipelines[k], dataset, subj) + if not self._already_computed(pipelines[k], dataset, subj, process_pipeline) } return ret - def _already_computed(self, pipeline, dataset, subject, session=None): + def _already_computed( + self, pipeline, dataset, subject, process_pipeline, session=None + ): """Check if we have results for a current combination of pipeline / dataset / subject.""" with h5py.File(self.filepath, "r") as f: # get the digest from repr - digest = get_digest(pipeline) + digest = get_pipeline_digest(process_pipeline, pipeline) # check if digest present if digest not in f.keys(): diff --git a/moabb/datasets/base.py b/moabb/datasets/base.py index e9dd7f654..6cfd944d1 100644 --- a/moabb/datasets/base.py +++ b/moabb/datasets/base.py @@ -4,25 +4,14 @@ import re import traceback from dataclasses import dataclass -from enum import Enum from inspect import signature from pathlib import Path -from typing import Dict, Type, Union +from typing import Dict, Union -from sklearn.pipeline import Pipeline, make_pipeline +from sklearn.pipeline import Pipeline -from moabb.datasets.bids_interface import ( - BIDSInterfaceBase, - BIDSInterfaceEpochs, - BIDSInterfaceNumpyArray, - BIDSInterfaceRawEDF, -) -from moabb.datasets.preprocessing import ( - EpochsToEvents, - ForkPipelines, - RawToEvents, - SetRawAnnotations, -) +from moabb.datasets.bids_interface import StepType, _interface_map +from moabb.datasets.preprocessing import SetRawAnnotations log = logging.getLogger(__name__) @@ -98,21 +87,6 @@ def make(cls, dic: Union[None, Dict, "CacheConfig"] = None) -> "CacheConfig": raise ValueError(f"Expected dict or CacheConfig, got {type(dic)}") -class StepType(Enum): - """Enum for the different steps in the pipeline.""" - - RAW = "raw" - EPOCHS = "epochs" - ARRAY = "array" - - -_interface_map: Dict[StepType, Type[BIDSInterfaceBase]] = { - StepType.RAW: BIDSInterfaceRawEDF, - StepType.EPOCHS: BIDSInterfaceEpochs, - StepType.ARRAY: BIDSInterfaceNumpyArray, -} - - def apply_step(pipeline, obj): """Apply a pipeline to an object.""" if obj is None: @@ -224,12 +198,10 @@ def get_data( self, subjects=None, cache_config=None, - raw_pipeline=None, - epochs_pipeline=None, - array_pipeline=None, - events_pipeline=None, + process_pipeline=None, ): - """Return the data correspoonding to a list of subjects. + """ + Return the data correspoonding to a list of subjects. The returned data is a dictionary with the following structure:: @@ -259,27 +231,16 @@ def get_data( cache_config: dict | CacheConfig Configuration for caching of datasets. See ``CacheConfig`` for details. - raw_pipeline: sklearn.pipeline.Pipeline | sklearn.base.TransformerMixin - | None - Pipeline that necessarily takes a mne.io.Raw as input, - and necessarily returns a :class:`mne.io.Raw` as output. - epochs_pipeline: sklearn.pipeline.Pipeline | - sklearn.base.TransformerMixin | None - Pipeline that necessarily takes a mne.io.Raw as input, - and necessarily returns a :class:`mne.Epochs` as output. - array_pipeline: sklearn.pipeline.Pipeline | - sklearn.base.TransformerMixin | None - Pipeline either takes as input a :class:`mne.Epochs` if - epochs_pipeline is not ``None``, or a :class:`mne.io.Raw` - otherwise. It necessarily returns a :func:`numpy.ndarray` - as output. - If array_pipeline is not None, each run will be a - dict with keys "X" and "y" corresponding respectively to the array - itself and the corresponding labels. - events_pipeline: sklearn.pipeline.Pipeline | - sklearn.base.TransformerMixin | None - Pipeline used to generate the events. Only used if - ``array_pipeline`` is not ``None``. + process_pipeline: Pipeline | None + Optional processing pipeline to apply to the data. + To generate an adequate pipeline, we recommend using + :func:`moabb.utils.make_process_pipelines`. + This pipeline will receive :class:`mne.io.BaseRaw` objects. + The steps names of this pipeline should be elements of :class:`StepType`. + According to their name, the steps should either return a + :class:`mne.io.BaseRaw`, a :class:`mne.Epochs`, or a :func:`numpy.ndarray`. + This pipeline must be "fixed" because it will not be trained, + i.e. no call to ``fit`` will be made. Returns ------- @@ -292,35 +253,14 @@ def get_data( if not isinstance(subjects, list): raise ValueError("subjects must be a list") - if events_pipeline is None and array_pipeline is not None: - log.warning( - f"event_id not specified, using all the dataset's " - f"events to generate labels: {self.event_id}" - ) - events_pipeline = ( - RawToEvents(self.event_id) - if epochs_pipeline is None - else EpochsToEvents() - ) - cache_config = CacheConfig.make(cache_config) - steps = [] - steps.append((StepType.RAW, SetRawAnnotations(self.event_id))) - if raw_pipeline is not None: - steps.append((StepType.RAW, raw_pipeline)) - if epochs_pipeline is not None: - steps.append((StepType.EPOCHS, epochs_pipeline)) - if array_pipeline is not None: - array_events_pipeline = ForkPipelines( + if process_pipeline is None: + process_pipeline = Pipeline( [ - ("X", array_pipeline), - ("events", events_pipeline), + (StepType.RAW, SetRawAnnotations(self.event_id)), ] ) - steps.append((StepType.ARRAY, array_events_pipeline)) - if len(steps) == 0: - steps.append((StepType.RAW, make_pipeline(None))) data = dict() for subject in subjects: @@ -329,7 +269,7 @@ def get_data( data[subject] = self._get_single_subject_data_using_cache( subject, cache_config, - steps, + process_pipeline, ) return data @@ -394,7 +334,9 @@ def download( verbose=verbose, ) - def _get_single_subject_data_using_cache(self, subject, cache_config, steps): + def _get_single_subject_data_using_cache( + self, subject, cache_config, process_pipeline + ): """Load a single subject's data using cache. Either load the data of a single subject from disk cache or from the @@ -402,6 +344,7 @@ def _get_single_subject_data_using_cache(self, subject, cache_config, steps): then eventually saves or overwrites the cache version depending on the parameters. """ + steps = list(process_pipeline.steps) splitted_steps = [] # list of (cached_steps, remaining_steps) if cache_config.use: splitted_steps += [ diff --git a/moabb/datasets/bids_interface.py b/moabb/datasets/bids_interface.py index 011815fe8..880eea768 100644 --- a/moabb/datasets/bids_interface.py +++ b/moabb/datasets/bids_interface.py @@ -18,8 +18,9 @@ import re from collections import OrderedDict from dataclasses import dataclass +from enum import Enum from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Dict, Type import mne import mne_bids @@ -436,3 +437,19 @@ def _write_file(self, bids_path, obj): overwrite=False, verbose=self.verbose, ) + + +class StepType(Enum): + """Enum corresponding to the type of data returned + by a pipeline step.""" + + RAW = "raw" + EPOCHS = "epochs" + ARRAY = "array" + + +_interface_map: Dict[StepType, Type[BIDSInterfaceBase]] = { + StepType.RAW: BIDSInterfaceRawEDF, + StepType.EPOCHS: BIDSInterfaceEpochs, + StepType.ARRAY: BIDSInterfaceNumpyArray, +} diff --git a/moabb/evaluations/base.py b/moabb/evaluations/base.py index 9767738a5..c78db516a 100644 --- a/moabb/evaluations/base.py +++ b/moabb/evaluations/base.py @@ -156,29 +156,40 @@ def process(self, pipelines, param_grid=None): for _, pipeline in pipelines.items(): if not (isinstance(pipeline, BaseEstimator)): raise (ValueError("pipelines must only contains Pipelines " "instance")) - for dataset in self.datasets: log.info("Processing dataset: {}".format(dataset.code)) - results = self.evaluate(dataset, pipelines, param_grid) + process_pipeline = self.paradigm.make_process_pipelines( + dataset, + return_epochs=self.return_epochs, + return_raws=self.return_raws, + postprocess_pipeline=None, + )[0] + # (we only keep the pipeline for the first frequency band, better ideas?) + + results = self.evaluate(dataset, pipelines, param_grid, process_pipeline) for res in results: - self.push_result(res, pipelines) + self.push_result(res, pipelines, process_pipeline) - return self.results.to_dataframe(pipelines=pipelines) + return self.results.to_dataframe( + pipelines=pipelines, process_pipeline=process_pipeline + ) - def push_result(self, res, pipelines): + def push_result(self, res, pipelines, process_pipeline): message = "{} | ".format(res["pipeline"]) message += "{} | {} | {}".format( res["dataset"].code, res["subject"], res["session"] ) message += ": Score %.3f" % res["score"] log.info(message) - self.results.add({res["pipeline"]: res}, pipelines=pipelines) + self.results.add( + {res["pipeline"]: res}, pipelines=pipelines, process_pipeline=process_pipeline + ) def get_results(self): return self.results.to_dataframe() @abstractmethod - def evaluate(self, dataset, pipelines, param_grid): + def evaluate(self, dataset, pipelines, param_grid, process_pipeline): """Evaluate results on a single dataset. This method return a generator. each results item is a dict with diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index 3d8f94cab..ed13be16b 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -33,7 +33,6 @@ except ImportError: _carbonfootprint = False - log = logging.getLogger(__name__) # Numpy ArrayLike is only available starting from Numpy 1.20 and Python 3.8 @@ -179,10 +178,12 @@ def _grid_search(self, param_grid, name_grid, name, grid_clf, X_, y_, cv): # flake8: noqa: C901 - def _evaluate(self, dataset, pipelines, param_grid): + def _evaluate(self, dataset, pipelines, param_grid, process_pipeline): with parallel_backend("threading"): results = Parallel(n_jobs=self.n_jobs_evaluation, verbose=1)( - delayed(self._evaluate_subject)(dataset, pipelines, param_grid, subject) + delayed(self._evaluate_subject)( + dataset, pipelines, param_grid, subject, process_pipeline + ) for subject in tqdm( dataset.subject_list, desc=f"{dataset.code}-WithinSession" ) @@ -191,11 +192,15 @@ def _evaluate(self, dataset, pipelines, param_grid): # Concatenate the results from all subjects yield from [res for subject_results in results for res in subject_results] - def _evaluate_subject(self, dataset, pipelines, param_grid, subject): + def _evaluate_subject( + self, dataset, pipelines, param_grid, subject, process_pipeline + ): # Progress Bar at subject level # check if we already have result for this subject/pipeline # we might need a better granularity, if we query the DB - run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) + run_pipes = self.results.not_yet_computed( + pipelines, dataset, subject, process_pipeline + ) if len(run_pipes) == 0: return [] @@ -362,12 +367,14 @@ def score_explicit(self, clf, X_train, y_train, X_test, y_test): duration = time() - t_start return score, duration - def _evaluate_learning_curve(self, dataset, pipelines): + def _evaluate_learning_curve(self, dataset, pipelines, process_pipeline): # Progressbar at subject level for subject in tqdm(dataset.subject_list, desc=f"{dataset.code}-WithinSession"): # check if we already have result for this subject/pipeline # we might need a better granularity, if we query the DB - run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) + run_pipes = self.results.not_yet_computed( + pipelines, dataset, subject, process_pipeline + ) if len(run_pipes) == 0: continue @@ -398,7 +405,7 @@ def _evaluate_learning_curve(self, dataset, pipelines): continue not_enough_data = False log.info( - f"Permutation: {perm_i+1}," + f"Permutation: {perm_i + 1}," f" Training samples: {len(subset_indices)}" ) @@ -440,11 +447,11 @@ def _evaluate_learning_curve(self, dataset, pipelines): ) yield res - def evaluate(self, dataset, pipelines, param_grid): + def evaluate(self, dataset, pipelines, param_grid, process_pipeline): if self.calculate_learning_curve: - yield from self._evaluate_learning_curve(dataset, pipelines) + yield from self._evaluate_learning_curve(dataset, pipelines, process_pipeline) else: - yield from self._evaluate(dataset, pipelines, param_grid) + yield from self._evaluate(dataset, pipelines, param_grid, process_pipeline) def is_valid(self, dataset): return True @@ -526,24 +533,28 @@ def _grid_search(self, param_grid, name_grid, name, grid_clf, X, y, cv, groups): return grid_clf # flake8: noqa: C901 - def evaluate(self, dataset, pipelines, param_grid): + def evaluate(self, dataset, pipelines, param_grid, process_pipeline): if not self.is_valid(dataset): raise AssertionError("Dataset is not appropriate for evaluation") # Progressbar at subject level results = [] with parallel_backend("threading"): for result in Parallel(n_jobs=self.n_jobs_evaluation, verbose=1)( - delayed(self.process_subject)(subject, param_grid, pipelines, dataset) + delayed(self.process_subject)( + subject, param_grid, pipelines, dataset, process_pipeline + ) for subject in dataset.subject_list ): results.extend(result) return results - def process_subject(self, subject, param_grid, pipelines, dataset): + def process_subject(self, subject, param_grid, pipelines, dataset, process_pipeline): # check if we already have result for this subject/pipeline # we might need a better granularity, if we query the DB - run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) + run_pipes = self.results.not_yet_computed( + pipelines, dataset, subject, process_pipeline + ) if len(run_pipes) == 0: print(f"Subject {subject} already processed") return [] @@ -743,7 +754,7 @@ def _grid_search(self, param_grid, name_grid, name, clf, pipelines, X, y, cv, gr return pipelines[name] # flake8: noqa: C901 - def evaluate(self, dataset, pipelines, param_grid): + def evaluate(self, dataset, pipelines, param_grid, process_pipeline): if not self.is_valid(dataset): raise AssertionError("Dataset is not appropriate for evaluation") # this is a bit akward, but we need to check if at least one pipe @@ -752,7 +763,11 @@ def evaluate(self, dataset, pipelines, param_grid): # we might need a better granularity, if we query the DB run_pipes = {} for subject in dataset.subject_list: - run_pipes.update(self.results.not_yet_computed(pipelines, dataset, subject)) + run_pipes.update( + self.results.not_yet_computed( + pipelines, dataset, subject, process_pipeline + ) + ) if len(run_pipes) == 0: return @@ -807,7 +822,9 @@ def evaluate(self, dataset, pipelines, param_grid): ): subject = groups[test[0]] # now we can check if this subject has results - run_pipes = self.results.not_yet_computed(pipelines, dataset, subject) + run_pipes = self.results.not_yet_computed( + pipelines, dataset, subject, process_pipeline + ) # iterate over pipelines for name, clf in run_pipes.items(): if _carbonfootprint: diff --git a/moabb/paradigms/base.py b/moabb/paradigms/base.py index 4a10aa35d..8baf1cdea 100644 --- a/moabb/paradigms/base.py +++ b/moabb/paradigms/base.py @@ -9,12 +9,14 @@ from sklearn.pipeline import Pipeline, make_pipeline from sklearn.preprocessing import FunctionTransformer +from moabb.datasets.bids_interface import StepType from moabb.datasets.preprocessing import ( EpochsToEvents, EventsToLabels, ForkPipelines, RawToEpochs, RawToEvents, + SetRawAnnotations, get_crop_pipeline, get_filter_pipeline, get_resample_pipeline, @@ -126,14 +128,86 @@ def prepare_process(self, dataset): def used_events(self, dataset): pass + def make_process_pipelines( + self, dataset, return_epochs=False, return_raws=False, postprocess_pipeline=None + ): + """Return the pre-processing pipelines corresponding to this paradigm (one per frequency band). + Refer to the arguments of :func:`get_data` for more information.""" + if return_epochs and return_raws: + message = "Select only return_epochs or return_raws, not both" + raise ValueError(message) + + self.prepare_process(dataset) + + raw_pipelines = self._get_raw_pipelines() + epochs_pipeline = self._get_epochs_pipeline(return_epochs, return_raws, dataset) + array_pipeline = self._get_array_pipeline( + return_epochs, return_raws, dataset, postprocess_pipeline + ) + + if array_pipeline is not None: + events_pipeline = ( + self._get_events_pipeline(dataset) if return_raws else EpochsToEvents() + ) + else: + events_pipeline = None + + if events_pipeline is None and array_pipeline is not None: + log.warning( + f"event_id not specified, using all the dataset's " + f"events to generate labels: {dataset.event_id}" + ) + events_pipeline = ( + RawToEvents(dataset.event_id) + if epochs_pipeline is None + else EpochsToEvents() + ) + + process_pipelines = [] + for raw_pipeline in raw_pipelines: + steps = [] + steps.append((StepType.RAW, SetRawAnnotations(dataset.event_id))) + if raw_pipeline is not None: + steps.append((StepType.RAW, raw_pipeline)) + if epochs_pipeline is not None: + steps.append((StepType.EPOCHS, epochs_pipeline)) + if array_pipeline is not None: + array_events_pipeline = ForkPipelines( + [ + ("X", array_pipeline), + ("events", events_pipeline), + ] + ) + steps.append((StepType.ARRAY, array_events_pipeline)) + process_pipelines.append(Pipeline(steps)) + return process_pipelines + + def make_labels_pipeline(self, dataset, return_epochs=False, return_raws=False): + """Returns the pipeline that extracts the labels from the + output of the postprocess_pipeline. + Refer to the arguments of :func:`get_data` for more information.""" + if return_epochs: + labels_pipeline = make_pipeline( + EpochsToEvents(), + EventsToLabels(event_id=self.used_events(dataset)), + ) + elif return_raws: + labels_pipeline = make_pipeline( + self._get_events_pipeline(dataset), + EventsToLabels(event_id=self.used_events(dataset)), + ) + else: # return array + labels_pipeline = EventsToLabels(event_id=self.used_events(dataset)) + return labels_pipeline + def get_data( # noqa: C901 self, dataset, subjects=None, return_epochs=False, return_raws=False, - processing_pipeline=None, cache_config=None, + postprocess_pipeline=None, ): """ Return the data for a list of subject. @@ -159,6 +233,14 @@ def get_data( # noqa: C901 Mutually exclusive with return_epochs cache_config: dict | CacheConfig Configuration for caching of datasets. See :class:`moabb.datasets.base.CacheConfig` for details. + postprocess_pipeline: Pipeline | None + Optional pipeline to apply to the data after the preprocessing. + This pipeline will either receive :class:`mne.io.BaseRaw`, :class:`mne.Epochs` + or :func:`np.ndarray` as input, depending on the values of ``return_epochs`` + and ``return_raws``. + This pipeline must return an ``np.ndarray``. + This pipeline must be "fixed" because it will not be trained, + i.e. no call to ``fit`` will be made. Eeturns ------- @@ -176,49 +258,21 @@ def get_data( # noqa: C901 message = f"Dataset {dataset.code} is not valid for paradigm" raise AssertionError(message) - if return_epochs and return_raws: - message = "Select only return_epochs or return_raws, not both" - raise ValueError(message) - if subjects is None: subjects = dataset.subject_list - self.prepare_process(dataset) - raw_pipelines = self._get_raw_pipelines() - epochs_pipeline = self._get_epochs_pipeline(return_epochs, return_raws, dataset) - array_pipeline = self._get_array_pipeline( - return_epochs, return_raws, dataset, processing_pipeline + process_pipelines = self.make_process_pipelines( + dataset, return_epochs, return_raws, postprocess_pipeline ) - if return_epochs: - labels_pipeline = make_pipeline( - EpochsToEvents(), - EventsToLabels(event_id=self.used_events(dataset)), - ) - elif return_raws: - labels_pipeline = make_pipeline( - self._get_events_pipeline(dataset), - EventsToLabels(event_id=self.used_events(dataset)), - ) - else: # return array - labels_pipeline = EventsToLabels(event_id=self.used_events(dataset)) - - if array_pipeline is not None: - events_pipeline = ( - self._get_events_pipeline(dataset) if return_raws else EpochsToEvents() - ) - else: - events_pipeline = None + labels_pipeline = self.make_labels_pipeline(dataset, return_epochs, return_raws) data = [ dataset.get_data( subjects=subjects, cache_config=cache_config, - raw_pipeline=raw_pipeline, - epochs_pipeline=epochs_pipeline, - array_pipeline=array_pipeline, - events_pipeline=events_pipeline, + process_pipeline=process_pipeline, ) - for raw_pipeline in raw_pipelines + for process_pipeline in process_pipelines ] X = [] @@ -361,7 +415,7 @@ def _get_array_pipeline( ) ) if processing_pipeline is not None: - steps.append(("processing_pipeline", processing_pipeline)) + steps.append(("postprocess_pipeline", processing_pipeline)) if len(steps) == 0: return None return Pipeline(steps) diff --git a/moabb/tests/analysis.py b/moabb/tests/analysis.py index da77f3066..bdcac7c25 100644 --- a/moabb/tests/analysis.py +++ b/moabb/tests/analysis.py @@ -153,41 +153,55 @@ def tearDown(self): os.remove(path) def testCanAddSample(self): - self.obj.add(to_result_input(["a"], [d1]), to_pipeline_dict(["a"])) + self.obj.add( + to_result_input(["a"], [d1]), to_pipeline_dict(["a"]), "process_pipeline" + ) def testRecognizesAlreadyComputed(self): _in = to_result_input(["a"], [d1]) - self.obj.add(_in, to_pipeline_dict(["a"])) + self.obj.add(_in, to_pipeline_dict(["a"]), "process_pipeline") not_yet_computed = self.obj.not_yet_computed( - to_pipeline_dict(["a"]), d1["dataset"], d1["subject"] + to_pipeline_dict(["a"]), + d1["dataset"], + d1["subject"], + "process_pipeline", ) self.assertTrue(len(not_yet_computed) == 0) def testCanAddMultiplePipelines(self): _in = to_result_input(["a", "b", "c"], [d1, d1, d2]) - self.obj.add(_in, to_pipeline_dict(["a", "b", "c"])) + self.obj.add(_in, to_pipeline_dict(["a", "b", "c"]), "process_pipeline") def testCanAddMultipleValuesPerPipeline(self): _in = to_result_input(["a", "b"], [[d1, d2], [d2, d1]]) - self.obj.add(_in, to_pipeline_dict(["a", "b"])) + self.obj.add(_in, to_pipeline_dict(["a", "b"]), "process_pipeline") not_yet_computed = self.obj.not_yet_computed( - to_pipeline_dict(["a"]), d1["dataset"], d1["subject"] + to_pipeline_dict(["a"]), + d1["dataset"], + d1["subject"], + "process_pipeline", ) self.assertTrue(len(not_yet_computed) == 0, not_yet_computed) not_yet_computed = self.obj.not_yet_computed( - to_pipeline_dict(["b"]), d2["dataset"], d2["subject"] + to_pipeline_dict(["b"]), + d2["dataset"], + d2["subject"], + "process_pipeline", ) self.assertTrue(len(not_yet_computed) == 0, not_yet_computed) not_yet_computed = self.obj.not_yet_computed( - to_pipeline_dict(["b"]), d1["dataset"], d1["subject"] + to_pipeline_dict(["b"]), + d1["dataset"], + d1["subject"], + "process_pipeline", ) self.assertTrue(len(not_yet_computed) == 0, not_yet_computed) def testCanExportToDataframe(self): _in = to_result_input(["a", "b", "c"], [d1, d1, d2]) - self.obj.add(_in, to_pipeline_dict(["a", "b", "c"])) + self.obj.add(_in, to_pipeline_dict(["a", "b", "c"]), "process_pipeline") _in = to_result_input(["a", "b", "c"], [d2, d2, d3]) - self.obj.add(_in, to_pipeline_dict(["a", "b", "c"])) + self.obj.add(_in, to_pipeline_dict(["a", "b", "c"]), "process_pipeline") df = self.obj.to_dataframe() self.assertTrue( set(np.unique(df["pipeline"])) == set(("a", "b", "c")), diff --git a/moabb/tests/benchmark.py b/moabb/tests/benchmark.py index 5419360a0..c29e77bee 100644 --- a/moabb/tests/benchmark.py +++ b/moabb/tests/benchmark.py @@ -50,6 +50,7 @@ def test_nodataset(self): benchmark, pipelines=str(self.pp_dir), exclude_datasets=["FakeDataset"], + overwrite=True, ) def test_selectparadigm(self): @@ -68,6 +69,7 @@ def test_include_exclude(self): pipelines=str(self.pp_dir), include_datasets=["FakeDataset"], exclude_datasets=["AnotherDataset"], + overwrite=True, ) diff --git a/moabb/tests/evaluations.py b/moabb/tests/evaluations.py index 31bc328cd..dfd1c76b0 100644 --- a/moabb/tests/evaluations.py +++ b/moabb/tests/evaluations.py @@ -70,7 +70,13 @@ def tearDown(self): os.remove(path) def test_eval_results(self): - results = [r for r in self.eval.evaluate(dataset, pipelines, param_grid=None)] + process_pipeline = self.eval.paradigm.make_process_pipelines(dataset)[0] + results = [ + r + for r in self.eval.evaluate( + dataset, pipelines, param_grid=None, process_pipeline=process_pipeline + ) + ] # We should get 4 results, 2 sessions 2 subjects self.assertEqual(len(results), 4) @@ -113,8 +119,15 @@ def test_eval_grid_search(self): # Test grid search param_grid = {"C": {"csp__metric": ["euclid", "riemann"]}} + process_pipeline = self.eval.paradigm.make_process_pipelines(dataset)[0] results = [ - r for r in self.eval.evaluate(dataset, pipelines, param_grid=param_grid) + r + for r in self.eval.evaluate( + dataset, + pipelines, + param_grid=param_grid, + process_pipeline=process_pipeline, + ) ] # We should get 4 results, 2 sessions 2 subjects @@ -162,8 +175,12 @@ def test_correct_results_integrity(self): data_size={"policy": "ratio", "value": np.array([0.2, 0.5])}, n_perms=np.array([2, 2]), ) + process_pipeline = learning_curve_eval.paradigm.make_process_pipelines(dataset)[0] results = [ - r for r in learning_curve_eval.evaluate(dataset, pipelines, param_grid=None) + r + for r in learning_curve_eval.evaluate( + dataset, pipelines, param_grid=None, process_pipeline=process_pipeline + ) ] keys = results[0].keys() self.assertEqual(len(keys), 10) # 8 + 2 new for learning curve @@ -188,7 +205,12 @@ def test_all_policies_work(self): def test_data_sanity(self): # need this helper to iterate over the generator def run_evaluation(eval, dataset, pipelines): - list(eval.evaluate(dataset, pipelines, param_grid=None)) + process_pipeline = eval.paradigm.make_process_pipelines(dataset)[0] + list( + eval.evaluate( + dataset, pipelines, param_grid=None, process_pipeline=process_pipeline + ) + ) # E.g. if number of samples too high -> expect error kwargs = dict(paradigm=FakeImageryParadigm(), datasets=[dataset], n_perms=[2, 2]) @@ -201,7 +223,11 @@ def run_evaluation(eval, dataset, pipelines): # This one should run run_evaluation(should_work, dataset, pipelines) self.assertRaises( - ValueError, run_evaluation, too_many_samples, dataset, pipelines + ValueError, + run_evaluation, + too_many_samples, + dataset, + pipelines, ) def test_eval_grid_search(self): diff --git a/moabb/utils.py b/moabb/utils.py index a412af612..4ae4917ca 100644 --- a/moabb/utils.py +++ b/moabb/utils.py @@ -6,12 +6,17 @@ import random import re import sys +from typing import TYPE_CHECKING import numpy as np from mne import get_config, set_config from mne import set_log_level as sll +if TYPE_CHECKING: + from moabb.datasets.base import BaseDataset + from moabb.paradigms.base import BaseProcessing + log = logging.getLogger(__name__) @@ -155,6 +160,19 @@ def set_download_dir(path): set_config("MNE_DATA", path) +def make_process_pipelines( + processing: "BaseProcessing", + dataset: "BaseDataset", + return_epochs: bool = False, + return_raws: bool = False, + postprocess_pipeline=None, +): + """Shortcut for the method :func:`moabb.paradigms.base.BaseProcessing.make_process_pipelines`""" + return processing.make_process_pipelines( + dataset, return_epochs, return_raws, postprocess_pipeline + ) + + aliases_list = [] # list of tuples containing (old name, new name, expire version)