安宗言
安宗言

Reputation: 1

How to add a data pipeline in MMDetection and MMFewshot?

This is my config:

data = dict(
    samples_per_gpu=4,
    workers_per_gpu=2,
    train=dict(
        type='MyNWayKShotDataset',
        num_support_ways=7,
        num_support_shots=1,
        one_support_shot_per_image=True,
        num_used_support_shots=200,
        save_dataset=False,
        dataset=dict(
            type='FewShotVHRDataset',
            ann_cfg=[
                dict(
                    type='ann_file',
                    ann_file='datasets/NWPU_VHR-10/annotations/train.json')
            ],
            img_prefix='datasets/NWPU_VHR-10/train',
            multi_pipelines=dict(
                query=[
                    dict(type='LoadImageFromFile'),
                    dict(type='LoadAnnotations', with_bbox=True),
                    dict(
                        type='Resize', img_scale=(1000, 600), keep_ratio=True),
                    dict(type='RandomFlip', flip_ratio=0.5),
                    dict(
                        type='Normalize',
                        mean=[103.53, 116.28, 123.675],
                        std=[1.0, 1.0, 1.0],
                        to_rgb=False),
                    dict(type='DefaultFormatBundle'),
                    dict(
                        type='Collect', keys=['img', 'gt_bboxes', 'gt_labels'])
                ],
                query_origin=[
                    dict(type='LoadImageFromFile'),
                    dict(type='LoadAnnotations', with_bbox=True),
                    dict(
                        type='Resize', img_scale=(1000, 600), keep_ratio=True),
                    dict(type='RandomFlip', flip_ratio=0.5),
                    dict(
                        type='Normalize',
                        mean=[103.53, 116.28, 123.675],
                        std=[1.0, 1.0, 1.0],
                        to_rgb=False),
                    dict(type='DefaultFormatBundle'),
                    dict(
                        type='Collect', keys=['img', 'gt_bboxes', 'gt_labels'])
                ],
                support=[
                    dict(type='LoadImageFromFile'),
                    dict(type='LoadAnnotations', with_bbox=True),
                    dict(
                        type='Normalize',
                        mean=[103.53, 116.28, 123.675],
                        std=[1.0, 1.0, 1.0],
                        to_rgb=False),
                    dict(type='GenerateMask', target_size=(224, 224)),
                    dict(type='RandomFlip', flip_ratio=0.0),
                    dict(type='DefaultFormatBundle'),
                    dict(
                        type='Collect', keys=['img', 'gt_bboxes', 'gt_labels'])
                ]),
            classes='BASE_CLASSES',
            instance_wise=False,
            dataset_name='query_support_dataset')),

In the multi_pipelines,there are three pipelines to process data differently. There are only two pipelines named'query,support'in the offical code.I need to add a pipeline named'query_origin' to use another pre_process method.I use the same 'query' data in the 'Query_origin'pipeline just without data augmentation.

From the official documentation, it askes me to modify DatasetWrapper,DataloaderWrapper, build_dataset and build_dataloader. https://mmfewshot.readthedocs.io/en/latest/detection/customize_dataset.html Here are the code I have modified:

  1. DatasetWrapper(partial):I modified the func 'getitem' and func 'len' to apply two different data augmentation methods to query_data separately.
class MyNWayKShotDataset:
    def __init__(self,
                 query_dataset: BaseFewShotDataset,
                 support_dataset: Optional[BaseFewShotDataset],
                 num_support_ways: int,
                 num_support_shots: int,
                 one_support_shot_per_image: bool = False,
                 num_used_support_shots: int = 200,
                 repeat_times: int = 1) -> None:
        self.query_dataset = query_dataset
        #self.query_origin_dataset = query_dataset
        if support_dataset is None:
            self.support_dataset = self.query_dataset
        else:
            self.support_dataset = support_dataset
        self.CLASSES = self.query_dataset.CLASSES
        # The mode determinate the behavior of fetching data,
        # the default mode is 'query'. To convert the dataset
        # into 'support' dataset, simply call the function
        # convert_query_to_support().
        self._mode = 'query'
        self.num_support_ways = num_support_ways
        self.one_support_shot_per_image = one_support_shot_per_image
        self.num_used_support_shots = num_used_support_shots
        assert num_support_ways <= len(
            self.CLASSES
        ), 'support way can not larger than the number of classes'
        self.num_support_shots = num_support_shots
        self.batch_indices = []
        self.data_infos_by_class = {i: [] for i in range(len(self.CLASSES))}
        self.prepare_support_shots()
        self.repeat_times = repeat_times
        # Disable the group sampler, because in few shot setting,
        # one group may only has two or three images.
        if hasattr(query_dataset, 'flag'):
            self.flag = np.zeros(
                len(self.query_dataset) * self.repeat_times, dtype=np.uint8)

        self._ori_len = len(self.query_dataset)

    def __getitem__(self, idx: int) -> Union[Dict, List[Dict]]:
        if self._mode == 'query':
            idx %= self._ori_len
            #
            query_data = self.query_dataset.prepare_train_img(idx, 'query')
            # 
            query_origin_data = self.query_dataset.prepare_train_img(idx, 'query_origin')
            # 
            return {
                'query_data': query_data,
                'query_origin_data': query_origin_data
            }
        elif self._mode == 'support':
            # loads one batch of data in support pipeline
            b_idx = self.batch_indices[idx]
            support_data = [
                self.support_dataset.prepare_train_img(idx, 'support',
                                                       [gt_idx])
                for (idx, gt_idx) in b_idx
            ]
            return {'support_data':support_data}
        else:
            raise ValueError('not valid data type')

    def __len__(self) -> int:
        """Length of dataset."""
        if self._mode in 'query':
            return len(self.query_dataset) * self.repeat_times
        elif self._mode == 'support':
            return len(self.batch_indices)
        else:
            raise ValueError(f'{self._mode}not a valid mode')
  1. build_dataset:I modified 'elif isinstance(dataset, MyNWayKShotDataset)':
def build_dataloader(dataset: Dataset,
                     samples_per_gpu: int,
                     workers_per_gpu: int,
                     num_gpus: int = 1,
                     dist: bool = True,
                     shuffle: bool = True,
                     seed: Optional[int] = None,
                     data_cfg: Optional[Dict] = None,
                     use_infinite_sampler: bool = False,
                     **kwargs) -> DataLoader:
    rank, world_size = get_dist_info()
    (sampler, batch_size, num_workers) = build_sampler(
        dist=dist,
        shuffle=shuffle,
        dataset=dataset,
        num_gpus=num_gpus,
        samples_per_gpu=samples_per_gpu,
        workers_per_gpu=workers_per_gpu,
        seed=seed,
        use_infinite_sampler=use_infinite_sampler)
    init_fn = partial(
        worker_init_fn, num_workers=num_workers, rank=rank,
        seed=seed) if seed is not None else None
    if isinstance(dataset, QueryAwareDataset):
        from mmfewshot.utils import multi_pipeline_collate_fn

        # `QueryAwareDataset` will return a list of DataContainer
        # `multi_pipeline_collate_fn` are designed to handle
        # the data with list[list[DataContainer]]
        data_loader = DataLoader(
            dataset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=num_workers,
            collate_fn=partial(
                multi_pipeline_collate_fn, samples_per_gpu=samples_per_gpu),
            pin_memory=False,
            worker_init_fn=init_fn,
            **kwargs)
    elif isinstance(dataset, MyNWayKShotDataset):
        from mmfewshot.utils import multi_pipeline_collate_fn
        from .dataloader_wrappers import MyNWayKShotDataloader

        # `NWayKShotDataset` will return a list of DataContainer
        # `multi_pipeline_collate_fn` are designed to handle
        # the data with list[list[DataContainer]]
        # initialize query dataloader
        query_data_loader = DataLoader(
            dataset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=num_workers,
            collate_fn=partial(
                multi_pipeline_collate_fn, samples_per_gpu=samples_per_gpu),
            pin_memory=False,
            worker_init_fn=init_fn,
            **kwargs)
        query_origin_data_loader = DataLoader(
            dataset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=num_workers,
            collate_fn=partial(
                multi_pipeline_collate_fn, samples_per_gpu=samples_per_gpu),
            pin_memory=False,
            worker_init_fn=init_fn,
            **kwargs)
        support_dataset = copy.deepcopy(dataset)
        # if infinite sampler is used, the length of batch indices in
        # support_dataset can be longer than the length of query dataset
        # as it can achieve better sample diversity
        if use_infinite_sampler:
            support_dataset.convert_query_to_support(len(dataset) * num_gpus)
        # create support dataset from query dataset and
        # sample batch indices with same length as query dataloader
        else:
            support_dataset.convert_query_to_support(
                len(query_data_loader) * num_gpus)

        (support_sampler, _, _) = build_sampler(
            dist=dist,
            shuffle=False,
            dataset=support_dataset,
            num_gpus=num_gpus,
            samples_per_gpu=1,
            workers_per_gpu=workers_per_gpu,
            seed=seed,
            use_infinite_sampler=use_infinite_sampler)
        # support dataloader is initialized with batch_size 1 as default.
        # each batch contains (num_support_ways * num_support_shots) images,
        # since changing batch_size is equal to changing num_support_shots.
        support_data_loader = DataLoader(
            support_dataset,
            batch_size=1,
            sampler=support_sampler,
            num_workers=num_workers,
            collate_fn=partial(multi_pipeline_collate_fn, samples_per_gpu=1),
            pin_memory=False,
            worker_init_fn=init_fn,
            **kwargs)

        # wrap two dataloaders with dataloader wrapper
        data_loader = MyNWayKShotDataloader(
            query_data_loader=query_data_loader,
            query_origin_data_loader=query_origin_data_loader,
            support_data_loader=support_data_loader)
    else:
        data_loader = DataLoader(
            dataset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=num_workers,
            collate_fn=partial(collate, samples_per_gpu=samples_per_gpu),
            pin_memory=False,
            worker_init_fn=init_fn,
            **kwargs)

    return data_loader
  1. DataloaderWrapper:
class MyNWayKShotDataloader:
    """A dataloader wrapper.

    It Create a iterator to generate query and support batch simultaneously.
    Each batch contains query data and support data, and the lengths are
    batch_size and (num_support_ways * num_support_shots) respectively.

    Args:
        query_data_loader (DataLoader): DataLoader of query dataset
        support_data_loader (DataLoader): DataLoader of support datasets.
    """

    def __init__(self, query_data_loader: DataLoader,
                 query_origin_data_loader: DataLoader,
                 support_data_loader: DataLoader) -> None:
        self.dataset = query_data_loader.dataset
        self.sampler = query_data_loader.sampler
        self.query_data_loader = query_data_loader
        self.query_origin_data_loader = query_origin_data_loader
        self.support_data_loader = support_data_loader

    def __iter__(self) -> Iterator:
        # if infinite sampler is used, this part of code only run once
        self.query_iter = iter(self.query_data_loader)
        self.query_origin_iter = iter(self.query_origin_data_loader)
        self.support_iter = iter(self.support_data_loader)
        return self

    def __next__(self) -> Dict:
        # call query and support iterator
        query_data = self.query_iter.next()
        query_origin_data = self.query_origin_iter.next()
        support_data = self.support_iter.next()


        return {'query_data': query_data,
                'query_origin_data': query_origin_data,
                'support_data': support_data}

    def __len__(self) -> int:
        return len(self.query_data_loader)
  1. build_dataloader:I modified the 'elif isinstance(dataset, MyNWayKShotDataset)'
def build_dataloader(dataset: Dataset,
                     samples_per_gpu: int,
                     workers_per_gpu: int,
                     num_gpus: int = 1,
                     dist: bool = True,
                     shuffle: bool = True,
                     seed: Optional[int] = None,
                     data_cfg: Optional[Dict] = None,
                     use_infinite_sampler: bool = False,
                     **kwargs) -> DataLoader:
    rank, world_size = get_dist_info()
    (sampler, batch_size, num_workers) = build_sampler(
        dist=dist,
        shuffle=shuffle,
        dataset=dataset,
        num_gpus=num_gpus,
        samples_per_gpu=samples_per_gpu,
        workers_per_gpu=workers_per_gpu,
        seed=seed,
        use_infinite_sampler=use_infinite_sampler)
    init_fn = partial(
        worker_init_fn, num_workers=num_workers, rank=rank,
        seed=seed) if seed is not None else None
    if isinstance(dataset, QueryAwareDataset):
        from mmfewshot.utils import multi_pipeline_collate_fn

        # `QueryAwareDataset` will return a list of DataContainer
        # `multi_pipeline_collate_fn` are designed to handle
        # the data with list[list[DataContainer]]
        data_loader = DataLoader(
            dataset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=num_workers,
            collate_fn=partial(
                multi_pipeline_collate_fn, samples_per_gpu=samples_per_gpu),
            pin_memory=False,
            worker_init_fn=init_fn,
            **kwargs)
    elif isinstance(dataset, MyNWayKShotDataset):
        from mmfewshot.utils import multi_pipeline_collate_fn
        from .dataloader_wrappers import MyNWayKShotDataloader

        # `NWayKShotDataset` will return a list of DataContainer
        # `multi_pipeline_collate_fn` are designed to handle
        # the data with list[list[DataContainer]]
        # initialize query dataloader
        query_data_loader = DataLoader(
            dataset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=num_workers,
            collate_fn=partial(
                multi_pipeline_collate_fn, samples_per_gpu=samples_per_gpu),
            pin_memory=False,
            worker_init_fn=init_fn,
            **kwargs)
        print('1111111',query_data_loader.dataset)
        query_origin_data_loader = DataLoader(
            dataset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=num_workers,
            collate_fn=partial(
                multi_pipeline_collate_fn, samples_per_gpu=samples_per_gpu),
            pin_memory=False,
            worker_init_fn=init_fn,
            **kwargs)
        print('22222', query_origin_data_loader.dataset)
        support_dataset = copy.deepcopy(dataset)
        # if infinite sampler is used, the length of batch indices in
        # support_dataset can be longer than the length of query dataset
        # as it can achieve better sample diversity
        if use_infinite_sampler:
            support_dataset.convert_query_to_support(len(dataset) * num_gpus)
        # create support dataset from query dataset and
        # sample batch indices with same length as query dataloader
        else:
            support_dataset.convert_query_to_support(
                len(query_data_loader) * num_gpus)

        (support_sampler, _, _) = build_sampler(
            dist=dist,
            shuffle=False,
            dataset=support_dataset,
            num_gpus=num_gpus,
            samples_per_gpu=1,
            workers_per_gpu=workers_per_gpu,
            seed=seed,
            use_infinite_sampler=use_infinite_sampler)
        # support dataloader is initialized with batch_size 1 as default.
        # each batch contains (num_support_ways * num_support_shots) images,
        # since changing batch_size is equal to changing num_support_shots.
        support_data_loader = DataLoader(
            support_dataset,
            batch_size=1,
            sampler=support_sampler,
            num_workers=num_workers,
            collate_fn=partial(multi_pipeline_collate_fn, samples_per_gpu=1),
            pin_memory=False,
            worker_init_fn=init_fn,
            **kwargs)

        # wrap two dataloaders with dataloader wrapper
        data_loader = MyNWayKShotDataloader(
            query_data_loader=query_data_loader,
            query_origin_data_loader=query_origin_data_loader,
            support_data_loader=support_data_loader)
    else:
        data_loader = DataLoader(
            dataset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=num_workers,
            collate_fn=partial(collate, samples_per_gpu=samples_per_gpu),
            pin_memory=False,
            worker_init_fn=init_fn,
            **kwargs)

    return data_loader

Next, I runed train.py. However, my program was stuck at a certain step and it was not throwing any errors.Upon inspection, it was stuck at the final step 'runner.run(data_loaders, cfg.workflow)' of the train_detector function called in train.py.

def train_detector(model: nn.Module,
                   dataset: Iterable,
                   cfg: ConfigDict,
                   distributed: bool = False,
                   validate: bool = False,
                   timestamp: Optional[str] = None,
                   meta: Optional[Dict] = None) -> None:
    cfg = compat_cfg(cfg)
    logger = get_root_logger(log_level=cfg.log_level)

    # prepare data loaders
    dataset = dataset if isinstance(dataset, (list, tuple)) else [dataset]

    train_dataloader_default_args = dict(
        samples_per_gpu=2,
        workers_per_gpu=2,
        # `num_gpus` will be ignored if distributed
        num_gpus=len(cfg.gpu_ids),
        dist=distributed,
        seed=cfg.seed,
        data_cfg=copy.deepcopy(cfg.data),
        use_infinite_sampler=cfg.use_infinite_sampler,
        persistent_workers=False)
    train_loader_cfg = {
        **train_dataloader_default_args,
        **cfg.data.get('train_dataloader', {})
    }
    data_loaders = [build_dataloader(ds, **train_loader_cfg) for ds in dataset]

    # put model on gpus
    if distributed:
        find_unused_parameters = cfg.get('find_unused_parameters', False)
        # Sets the `find_unused_parameters` parameter in
        # torch.nn.parallel.DistributedDataParallel
        model = MMDistributedDataParallel(
            model.cuda(),
            device_ids=[torch.cuda.current_device()],
            broadcast_buffers=False,
            find_unused_parameters=find_unused_parameters)
    else:
        # Please use MMCV >= 1.4.4 for CPU training!
        model = MMDataParallel(model, device_ids=cfg.gpu_ids)

    # build runner
    optimizer = build_optimizer(model, cfg.optimizer)

    # Infinite sampler will return a infinite stream of index. It can NOT
    # be used in `EpochBasedRunner`, because the `EpochBasedRunner` will
    # enumerate the dataloader forever. Thus, `InfiniteEpochBasedRunner`
    # is designed to handle dataloader with infinite sampler.
    if cfg.use_infinite_sampler and cfg.runner['type'] == 'EpochBasedRunner':
        cfg.runner['type'] = 'InfiniteEpochBasedRunner'
    runner = build_runner(
        cfg.runner,
        default_args=dict(
            model=model,
            optimizer=optimizer,
            work_dir=cfg.work_dir,
            logger=logger,
            meta=meta))

    # an ugly workaround to make .log and .log.json filenames the same
    runner.timestamp = timestamp

    # fp16 setting
    fp16_cfg = cfg.get('fp16', None)
    if fp16_cfg is not None:
        optimizer_config = Fp16OptimizerHook(
            **cfg.optimizer_config, **fp16_cfg, distributed=distributed)
    elif distributed and 'type' not in cfg.optimizer_config:
        optimizer_config = OptimizerHook(**cfg.optimizer_config)
    else:
        optimizer_config = cfg.optimizer_config

    # register hooks
    runner.register_training_hooks(cfg.lr_config, optimizer_config,
                                   cfg.checkpoint_config, cfg.log_config,
                                   cfg.get('momentum_config', None))
    if distributed:
        if isinstance(runner, EpochBasedRunner):
            runner.register_hook(DistSamplerSeedHook())

    # register eval hooks
    if validate:
        # currently only support single images testing
        val_dataloader_default_args = dict(
            samples_per_gpu=1,
            workers_per_gpu=2,
            dist=distributed,
            shuffle=False,
            persistent_workers=False)
        val_dataloader_args = {
            **val_dataloader_default_args,
            **cfg.data.get('val_dataloader', {})
        }

        val_dataset = build_dataset(cfg.data.val, dict(test_mode=True))

        assert val_dataloader_args['samples_per_gpu'] == 1, \
            'currently only support single images testing'
        val_dataloader = build_dataloader(val_dataset, **val_dataloader_args)

        eval_cfg = cfg.get('evaluation', {})
        eval_cfg['by_epoch'] = cfg.runner['type'] != 'IterBasedRunner'

        # Prepare `model_init` dataset for model initialization. In most cases,
        # the `model_init` dataset contains support images and few shot
        # annotations. The meta-learning based detectors will extract the
        # features from images and save them as part of model parameters.
        # The `model_init` dataset can be mutually configured or
        # randomly selected during runtime.
        if cfg.data.get('model_init', None) is not None:
            # The randomly selected few shot support during runtime can not be
            # configured offline. In such case, the copy datasets are designed
            # to directly copy the randomly generated support set for model
            # initialization. The copy datasets copy the `data_infos` by
            # passing it as argument and other arguments can be different
            # from training dataset.
            if cfg.data.model_init.pop('copy_from_train_dataset', False):
                if cfg.data.model_init.ann_cfg is not None:
                    warnings.warn(
                        'model_init dataset will copy support '
                        'dataset used for training and original '
                        'ann_cfg will be discarded', UserWarning)
                # modify dataset type to support copying data_infos operation
                cfg.data.model_init.type = \
                    get_copy_dataset_type(cfg.data.model_init.type)
                if not hasattr(dataset[0], 'get_support_data_infos'):
                    raise NotImplementedError(
                        f'`get_support_data_infos` is not implemented '
                        f'in {dataset[0].__class__.__name__}.')
                cfg.data.model_init.ann_cfg = [
                    dict(data_infos=dataset[0].get_support_data_infos())
                ]
            # The `model_init` dataset will be saved into checkpoint, which
            # allows model to be initialized with these data as default, if
            # the config of data is not be overwritten during testing.
            cfg.checkpoint_config.meta['model_init_ann_cfg'] = \
                cfg.data.model_init.ann_cfg
            samples_per_gpu = cfg.data.model_init.pop('samples_per_gpu', 1)
            workers_per_gpu = cfg.data.model_init.pop('workers_per_gpu', 1)
            model_init_dataset = build_dataset(cfg.data.model_init)
            # Noted that `dist` should be FALSE to make all the models on
            # different gpus get same data results in same initialized models.
            model_init_dataloader = build_dataloader(
                model_init_dataset,
                samples_per_gpu=samples_per_gpu,
                workers_per_gpu=workers_per_gpu,
                dist=False,
                shuffle=False)

            # eval hook for meta-learning based query-support detector, it
            # supports model initialization before regular evaluation.
            eval_hook = QuerySupportDistEvalHook \
                if distributed else QuerySupportEvalHook
            runner.register_hook(
                eval_hook(model_init_dataloader, val_dataloader, **eval_cfg),
                priority='LOW')
        else:
            # for the fine-tuned based methods, the evaluation is the
            # same as mmdet.
            eval_hook = DistEvalHook if distributed else EvalHook
            runner.register_hook(
                eval_hook(val_dataloader, **eval_cfg), priority='LOW')

    # user-defined hooks
    if cfg.get('custom_hooks', None):
        custom_hooks = cfg.custom_hooks
        assert isinstance(
            custom_hooks, list
        ), f'custom_hooks expect list type, but got {type(custom_hooks)}'
        for hook_cfg in cfg.custom_hooks:
            assert isinstance(
                hook_cfg, dict
            ), f'Each item in custom_hooks expects dict type, but ' \
               f'got {type(hook_cfg)}'
            hook_cfg = hook_cfg.copy()
            priority = hook_cfg.pop('priority', 'NORMAL')
            hook = build_from_cfg(hook_cfg, HOOKS)
            runner.register_hook(hook, priority=priority)

    if cfg.resume_from:
        runner.resume(cfg.resume_from)
    elif cfg.load_from:
        runner.load_checkpoint(cfg.load_from)
    runner.run(data_loaders, cfg.workflow)

I suspect there is an issue with the dataloader code, but without any errors being reported, I can't pinpoint the problem accurately.Please tell me how this can be fixed. Thanks.

Upvotes: 0

Views: 142

Answers (0)

Related Questions