Package hub

Expand source code
import threading
from queue import Queue
from botocore.config import Config
import numpy as np
import multiprocessing
import sys
from hub.util.check_latest_version import warn_if_update_required

if sys.platform == "darwin":
    multiprocessing.set_start_method("fork", force=True)

__pdoc__ = {
    "api": False,
    "auto": False,
    "cli": False,
    "client": False,
    "constants": False,
    "config": False,
    "integrations": False,
    "tests": False,
    "util": False,
    "Dataset.clear_cache": False,
    "Dataset.flush": False,
    "Dataset.read_only": False,
    "Dataset.size_approx": False,
    "Dataset.token": False,
    "Dataset.num_samples": False,
}
from .api.dataset import dataset as api_dataset
from .api.read import read
from .api.link import link
from .api.tiled import tiled
from .core.dataset import Dataset
from .core.transform import compute, compose
from .core.tensor import Tensor
from .util.bugout_reporter import hub_reporter
from .compression import SUPPORTED_COMPRESSIONS
from .htype import HTYPE_CONFIGURATIONS
from .integrations import huggingface

compressions = list(SUPPORTED_COMPRESSIONS)
htypes = sorted(list(HTYPE_CONFIGURATIONS))
list = api_dataset.list
exists = api_dataset.exists
load = api_dataset.load
empty = api_dataset.empty
like = api_dataset.like
delete = api_dataset.delete
rename = api_dataset.rename
copy = api_dataset.copy
deepcopy = api_dataset.deepcopy
ingest = api_dataset.ingest
ingest_kaggle = api_dataset.ingest_kaggle
ingest_dataframe = api_dataset.ingest_dataframe
ingest_huggingface = huggingface.ingest_huggingface
dataset = api_dataset.init
tensor = Tensor

__all__ = [
    "tensor",
    "read",
    "link",
    "__version__",
    "load",
    "empty",
    "exists",
    "compute",
    "compose",
    "copy",
    "dataset",
    "Dataset",
    "deepcopy",
    "like",
    "list",
    "ingest",
    "ingest_kaggle",
    "ingest_huggingface",
    "compressions",
    "htypes",
    "config",
    "delete",
    "copy",
    "rename",
]

__version__ = "2.5.0"
warn_if_update_required(__version__)
__encoded_version__ = np.array(__version__)
config = {"s3": Config(max_pool_connections=50, connect_timeout=300, read_timeout=300)}


hub_reporter.tags.append(f"version:{__version__}")
hub_reporter.system_report(publish=True)
hub_reporter.setup_excepthook(publish=True)

event_queue: Queue = Queue()


def send_event():
    while True:
        try:
            event = event_queue.get()
            client, event_dict = event
            client.send_event(event_dict)
        except Exception:
            pass


threading.Thread(target=send_event, daemon=True).start()

Sub-modules

hub.compression
hub.core
hub.htype

"htype" is the class of a tensor: image, bounding box, generic tensor, etc …

hub.visualizer

Functions

def compose(functions)

Takes a list of functions decorated using hub.compute and creates a pipeline that can be evaluated using .eval

Example::

pipeline = hub.compose([my_fn(a=3), another_function(b=2)])
pipeline.eval(data_in, ds_out, scheduler="processed", num_workers=2)

The eval method evaluates the pipeline/transform function.

It has the following arguments:

  • data_in: Input passed to the transform to generate output dataset.

    • It should support __getitem__ and __len__. This can be a Hub dataset.
  • ds_out (Dataset, optional): The dataset object to which the transform will get written.

    • If this is not provided, data_in will be overwritten if it is a Hub dataset, otherwise error will be raised.
    • It should have all keys being generated in output already present as tensors.
    • It's initial state should be either:
      • Empty i.e. all tensors have no samples. In this case all samples are added to the dataset.
      • All tensors are populated and have sampe length. In this case new samples are appended to the dataset.
  • num_workers (int): The number of workers to use for performing the transform.

    • Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
  • scheduler (str): The scheduler to be used to compute the transformation.

    • Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.
  • progressbar (bool): Displays a progress bar if True (default).

  • skip_ok (bool): If True, skips the check for output tensors generated.

    • This allows the user to skip certain tensors in the function definition.
    • This is especially useful for inplace transformations in which certain tensors are not modified. Defaults to False.

It raises the following errors:

  • InvalidInputDataError: If data_in passed to transform is invalid. It should support __getitem__ and __len__ operations. Using scheduler other than "threaded" with hub dataset having base storage as memory as data_in will also raise this.

  • InvalidOutputDatasetError: If all the tensors of ds_out passed to transform don't have the same length. Using scheduler other than "threaded" with hub dataset having base storage as memory as ds_out will also raise this.

  • TensorMismatchError: If one or more of the outputs generated during transform contain different tensors than the ones present in 'ds_out' provided to transform.

  • UnsupportedSchedulerError: If the scheduler passed is not recognized. Supported values include: 'serial', 'threaded', 'processed' and 'ray'.

  • TransformError: All other exceptions raised if there are problems while running the pipeline.

Expand source code
def compose(functions: List[ComputeFunction]):  # noqa: DAR101, DAR102, DAR201, DAR401
    """Takes a list of functions decorated using hub.compute and creates a pipeline that can be evaluated using .eval

    Example::

        pipeline = hub.compose([my_fn(a=3), another_function(b=2)])
        pipeline.eval(data_in, ds_out, scheduler="processed", num_workers=2)

    The __eval__ method evaluates the pipeline/transform function.

    It has the following arguments:

    - `data_in`: Input passed to the transform to generate output dataset.
        - It should support \__getitem__ and \__len__. This can be a Hub dataset.

    - `ds_out (Dataset, optional)`: The dataset object to which the transform will get written.
        - If this is not provided, data_in will be overwritten if it is a Hub dataset, otherwise error will be raised.
        - It should have all keys being generated in output already present as tensors.
        - It's initial state should be either:
            - Empty i.e. all tensors have no samples. In this case all samples are added to the dataset.
            - All tensors are populated and have sampe length. In this case new samples are appended to the dataset.

    - `num_workers (int)`: The number of workers to use for performing the transform.
        - Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.

    - `scheduler (str)`: The scheduler to be used to compute the transformation.
        - Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.

    - `progressbar (bool)`: Displays a progress bar if True (default).

    - `skip_ok (bool)`: If True, skips the check for output tensors generated.
        - This allows the user to skip certain tensors in the function definition.
        - This is especially useful for inplace transformations in which certain tensors are not modified. Defaults to False.

    It raises the following errors:

    - `InvalidInputDataError`: If data_in passed to transform is invalid. It should support \__getitem__ and \__len__ operations. Using scheduler other than "threaded" with hub dataset having base storage as memory as data_in will also raise this.

    - `InvalidOutputDatasetError`: If all the tensors of ds_out passed to transform don't have the same length. Using scheduler other than "threaded" with hub dataset having base storage as memory as ds_out will also raise this.

    - `TensorMismatchError`: If one or more of the outputs generated during transform contain different tensors than the ones present in 'ds_out' provided to transform.

    - `UnsupportedSchedulerError`: If the scheduler passed is not recognized. Supported values include: 'serial', 'threaded', 'processed' and 'ray'.

    - `TransformError`: All other exceptions raised if there are problems while running the pipeline.
    """
    if not functions:
        raise HubComposeEmptyListError
    for index, fn in enumerate(functions):
        if not isinstance(fn, ComputeFunction):
            raise HubComposeIncompatibleFunction(index)
    return Pipeline(functions)
def compute(fn, name=None)

Compute is a decorator for functions.

The functions should have atleast 2 argument, the first two will correspond to sample_in and samples_out.

There can be as many other arguments as required.

The output should be appended/extended to the second argument in a hub like syntax.

Any value returned by the fn will be ignored.

Example::

@hub.compute
def my_fn(sample_in: Any, samples_out, my_arg0, my_arg1=0):
    samples_out.my_tensor.append(my_arg0 * my_arg1)

# This transform can be used using the eval method in one of these 2 ways:-

# Directly evaluating the method
# here arg0 and arg1 correspond to the 3rd and 4th argument in my_fn
my_fn(arg0, arg1).eval(data_in, ds_out, scheduler="threaded", num_workers=5)

# As a part of a Transform pipeline containing other functions
pipeline = hub.compose([my_fn(a, b), another_function(x=2)])
pipeline.eval(data_in, ds_out, scheduler="processed", num_workers=2)

The eval method evaluates the pipeline/transform function.

It has the following arguments:

  • data_in: Input passed to the transform to generate output dataset.

    • It should support __getitem__ and __len__. This can be a Hub dataset.
  • ds_out (Dataset, optional): The dataset object to which the transform will get written.

    • If this is not provided, data_in will be overwritten if it is a Hub dataset, otherwise error will be raised.
    • It should have all keys being generated in output already present as tensors.
    • It's initial state should be either:
      • Empty i.e. all tensors have no samples. In this case all samples are added to the dataset.
      • All tensors are populated and have sampe length. In this case new samples are appended to the dataset.
  • num_workers (int): The number of workers to use for performing the transform.

    • Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
  • scheduler (str): The scheduler to be used to compute the transformation.

    • Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.
  • progressbar (bool): Displays a progress bar if True (default).

  • skip_ok (bool): If True, skips the check for output tensors generated.

    • This allows the user to skip certain tensors in the function definition.
    • This is especially useful for inplace transformations in which certain tensors are not modified. Defaults to False.

It raises the following errors:

  • InvalidInputDataError: If data_in passed to transform is invalid. It should support __getitem__ and __len__ operations. Using scheduler other than "threaded" with hub dataset having base storage as memory as data_in will also raise this.

  • InvalidOutputDatasetError: If all the tensors of ds_out passed to transform don't have the same length. Using scheduler other than "threaded" with hub dataset having base storage as memory as ds_out will also raise this.

  • TensorMismatchError: If one or more of the outputs generated during transform contain different tensors than the ones present in 'ds_out' provided to transform.

  • UnsupportedSchedulerError: If the scheduler passed is not recognized. Supported values include: 'serial', 'threaded', 'processed' and 'ray'.

  • TransformError: All other exceptions raised if there are problems while running the pipeline.

Expand source code
def compute(
    fn,
    name: Optional[str] = None,
) -> Callable[..., ComputeFunction]:  # noqa: DAR101, DAR102, DAR201, DAR401
    """Compute is a decorator for functions.

    The functions should have atleast 2 argument, the first two will correspond to `sample_in` and `samples_out`.

    There can be as many other arguments as required.

    The output should be appended/extended to the second argument in a hub like syntax.

    Any value returned by the fn will be ignored.

    Example::

        @hub.compute
        def my_fn(sample_in: Any, samples_out, my_arg0, my_arg1=0):
            samples_out.my_tensor.append(my_arg0 * my_arg1)

        # This transform can be used using the eval method in one of these 2 ways:-

        # Directly evaluating the method
        # here arg0 and arg1 correspond to the 3rd and 4th argument in my_fn
        my_fn(arg0, arg1).eval(data_in, ds_out, scheduler="threaded", num_workers=5)

        # As a part of a Transform pipeline containing other functions
        pipeline = hub.compose([my_fn(a, b), another_function(x=2)])
        pipeline.eval(data_in, ds_out, scheduler="processed", num_workers=2)

    The __eval__ method evaluates the pipeline/transform function.

    It has the following arguments:

    - `data_in`: Input passed to the transform to generate output dataset.
        - It should support \__getitem__ and \__len__. This can be a Hub dataset.

    - `ds_out (Dataset, optional)`: The dataset object to which the transform will get written.
        - If this is not provided, data_in will be overwritten if it is a Hub dataset, otherwise error will be raised.
        - It should have all keys being generated in output already present as tensors.
        - It's initial state should be either:
            - Empty i.e. all tensors have no samples. In this case all samples are added to the dataset.
            - All tensors are populated and have sampe length. In this case new samples are appended to the dataset.

    - `num_workers (int)`: The number of workers to use for performing the transform.
        - Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.

    - `scheduler (str)`: The scheduler to be used to compute the transformation.
        - Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.

    - `progressbar (bool)`: Displays a progress bar if True (default).

    - `skip_ok (bool)`: If True, skips the check for output tensors generated.
        - This allows the user to skip certain tensors in the function definition.
        - This is especially useful for inplace transformations in which certain tensors are not modified. Defaults to False.

    It raises the following errors:

    - `InvalidInputDataError`: If data_in passed to transform is invalid. It should support \__getitem__ and \__len__ operations. Using scheduler other than "threaded" with hub dataset having base storage as memory as data_in will also raise this.

    - `InvalidOutputDatasetError`: If all the tensors of ds_out passed to transform don't have the same length. Using scheduler other than "threaded" with hub dataset having base storage as memory as ds_out will also raise this.

    - `TensorMismatchError`: If one or more of the outputs generated during transform contain different tensors than the ones present in 'ds_out' provided to transform.

    - `UnsupportedSchedulerError`: If the scheduler passed is not recognized. Supported values include: 'serial', 'threaded', 'processed' and 'ray'.

    - `TransformError`: All other exceptions raised if there are problems while running the pipeline.
    """

    def inner(*args, **kwargs):
        return ComputeFunction(fn, args, kwargs, name)

    return inner
def copy(src, dest, overwrite=False, src_creds=None, src_token=None, dest_creds=None, dest_token=None, num_workers=0, scheduler='threaded', progressbar=True)

Copies this dataset at src to dest. Version control history is not included.

Args

src : Union[str, Dataset]
The Dataset or the path to the dataset to be copied.
dest : str
Destination path to copy to.
overwrite : bool
If True and a dataset exists at destination, it will be overwritten. Defaults to False.
src_creds : dict, optional

A dictionary containing credentials used to access the dataset at src.

  • If 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token' are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
  • It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
src_token : str, optional
Activeloop token, used for fetching credentials to the dataset at src if it is a Hub dataset. This is optional, tokens are normally autogenerated.
dest_creds : dict, optional
creds required to create / overwrite datasets at dest.
dest_token : str, optional
token used to for fetching credentials to dest.
num_workers : int
The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
scheduler : str
The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.
progressbar : bool
Displays a progress bar if True (default).

Returns

Dataset
New dataset object.

Raises

DatasetHandlerError
If a dataset already exists at destination path and overwrite is False.
Expand source code
@staticmethod
def copy(
    src: Union[str, Dataset],
    dest: str,
    overwrite: bool = False,
    src_creds=None,
    src_token=None,
    dest_creds=None,
    dest_token=None,
    num_workers: int = 0,
    scheduler="threaded",
    progressbar=True,
):
    """Copies this dataset at `src` to `dest`. Version control history is not included.

    Args:
        src (Union[str, Dataset]): The Dataset or the path to the dataset to be copied.
        dest (str): Destination path to copy to.
        overwrite (bool): If True and a dataset exists at `destination`, it will be overwritten. Defaults to False.
        src_creds (dict, optional): A dictionary containing credentials used to access the dataset at `src`.
            -
            - If 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token' are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
            - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
        src_token (str, optional): Activeloop token, used for fetching credentials to the dataset at `src` if it is a Hub dataset. This is optional, tokens are normally autogenerated.
        dest_creds (dict, optional): creds required to create / overwrite datasets at `dest`.
        dest_token (str, optional): token used to for fetching credentials to `dest`.
        num_workers (int): The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
        scheduler (str): The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'.
            Defaults to 'threaded'.
        progressbar (bool): Displays a progress bar if True (default).

    Returns:
        Dataset: New dataset object.

    Raises:
        DatasetHandlerError: If a dataset already exists at destination path and overwrite is False.
    """
    if isinstance(src, str):
        src_ds = hub.load(src, read_only=True, creds=src_creds, token=src_token)
    else:
        src_ds = src
    return src_ds.copy(
        dest,
        overwrite=overwrite,
        creds=dest_creds,
        token=dest_token,
        num_workers=num_workers,
        scheduler=scheduler,
        progressbar=progressbar,
    )
def dataset(path, read_only=False, overwrite=False, public=False, memory_cache_size=256, local_cache_size=0, creds=None, token=None, verbose=True)

Returns a Dataset object referencing either a new or existing dataset.

Important

Using overwrite will delete all of your data if it exists! Be very careful when setting this parameter.

Examples

ds = hub.dataset("hub://username/dataset")
ds = hub.dataset("s3://mybucket/my_dataset")
ds = hub.dataset("./datasets/my_dataset", overwrite=True)

Args

path : str

The full path to the dataset. Can be:

  • a Hub cloud path of the form hub://username/datasetname. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line)
  • an s3 path of the form s3://bucketname/path/to/dataset. Credentials are required in either the environment or passed to the creds argument.
  • a local file system path of the form ./path/to/dataset or ~/path/to/dataset or path/to/dataset.
  • a memory path of the form mem://path/to/dataset which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
read_only : bool
Opens dataset in read only mode if this is passed as True. Defaults to False. Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode.
overwrite : bool
WARNING: If set to True this overwrites the dataset if it already exists. This can NOT be undone! Defaults to False.
public : bool
Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to True.
memory_cache_size : int
The size of the memory cache to be used in MB.
local_cache_size : int
The size of the local filesystem cache to be used in MB.
creds : dict, str, optional
A dictionary containing credentials or path to credentials file used to access the dataset at the path. If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths. It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
token : str, optional
Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
verbose : bool
If True, logs will be printed. Defaults to True.

Returns

Dataset object created using the arguments provided.

Raises

AgreementError
When agreement is rejected
Expand source code
@staticmethod
def init(
    path: str,
    read_only: bool = DEFAULT_READONLY,
    overwrite: bool = False,
    public: bool = False,
    memory_cache_size: int = DEFAULT_MEMORY_CACHE_SIZE,
    local_cache_size: int = DEFAULT_LOCAL_CACHE_SIZE,
    creds: Optional[Union[Dict, str]] = None,
    token: Optional[str] = None,
    verbose: bool = True,
):
    """Returns a Dataset object referencing either a new or existing dataset.

    Important:
        Using `overwrite` will delete all of your data if it exists! Be very careful when setting this parameter.

    Examples:
        ```
        ds = hub.dataset("hub://username/dataset")
        ds = hub.dataset("s3://mybucket/my_dataset")
        ds = hub.dataset("./datasets/my_dataset", overwrite=True)
        ```

    Args:
        path (str): The full path to the dataset. Can be:
            -
            - a Hub cloud path of the form `hub://username/datasetname`. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line)
            - an s3 path of the form `s3://bucketname/path/to/dataset`. Credentials are required in either the environment or passed to the creds argument.
            - a local file system path of the form `./path/to/dataset` or `~/path/to/dataset` or `path/to/dataset`.
            - a memory path of the form `mem://path/to/dataset` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
        read_only (bool): Opens dataset in read only mode if this is passed as True. Defaults to False.
            Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode.
        overwrite (bool): WARNING: If set to True this overwrites the dataset if it already exists. This can NOT be undone! Defaults to False.
        public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to True.
        memory_cache_size (int): The size of the memory cache to be used in MB.
        local_cache_size (int): The size of the local filesystem cache to be used in MB.
        creds (dict, str, optional): A dictionary containing credentials or path to credentials file used to access the dataset at the path.
            If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
            It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
        token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
        verbose (bool): If True, logs will be printed. Defaults to True.

    Returns:
        Dataset object created using the arguments provided.

    Raises:
        AgreementError: When agreement is rejected
    """
    if creds is None:
        creds = {}

    feature_report_path(path, "dataset", {"Overwrite": overwrite})

    storage, cache_chain = get_storage_and_cache_chain(
        path=path,
        read_only=read_only,
        creds=creds,
        token=token,
        memory_cache_size=memory_cache_size,
        local_cache_size=local_cache_size,
    )
    if overwrite and dataset_exists(cache_chain):
        cache_chain.clear()

    try:
        read_only = storage.read_only
        return dataset_factory(
            path=path,
            storage=cache_chain,
            read_only=read_only,
            public=public,
            token=token,
            verbose=verbose,
        )
    except AgreementError as e:
        raise e from None
def deepcopy(src, dest, overwrite=False, src_creds=None, src_token=None, dest_creds=None, dest_token=None, num_workers=0, scheduler='threaded', progressbar=True, public=False)

Copies dataset at src to dest including version control history.

Args

src : str
Path to the dataset to be copied.
dest : str
Destination path to copy to.
overwrite : bool
If True and a dataset exists at destination, it will be overwritten. Defaults to False.
src_creds : dict, optional

A dictionary containing credentials used to access the dataset at src.

  • If 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token' are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
  • It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
src_token : str, optional
Activeloop token, used for fetching credentials to the dataset at src if it is a Hub dataset. This is optional, tokens are normally autogenerated.
dest_creds : dict, optional
creds required to create / overwrite datasets at dest.
dest_token : str, optional
token used to for fetching credentials to dest.
num_workers : int
The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
scheduler : str
The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.
progressbar : bool
Displays a progress bar if True (default).
public : bool
Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.

Returns

Dataset
New dataset object.

Raises

DatasetHandlerError
If a dataset already exists at destination path and overwrite is False.
Expand source code
@staticmethod
def deepcopy(
    src: str,
    dest: str,
    overwrite: bool = False,
    src_creds=None,
    src_token=None,
    dest_creds=None,
    dest_token=None,
    num_workers: int = 0,
    scheduler="threaded",
    progressbar=True,
    public: bool = False,
):
    """Copies dataset at `src` to `dest` including version control history.

    Args:
        src (str): Path to the dataset to be copied.
        dest (str): Destination path to copy to.
        overwrite (bool): If True and a dataset exists at `destination`, it will be overwritten. Defaults to False.
        src_creds (dict, optional): A dictionary containing credentials used to access the dataset at `src`.
            -
            - If 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token' are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
            - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
        src_token (str, optional): Activeloop token, used for fetching credentials to the dataset at `src` if it is a Hub dataset. This is optional, tokens are normally autogenerated.
        dest_creds (dict, optional): creds required to create / overwrite datasets at `dest`.
        dest_token (str, optional): token used to for fetching credentials to `dest`.
        num_workers (int): The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
        scheduler (str): The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'.
            Defaults to 'threaded'.
        progressbar (bool): Displays a progress bar if True (default).
        public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.

    Returns:
        Dataset: New dataset object.

    Raises:
        DatasetHandlerError: If a dataset already exists at destination path and overwrite is False.
    """
    report_params = {
        "Overwrite": overwrite,
        "Num_Workers": num_workers,
        "Scheduler": scheduler,
        "Progressbar": progressbar,
        "Public": public,
    }
    if dest.startswith("hub://"):
        report_params["Dest"] = dest
    feature_report_path(src, "deepcopy", report_params)
    src_ds = hub.load(src, read_only=True, creds=src_creds, token=src_token)
    src_storage = get_base_storage(src_ds.storage)

    dest_storage, cache_chain = get_storage_and_cache_chain(
        dest,
        creds=dest_creds,
        token=dest_token,
        read_only=False,
        memory_cache_size=DEFAULT_MEMORY_CACHE_SIZE,
        local_cache_size=DEFAULT_LOCAL_CACHE_SIZE,
    )

    if dataset_exists(cache_chain):
        if overwrite:
            cache_chain.clear()
        else:
            raise DatasetHandlerError(
                f"A dataset already exists at the given path ({dest}). If you want to copy to a new dataset, either specify another path or use overwrite=True."
            )

    def copy_func(keys, progress_callback=None):
        cache = generate_chain(
            src_storage,
            memory_cache_size=DEFAULT_MEMORY_CACHE_SIZE,
            local_cache_size=DEFAULT_LOCAL_CACHE_SIZE,
            path=src,
        )
        for key in keys:
            if isinstance(cache[key], HubMemoryObject):
                dest_storage[key] = cache[key].tobytes()
            else:
                dest_storage[key] = cache[key]
            if progress_callback:
                progress_callback(1)

    def copy_func_with_progress_bar(pg_callback, keys):
        copy_func(keys, pg_callback)

    keys = list(src_storage._all_keys())
    len_keys = len(keys)
    if num_workers == 0:
        keys = [keys]
    else:
        keys = [keys[i::num_workers] for i in range(num_workers)]
    compute_provider = get_compute_provider(scheduler, num_workers)
    try:
        if progressbar:
            compute_provider.map_with_progressbar(
                copy_func_with_progress_bar,
                keys,
                len_keys,
                "Copying dataset",
            )
        else:
            compute_provider.map(copy_func, keys)
    finally:
        compute_provider.close()

    return dataset_factory(
        path=dest,
        storage=cache_chain,
        read_only=False,
        public=public,
        token=dest_token,
    )
def delete(path, force=False, large_ok=False, creds=None, token=None, verbose=False)

Deletes a dataset at a given path.

This is an IRREVERSIBLE operation. Data once deleted can not be recovered.

Args

path : str
The path to the dataset to be deleted.
force : bool
Delete data regardless of whether it looks like a hub dataset. All data at the path will be removed.
large_ok : bool
Delete datasets larger than 1GB. Disabled by default.
creds : dict, optional

A dictionary containing credentials used to access the dataset at the path.

  • If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
  • It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
token : str, optional
Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
verbose : bool
If True, logs will be printed. Defaults to True.

Raises

DatasetHandlerError
If a Dataset does not exist at the given path and force = False.
Expand source code
@staticmethod
def delete(
    path: str,
    force: bool = False,
    large_ok: bool = False,
    creds: Optional[dict] = None,
    token: Optional[str] = None,
    verbose: bool = False,
) -> None:
    """Deletes a dataset at a given path.

    This is an __IRREVERSIBLE__ operation. Data once deleted can not be recovered.

    Args:
        path (str): The path to the dataset to be deleted.
        force (bool): Delete data regardless of whether
            it looks like a hub dataset. All data at the path will be removed.
        large_ok (bool): Delete datasets larger than 1GB. Disabled by default.
        creds (dict, optional): A dictionary containing credentials used to access the dataset at the path.
            -
            - If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
            - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
        token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
        verbose (bool): If True, logs will be printed. Defaults to True.

    Raises:
        DatasetHandlerError: If a Dataset does not exist at the given path and force = False.
    """
    if creds is None:
        creds = {}

    feature_report_path(path, "delete", {"Force": force, "Large_OK": large_ok})

    try:
        ds = hub.load(path, verbose=False, token=token, creds=creds)
        ds.delete(large_ok=large_ok)
        if verbose:
            logger.info(f"{path} dataset deleted successfully.")
    except Exception as e:
        if force:
            base_storage = storage_provider_from_path(
                path=path,
                creds=creds,
                read_only=False,
                token=token,
            )
            base_storage.clear()
            remove_path_from_backend(path, token)
            if verbose:
                logger.info(f"{path} folder deleted successfully.")
        else:
            if isinstance(e, (DatasetHandlerError, PathNotEmptyException)):
                raise DatasetHandlerError(
                    "A Hub dataset wasn't found at the specified path. "
                    "This may be due to a corrupt dataset or a wrong path. "
                    "If you want to delete the data at the path regardless, use force=True"
                )
            raise
def empty(path, overwrite=False, public=False, memory_cache_size=256, local_cache_size=0, creds=None, token=None)

Creates an empty dataset

Important

Using overwrite will delete all of your data if it exists! Be very careful when setting this parameter.

Args

path : str

The full path to the dataset. Can be:

  • a Hub cloud path of the form hub://username/datasetname. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line)
  • an s3 path of the form s3://bucketname/path/to/dataset. Credentials are required in either the environment or passed to the creds argument.
  • a local file system path of the form ./path/to/dataset or ~/path/to/dataset or path/to/dataset.
  • a memory path of the form mem://path/to/dataset which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
overwrite : bool
WARNING: If set to True this overwrites the dataset if it already exists. This can NOT be undone! Defaults to False.
public : bool
Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.
memory_cache_size : int
The size of the memory cache to be used in MB.
local_cache_size : int
The size of the local filesystem cache to be used in MB.
creds : dict, optional

A dictionary containing credentials used to access the dataset at the path.

  • If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
  • It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
token : str, optional
Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.

Returns

Dataset object created using the arguments provided.

Raises

DatasetHandlerError
If a Dataset already exists at the given path and overwrite is False.
Expand source code
@staticmethod
def empty(
    path: str,
    overwrite: bool = False,
    public: bool = False,
    memory_cache_size: int = DEFAULT_MEMORY_CACHE_SIZE,
    local_cache_size: int = DEFAULT_LOCAL_CACHE_SIZE,
    creds: Optional[dict] = None,
    token: Optional[str] = None,
) -> Dataset:
    """Creates an empty dataset

    Important:
        Using `overwrite` will delete all of your data if it exists! Be very careful when setting this parameter.

    Args:
        path (str): The full path to the dataset. Can be:
            -
            - a Hub cloud path of the form `hub://username/datasetname`. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line)
            - an s3 path of the form `s3://bucketname/path/to/dataset`. Credentials are required in either the environment or passed to the creds argument.
            - a local file system path of the form `./path/to/dataset` or `~/path/to/dataset` or `path/to/dataset`.
            - a memory path of the form `mem://path/to/dataset` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
        overwrite (bool): __WARNING__: If set to True this overwrites the dataset if it already exists. This can __NOT__ be undone! Defaults to False.
        public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.
        memory_cache_size (int): The size of the memory cache to be used in MB.
        local_cache_size (int): The size of the local filesystem cache to be used in MB.
        creds (dict, optional): A dictionary containing credentials used to access the dataset at the path.
            -
            - If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
            - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
        token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.

    Returns:
        Dataset object created using the arguments provided.

    Raises:
        DatasetHandlerError: If a Dataset already exists at the given path and overwrite is False.
    """
    if creds is None:
        creds = {}

    feature_report_path(path, "empty", {"Overwrite": overwrite})

    storage, cache_chain = get_storage_and_cache_chain(
        path=path,
        read_only=False,
        creds=creds,
        token=token,
        memory_cache_size=memory_cache_size,
        local_cache_size=local_cache_size,
    )

    if overwrite and dataset_exists(cache_chain):
        cache_chain.clear()
    elif dataset_exists(cache_chain):
        raise DatasetHandlerError(
            f"A dataset already exists at the given path ({path}). If you want to create a new empty dataset, either specify another path or use overwrite=True. If you want to load the dataset that exists at this path, use hub.load() instead."
        )

    read_only = storage.read_only
    return dataset_factory(
        path=path,
        storage=cache_chain,
        read_only=read_only,
        public=public,
        token=token,
    )
def exists(path, creds=None, token=None)

Checks if a dataset exists at the given path.

Args

path : str
the path which needs to be checked.
creds : dict, optional
A dictionary containing credentials used to access the dataset at the path.
token : str, optional
Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.

Returns

A boolean confirming whether the dataset exists or not at the given path.

Expand source code
@staticmethod
def exists(
    path: str, creds: Optional[dict] = None, token: Optional[str] = None
) -> bool:
    """Checks if a dataset exists at the given `path`.
    Args:
        path (str): the path which needs to be checked.
        creds (dict, optional): A dictionary containing credentials used to access the dataset at the path.
        token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
    Returns:
        A boolean confirming whether the dataset exists or not at the given path.
    """
    if creds is None:
        creds = {}
    try:
        storage, cache_chain = get_storage_and_cache_chain(
            path=path,
            read_only=True,
            creds=creds,
            token=token,
            memory_cache_size=DEFAULT_MEMORY_CACHE_SIZE,
            local_cache_size=DEFAULT_LOCAL_CACHE_SIZE,
        )
    except (AuthorizationException):
        # Cloud Dataset does not exist
        return False
    if dataset_exists(storage):
        return True
    else:
        return False
def ingest(src, dest, images_compression='auto', dest_creds=None, progressbar=True, summary=True, **dataset_kwargs)

Ingests a dataset from a source and stores it as a structured dataset to destination

Note

  • Currently only local source paths and image classification datasets / csv files are supported for automatic ingestion.
  • Supported filetypes: png/jpeg/jpg/csv.
  • All files and sub-directories with unsupported filetypes are ignored.
  • Valid source directory structures for image classification look like:
    data/
        img0.jpg
        img1.jpg
        ...

or

    data/
        class0/
            cat0.jpg
            ...
        class1/
            dog0.jpg
            ...
        ...

or

    data/
        train/
            class0/
                img0.jpg
                ...
            ...
        val/
            class0/
                img0.jpg
                ...
            ...
        ...
  • Classes defined as sub-directories can be accessed at ds["test/labels"].info.class_names.
  • Support for train and test sub directories is present under ds["train/images"], ds["train/labels"] and ds["test/images"], ds["test/labels"]
  • Mapping filenames to classes from an external file is currently not supported.

Args

src : str
Local path to where the unstructured dataset is stored or path to csv file.
dest : str

Destination path where the structured dataset will be stored. Can be:

  • a Hub cloud path of the form hub://username/datasetname. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line)
  • an s3 path of the form s3://bucketname/path/to/dataset. Credentials are required in either the environment or passed to the creds argument.
  • a local file system path of the form ./path/to/dataset or ~/path/to/dataset or path/to/dataset.
  • a memory path of the form mem://path/to/dataset which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
images_compression : str
For image classification datasets, this compression will be used for the images tensor. If images_compression is "auto", compression will be automatically determined by the most common extension in the directory.
dest_creds : dict
A dictionary containing credentials used to access the destination path of the dataset.
progressbar : bool
Enables or disables ingestion progress bar. Defaults to True.
summary : bool
If True, a summary of skipped files will be printed after completion. Defaults to True.
**dataset_kwargs
Any arguments passed here will be forwarded to the dataset creator function.

Returns

Dataset
New dataset object with structured dataset.

Raises

InvalidPathException
If the source directory does not exist.
SamePathException
If the source and destination path are same.
AutoCompressionError
If the source director is empty or does not contain a valid extension.
InvalidFileExtension
If the most frequent file extension is found to be 'None' during auto-compression.
Expand source code
@staticmethod
def ingest(
    src,
    dest: str,
    images_compression: str = "auto",
    dest_creds: dict = None,
    progressbar: bool = True,
    summary: bool = True,
    **dataset_kwargs,
) -> Dataset:
    """Ingests a dataset from a source and stores it as a structured dataset to destination

    Note:
        - Currently only local source paths and image classification datasets / csv files are supported for automatic ingestion.
        - Supported filetypes: png/jpeg/jpg/csv.
        - All files and sub-directories with unsupported filetypes are ignored.
        - Valid source directory structures for image classification look like:

        ```
            data/
                img0.jpg
                img1.jpg
                ...

        ```
        or
        ```
            data/
                class0/
                    cat0.jpg
                    ...
                class1/
                    dog0.jpg
                    ...
                ...

        ```
        or
        ```
            data/
                train/
                    class0/
                        img0.jpg
                        ...
                    ...
                val/
                    class0/
                        img0.jpg
                        ...
                    ...
                ...
        ```

        - Classes defined as sub-directories can be accessed at `ds["test/labels"].info.class_names`.
        - Support for train and test sub directories is present under ds["train/images"], ds["train/labels"] and ds["test/images"], ds["test/labels"]
        - Mapping filenames to classes from an external file is currently not supported.

    Args:
        src (str): Local path to where the unstructured dataset is stored or path to csv file.
        dest (str): Destination path where the structured dataset will be stored. Can be:
            -
            - a Hub cloud path of the form `hub://username/datasetname`. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line)
            - an s3 path of the form `s3://bucketname/path/to/dataset`. Credentials are required in either the environment or passed to the creds argument.
            - a local file system path of the form `./path/to/dataset` or `~/path/to/dataset` or `path/to/dataset`.
            - a memory path of the form `mem://path/to/dataset` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
        images_compression (str): For image classification datasets, this compression will be used for the `images` tensor. If images_compression is "auto", compression will be automatically determined by the most common extension in the directory.
        dest_creds (dict): A dictionary containing credentials used to access the destination path of the dataset.
        progressbar (bool): Enables or disables ingestion progress bar. Defaults to True.
        summary (bool): If True, a summary of skipped files will be printed after completion. Defaults to True.
        **dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function.

    Returns:
        Dataset: New dataset object with structured dataset.

    Raises:
        InvalidPathException: If the source directory does not exist.
        SamePathException: If the source and destination path are same.
        AutoCompressionError: If the source director is empty or does not contain a valid extension.
        InvalidFileExtension: If the most frequent file extension is found to be 'None' during auto-compression.
    """

    feature_report_path(
        dest,
        "ingest",
        {
            "Images_Compression": images_compression,
            "Progressbar": progressbar,
            "Summary": summary,
        },
    )

    if isinstance(src, str):
        if os.path.isdir(dest) and os.path.samefile(src, dest):
            raise SamePathException(src)

        if src.endswith(".csv"):
            import pandas as pd  # type:ignore

            if not os.path.isfile(src):
                raise InvalidPathException(src)
            source = pd.read_csv(src, quotechar='"', skipinitialspace=True)
            ds = dataset.ingest_dataframe(
                source, dest, dest_creds, progressbar, **dataset_kwargs
            )
            return ds

        if not os.path.isdir(src):
            raise InvalidPathException(src)

        if images_compression == "auto":
            images_compression = get_most_common_extension(src)
            if images_compression is None:
                raise InvalidFileExtension(src)

        ds = hub.dataset(dest, creds=dest_creds, **dataset_kwargs)

        # TODO: support more than just image classification (and update docstring)
        unstructured = ImageClassification(source=src)

        # TODO: auto detect compression
        unstructured.structure(
            ds,  # type: ignore
            use_progress_bar=progressbar,
            generate_summary=summary,
            image_tensor_args={"sample_compression": images_compression},
        )
    return ds  # type: ignore
def ingest_huggingface(src, dest, use_progressbar=True)

Converts hugging face datasets to hub format.

Note

  • if DatasetDict looks like:
    {
        train: Dataset({
            features: ['data']
        }),
        validation: Dataset({
            features: ['data']
        }),
        test: Dataset({
            features: ['data']
        }),
    }

it will be converted to a Hub Dataset with tensors ['train/data', 'validation/data', 'test/data'].

Features of the type Sequence(feature=Value(dtype='string')) are not supported. Columns of such type are skipped.

Args

src : hfDataset, DatasetDict
Hugging Face Dataset or DatasetDict to be converted. Data in different splits of a DatasetDict will be stored under respective tensor groups.
dest : Dataset, str
Destination dataset or path to it.
use_progressbar : bool
Defines if progress bar should be used to show conversion progress.

Returns

Dataset
The destination Hub dataset.
Expand source code
def ingest_huggingface(
    src,
    dest,
    use_progressbar=True,
) -> Dataset:
    """Converts hugging face datasets to hub format.

    Note:
        - if DatasetDict looks like:
        ```
            {
                train: Dataset({
                    features: ['data']
                }),
                validation: Dataset({
                    features: ['data']
                }),
                test: Dataset({
                    features: ['data']
                }),
            }
        ```
        it will be converted to a Hub `Dataset` with tensors `['train/data', 'validation/data', 'test/data']`.

        Features of the type `Sequence(feature=Value(dtype='string'))` are not supported. Columns of such type are skipped.

    Args:
        src (hfDataset, DatasetDict): Hugging Face Dataset or DatasetDict to be converted. Data in different splits of a
            DatasetDict will be stored under respective tensor groups.
        dest (Dataset, str): Destination dataset or path to it.
        use_progressbar (bool): Defines if progress bar should be used to show conversion progress.

    Returns:
        Dataset: The destination Hub dataset.

    """
    from datasets import DatasetDict

    if isinstance(dest, str):
        ds = hub.dataset(dest)
    else:
        ds = dest  # type: ignore

    if isinstance(src, DatasetDict):
        for split, src_ds in src.items():
            for key, feature in src_ds.features.items():
                _create_tensor_from_feature(f"{split}/{key}", feature, src_ds, ds)
    else:
        skipped_keys: Set[str] = set()
        features = tqdm(
            src.features.items(),
            desc=f"Converting...({len(skipped_keys)} skipped)",
            disable=not use_progressbar,
        )
        for key, feature in features:
            if not _create_tensor_from_feature(key, feature, src, ds):
                skipped_keys.add(key)
                features.set_description(f"Converting...({len(skipped_keys)} skipped)")
    return ds  # type: ignore
def ingest_kaggle(tag, src, dest, exist_ok=False, images_compression='auto', dest_creds=None, kaggle_credentials=None, progressbar=True, summary=True, **dataset_kwargs)

Download and ingest a kaggle dataset and store it as a structured dataset to destination

Note

Currently only local source paths and image classification datasets are supported for automatic ingestion.

Args

tag : str
Kaggle dataset tag. Example: "coloradokb/dandelionimages" points to https://www.kaggle.com/coloradokb/dandelionimages
src : str
Local path to where the raw kaggle dataset will be downlaoded to.
dest : str

Destination path where the structured dataset will be stored. Can be:

  • a Hub cloud path of the form hub://username/datasetname. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line)
  • an s3 path of the form s3://bucketname/path/to/dataset. Credentials are required in either the environment or passed to the creds argument.
  • a local file system path of the form ./path/to/dataset or ~/path/to/dataset or path/to/dataset.
  • a memory path of the form mem://path/to/dataset which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
exist_ok : bool
If the kaggle dataset was already downloaded and exist_ok is True, ingestion will proceed without error.
images_compression : str
For image classification datasets, this compression will be used for the images tensor. If images_compression is "auto", compression will be automatically determined by the most common extension in the directory.
dest_creds : dict
A dictionary containing credentials used to access the destination path of the dataset.
kaggle_credentials : dict
A dictionary containing kaggle credentials {"username":"YOUR_USERNAME", "key": "YOUR_KEY"}. If None, environment variables/the kaggle.json file will be used if available.
progressbar : bool
Enables or disables ingestion progress bar. Set to true by default.
summary : bool
Generates ingestion summary. Set to true by default.
**dataset_kwargs
Any arguments passed here will be forwarded to the dataset creator function.

Returns

Dataset
New dataset object with structured dataset.

Raises

SamePathException
If the source and destination path are same.
Expand source code
@staticmethod
def ingest_kaggle(
    tag: str,
    src: str,
    dest: str,
    exist_ok: bool = False,
    images_compression: str = "auto",
    dest_creds: dict = None,
    kaggle_credentials: dict = None,
    progressbar: bool = True,
    summary: bool = True,
    **dataset_kwargs,
) -> Dataset:
    """Download and ingest a kaggle dataset and store it as a structured dataset to destination

    Note:
        Currently only local source paths and image classification datasets are supported for automatic ingestion.

    Args:
        tag (str): Kaggle dataset tag. Example: `"coloradokb/dandelionimages"` points to https://www.kaggle.com/coloradokb/dandelionimages
        src (str): Local path to where the raw kaggle dataset will be downlaoded to.
        dest (str): Destination path where the structured dataset will be stored. Can be:
            -
            - a Hub cloud path of the form `hub://username/datasetname`. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line)
            - an s3 path of the form `s3://bucketname/path/to/dataset`. Credentials are required in either the environment or passed to the creds argument.
            - a local file system path of the form `./path/to/dataset` or `~/path/to/dataset` or `path/to/dataset`.
            - a memory path of the form `mem://path/to/dataset` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
        exist_ok (bool): If the kaggle dataset was already downloaded and `exist_ok` is True, ingestion will proceed without error.
        images_compression (str): For image classification datasets, this compression will be used for the `images` tensor. If images_compression is "auto", compression will be automatically determined by the most common extension in the directory.
        dest_creds (dict): A dictionary containing credentials used to access the destination path of the dataset.
        kaggle_credentials (dict): A dictionary containing kaggle credentials {"username":"YOUR_USERNAME", "key": "YOUR_KEY"}. If None, environment variables/the kaggle.json file will be used if available.
        progressbar (bool): Enables or disables ingestion progress bar. Set to true by default.
        summary (bool): Generates ingestion summary. Set to true by default.
        **dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function.

    Returns:
        Dataset: New dataset object with structured dataset.

    Raises:
        SamePathException: If the source and destination path are same.
    """

    feature_report_path(
        dest,
        "ingest_kaggle",
        {
            "Images_Compression": images_compression,
            "Exist_Ok": exist_ok,
            "Progressbar": progressbar,
            "Summary": summary,
        },
    )

    if os.path.isdir(src) and os.path.isdir(dest):
        if os.path.samefile(src, dest):
            raise SamePathException(src)

    download_kaggle_dataset(
        tag,
        local_path=src,
        kaggle_credentials=kaggle_credentials,
        exist_ok=exist_ok,
    )

    ds = hub.ingest(
        src=src,
        dest=dest,
        images_compression=images_compression,
        dest_creds=dest_creds,
        progressbar=progressbar,
        summary=summary,
        **dataset_kwargs,
    )

    return ds
def like(path, source, overwrite=False, creds=None, token=None, public=False)

Copies the source dataset's structure to a new location. No samples are copied, only the meta/info for the dataset and it's tensors.

Args

path : str
Path where the new dataset will be created.
source : Union[str, Dataset]
Path or dataset object that will be used as the template for the new dataset.
overwrite : bool
If True and a dataset exists at destination, it will be overwritten. Defaults to False.
creds : dict, optional

A dictionary containing credentials used to access the dataset at the path.

  • If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
  • It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
token : str, optional
Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
public : bool
Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.

Returns

Dataset
New dataset object.
Expand source code
@staticmethod
def like(
    path: str,
    source: Union[str, Dataset],
    overwrite: bool = False,
    creds: Optional[dict] = None,
    token: Optional[str] = None,
    public: bool = False,
) -> Dataset:
    """Copies the `source` dataset's structure to a new location. No samples are copied, only the meta/info for the dataset and it's tensors.

    Args:
        path (str): Path where the new dataset will be created.
        source (Union[str, Dataset]): Path or dataset object that will be used as the template for the new dataset.
        overwrite (bool): If True and a dataset exists at `destination`, it will be overwritten. Defaults to False.
        creds (dict, optional): A dictionary containing credentials used to access the dataset at the path.
            -
            - If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
            - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
        token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
        public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.

    Returns:
        Dataset: New dataset object.
    """

    feature_report_path(path, "like", {"Overwrite": overwrite, "Public": public})

    destination_ds = dataset.empty(
        path,
        creds=creds,
        overwrite=overwrite,
        token=token,
        public=public,
    )
    source_ds = source
    if isinstance(source, str):
        source_ds = dataset.load(source)

    for tensor_name in source_ds.tensors:  # type: ignore
        destination_ds.create_tensor_like(tensor_name, source_ds[tensor_name])

    destination_ds.info.update(source_ds.info.__getstate__())  # type: ignore

    return destination_ds

Utility that stores a link to raw data. Used to add data to a Hub Dataset without copying it.

Note

No data is actually loaded until you try to read the sample from a dataset. There are a few exceptions to this:-

  • If verify=True was specified DURING create_tensor of the tensor to which this is being added, some metadata is read to verify the integrity of the sample.
  • If create_shape_tensor=True was specified DURING create_tensor of the tensor to which this is being added, the shape of the sample is read.
  • If create_sample_info_tensor=True was specified DURING create_tensor of the tensor to which this is being added, the sample info is read.

Examples

>>> ds = hub.dataset("......")

Add the names of the creds you want to use (not needed for http/local urls)

>>> ds.add_creds("MY_S3_KEY")
>>> ds.add_creds("GCS_KEY")

Populate the names added with creds dictionary These creds are only present temporarily and will have to be repopulated on every reload

>>> ds.populate_creds("MY_S3_KEY", {})
>>> ds.populate_creds("GCS_KEY", {})

Create a tensor that can contain links

>>> ds.create_tensor("img", htype="link[image]", verify=True, create_shape_tensor=False, create_sample_info_tensor=False)

Populate the tensor with links

>>> ds.img.append(hub.link("s3://abc/def.jpeg", creds_key="MY_S3_KEY"))
>>> ds.img.append(hub.link("gcs://ghi/jkl.png", creds_key="GCS_KEY"))
>>> ds.img.append(hub.link("https://picsum.photos/200/300")) # doesn't need creds
>>> ds.img.append(hub.link("s3://abc/def.jpeg"))  # will use creds from environment
>>> ds.img.append(hub.link("s3://abc/def.jpeg", creds_key="ENV"))  # this will also use creds from environment

Accessing the data

>>> for i in range(5):
>>> ds.img[i].numpy()

Updating a sample

>>> ds.img[0] = hub.link("./data/cat.jpeg")

Supported file types:

Image: "bmp", "dib", "gif", "ico", "jpeg", "jpeg2000", "pcx", "png", "ppm", "sgi", "tga", "tiff", "webp", "wmf", "xbm"
Audio: "flac", "mp3", "wav"
Video: "mp4", "mkv", "avi"
Dicom: "dcm"

Args

path : str
Path to a supported file.
creds_key : optional, str
The credential key to use to read data for this sample. The actual credentials are fetched from the dataset.

Returns

LinkedSample
LinkedSample object that stores path and creds.
Expand source code
def link(
    path: str,
    creds_key: Optional[str] = None,
) -> LinkedSample:
    """Utility that stores a link to raw data. Used to add data to a Hub Dataset without copying it.

    Note:
        No data is actually loaded until you try to read the sample from a dataset.
        There are a few exceptions to this:-

        - If verify=True was specified DURING create_tensor of the tensor to which this is being added, some metadata is read to verify the integrity of the sample.
        - If create_shape_tensor=True was specified DURING create_tensor of the tensor to which this is being added, the shape of the sample is read.
        - If create_sample_info_tensor=True was specified DURING create_tensor of the tensor to which this is being added, the sample info is read.

    Examples:
        >>> ds = hub.dataset("......")

        Add the names of the creds you want to use (not needed for http/local urls)
        >>> ds.add_creds("MY_S3_KEY")
        >>> ds.add_creds("GCS_KEY")

        Populate the names added with creds dictionary
        These creds are only present temporarily and will have to be repopulated on every reload
        >>> ds.populate_creds("MY_S3_KEY", {})
        >>> ds.populate_creds("GCS_KEY", {})

        Create a tensor that can contain links
        >>> ds.create_tensor("img", htype="link[image]", verify=True, create_shape_tensor=False, create_sample_info_tensor=False)

        Populate the tensor with links
        >>> ds.img.append(hub.link("s3://abc/def.jpeg", creds_key="MY_S3_KEY"))
        >>> ds.img.append(hub.link("gcs://ghi/jkl.png", creds_key="GCS_KEY"))
        >>> ds.img.append(hub.link("https://picsum.photos/200/300")) # doesn't need creds
        >>> ds.img.append(hub.link("s3://abc/def.jpeg"))  # will use creds from environment
        >>> ds.img.append(hub.link("s3://abc/def.jpeg", creds_key="ENV"))  # this will also use creds from environment

        Accessing the data
        >>> for i in range(5):
        >>> ds.img[i].numpy()

        Updating a sample
        >>> ds.img[0] = hub.link("./data/cat.jpeg")

    Supported file types:

        Image: "bmp", "dib", "gif", "ico", "jpeg", "jpeg2000", "pcx", "png", "ppm", "sgi", "tga", "tiff", "webp", "wmf", "xbm"
        Audio: "flac", "mp3", "wav"
        Video: "mp4", "mkv", "avi"
        Dicom: "dcm"

    Args:
        path (str): Path to a supported file.
        creds_key (optional, str): The credential key to use to read data for this sample. The actual credentials are fetched from the dataset.

    Returns:
        LinkedSample: LinkedSample object that stores path and creds.
    """
    return LinkedSample(path, creds_key)
def list(workspace='', token=None)

List all available hub cloud datasets.

Args

workspace : str
Specify user/organization name. If not given, returns a list of all datasets that can be accessed, regardless of what workspace they are in. Otherwise, lists all datasets in the given workspace.
token : str, optional
Activeloop token, used for fetching credentials for Hub datasets. This is optional, tokens are normally autogenerated.

Returns

List of dataset names.

Expand source code
@staticmethod
@hub_reporter.record_call
def list(
    workspace: str = "",
    token: Optional[str] = None,
) -> None:
    """List all available hub cloud datasets.

    Args:
        workspace (str): Specify user/organization name. If not given,
            returns a list of all datasets that can be accessed, regardless of what workspace they are in.
            Otherwise, lists all datasets in the given workspace.
        token (str, optional): Activeloop token, used for fetching credentials for Hub datasets. This is optional, tokens are normally autogenerated.

    Returns:
        List of dataset names.
    """
    client = HubBackendClient(token=token)
    datasets = client.get_datasets(workspace=workspace)
    return datasets
def load(path, read_only=False, memory_cache_size=256, local_cache_size=0, creds=None, token=None, verbose=True)

Loads an existing dataset

Args

path : str

The full path to the dataset. Can be:

  • a Hub cloud path of the form hub://username/datasetname. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line)
  • an s3 path of the form s3://bucketname/path/to/dataset. Credentials are required in either the environment or passed to the creds argument.
  • a local file system path of the form ./path/to/dataset or ~/path/to/dataset or path/to/dataset.
  • a memory path of the form mem://path/to/dataset which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
read_only : bool
Opens dataset in read only mode if this is passed as True. Defaults to False. Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode.
memory_cache_size : int
The size of the memory cache to be used in MB.
local_cache_size : int
The size of the local filesystem cache to be used in MB.
creds : dict, optional

A dictionary containing credentials used to access the dataset at the path.

  • If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
  • It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
token : str, optional
Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
verbose : bool
If True, logs will be printed. Defaults to True.

Returns

Dataset object created using the arguments provided.

Raises

DatasetHandlerError
If a Dataset does not exist at the given path.
AgreementError
When agreement is rejected
Expand source code
@staticmethod
def load(
    path: str,
    read_only: bool = DEFAULT_READONLY,
    memory_cache_size: int = DEFAULT_MEMORY_CACHE_SIZE,
    local_cache_size: int = DEFAULT_LOCAL_CACHE_SIZE,
    creds: Optional[dict] = None,
    token: Optional[str] = None,
    verbose: bool = True,
) -> Dataset:
    """Loads an existing dataset

    Args:
        path (str): The full path to the dataset. Can be:
            -
            - a Hub cloud path of the form `hub://username/datasetname`. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line)
            - an s3 path of the form `s3://bucketname/path/to/dataset`. Credentials are required in either the environment or passed to the creds argument.
            - a local file system path of the form `./path/to/dataset` or `~/path/to/dataset` or `path/to/dataset`.
            - a memory path of the form `mem://path/to/dataset` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
        read_only (bool): Opens dataset in read only mode if this is passed as True. Defaults to False.
            Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode.
        memory_cache_size (int): The size of the memory cache to be used in MB.
        local_cache_size (int): The size of the local filesystem cache to be used in MB.
        creds (dict, optional): A dictionary containing credentials used to access the dataset at the path.
            -
            - If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
            - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'region', 'profile_name' as keys.
        token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
        verbose (bool): If True, logs will be printed. Defaults to True.

    Returns:
        Dataset object created using the arguments provided.

    Raises:
        DatasetHandlerError: If a Dataset does not exist at the given path.
        AgreementError: When agreement is rejected
    """
    if creds is None:
        creds = {}

    feature_report_path(path, "load", {})

    storage, cache_chain = get_storage_and_cache_chain(
        path=path,
        read_only=read_only,
        creds=creds,
        token=token,
        memory_cache_size=memory_cache_size,
        local_cache_size=local_cache_size,
    )

    if not dataset_exists(cache_chain):
        raise DatasetHandlerError(
            f"A Hub dataset does not exist at the given path ({path}). Check the path provided or in case you want to create a new dataset, use hub.empty()."
        )

    try:
        read_only = storage.read_only
        return dataset_factory(
            path=path,
            storage=cache_chain,
            read_only=read_only,
            token=token,
            verbose=verbose,
        )
    except AgreementError as e:
        raise e from None
def read(path, verify=False, creds=None, compression=None, storage=None)

Utility that reads raw data from supported files into hub format.

  • Recompresses data into format required by the tensor if permitted by the tensor htype.
  • Simply copies the data in the file if file format matches sample_compression of the tensor, thus maximizing upload speeds.

Note

No data is actually loaded until you try to get a property of the returned Sample. This is useful for passing along to tensor.append and tensor.extend.

Examples

>>> ds.create_tensor("images", htype="image", sample_compression="jpeg")
>>> ds.images.append(hub.read("path/to/cat.jpg"))
>>> ds.images.shape
(1, 399, 640, 3)
>>> ds.create_tensor("videos", htype="video", sample_compression="mp4")
>>> ds.videos.append(hub.read("path/to/video.mp4"))
>>> ds.videos.shape
(1, 136, 720, 1080, 3)
>>> ds.create_tensor("images", htype="image", sample_compression="jpeg")
>>> ds.images.append(hub.read("https://picsum.photos/200/300"))
>>> ds.images[0].shape
(300, 200, 3)

Supported file types:

Image: "bmp", "dib", "gif", "ico", "jpeg", "jpeg2000", "pcx", "png", "ppm", "sgi", "tga", "tiff", "webp", "wmf", "xbm"
Audio: "flac", "mp3", "wav"
Video: "mp4", "mkv", "avi"
Dicom: "dcm"

Args

path : str
Path to a supported file.
verify : bool
If True, contents of the file are verified.
creds : optional, Dict
Credentials for s3 and gcp for urls.
compression : optional, str
Format of the file (see hub.compression.SUPPORTED_COMPRESSIONS). Only required if path does not have an extension.
storage : optional, StorageProvider
Storage provider to use to retrieve remote files. Useful if multiple files are being read from same storage to minimize overhead of creating a new provider.

Returns

Sample
Sample object. Call sample.array to get the np.ndarray.
Expand source code
def read(
    path: str,
    verify: bool = False,
    creds: Optional[Dict] = None,
    compression: Optional[str] = None,
    storage: Optional[StorageProvider] = None,
) -> Sample:
    """Utility that reads raw data from supported files into hub format.

    - Recompresses data into format required by the tensor if permitted by the tensor htype.
    - Simply copies the data in the file if file format matches sample_compression of the tensor, thus maximizing upload speeds.

    Note:
        No data is actually loaded until you try to get a property of the returned `Sample`. This is useful for passing along to
            `tensor.append` and `tensor.extend`.

    Examples:
        >>> ds.create_tensor("images", htype="image", sample_compression="jpeg")
        >>> ds.images.append(hub.read("path/to/cat.jpg"))
        >>> ds.images.shape
        (1, 399, 640, 3)

        >>> ds.create_tensor("videos", htype="video", sample_compression="mp4")
        >>> ds.videos.append(hub.read("path/to/video.mp4"))
        >>> ds.videos.shape
        (1, 136, 720, 1080, 3)

        >>> ds.create_tensor("images", htype="image", sample_compression="jpeg")
        >>> ds.images.append(hub.read("https://picsum.photos/200/300"))
        >>> ds.images[0].shape
        (300, 200, 3)

    Supported file types:

        Image: "bmp", "dib", "gif", "ico", "jpeg", "jpeg2000", "pcx", "png", "ppm", "sgi", "tga", "tiff", "webp", "wmf", "xbm"
        Audio: "flac", "mp3", "wav"
        Video: "mp4", "mkv", "avi"
        Dicom: "dcm"

    Args:
        path (str): Path to a supported file.
        verify (bool):  If True, contents of the file are verified.
        creds (optional, Dict): Credentials for s3 and gcp for urls.
        compression (optional, str): Format of the file (see `hub.compression.SUPPORTED_COMPRESSIONS`). Only required if path does not have an extension.
        storage (optional, StorageProvider): Storage provider to use to retrieve remote files. Useful if multiple files are being read from same storage to minimize overhead of creating a new provider.

    Returns:
        Sample: Sample object. Call `sample.array` to get the `np.ndarray`.
    """
    return Sample(
        path, verify=verify, compression=compression, creds=creds, storage=storage
    )
def rename(old_path, new_path, creds=None, token=None)

Renames dataset at old_path to new_path.

Examples

hub.rename("hub://username/image_ds", "hub://username/new_ds")
hub.rename("s3://mybucket/my_ds", "s3://mybucket/renamed_ds")

Args

old_path : str
The path to the dataset to be renamed.
new_path : str
Path to the dataset after renaming.
creds : dict, optional

A dictionary containing credentials used to access the dataset at the path.

  • This takes precedence over credentials present in the environment. Currently only works with s3 paths.
  • It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url' and 'region' as keys.
token : str, optional
Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.

Returns

Dataset object after renaming.

Raises

DatasetHandlerError
If a Dataset does not exist at the given path or if new path is to a different directory.
Expand source code
@staticmethod
def rename(
    old_path: str,
    new_path: str,
    creds: Optional[dict] = None,
    token: Optional[str] = None,
) -> Dataset:
    """Renames dataset at `old_path` to `new_path`.
    Examples:
        ```
        hub.rename("hub://username/image_ds", "hub://username/new_ds")
        hub.rename("s3://mybucket/my_ds", "s3://mybucket/renamed_ds")
        ```

    Args:
        old_path (str): The path to the dataset to be renamed.
        new_path (str): Path to the dataset after renaming.
        creds (dict, optional): A dictionary containing credentials used to access the dataset at the path.
            -
            - This takes precedence over credentials present in the environment. Currently only works with s3 paths.
            - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url' and 'region' as keys.
        token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.

    Returns:
        Dataset object after renaming.

    Raises:
        DatasetHandlerError: If a Dataset does not exist at the given path or if new path is to a different directory.
    """
    if creds is None:
        creds = {}

    feature_report_path(old_path, "rename", {})

    ds = hub.load(old_path, verbose=False, token=token, creds=creds)
    ds.rename(new_path)

    return ds  # type: ignore

Classes

class Dataset (storage, index=None, group_index='', read_only=False, public=False, token=None, verbose=True, version_state=None, path=None, is_iteration=False, link_creds=None, **kwargs)

Initializes a new or existing dataset.

Args

storage : LRUCache
The storage provider used to access the dataset.
index : Index, Optional
The Index object restricting the view of this dataset's tensors.
group_index : str
Name of the group this dataset instance represents.
read_only : bool
Opens dataset in read only mode if this is passed as True. Defaults to False. Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode.
public : bool, Optional
Applied only if storage is Hub cloud storage and a new Dataset is being created. Defines if the dataset will have public access.
token : str, Optional
Activeloop token, used for fetching credentials for Hub datasets. This is Optional, tokens are normally autogenerated.
verbose : bool
If True, logs will be printed. Defaults to True.
version_state : Dict[str, Any], Optional
The version state of the dataset, includes commit_id, commit_node, branch, branch_commit_map and commit_node_map.
path
The path to the dataset.
is_iteration : bool
If this Dataset is being used as an iterator.
link_creds : LinkCreds, Optional
The LinkCreds object used to access tensors that have external data linked to them.
**kwargs
Passing subclass variables through without errors.

Raises

ValueError
If an existing local path is given, it must be a directory.
ImproperDatasetInitialization
Exactly one argument out of 'path' and 'storage' needs to be specified. This is raised if none of them are specified or more than one are specifed.
InvalidHubPathException
If a Hub cloud path (path starting with hub://) is specified and it isn't of the form hub://username/datasetname.
AuthorizationException
If a Hub cloud path (path starting with hub://) is specified and the user doesn't have access to the dataset.
PathNotEmptyException
If the path to the dataset doesn't contain a Hub dataset and is also not empty.
Expand source code
class Dataset:
    def __init__(
        self,
        storage: LRUCache,
        index: Optional[Index] = None,
        group_index: str = "",
        read_only: bool = DEFAULT_READONLY,
        public: Optional[bool] = False,
        token: Optional[str] = None,
        verbose: bool = True,
        version_state: Optional[Dict[str, Any]] = None,
        path: Optional[str] = None,
        is_iteration: bool = False,
        link_creds=None,
        **kwargs,
    ):
        """Initializes a new or existing dataset.

        Args:
            storage (LRUCache): The storage provider used to access the dataset.
            index (Index, Optional): The Index object restricting the view of this dataset's tensors.
            group_index (str): Name of the group this dataset instance represents.
            read_only (bool): Opens dataset in read only mode if this is passed as True. Defaults to False.
                Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode.
            public (bool, Optional): Applied only if storage is Hub cloud storage and a new Dataset is being created. Defines if the dataset will have public access.
            token (str, Optional): Activeloop token, used for fetching credentials for Hub datasets. This is Optional, tokens are normally autogenerated.
            verbose (bool): If True, logs will be printed. Defaults to True.
            version_state (Dict[str, Any], Optional): The version state of the dataset, includes commit_id, commit_node, branch, branch_commit_map and commit_node_map.
            path: The path to the dataset.
            is_iteration (bool): If this Dataset is being used as an iterator.
            link_creds (LinkCreds, Optional): The LinkCreds object used to access tensors that have external data linked to them.
            **kwargs: Passing subclass variables through without errors.


        Raises:
            ValueError: If an existing local path is given, it must be a directory.
            ImproperDatasetInitialization: Exactly one argument out of 'path' and 'storage' needs to be specified.
                This is raised if none of them are specified or more than one are specifed.
            InvalidHubPathException: If a Hub cloud path (path starting with `hub://`) is specified and it isn't of the form `hub://username/datasetname`.
            AuthorizationException: If a Hub cloud path (path starting with `hub://`) is specified and the user doesn't have access to the dataset.
            PathNotEmptyException: If the path to the dataset doesn't contain a Hub dataset and is also not empty.
        """
        d: Dict[str, Any] = {}
        d["_client"] = d["org_id"] = d["ds_name"] = None
        # uniquely identifies dataset
        d["path"] = path or get_path_from_storage(storage)
        d["storage"] = storage
        d["_read_only"] = read_only
        d["_locked_out"] = False  # User requested write access but was denied
        d["is_iteration"] = is_iteration
        d["is_first_load"] = version_state is None
        d["index"] = index or Index()
        d["group_index"] = group_index
        d["_token"] = token
        d["public"] = public
        d["verbose"] = verbose
        d["version_state"] = version_state or {}
        d["link_creds"] = link_creds
        d["_info"] = None
        d["_ds_diff"] = None
        self.__dict__.update(d)
        self._set_derived_attributes()
        self._first_load_init()
        self._initial_autoflush: List[
            bool
        ] = []  # This is a stack to support nested with contexts
        self._is_filtered_view = False
        self._view_info = None

    def _lock_lost_handler(self):
        """This is called when lock is acquired but lost later on due to slow update."""
        self.read_only = True
        always_warn(
            "Unable to update dataset lock as another machine has locked it for writing. Switching to read only mode."
        )
        self._locked_out = True

    def __enter__(self):
        self._initial_autoflush.append(self.storage.autoflush)
        self.storage.autoflush = False
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.storage.autoflush = self._initial_autoflush.pop()
        if not self._read_only:
            self.storage.maybe_flush()

    @property
    def num_samples(self) -> int:
        """Returns the length of the smallest tensor.
        Ignores any applied indexing and returns the total length.
        """
        return min(
            map(
                len,
                filter(
                    lambda t: t.key not in self.meta.hidden_tensors,
                    self.version_state["full_tensors"].values(),
                ),
            ),
            default=0,
        )

    @property
    def meta(self) -> DatasetMeta:
        """Returns the metadata of the dataset."""
        return self.version_state["meta"]

    def __len__(self):
        """Returns the length of the smallest tensor"""
        tensor_lengths = [len(tensor) for tensor in self.tensors.values()]
        return min(tensor_lengths, default=0)

    def __getstate__(self) -> Dict[str, Any]:
        """Returns a dict that can be pickled and used to restore this dataset.

        Note:
            Pickling a dataset does not copy the dataset, it only saves attributes that can be used to restore the dataset.
            If you pickle a local dataset and try to access it on a machine that does not have the data present, the dataset will not work.
        """
        if self.path.startswith("mem://"):
            raise MemoryDatasetCanNotBePickledError
        keys = [
            "path",
            "_read_only",
            "index",
            "group_index",
            "public",
            "storage",
            "_token",
            "verbose",
            "version_state",
            "org_id",
            "ds_name",
            "_is_filtered_view",
            "_view_info",
        ]
        state = {k: getattr(self, k) for k in keys}
        state["link_creds"] = self.link_creds
        return state

    def __setstate__(self, state: Dict[str, Any]):
        """Restores dataset from a pickled state.

        Args:
            state (dict): The pickled state used to restore the dataset.
        """
        state["is_first_load"] = True
        state["_info"] = None
        state["is_iteration"] = False
        self.__dict__.update(state)

        # clear cache while restoring
        self.storage.clear_cache_without_flush()
        self._initial_autoflush = []
        self.is_first_load = True
        self._info = None
        self._ds_diff = None
        self._set_derived_attributes(verbose=False)

    def __getitem__(
        self,
        item: Union[
            str, int, slice, List[int], Tuple[Union[int, slice, Tuple[int]]], Index
        ],
        is_iteration: bool = False,
    ):
        if isinstance(item, str):
            fullpath = posixpath.join(self.group_index, item)
            tensor = self._get_tensor_from_root(fullpath)
            if tensor is not None:
                return tensor[self.index]
            elif self._has_group_in_root(fullpath):
                return self.__class__(
                    storage=self.storage,
                    index=self.index,
                    group_index=posixpath.join(self.group_index, item),
                    read_only=self.read_only,
                    token=self._token,
                    verbose=False,
                    version_state=self.version_state,
                    path=self.path,
                    link_creds=self.link_creds,
                )
            elif "/" in item:
                splt = posixpath.split(item)
                return self[splt[0]][splt[1]]
            else:
                raise TensorDoesNotExistError(item)
        elif isinstance(item, (int, slice, list, tuple, Index)):
            return self.__class__(
                storage=self.storage,
                index=self.index[item],
                group_index=self.group_index,
                read_only=self._read_only,
                token=self._token,
                verbose=False,
                version_state=self.version_state,
                path=self.path,
                is_iteration=is_iteration,
                link_creds=self.link_creds,
            )
        else:
            raise InvalidKeyTypeError(item)

    @invalid_view_op
    @hub_reporter.record_call
    def create_tensor(
        self,
        name: str,
        htype: str = UNSPECIFIED,
        dtype: Union[str, np.dtype] = UNSPECIFIED,
        sample_compression: str = UNSPECIFIED,
        chunk_compression: str = UNSPECIFIED,
        hidden: bool = False,
        create_sample_info_tensor: bool = True,
        create_shape_tensor: bool = True,
        create_id_tensor: bool = True,
        verify: bool = False,
        **kwargs,
    ):
        """Creates a new tensor in the dataset.

        Examples:
            ```
            # create dataset
            ds = hub.dataset("path/to/dataset")

            # create tensors
            ds.create_tensor("images", htype="image", sample_compression="jpg")
            ds.create_tensor("videos", htype="video", sample_compression="mp4")
            ds.create_tensor("data")

            # append data
            ds.images.append(np.ones((400, 400, 3), dtype='uint8'))
            ds.videos.append(hub.read("videos/sample_video.mp4"))
            ds.data.append(np.zeros((100, 100, 2)))
            ```

        Args:
            name (str): The name of the tensor to be created.
            htype (str): The class of data for the tensor.
                The defaults for other parameters are determined in terms of this value.
                For example, `htype="image"` would have `dtype` default to `uint8`.
                These defaults can be overridden by explicitly passing any of the other parameters to this function.
                May also modify the defaults for other parameters.
            dtype (str): Optionally override this tensor's `dtype`. All subsequent samples are required to have this `dtype`.
            sample_compression (str): All samples will be compressed in the provided format. If `None`, samples are uncompressed.
            chunk_compression (str): All chunks will be compressed in the provided format. If `None`, chunks are uncompressed.
            **kwargs: `htype` defaults can be overridden by passing any of the compatible parameters.
                To see all `htype`s and their correspondent arguments, check out `hub/htypes.py`.
            hidden (bool): If True, the tensor will be hidden from ds.tensors but can still be accessed via ds[tensor_name]
            create_sample_info_tensor (bool): If True, meta data of individual samples will be stored in a hidden tensor. This data can be accessed via `tensor[i].sample_info`.
            create_shape_tensor (bool): If True, an associated tensor containing shapes of each sample will be created.
            create_id_tensor (bool): If True, an associated tensor containing unique ids for each sample will be created.
                This is useful for merge operations.
            verify (bool): Valid only for link htypes. If True, all links will be verified before they are added to the tensor.

        Returns:
            The new tensor, which can also be accessed by `self[name]`.

        Raises:
            TensorAlreadyExistsError: Duplicate tensors are not allowed.
            TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed.
            InvalidTensorNameError: If `name` is in dataset attributes.
            NotImplementedError: If trying to override `chunk_compression`.
            TensorMetaInvalidHtype: If invalid htype is specified.
            ValueError: If an illegal argument is specified.
        """
        # if not the head node, checkout to an auto branch that is newly created
        auto_checkout(self)

        name = filter_name(name, self.group_index)
        key = self.version_state["tensor_names"].get(name)
        if key:
            raise TensorAlreadyExistsError(name)
        else:
            if name in self.version_state["full_tensors"]:
                key = f"{name}_{uuid.uuid4().hex[:4]}"
            else:
                key = name

        if name in self._groups:
            raise TensorGroupAlreadyExistsError(name)

        tensor_name = posixpath.split(name)[1]
        if not tensor_name or tensor_name in dir(self):
            raise InvalidTensorNameError(tensor_name)

        is_sequence, is_link, htype = parse_complex_htype(htype)
        kwargs["is_sequence"] = kwargs.get("is_sequence") or is_sequence
        kwargs["is_link"] = kwargs.get("is_link") or is_link
        kwargs["verify"] = verify
        if is_link and (
            sample_compression != UNSPECIFIED or chunk_compression != UNSPECIFIED
        ):
            warnings.warn(
                "Chunk_compression and sample_compression aren't valid for tensors with linked data. Ignoring these arguments."
            )
            sample_compression = UNSPECIFIED
            chunk_compression = UNSPECIFIED

        if not self._is_root():
            return self.root.create_tensor(
                key, htype, dtype, sample_compression, chunk_compression, **kwargs
            )

        if "/" in name:
            self._create_group(posixpath.split(name)[0])

        # Seperate meta and info

        htype_config = HTYPE_CONFIGURATIONS.get(htype, {})
        info_keys = htype_config.copy().pop("_info", [])
        info_kwargs = {}
        meta_kwargs = {}
        for k, v in kwargs.items():
            if k in info_keys:
                verify_htype_key_value(htype, k, v)
                info_kwargs[k] = v
            else:
                meta_kwargs[k] = v

        # Set defaults
        for k in info_keys:
            if k not in info_kwargs:
                info_kwargs[k] = htype_config[k]

        create_tensor(
            key,
            self.storage,
            htype=htype,
            dtype=dtype,
            sample_compression=sample_compression,
            chunk_compression=chunk_compression,
            version_state=self.version_state,
            hidden=hidden,
            **meta_kwargs,
        )
        meta: DatasetMeta = self.meta
        ffw_dataset_meta(meta)
        meta.add_tensor(name, key, hidden=hidden)
        tensor = Tensor(key, self)  # type: ignore
        tensor.meta.name = name
        self.version_state["full_tensors"][key] = tensor
        self.version_state["tensor_names"][name] = key
        if info_kwargs:
            tensor.info.update(info_kwargs)
        self.storage.maybe_flush()
        if create_sample_info_tensor and htype in ("image", "audio", "video", "dicom"):
            self._create_sample_info_tensor(name)
        if create_shape_tensor and htype not in ("text", "json"):
            self._create_sample_shape_tensor(name, htype=htype)
        if create_id_tensor:
            self._create_sample_id_tensor(name)
        return tensor

    def _create_sample_shape_tensor(self, tensor: str, htype: str):
        shape_tensor = get_sample_shape_tensor_key(tensor)
        self.create_tensor(
            shape_tensor,
            hidden=True,
            create_id_tensor=False,
            create_sample_info_tensor=False,
            create_shape_tensor=False,
            max_chunk_size=SAMPLE_INFO_TENSOR_MAX_CHUNK_SIZE,
        )
        f = "append_len" if htype == "list" else "append_shape"
        self._link_tensors(
            tensor, shape_tensor, append_f=f, update_f=f, flatten_sequence=True
        )

    def _create_sample_id_tensor(self, tensor: str):
        id_tensor = get_sample_id_tensor_key(tensor)
        self.create_tensor(
            id_tensor,
            hidden=True,
            create_id_tensor=False,
            create_sample_info_tensor=False,
            create_shape_tensor=False,
        )
        self._link_tensors(
            tensor,
            id_tensor,
            append_f="append_id",
            flatten_sequence=False,
        )

    def _create_sample_info_tensor(self, tensor: str):
        sample_info_tensor = get_sample_info_tensor_key(tensor)
        self.create_tensor(
            sample_info_tensor,
            htype="json",
            max_chunk_size=SAMPLE_INFO_TENSOR_MAX_CHUNK_SIZE,
            hidden=True,
            create_id_tensor=False,
            create_sample_info_tensor=False,
            create_shape_tensor=False,
        )
        self._link_tensors(
            tensor,
            sample_info_tensor,
            "append_info",
            "update_info",
            flatten_sequence=True,
        )

    def _hide_tensor(self, tensor: str):
        self._tensors()[tensor].meta.set_hidden(True)
        self.meta._hide_tensor(tensor)
        self.storage.maybe_flush()

    @invalid_view_op
    @hub_reporter.record_call
    def delete_tensor(self, name: str, large_ok: bool = False):
        """Delete a tensor from the dataset.

        Examples:
            ```
            ds.delete_tensor("images/cats")
            ```

        Args:
            name (str): The name of tensor to be deleted.
            large_ok (bool): Delete tensors larger than 1GB. Disabled by default.

        Returns:
            None

        Raises:
            TensorDoesNotExistError: If tensor of name `name` does not exist in the dataset.
        """
        auto_checkout(self)

        name = filter_name(name, self.group_index)
        key = self.version_state["tensor_names"].get(name)

        if not key:
            raise TensorDoesNotExistError(name)

        if not tensor_exists(key, self.storage, self.version_state["commit_id"]):
            raise TensorDoesNotExistError(name)

        if not self._is_root():
            return self.root.delete_tensor(name, large_ok)

        if not large_ok:
            chunk_engine = self.version_state["full_tensors"][key].chunk_engine
            size_approx = chunk_engine.num_samples * chunk_engine.min_chunk_size
            if size_approx > hub.constants.DELETE_SAFETY_SIZE:
                logger.info(
                    f"Tensor {name} was too large to delete. Try again with large_ok=True."
                )
                return

        with self:
            meta = self.meta
            key = self.version_state["tensor_names"].pop(name)
            if key not in meta.hidden_tensors:
                tensor_diff = Tensor(key, self).chunk_engine.commit_diff
                # if tensor was created in this commit, there's no diff for deleting it.
                if not tensor_diff.created:
                    self._dataset_diff.tensor_deleted(name)
            delete_tensor(key, self)
            self.version_state["full_tensors"].pop(key)
            ffw_dataset_meta(meta)
            meta.delete_tensor(name)
            self.version_state["meta"] = meta

        for t_name in [
            func(name)
            for func in (
                get_sample_id_tensor_key,
                get_sample_info_tensor_key,
                get_sample_shape_tensor_key,
            )
        ]:
            t_key = self.meta.tensor_names.get(t_name)
            if t_key and tensor_exists(
                t_key, self.storage, self.version_state["commit_id"]
            ):
                self.delete_tensor(t_name)

        self.storage.maybe_flush()

    @invalid_view_op
    @hub_reporter.record_call
    def delete_group(self, name: str, large_ok: bool = False):
        """Delete a tensor group from the dataset.

        Examples:
            ```
            ds.delete_group("images/dogs")
            ```

        Args:
            name (str): The name of tensor group to be deleted.
            large_ok (bool): Delete tensor groups larger than 1GB. Disabled by default.

        Returns:
            None

        Raises:
            TensorGroupDoesNotExistError: If tensor group of name `name` does not exist in the dataset.
        """
        auto_checkout(self)

        full_path = filter_name(name, self.group_index)

        if full_path not in self._groups:
            raise TensorGroupDoesNotExistError(name)

        if not self._is_root():
            return self.root.delete_group(full_path, large_ok)

        if not large_ok:
            size_approx = self[name].size_approx()
            if size_approx > hub.constants.DELETE_SAFETY_SIZE:
                logger.info(
                    f"Group {name} was too large to delete. Try again with large_ok=True."
                )
                return

        with self:
            meta = self.version_state["meta"]
            ffw_dataset_meta(meta)
            tensors = [
                posixpath.join(name, tensor)
                for tensor in self[name]._all_tensors_filtered(include_hidden=True)
            ]
            meta.delete_group(name)
            for tensor in tensors:
                key = self.version_state["tensor_names"][tensor]
                delete_tensor(key, self)
                self.version_state["tensor_names"].pop(tensor)
                self.version_state["full_tensors"].pop(key)

        self.storage.maybe_flush()

    @invalid_view_op
    @hub_reporter.record_call
    def create_tensor_like(self, name: str, source: "Tensor") -> "Tensor":
        """Copies the `source` tensor's meta information and creates a new tensor with it. No samples are copied, only the meta/info for the tensor is.

        Examples:
            ```
            ds.create_tensor_like("cats", ds["images"])
            ```

        Args:
            name (str): Name for the new tensor.
            source (Tensor): Tensor who's meta/info will be copied. May or may not be contained in the same dataset.

        Returns:
            Tensor: New Tensor object.
        """

        info = source.info.__getstate__().copy()
        meta = source.meta.__getstate__().copy()
        del meta["min_shape"]
        del meta["max_shape"]
        del meta["length"]
        del meta["version"]
        del meta["name"]

        destination_tensor = self.create_tensor(name, **meta)
        destination_tensor.info.update(info)
        return destination_tensor

    def _rename_tensor(self, name, new_name):
        tensor = self[name]
        tensor.meta.name = new_name
        key = self.version_state["tensor_names"].pop(name)
        meta = self.meta
        if key not in meta.hidden_tensors:
            tensor_diff = tensor.chunk_engine.commit_diff
            # if tensor was created in this commit, tensor name has to be updated without adding it to diff.
            if not tensor_diff.created:
                self._dataset_diff.tensor_renamed(name, new_name)
        self.version_state["tensor_names"][new_name] = key
        ffw_dataset_meta(meta)
        meta.rename_tensor(name, new_name)

        for func in (
            get_sample_id_tensor_key,
            get_sample_info_tensor_key,
            get_sample_shape_tensor_key,
        ):
            t_old, t_new = map(func, (name, new_name))
            t_key = self.meta.tensor_names.get(t_old)
            if t_key and tensor_exists(
                t_key, self.storage, self.version_state["commit_id"]
            ):
                self._rename_tensor(t_old, t_new)

        return tensor

    @hub_reporter.record_call
    def rename_tensor(self, name: str, new_name: str) -> "Tensor":
        """Renames tensor with name `name` to `new_name`

        Args:
            name (str): Name of tensor to be renamed.
            new_name (str): New name of tensor.

        Returns:
            Tensor: Renamed tensor.

        Raises:
            TensorDoesNotExistError: If tensor of name `name` does not exist in the dataset.
            TensorAlreadyExistsError: Duplicate tensors are not allowed.
            TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed.
            InvalidTensorNameError: If `new_name` is in dataset attributes.
            RenameError: If `new_name` points to a group different from `name`.
        """
        auto_checkout(self)

        if name not in self._tensors():
            raise TensorDoesNotExistError(name)

        name = filter_name(name, self.group_index)
        new_name = filter_name(new_name, self.group_index)

        if posixpath.split(name)[0] != posixpath.split(new_name)[0]:
            raise RenameError("New name of tensor cannot point to a different group")

        if new_name in self.version_state["tensor_names"]:
            raise TensorAlreadyExistsError(new_name)

        if new_name in self._groups:
            raise TensorGroupAlreadyExistsError(new_name)

        new_tensor_name = posixpath.split(new_name)[1]
        if not new_tensor_name or new_tensor_name in dir(self):
            raise InvalidTensorNameError(new_name)

        tensor = self.root._rename_tensor(name, new_name)

        self.storage.maybe_flush()
        return tensor

    @hub_reporter.record_call
    def rename_group(self, name: str, new_name: str) -> None:
        """Renames group with name `name` to `new_name`

        Args:
            name (str): Name of group to be renamed.
            new_name (str): New name of group.

        Raises:
            TensorGroupDoesNotExistError: If tensor group of name `name` does not exist in the dataset.
            TensorAlreadyExistsError: Duplicate tensors are not allowed.
            TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed.
            InvalidTensorGroupNameError: If `name` is in dataset attributes.
            RenameError: If `new_name` points to a group different from `name`.
        """
        auto_checkout(self)

        name = filter_name(name, self.group_index)
        new_name = filter_name(new_name, self.group_index)

        if name not in self._groups:
            raise TensorGroupDoesNotExistError(name)

        if posixpath.split(name)[0] != posixpath.split(new_name)[0]:
            raise RenameError("Names does not match.")

        if new_name in self.version_state["tensor_names"]:
            raise TensorAlreadyExistsError(new_name)

        if new_name in self._groups:
            raise TensorGroupAlreadyExistsError(new_name)

        new_tensor_name = posixpath.split(new_name)[1]
        if not new_tensor_name or new_tensor_name in dir(self):
            raise InvalidTensorGroupNameError(new_name)

        meta = self.meta
        meta.rename_group(name, new_name)

        root = self.root
        for tensor in filter(
            lambda x: x.startswith(name),
            map(lambda y: y.meta.name or y.key, self.tensors.values()),
        ):
            root._rename_tensor(
                tensor,
                posixpath.join(new_name, posixpath.relpath(tensor, name)),
            )

        self.storage.maybe_flush()

    __getattr__ = __getitem__

    def __setattr__(self, name: str, value):
        if isinstance(value, (np.ndarray, np.generic)):
            raise TypeError(
                "Setting tensor attributes directly is not supported. To add a tensor, use the `create_tensor` method."
                + "To add data to a tensor, use the `append` and `extend` methods."
            )
        else:
            return super().__setattr__(name, value)

    def __iter__(self):
        for i in range(len(self)):
            yield self.__getitem__(i, is_iteration=True)

    def _load_version_info(self):
        """Loads data from version_control_file otherwise assume it doesn't exist and load all empty"""
        if self.version_state:
            return

        branch = "main"
        version_state = {"branch": branch}
        try:
            version_info = load_version_info(self.storage)
            version_state["branch_commit_map"] = version_info["branch_commit_map"]
            version_state["commit_node_map"] = version_info["commit_node_map"]
            commit_id = version_state["branch_commit_map"][branch]
            version_state["commit_id"] = commit_id
            version_state["commit_node"] = version_state["commit_node_map"][commit_id]
        except Exception:
            version_state["branch_commit_map"] = {}
            version_state["commit_node_map"] = {}
            # used to identify that this is the first commit so its data will not be in similar directory structure to the rest
            commit_id = FIRST_COMMIT_ID
            commit_node = CommitNode(branch, commit_id)
            version_state["commit_id"] = commit_id
            version_state["commit_node"] = commit_node
            version_state["branch_commit_map"][branch] = commit_id
            version_state["commit_node_map"][commit_id] = commit_node
        # keeps track of the full unindexed tensors
        version_state["full_tensors"] = {}
        version_state["tensor_names"] = {}
        self.__dict__["version_state"] = version_state

    def _load_link_creds(self):
        if self.link_creds is not None:
            return

        link_creds_key = get_dataset_linked_creds_key()
        try:
            data_bytes = self.storage[link_creds_key]
        except KeyError:
            data_bytes = None
        if data_bytes is None:
            link_creds = LinkCreds()
        else:
            link_creds = LinkCreds.frombuffer(data_bytes)
        self.link_creds = link_creds

    def _lock(self, err=False):
        storage = get_base_storage(self.storage)

        if isinstance(storage, tuple(_LOCKABLE_STORAGES)) and (
            not self.read_only or self._locked_out
        ):
            try:
                # temporarily disable read only on base storage, to try to acquire lock, if exception, it will be again made readonly
                storage.disable_readonly()
                lock_dataset(
                    self,
                    lock_lost_callback=self._lock_lost_handler,
                )
            except LockedException as e:
                self.read_only = True
                self.__dict__["_locked_out"] = True
                if err:
                    raise e
                always_warn(
                    "Checking out dataset in read only mode as another machine has locked this version for writing."
                )
                return False
        return True

    def _unlock(self):
        unlock_dataset(self)

    def __del__(self):
        try:
            self._unlock()
        except Exception:  # python shutting down
            pass

    def commit(self, message: Optional[str] = None, allow_empty=False) -> str:
        """Stores a snapshot of the current state of the dataset.

        Note:
            - Commiting from a non-head node in any branch, will lead to an auto checkout to a new branch.
            - This same behaviour will happen if new samples are added or existing samples are updated from a non-head node.

        Args:
            message (str, Optional): Used to describe the commit.
            allow_empty (bool): If True, commit even if there are no changes

        Returns:
            str: the commit id of the stored commit that can be used to access the snapshot.

        Raises:
            Exception: if dataset is a filtered view.
            EmptyCommitError: if there are no changes and user does not forced to commit unchanged data
        """
        if not allow_empty and not self.has_head_changes:
            raise EmptyCommitError(
                "There are no changes, commit is not done. Try again with allow_empty=True."
            )

        return self._commit(message)

    @hub_reporter.record_call
    def merge(
        self,
        target_id: str,
        conflict_resolution: Optional[str] = None,
        delete_removed_tensors: bool = False,
        force: bool = False,
    ):
        """Merges the target_id into the current dataset.

        Args:
            target_id (str): The commit_id or branch to merge.
            conflict_resolution (str, Optional): The strategy to use to resolve merge conflicts.
                -
                - Conflicts are scenarios where both the current dataset and the target id have made changes to the same sample/s since their common ancestor.
                - Must be one of the following
                    - None - this is the default value, will raise an exception if there are conflicts.
                    - "ours" - during conflicts, values from the current dataset will be used.
                    - "theirs" - during conflicts, values from target id will be used.
            delete_removed_tensors (bool): If true, deleted tensors will be deleted from the dataset.
            force (bool): Forces merge.
                -
                - `force` = True will have these effects in the following cases of merge conflicts:
                    - If tensor is renamed on target but is missing from HEAD, renamed tensor will be registered as a new tensor on current branch.
                    - If tensor is renamed on both target and current branch, tensor on target will be registered as a new tensor on current branch.
                    - If tensor is renamed on target and a new tensor of the new name was created on the current branch, they will be merged.

        Raises:
            Exception: if dataset is a filtered view.
            ValueError: if the conflict resolution strategy is not one of the None, "ours", or "theirs".
        """
        if self._is_filtered_view:
            raise Exception(
                "Cannot perform version control operations on a filtered dataset view."
            )

        if conflict_resolution not in [None, "ours", "theirs"]:
            raise ValueError(
                f"conflict_resolution must be one of None, 'ours', or 'theirs'. Got {conflict_resolution}"
            )

        try_flushing(self)

        self._initial_autoflush.append(self.storage.autoflush)
        self.storage.autoflush = False

        merge(self, target_id, conflict_resolution, delete_removed_tensors, force)

        self.storage.autoflush = self._initial_autoflush.pop()

    def _commit(self, message: Optional[str] = None, hash: Optional[str] = None) -> str:
        if self._is_filtered_view:
            raise Exception(
                "Cannot perform version control operations on a filtered dataset view."
            )

        try_flushing(self)

        self._initial_autoflush.append(self.storage.autoflush)
        self.storage.autoflush = False
        try:
            self._unlock()
            commit(self, message, hash)
            self._lock()
        finally:
            self.storage.autoflush = self._initial_autoflush.pop()
        self._info = None
        self._ds_diff = None

        # do not store commit message
        hub_reporter.feature_report(feature_name="commit", parameters={})

        return self.commit_id  # type: ignore

    def checkout(self, address: str, create: bool = False) -> Optional[str]:
        """Checks out to a specific commit_id or branch. If `create = True`, creates a new branch with name as address.

        Note:
            Checkout from a head node in any branch that contains uncommitted data will lead to an auto commit before the checkout.

        Args:
            address (str): The commit_id or branch to checkout to.
            create (bool): If True, creates a new branch with name as address.

        Returns:
            str: The commit_id of the dataset after checkout.

        Raises:
            Exception: If dataset is a filtered view.
        """
        return self._checkout(address, create)

    def _checkout(
        self, address: str, create: bool = False, hash: Optional[str] = None
    ) -> Optional[str]:
        if self._is_filtered_view:
            raise Exception(
                "Cannot perform version control operations on a filtered dataset view."
            )
        if self._locked_out:
            self.storage.disable_readonly()
            self._read_only = False
            base_storage = get_base_storage(self.storage)
            base_storage.disable_readonly()
        try_flushing(self)
        self._initial_autoflush.append(self.storage.autoflush)
        self.storage.autoflush = False
        err = False
        try:
            self._unlock()
            checkout(self, address, create, hash)
        except Exception as e:
            err = True
            if self._locked_out:
                self.storage.enable_readonly()
                self._read_only = True
                base_storage.enable_readonly()
            raise e
        finally:
            if not (err and self._locked_out):
                self._lock()
            self.storage.autoflush = self._initial_autoflush.pop()
        self._info = None
        self._ds_diff = None

        # do not store address
        hub_reporter.feature_report(
            feature_name="checkout", parameters={"Create": str(create)}
        )
        commit_node = self.version_state["commit_node"]
        if self.verbose:
            warn_node_checkout(commit_node, create)

        return self.commit_id

    @hub_reporter.record_call
    def log(self):
        """Displays the details of all the past commits."""
        commit_node = self.version_state["commit_node"]
        print("---------------\nHub Version Log\n---------------\n")
        print(f"Current Branch: {self.version_state['branch']}")
        if self.has_head_changes:
            print("** There are uncommitted changes on this branch.")
        print()
        while commit_node:
            if not commit_node.is_head_node:
                print(f"{commit_node}\n")
            commit_node = commit_node.parent

    @hub_reporter.record_call
    def diff(
        self, id_1: Optional[str] = None, id_2: Optional[str] = None, as_dict=False
    ) -> Optional[Dict]:
        """Returns/displays the differences between commits/branches.

        For each tensor this contains information about the sample indexes that were added/modified as well as whether the tensor was created.

        Args:
            id_1 (str, Optional): The first commit_id or branch name.
            id_2 (str, Optional): The second commit_id or branch name.
            as_dict (bool, Optional): If True, returns a dictionary of the differences instead of printing them.
                This dictionary will have two keys - "tensor" and "dataset" which represents tensor level and dataset level changes, respectively.
                Defaults to False.

        Note:
            - If both `id_1` and `id_2` are None, the differences between the current state and the previous commit will be calculated. If you're at the head of the branch, this will show the uncommitted changes, if any.
            - If only `id_1` is provided, the differences between the current state and id_1 will be calculated. If you're at the head of the branch, this will take into account the uncommitted changes, if any.
            - If only `id_2` is provided, a ValueError will be raised.
            - If both `id_1` and `id_2` are provided, the differences between `id_1` and `id_2` will be calculated.

        Returns:
            Dict: The differences between the commits/branches if as_dict is True.

                - If `id_1` and `id_2` are None, a dictionary containing the differences between the current state and the previous commit will be returned.
                - If only `id_1` is provided, a dictionary containing the differences in the current state and `id_1` respectively will be returned.
                - If only `id_2` is provided, a ValueError will be raised.
                - If both `id_1` and `id_2` are provided, a dictionary containing the differences in `id_1` and `id_2` respectively will be returned.
            None: If as_dict is False.

            Example of a dict returned:
            ```
            {
                "image": {"data_added": [3, 6], "data_updated": {0, 2}, "created": False, "info_updated": False, "data_transformed_in_place": False},
                "label": {"data_added": [0, 3], "data_updated": {}, "created": True, "info_updated": False, "data_transformed_in_place": False},
                "other/stuff" : {data_added: [3, 3], data_updated: {1, 2}, created: True, "info_updated": False, "data_transformed_in_place": False},
            }
            ```

            Here, 'data_added' is a range of sample indexes that were added to the tensor:

            - For example [3, 6] means that sample 3, 4 and 5 were added.
            - Another example [3, 3] means that no samples were added as the range is empty

            'data_updated' is a set of sample indexes that were updated.

            - For example {0, 2} means that sample 0 and 2 were updated.

            'created' is a boolean that is True if the tensor was created.

            'info_updated' is a boolean that is True if the info of the tensor was updated.

            'data_transformed_in_place' is a boolean that is True if the data of the tensor was transformed in place.


        Raises:
            ValueError: If `id_1` is None and `id_2` is not None.
        """
        version_state, storage = self.version_state, self.storage
        res = get_changes_and_messages(version_state, storage, id_1, id_2)
        if as_dict:
            dataset_changes_1 = res[0]
            dataset_changes_2 = res[1]
            tensor_changes_1 = res[2]
            tensor_changes_2 = res[3]
            changes = {}
            if id_1 is None and id_2 is None:
                changes["dataset"] = dataset_changes_1
                changes["tensor"] = tensor_changes_1
                return changes
            changes["dataset"] = dataset_changes_1, dataset_changes_2
            changes["tensor"] = tensor_changes_1, tensor_changes_2
            return changes
        all_changes = get_all_changes_string(*res)
        print(all_changes)
        return None

    def _populate_meta(self, verbose=True):
        """Populates the meta information for the dataset."""
        if dataset_exists(self.storage):
            if verbose and self.verbose:
                logger.info(f"{self.path} loaded successfully.")
            load_meta(self)

        elif not self.storage.empty():
            # dataset does not exist, but the path was not empty
            raise PathNotEmptyException

        else:
            if self.read_only:
                # cannot create a new dataset when in read_only mode.
                raise CouldNotCreateNewDatasetException(self.path)
            meta = DatasetMeta()
            key = get_dataset_meta_key(self.version_state["commit_id"])
            self.version_state["meta"] = meta
            self.storage.register_hub_object(key, meta)
            self._register_dataset()
            self.flush()

    def _register_dataset(self):
        """overridden in HubCloudDataset"""

    def _send_query_progress(self, *args, **kwargs):
        """overridden in HubCloudDataset"""

    def _send_compute_progress(self, *args, **kwargs):
        """overridden in HubCloudDataset"""

    def _send_pytorch_progress(self, *args, **kwargs):
        """overridden in HubCloudDataset"""

    def _send_filter_progress(self, *args, **kwargs):
        """overridden in HubCloudDataset"""

    def _send_commit_event(self, *args, **kwargs):
        """overridden in HubCloudDataset"""

    def _send_dataset_creation_event(self, *args, **kwargs):
        """overridden in HubCloudDataset"""

    def _send_branch_creation_event(self, *args, **kwargs):
        """overridden in HubCloudDataset"""

    def _first_load_init(self):
        """overridden in HubCloudDataset"""

    @property
    def read_only(self):
        return self._read_only

    @property
    def has_head_changes(self):
        """Returns True if currently at head node and uncommitted changes are present."""
        commit_node = self.version_state["commit_node"]
        return not commit_node.children and current_commit_has_change(
            self.version_state, self.storage
        )

    def _set_read_only(self, value: bool, err: bool):
        storage = self.storage
        self.__dict__["_read_only"] = value
        if value:
            storage.enable_readonly()
            if isinstance(storage, LRUCache) and storage.next_storage is not None:
                storage.next_storage.enable_readonly()
        else:
            try:
                locked = self._lock(err=err)
                if locked:
                    self.storage.disable_readonly()
                    if (
                        isinstance(storage, LRUCache)
                        and storage.next_storage is not None
                    ):
                        storage.next_storage.disable_readonly()
                else:
                    self.__dict__["_read_only"] = True
            except LockedException as e:
                self.__dict__["_read_only"] = True
                raise e

    @read_only.setter
    @invalid_view_op
    def read_only(self, value: bool):
        self._set_read_only(value, True)

    @hub_reporter.record_call
    def pytorch(
        self,
        transform: Optional[Callable] = None,
        tensors: Optional[Sequence[str]] = None,
        tobytes: Union[bool, Sequence[str]] = False,
        num_workers: int = 1,
        batch_size: int = 1,
        drop_last: bool = False,
        collate_fn: Optional[Callable] = None,
        pin_memory: bool = False,
        shuffle: bool = False,
        buffer_size: int = 2048,
        use_local_cache: bool = False,
        use_progress_bar: bool = False,
    ):
        """Converts the dataset into a pytorch Dataloader.

        Note:
            Pytorch does not support uint16, uint32, uint64 dtypes. These are implicitly type casted to int32, int64 and int64 respectively.
            This spins up it's own workers to fetch data.

        Args:
            transform (Callable, Optional): Transformation function to be applied to each sample.
            tensors (List, Optional): Optionally provide a list of tensor names in the ordering that your training script expects. For example, if you have a dataset that has "image" and "label" tensors, if `tensors=["image", "label"]`, your training script should expect each batch will be provided as a tuple of (image, label).
            tobytes (bool): If True, samples will not be decompressed and their raw bytes will be returned instead of numpy arrays. Can also be a list of tensors, in which case those tensors alone will not be decompressed.
            num_workers (int): The number of workers to use for fetching data in parallel.
            batch_size (int): Number of samples per batch to load. Default value is 1.
            drop_last (bool): Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size.
                If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Default value is False.
                Read torch.utils.data.DataLoader docs for more details.
            collate_fn (Callable, Optional): merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.
                Read torch.utils.data.DataLoader docs for more details.
            pin_memory (bool): If True, the data loader will copy Tensors into CUDA pinned memory before returning them. Default value is False.
                Read torch.utils.data.DataLoader docs for more details.
            shuffle (bool): If True, the data loader will shuffle the data indices. Default value is False. Details about how hub shuffles data can be found at https://docs.activeloop.ai/how-hub-works/shuffling-in-ds.pytorch
            buffer_size (int): The size of the buffer used to shuffle the data in MBs. Defaults to 2048 MB. Increasing the buffer_size will increase the extent of shuffling.
            use_local_cache (bool): If True, the data loader will use a local cache to store data. This is useful when the dataset can fit on the machine and we don't want to fetch the data multiple times for each iteration. Default value is False.
            use_progress_bar (bool): If True, tqdm will be wrapped around the returned dataloader. Default value is True.

        Returns:
            A torch.utils.data.DataLoader object.
        """
        from hub.integrations import dataset_to_pytorch as to_pytorch

        dataloader = to_pytorch(
            self,
            transform=transform,
            tensors=tensors,
            tobytes=tobytes,
            num_workers=num_workers,
            batch_size=batch_size,
            drop_last=drop_last,
            collate_fn=collate_fn,
            pin_memory=pin_memory,
            shuffle=shuffle,
            buffer_size=buffer_size,
            use_local_cache=use_local_cache,
        )

        if use_progress_bar:
            dataloader = tqdm(dataloader, desc=self.path, total=len(self) // batch_size)

        return dataloader

    @hub_reporter.record_call
    def filter(
        self,
        function: Union[Callable, str],
        num_workers: int = 0,
        scheduler: str = "threaded",
        progressbar: bool = True,
        store_result: bool = False,
        result_path: Optional[str] = None,
        result_ds_args: Optional[dict] = None,
    ):
        """Filters the dataset in accordance of filter function `f(x: sample) -> bool`

        Args:
            function (Callable, str): Filter function that takes sample as argument and returns True/False
                if sample should be included in result. Also supports simplified expression evaluations.
                See `hub.core.query.query.DatasetQuery` for more details.
            num_workers (int): Level of parallelization of filter evaluations.
                `0` indicates in-place for-loop evaluation, multiprocessing is used otherwise.
            scheduler (str): Scheduler to use for multiprocessing evaluation.
                `threaded` is default
            progressbar (bool): Display progress bar while filtering. True is default
            store_result (bool): If True, result of the filter will be saved to a dataset asynchronously.
            result_path (Optional, str): Path to save the filter result. Only applicable if `store_result` is True.
            result_ds_args (Optional, dict): Additional args for result dataset. Only applicable if `store_result` is True.

        Returns:
            View on Dataset with elements, that satisfy filter function


        Example:
            Following filters are identical and return dataset view where all the samples have label equals to 2.
            >>> dataset.filter(lambda sample: sample.labels.numpy() == 2)
            >>> dataset.filter('labels == 2')
        """
        from hub.core.query import filter_dataset, query_dataset
        from hub.core.query import DatasetQuery

        fn = query_dataset if isinstance(function, str) else filter_dataset
        return fn(
            self,
            function,
            num_workers=num_workers,
            scheduler=scheduler,
            progressbar=progressbar,
            store_result=store_result,
            result_path=result_path,
            result_ds_args=result_ds_args,
        )

    def _get_total_meta(self):
        """Returns tensor metas all together"""
        return {
            tensor_key: tensor_value.meta
            for tensor_key, tensor_value in self.version_state["full_tensors"].items()
        }

    def _set_derived_attributes(self, verbose: bool = True):
        """Sets derived attributes during init and unpickling."""
        if self.is_first_load:
            self.storage.autoflush = True
            self._load_version_info()
            self._load_link_creds()
            self._set_read_only(
                self._read_only, False
            )  # TODO: weird fix for dataset unpickling
            self._populate_meta(verbose)  # TODO: use the same scheme as `load_info`
        elif not self._read_only:
            self._lock()  # for ref counting
        if not self.is_iteration:
            self.index.validate(self.num_samples)

    @property
    def info(self):
        """Returns the information about the dataset."""
        if self._info is None:
            path = get_dataset_info_key(self.version_state["commit_id"])
            self.__dict__["_info"] = load_info(path, self)  # type: ignore
        return self._info

    @info.setter
    def info(self, value):
        if isinstance(value, dict):
            info = self.info
            info.replace_with(value)
        else:
            raise TypeError("Info must be set with type Dict")

    @property
    def _dataset_diff(self):
        if self._ds_diff is None:
            self.__dict__["_ds_diff"] = load_dataset_diff(self)
        return self._ds_diff

    @hub_reporter.record_call
    def tensorflow(
        self,
        tensors: Optional[Sequence[str]] = None,
        tobytes: Union[bool, Sequence[str]] = False,
    ):
        """Converts the dataset into a tensorflow compatible format.

        See:
            https://www.tensorflow.org/api_docs/python/tf/data/Dataset

        Args:
            tensors (List, Optional): Optionally provide a list of tensor names in the ordering that your training script expects. For example, if you have a dataset that has "image" and "label" tensors, if `tensors=["image", "label"]`, your training script should expect each batch will be provided as a tuple of (image, label).
            tobytes (bool): If True, samples will not be decompressed and their raw bytes will be returned instead of numpy arrays. Can also be a list of tensors, in which case those tensors alone will not be decompressed.

        Returns:
            tf.data.Dataset object that can be used for tensorflow training.
        """
        return dataset_to_tensorflow(self, tensors=tensors, tobytes=tobytes)

    def flush(self):
        """Necessary operation after writes if caches are being used.
        Writes all the dirty data from the cache layers (if any) to the underlying storage.
        Here dirty data corresponds to data that has been changed/assigned and but hasn't yet been sent to the
        underlying storage.
        """
        self.storage.flush()

    def clear_cache(self):
        """Flushes (see Dataset.flush documentation) the contents of the cache layers (if any) and then deletes contents
         of all the layers of it.
        This doesn't delete data from the actual storage.
        This is useful if you have multiple datasets with memory caches open, taking up too much RAM.
        Also useful when local cache is no longer needed for certain datasets and is taking up storage space.
        """
        if hasattr(self.storage, "clear_cache"):
            self.storage.clear_cache()

    def size_approx(self):
        """Estimates the size in bytes of the dataset.
        Includes only content, so will generally return an under-estimate.
        """
        tensors = self.version_state["full_tensors"].values()
        chunk_engines = [tensor.chunk_engine for tensor in tensors]
        size = sum(c.num_chunks * c.min_chunk_size for c in chunk_engines)
        for group in self._groups_filtered:
            size += self[group].size_approx()
        return size

    @invalid_view_op
    @hub_reporter.record_call
    def rename(self, path: str):
        """Renames the dataset to `path`.

        Example:
            ```
            ds = hub.load("hub://username/dataset")
            ds.rename("hub://username/renamed_dataset")
            ```

        Args:
            path (str): New path to the dataset.

        Raises:
            RenameError: If `path` points to a different directory.
        """
        path = path.rstrip("/")
        if posixpath.split(path)[0] != posixpath.split(self.path)[0]:
            raise RenameError
        storage = get_base_storage(self.storage)
        storage.rename(path)
        self.path = path

    @invalid_view_op
    @hub_reporter.record_call
    def delete(self, large_ok=False):
        """Deletes the entire dataset from the cache layers (if any) and the underlying storage.
        This is an IRREVERSIBLE operation. Data once deleted can not be recovered.

        Args:
            large_ok (bool): Delete datasets larger than 1GB. Disabled by default.
        """

        if not large_ok:
            size = self.size_approx()
            if size > hub.constants.DELETE_SAFETY_SIZE:
                logger.info(
                    f"Hub Dataset {self.path} was too large to delete. Try again with large_ok=True."
                )
                return

        self._unlock()
        self.storage.clear()

    def summary(self):
        pretty_print = summary_dataset(self)

        print(self)
        print(pretty_print)

    def __str__(self):
        path_str = ""
        if self.path:
            path_str = f"path='{self.path}', "

        mode_str = ""
        if self.read_only:
            mode_str = f"read_only=True, "

        index_str = f"index={self.index}, "
        if self.index.is_trivial():
            index_str = ""

        group_index_str = (
            f"group_index='{self.group_index}', " if self.group_index else ""
        )

        return f"Dataset({path_str}{mode_str}{index_str}{group_index_str}tensors={self._all_tensors_filtered(include_hidden=False)})"

    __repr__ = __str__

    def _get_tensor_from_root(self, name: str) -> Optional[Tensor]:
        """Gets a tensor from the root dataset.
        Acesses storage only for the first call.
        """
        key = self.version_state["tensor_names"].get(name)
        ret = self.version_state["full_tensors"].get(key)
        return ret

    def _has_group_in_root(self, name: str) -> bool:
        """Checks if a group exists in the root dataset.
        This is faster than checking `if group in self._groups:`
        """
        return name in self.version_state["meta"].groups

    @property
    def token(self):
        """Get attached token of the dataset"""
        return self._token

    @property
    def _ungrouped_tensors(self) -> Dict[str, Tensor]:
        """Top level tensors in this group that do not belong to any sub groups"""
        return {
            posixpath.basename(k): self.version_state["full_tensors"][v]
            for k, v in self.version_state["tensor_names"].items()
            if posixpath.dirname(k) == self.group_index
        }

    def _all_tensors_filtered(self, include_hidden: bool = True) -> List[str]:
        """Names of all tensors belonging to this group, including those within sub groups"""
        hidden_tensors = self.meta.hidden_tensors
        tensor_names = self.version_state["tensor_names"]
        return [
            posixpath.relpath(t, self.group_index)
            for t in tensor_names
            if (not self.group_index or t.startswith(self.group_index + "/"))
            and (include_hidden or tensor_names[t] not in hidden_tensors)
        ]

    def _tensors(self, include_hidden: bool = True) -> Dict[str, Tensor]:
        """All tensors belonging to this group, including those within sub groups. Always returns the sliced tensors."""
        return {
            t: self.version_state["full_tensors"][
                self.version_state["tensor_names"][posixpath.join(self.group_index, t)]
            ][self.index]
            for t in self._all_tensors_filtered(include_hidden)
        }

    @property
    def tensors(self) -> Dict[str, Tensor]:
        """All tensors belonging to this group, including those within sub groups. Always returns the sliced tensors."""
        return self._tensors(include_hidden=False)

    @property
    def branches(self):
        """Lists all the branches of the dataset.
        Returns:
            List of branches.
        """
        return list(self.version_state["branch_commit_map"])

    @property
    def commits(self) -> List[Dict]:
        """Lists all the commits leading to the current dataset state.
        Returns:
            List of dictionaries containing commit information.
        """
        commits = []
        commit_node = self.version_state["commit_node"]
        while commit_node:
            if not commit_node.is_head_node:
                commit_info = {
                    "commit": commit_node.commit_id,
                    "author": commit_node.commit_user_name,
                    "time": str(commit_node.commit_time)[:-7],
                    "message": commit_node.commit_message,
                }
                commits.append(commit_info)
            commit_node = commit_node.parent
        return commits

    def get_commit_details(self, commit_id) -> Dict:
        commit_node: CommitNode = self.version_state["commit_node_map"].get(commit_id)
        if commit_node is None:
            raise KeyError(f"Commit {commit_id} not found in dataset.")
        return {
            "commit": commit_node.commit_id,
            "author": commit_node.commit_user_name,
            "time": str(commit_node.commit_time)[:-7],
            "message": commit_node.commit_message,
        }

    @property
    def _groups(self) -> List[str]:
        """Names of all groups in the root dataset"""
        return self.meta.groups  # type: ignore

    @property
    def _groups_filtered(self) -> List[str]:
        """Names of all sub groups in this group"""
        groups_filtered = []
        for g in self._groups:
            dirname, basename = posixpath.split(g)
            if dirname == self.group_index:
                groups_filtered.append(basename)
        return groups_filtered

    @property
    def groups(self) -> Dict[str, "Dataset"]:
        """All sub groups in this group"""
        return {g: self[g] for g in self._groups_filtered}

    @property
    def commit_id(self) -> Optional[str]:
        """The lasted committed commit_id of the dataset. If there are no commits, this returns None."""
        commit_node = self.version_state["commit_node"]
        if not commit_node.is_head_node:
            return commit_node.commit_id

        parent = commit_node.parent

        if parent is None:
            return None
        else:
            return parent.commit_id

    @property
    def pending_commit_id(self) -> str:
        """The commit_id of the next commit that will be made to the dataset.
        If you're not at the head of the current branch, this will be the same as the commit_id.
        """
        return self.version_state["commit_id"]

    @property
    def branch(self) -> str:
        """The current branch of the dataset"""
        return self.version_state["branch"]

    def _is_root(self) -> bool:
        return not self.group_index

    @property
    def parent(self):
        """Returns the parent of this group. Returns None if this is the root dataset."""
        if self._is_root():
            return None
        autoflush = self.storage.autoflush
        ds = self.__class__(
            storage=self.storage,
            index=self.index,
            group_index=posixpath.dirname(self.group_index),
            read_only=self.read_only,
            public=self.public,
            token=self._token,
            verbose=self.verbose,
            version_state=self.version_state,
            path=self.path,
            link_creds=self.link_creds,
        )
        self.storage.autoflush = autoflush
        return ds

    @property
    def root(self):
        """Returns the root dataset of a group."""
        if self._is_root():
            return self
        autoflush = self.storage.autoflush
        ds = self.__class__(
            storage=self.storage,
            index=self.index,
            group_index="",
            read_only=self.read_only,
            public=self.public,
            token=self._token,
            verbose=self.verbose,
            version_state=self.version_state,
            path=self.path,
            link_creds=self.link_creds,
        )
        self.storage.autoflush = autoflush
        return ds

    def _create_group(self, name: str) -> "Dataset":
        """Internal method used by `create_group` and `create_tensor`."""
        meta: DatasetMeta = self.version_state["meta"]
        if not name or name in dir(self):
            raise InvalidTensorGroupNameError(name)
        fullname = name
        while name:
            if name in self.version_state["full_tensors"]:
                raise TensorAlreadyExistsError(name)
            meta.add_group(name)
            name, _ = posixpath.split(name)
        return self[fullname]

    @hub_reporter.record_call
    def create_group(self, name: str) -> "Dataset":
        """Creates a tensor group. Intermediate groups in the path are also created.

        Examples:

            ```
            ds.create_group("images")
            ds['images'].create_tensor("cats")
            ```

                ds.create_groups("images/jpg/cats")
                ds["images"].create_tensor("png")
                ds["images/jpg"].create_group("dogs")
        """
        if not self._is_root():
            return self.root.create_group(posixpath.join(self.group_index, name))
        name = filter_name(name)
        if name in self._groups:
            raise TensorGroupAlreadyExistsError(name)
        return self._create_group(name)

    def rechunk(
        self,
        tensors: Optional[Union[str, List[str]]] = None,
        num_workers: int = 0,
        scheduler: str = "threaded",
        progressbar: bool = True,
    ):
        """Rewrites the underlying chunks to make their sizes optimal.
        This is usually needed in cases where a lot of updates have been made to the data.

        Args:
            tensors (str, List[str], Optional): Name/names of the tensors to rechunk.
                If None, all tensors in the dataset are rechunked.
            num_workers (int): The number of workers to use for rechunking. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
            scheduler (str): The scheduler to be used for rechunking. Supported values include: 'serial', 'threaded', 'processed' and 'ray'.
                Defaults to 'threaded'.
            progressbar (bool): Displays a progress bar if True (default).
        """

        if tensors is None:
            tensors = list(self.tensors)
        elif isinstance(tensors, str):
            tensors = [tensors]

        # identity function that rechunks
        @hub.compute
        def rechunking(sample_in, samples_out):
            for tensor in tensors:
                samples_out[tensor].append(sample_in[tensor])

        rechunking().eval(
            self,
            num_workers=num_workers,
            scheduler=scheduler,
            progressbar=progressbar,
            skip_ok=True,
        )

    # the below methods are used by cloudpickle dumps
    def __origin__(self):
        return None

    def __values__(self):
        return None

    def __type__(self):
        return None

    def __union_params__(self):
        return None

    def __tuple_params__(self):
        return None

    def __result__(self):
        return None

    def __args__(self):
        return None

    def __bool__(self):
        return True

    def extend(self, samples: Dict[str, Any], skip_ok: bool = False):
        """Appends multiple rows of samples to mutliple tensors at once. This method expects all tensors being updated to be of the same length.
        Args:
            samples (Dict[str, Any]): Dictionary with tensor names as keys and samples as values.
            skip_ok (bool): Skip tensors not in `samples` if set to True.
        Raises:
            KeyError: If any tensor in the dataset is not a key in `samples` and `skip_ok` is False.
            TensorDoesNotExistError: If tensor in `samples` does not exist.
            ValueError: If all tensors being updated are not of the same length.
            NotImplementedError: If an error occurs while writing tiles.
            Exception: Error while attempting to rollback appends.
        """
        if isinstance(samples, Dataset):
            samples = samples.tensors
        if not samples:
            return
        n = len(samples[next(iter(samples.keys()))])
        for v in samples.values():
            if len(v) != n:
                sizes = {k: len(v) for (k, v) in samples.items()}
                raise ValueError(
                    f"Incoming samples are not of equal lengths. Incoming sample sizes: {sizes}"
                )
        for i in range(n):
            self.append({k: v[i] for k, v in samples.items()})

    def append(self, sample: Dict[str, Any], skip_ok: bool = False):
        """Append samples to mutliple tensors at once. This method expects all tensors being updated to be of the same length.
        Args:
            sample (dict): Dictionary with tensor names as keys and samples as values.
            skip_ok (bool): Skip tensors not in `sample` if set to True.
        Raises:
            KeyError: If any tensor in the dataset is not a key in `sample` and `skip_ok` is False.
            TensorDoesNotExistError: If tensor in `sample` does not exist.
            ValueError: If all tensors being updated are not of the same length.
            NotImplementedError: If an error occurs while writing tiles.
            Exception: Error while attempting to rollback appends.
        """
        if isinstance(sample, Dataset):
            sample = sample.tensors
        if not skip_ok:
            for k in self.tensors:
                if k not in sample:
                    raise KeyError(
                        f"Required tensor not provided: {k}. Use ds.append(sample, skip_ok=True) to skip tensors."
                    )
        for k in sample:
            if k not in self._tensors():
                raise TensorDoesNotExistError(k)
        if len(set(map(len, (self[k] for k in sample)))) != 1:
            raise ValueError(
                "When appending using Dataset.append, all tensors are expected to have the same length."
            )
        tensors_appended = []
        with self:
            for k, v in sample.items():
                try:
                    tensor = self[k]
                    enc = tensor.chunk_engine.chunk_id_encoder
                    num_chunks = enc.num_chunks
                    tensor.append(v)
                    tensors_appended.append(k)
                except Exception as e:
                    new_num_chunks = enc.num_chunks
                    num_chunks_added = new_num_chunks - num_chunks
                    if num_chunks_added > 1:
                        # This is unlikely to happen, i.e the sample passed the validation
                        # steps and tiling but some error occured while writing tiles to chunks
                        raise NotImplementedError(
                            "Unable to recover from error while writing tiles."
                        ) from e
                    elif num_chunks_added == 1:
                        enc._encoded = enc._encoded[:-1]
                    for k in tensors_appended:
                        try:
                            self[k]._pop()
                        except Exception as e2:
                            raise Exception(
                                "Error while attepting to rollback appends"
                            ) from e2
                    raise e

    def _view_hash(self) -> str:
        """Generates a unique hash for a filtered dataset view."""
        return hash_inputs(
            self.path,
            *[e.value for e in self.index.values],
            self.pending_commit_id,
            getattr(self, "_query", None),
        )

    def _get_view_info(self):
        if self._view_info is None:
            tm = getattr(self, "_created_at", time())
            hash = self._view_hash()
            info = {
                "id": hash,
                "description": "Virtual Datasource",
                "virtual-datasource": True,
                "source-dataset": self.path,
                "source-dataset-version": self.version_state["commit_id"],
                "created_at": tm,
            }

            query = getattr(self, "_query", None)
            if query:
                info["query"] = query
                info["source-dataset-index"] = getattr(self, "_source_ds_idx", None)
            self._view_info = info
        return self._view_info

    @staticmethod
    def _write_queries_json(ds, info):
        base_storage = get_base_storage(ds.storage)
        storage_read_only = base_storage.read_only
        if ds._locked_out:
            # Ignore storage level lock since we have file level lock
            base_storage.read_only = False
        lock = Lock(base_storage, get_queries_lock_key())
        lock.acquire(timeout=10, force=True)
        queries_key = get_queries_key()
        try:
            try:
                queries = json.loads(base_storage[queries_key].decode("utf-8"))
            except KeyError:
                queries = []
            queries.append(info)
            base_storage[queries_key] = json.dumps(queries).encode("utf-8")
        finally:
            lock.release()
            base_storage.read_only = storage_read_only

    def _write_vds(self, vds):
        """Writes the indices of this view to a vds."""
        info = self._get_view_info()
        with vds:
            vds.info.update(info)
            vds.create_tensor("VDS_INDEX", dtype="uint64").extend(
                list(self.index.values[0].indices(len(self)))
            )

    def _store_view_in_subdir(self):
        """Stores this view under ".queries" sub directory of same storage."""

        info = self._get_view_info()
        hash = info["id"]
        path = f".queries/{hash}"
        self.flush()
        get_base_storage(self.storage).subdir(path).clear()
        vds = self._sub_ds(path, empty=True)
        self._write_vds(vds)
        Dataset._write_queries_json(self, info)
        return vds

    def _store_view_in_user_queries_dataset(self):
        """Stores this view under hub://username/queries
        Only applicable for views of hub datasets.
        """
        if len(self.index.values) > 1:
            raise NotImplementedError("Storing sub-sample slices is not supported yet.")
        username = get_user_name()
        if username == "public":
            raise NotLoggedInError("Unable to save query result. Not logged in.")

        info = self._get_view_info()
        hash = info["id"]

        queries_ds_path = f"hub://{username}/queries"

        try:
            queries_ds = hub.dataset(
                queries_ds_path, verbose=False
            )  # create if doesn't exist
        except PathNotEmptyException:
            hub.delete(queries_ds_path, force=True)
            queries_ds = hub.dataset(queries_ds_path, verbose=False)

        queries_ds._unlock()  # we don't need locking as no data will be added to this ds.

        path = f"hub://{username}/queries/{hash}"

        vds = hub.empty(path, overwrite=True)

        self._write_vds(vds)

        Dataset._write_queries_json(queries_ds, info)

        return vds

    def _store_view_in_path(self, path, **ds_args):
        """Stores this view at a given dataset path"""
        vds = hub.dataset(path, **ds_args)
        self._write_vds(vds)
        return vds

    def store(self, path: Optional[str] = None, **ds_args) -> str:
        """Stores a dataset view as a virtual dataset (VDS)

        Args:
            path (Optional, str): If specified, the VDS will stored as a standalone dataset at the specified path. If not,
                the VDS is stored under `.queries` subdirectory of the source dataset's storage. If the user doesn't have
                write access to the source dataset and the source dataset is a hub cloud dataset, then the VDS is stored
                is stored under the user's hub account and can be accessed using hub.load(f"hub://{username}/queries/{query_hash}").
            ds_args (dict): Additional args for creating VDS when path is specified. (See documentation for `hub.dataset()`)

        Returns:
            str: Path to the stored VDS.
        """
        return self._store(path, False, **ds_args)

    def _store(self, path: Optional[str] = None, _ret_ds: bool = False, **ds_args):
        """Stores a dataset view as a virtual dataset (VDS)

        Args:
            path (Optional, str): If specified, the VDS will stored as a standalone dataset at the specified path. If not,
                the VDS is stored under `.queries` subdirectory of the source dataset's storage. If the user doesn't have
                write access to the source dataset and the source dataset is a hub cloud dataset, then the VDS is stored
                is stored under the user's hub account and can be accessed using hub.load(f"hub://{username}/queries/{query_hash}").
            _ret_ds (bool): If True, the VDS is retured as such without converting it to a view. If False, the VDS path is returned.
                Default False.
            ds_args (dict): Additional args for creating VDS when path is specified. (See documentation for `hub.dataset()`)

        Returns:
            If _ret_ds is True, the VDS is returned, else path to the VDS is returned.

        Raises:
            NotImplementedError: When storing sub-sample slices and saving views inplace for in-memory datasets.
            ReadOnlyModeError: When attempting to save a view inplace and the user doesn't have write access.
        """
        if len(self.index.values) > 1:
            raise NotImplementedError("Storing sub-sample slices is not supported yet.")

        if path is None and hasattr(self, "_vds"):
            vds = self._vds
        elif path is None:
            if isinstance(self, MemoryProvider):
                raise NotImplementedError(
                    "Saving views inplace is not supported for in-memory datasets."
                )
            if self.read_only:
                if isinstance(self, hub.core.dataset.HubCloudDataset):
                    vds = self._store_view_in_user_queries_dataset()
                else:
                    raise ReadOnlyModeError(
                        "Cannot save view in read only dataset. Speicify a path to store the view in a different location."
                    )
            else:
                vds = self._store_view_in_subdir()
        else:
            vds = self._store_view_in_path(path, **ds_args)
        if _ret_ds:
            return vds
        return vds.path

    def _get_view(self):
        """Returns a view for this VDS. Only works if this Dataset is a virtual dataset.

        Returns:
            A view of the source dataset based on the indices from VDS.

        Raises:
            Exception: If this is not a VDS.
        """
        try:
            ds = hub.dataset(path=self.info["source-dataset"], verbose=False)
        except KeyError:
            raise Exception("Dataset._get_view() works only for virtual datasets.")
        ds = ds[self.VDS_INDEX.numpy().reshape(-1).tolist()]
        ds._vds = self
        return ds

    def _get_empty_vds(
        self, vds_path: Optional[str] = None, query: Optional[str] = None, **vds_args
    ):
        """Returns an empty VDS with this dataset as the source dataset. Internal.

        Args:
            vds_path (Optional, str): If specified, the vds will be stored at this path. Else the vds will be stored
                under `.queries` subdirectory.
            query (Optional, str): Query string associated with this view.
            vds_args (dict): Additional args for creating vds when path is specified.

        Returns:
            Empty VDS with this dataset as the source dataset.
        """
        view = self[:0]
        if query:
            view._query = query
        return view._store(vds_path, _ret_ds=True, **vds_args)

    def _get_query_history(self) -> List[str]:
        """
        Internal. Returns a list of hashes which can be passed to Dataset._get_stored_vds to get a dataset view.
        """
        try:
            queries = json.loads(self.storage[get_queries_key()].decode("utf-8"))
            return queries
        except KeyError:
            return []

    def _sub_ds(
        self,
        path,
        empty=False,
        memory_cache_size: int = DEFAULT_MEMORY_CACHE_SIZE,
        local_cache_size: int = DEFAULT_LOCAL_CACHE_SIZE,
    ):
        """Loads a nested dataset. Internal.
        Note: Virtual datasets are returned as such, they are not converted to views.

        Args:
            path (str): Path to sub directory
            empty (bool): If True, all contents of the sub directory is cleared before initializing the sub dataset.
            memory_cache_size (int): Memory cache size for the sub dataset.
            local_cache_size (int): Local storage cache size for the sub dataset.

        Returns:
            Sub dataset
        """
        base_storage = get_base_storage(self.storage)
        sub_storage = base_storage.subdir(path)

        if self.path.startswith("hub://"):
            path = posixpath.join(self.path, path)
            cls = hub.core.dataset.HubCloudDataset
        else:
            path = sub_storage.root
            cls = hub.core.dataset.Dataset

        return cls(
            generate_chain(
                sub_storage,
                memory_cache_size * MB,
                local_cache_size * MB,
            ),
            path=path,
            token=self._token,
        )

    def _get_stored_vds(self, hash: str):
        """Returns a vds stored under the `.queries` subdirectory given its hash.

        Args:
            hash (str): Hash of the required vds.

        Returns:
            VDS with the specified hash.
        """
        return self._get_sub_ds(".queries/" + hash)

    def _link_tensors(
        self,
        src: str,
        dest: str,
        append_f: str,
        update_f: Optional[str] = None,
        flatten_sequence: Optional[bool] = None,
    ):
        """Internal. Links a source tensor to a destination tensor. Appends / updates made to the source tensor will be reflected in the destination tensor.

        Args:
            src (str): Name of the source tensor.
            dest (str): Name of the destination tensor.
            append_f (str): Name of the linked tensor transform to be used for appending items to the destination tensor. This transform should be defined in `hub.core.tensor_link` module.
            update_f (str): Name of the linked tensor transform to be used for updating items in the destination tensor. This transform should be defined in `hub.core.tensor_link` module.
            flatten_sequence (bool, Optional): Whether appends and updates should be done per item or per sequence if the source tensor is a sequence tensor.

        Raises:
            TensorDoesNotExistError: If source or destination tensors do not exist in this dataset.
            ValueError: If source tensor is a sequence tensor and `flatten_sequence` argument is not specified.
        """
        assert self._is_root()
        tensors = self._tensors()
        if src not in tensors:
            raise TensorDoesNotExistError(src)
        if dest not in tensors:
            raise TensorDoesNotExistError(dest)
        src_tensor = self[src]
        dest_key = self.version_state["tensor_names"][dest]
        if flatten_sequence is None:
            if src_tensor.is_sequence:
                raise ValueError(
                    "`flatten_sequence` arg must be specified when linking a sequence tensor."
                )
            flatten_sequence = False
        src_tensor.meta.add_link(dest_key, append_f, update_f, flatten_sequence)
        self.storage.maybe_flush()

    def copy(
        self,
        dest: str,
        overwrite: bool = False,
        creds=None,
        token=None,
        num_workers: int = 0,
        scheduler="threaded",
        progressbar=True,
        public: bool = False,
    ):
        """Copies this dataset or dataset view to `dest`. Version control history is not included.

        Args:
            dest (str): Destination path to copy to.
            overwrite (bool): If True and a dataset exists at `destination`, it will be overwritten. Defaults to False.
            creds (dict, Optional): creds required to create / overwrite datasets at `dest`.
            token (str, Optional): token used to for fetching credentials to `dest`.
            num_workers (int): The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
            scheduler (str): The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'.
                Defaults to 'threaded'.
            progressbar (bool): Displays a progress bar if True (default).
            public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.

        Returns:
            Dataset: New dataset object.

        Raises:
            DatasetHandlerError: If a dataset already exists at destination path and overwrite is False.
        """
        report_params = {
            "Overwrite": overwrite,
            "Num_Workers": num_workers,
            "Scheduler": scheduler,
            "Progressbar": progressbar,
            "Public": public,
        }
        if dest.startswith("hub://"):
            report_params["Dest"] = dest
        feature_report_path(self.path, "copy", report_params)
        dest_ds = hub.like(
            dest,
            self,
            creds=creds,
            token=token,
            overwrite=overwrite,
            public=public,
        )
        with dest_ds:
            dest_ds.info.update(self.info)
        for tensor in self.tensors:
            if progressbar:
                sys.stderr.write(f"Copying tensor: {tensor}.\n")
            hub.compute(_copy_tensor, name="tensor copy transform")(
                tensor_name=tensor
            ).eval(
                self,
                dest_ds,
                num_workers=num_workers,
                scheduler=scheduler,
                progressbar=progressbar,
                skip_ok=True,
                check_lengths=False,
            )
        return dest_ds

    @invalid_view_op
    def reset(self):
        """Resets the uncommitted changes present in the branch.
        Note: The uncommitted data is deleted from underlying storage, this is not a reversible operation.
        """
        storage, version_state = self.storage, self.version_state
        if version_state["commit_node"].children:
            print("You are not at the head node of the branch, cannot reset.")
            return
        if not self.has_head_changes:
            print("There are no uncommitted changes on this branch.")
            return

        # delete metas first
        self._delete_metas()

        if self.commit_id is None:
            storage.clear()
            self._populate_meta()
        else:
            prefix = "/".join(("versions", self.pending_commit_id))
            storage.clear(prefix=prefix)
            copy_metas(self.commit_id, self.pending_commit_id, storage, version_state)
            create_commit_chunk_sets(self.commit_id, storage, version_state)
        load_meta(self)
        self._info = None
        self._ds_diff = None

    def _delete_metas(self):
        """Deletes all metas in the dataset."""
        commit_id = self.pending_commit_id
        meta_keys = [get_dataset_meta_key(commit_id)]
        meta_keys.append(get_dataset_diff_key(commit_id))
        meta_keys.append(get_dataset_info_key(commit_id))

        for tensor in self.tensors:
            meta_keys.append(get_tensor_meta_key(commit_id, tensor))
            meta_keys.append(get_tensor_tile_encoder_key(commit_id, tensor))
            meta_keys.append(get_tensor_info_key(commit_id, tensor))
            meta_keys.append(get_tensor_commit_chunk_set_key(commit_id, tensor))
            meta_keys.append(get_tensor_commit_diff_key(commit_id, tensor))
            meta_keys.append(get_chunk_id_encoder_key(commit_id, tensor))
            meta_keys.append(get_sequence_encoder_key(commit_id, tensor))

        for key in meta_keys:
            try:
                del self.storage[key]
            except KeyError:
                pass

    def add_creds(self, creds_key: str):
        """Adds a new creds key to the dataset. These keys are used for tensors that are linked to external data.

        Examples:
            ```
            # create/load a dataset
            ds = hub.dataset("path/to/dataset")

            # add a new creds key
            ds.add_creds("my_s3_key")
            ```

        Args:
            creds_key (str): The key to be added.
        """
        self.link_creds.add_creds(creds_key)
        save_link_creds(self.link_creds, self.storage)

    def populate_creds(self, creds_key: str, creds: dict):
        """Populates the creds key added in add_creds with the given creds. These creds are used to fetch the external data.
        This needs to be done everytime the dataset is reloaded for datasets that contain links to external data.

        Examples:
            ```
            # create/load a dataset
            ds = hub.dataset("path/to/dataset")

            # add a new creds key
            ds.add_creds("my_s3_key")

            # populate the creds
            ds.populate_creds("my_s3_key", {"aws_access_key_id": "my_access_key", "aws_secret_access_key": "my_secret_key"})
            ```

        """
        self.link_creds.populate_creds(creds_key, creds)

    def get_creds(self) -> List[str]:
        """Returns the list of creds keys added to the dataset. These are used to fetch external data in linked tensors"""
        return self.link_creds.creds_keys

    def visualize(
        self, width: Union[int, str, None] = None, height: Union[int, str, None] = None
    ):
        """
        Visualizes the dataset in the Jupyter notebook.

        Args:
            width: Union[int, str, None] Optional width of the visualizer canvas.
            height: Union[int, str, None] Optional height of the visualizer canvas.

        Raises:
            Exception: If a dataset is not hub cloud dataset and the visualization happens in colab.
        """
        from hub.visualizer import visualize

        hub_reporter.feature_report(feature_name="visualize", parameters={})
        if is_colab:
            raise Exception("Cannot visualize local dataset in Colab.")
        else:
            visualize(self.storage, width=width, height=height)

Subclasses

Instance variables

var branch

The current branch of the dataset

Expand source code
@property
def branch(self) -> str:
    """The current branch of the dataset"""
    return self.version_state["branch"]
var branches

Lists all the branches of the dataset.

Returns

List of branches.

Expand source code
@property
def branches(self):
    """Lists all the branches of the dataset.
    Returns:
        List of branches.
    """
    return list(self.version_state["branch_commit_map"])
var commit_id

The lasted committed commit_id of the dataset. If there are no commits, this returns None.

Expand source code
@property
def commit_id(self) -> Optional[str]:
    """The lasted committed commit_id of the dataset. If there are no commits, this returns None."""
    commit_node = self.version_state["commit_node"]
    if not commit_node.is_head_node:
        return commit_node.commit_id

    parent = commit_node.parent

    if parent is None:
        return None
    else:
        return parent.commit_id
var commits

Lists all the commits leading to the current dataset state.

Returns

List of dictionaries containing commit information.

Expand source code
@property
def commits(self) -> List[Dict]:
    """Lists all the commits leading to the current dataset state.
    Returns:
        List of dictionaries containing commit information.
    """
    commits = []
    commit_node = self.version_state["commit_node"]
    while commit_node:
        if not commit_node.is_head_node:
            commit_info = {
                "commit": commit_node.commit_id,
                "author": commit_node.commit_user_name,
                "time": str(commit_node.commit_time)[:-7],
                "message": commit_node.commit_message,
            }
            commits.append(commit_info)
        commit_node = commit_node.parent
    return commits
var groups

All sub groups in this group

Expand source code
@property
def groups(self) -> Dict[str, "Dataset"]:
    """All sub groups in this group"""
    return {g: self[g] for g in self._groups_filtered}
var has_head_changes

Returns True if currently at head node and uncommitted changes are present.

Expand source code
@property
def has_head_changes(self):
    """Returns True if currently at head node and uncommitted changes are present."""
    commit_node = self.version_state["commit_node"]
    return not commit_node.children and current_commit_has_change(
        self.version_state, self.storage
    )
var info

Returns the information about the dataset.

Expand source code
@property
def info(self):
    """Returns the information about the dataset."""
    if self._info is None:
        path = get_dataset_info_key(self.version_state["commit_id"])
        self.__dict__["_info"] = load_info(path, self)  # type: ignore
    return self._info
var meta

Returns the metadata of the dataset.

Expand source code
@property
def meta(self) -> DatasetMeta:
    """Returns the metadata of the dataset."""
    return self.version_state["meta"]
var parent

Returns the parent of this group. Returns None if this is the root dataset.

Expand source code
@property
def parent(self):
    """Returns the parent of this group. Returns None if this is the root dataset."""
    if self._is_root():
        return None
    autoflush = self.storage.autoflush
    ds = self.__class__(
        storage=self.storage,
        index=self.index,
        group_index=posixpath.dirname(self.group_index),
        read_only=self.read_only,
        public=self.public,
        token=self._token,
        verbose=self.verbose,
        version_state=self.version_state,
        path=self.path,
        link_creds=self.link_creds,
    )
    self.storage.autoflush = autoflush
    return ds
var pending_commit_id

The commit_id of the next commit that will be made to the dataset. If you're not at the head of the current branch, this will be the same as the commit_id.

Expand source code
@property
def pending_commit_id(self) -> str:
    """The commit_id of the next commit that will be made to the dataset.
    If you're not at the head of the current branch, this will be the same as the commit_id.
    """
    return self.version_state["commit_id"]
var root

Returns the root dataset of a group.

Expand source code
@property
def root(self):
    """Returns the root dataset of a group."""
    if self._is_root():
        return self
    autoflush = self.storage.autoflush
    ds = self.__class__(
        storage=self.storage,
        index=self.index,
        group_index="",
        read_only=self.read_only,
        public=self.public,
        token=self._token,
        verbose=self.verbose,
        version_state=self.version_state,
        path=self.path,
        link_creds=self.link_creds,
    )
    self.storage.autoflush = autoflush
    return ds
var tensors

All tensors belonging to this group, including those within sub groups. Always returns the sliced tensors.

Expand source code
@property
def tensors(self) -> Dict[str, Tensor]:
    """All tensors belonging to this group, including those within sub groups. Always returns the sliced tensors."""
    return self._tensors(include_hidden=False)

Methods

def add_creds(self, creds_key)

Adds a new creds key to the dataset. These keys are used for tensors that are linked to external data.

Examples

# create/load a dataset
ds = hub.dataset("path/to/dataset")

# add a new creds key
ds.add_creds("my_s3_key")

Args

creds_key : str
The key to be added.
Expand source code
def add_creds(self, creds_key: str):
    """Adds a new creds key to the dataset. These keys are used for tensors that are linked to external data.

    Examples:
        ```
        # create/load a dataset
        ds = hub.dataset("path/to/dataset")

        # add a new creds key
        ds.add_creds("my_s3_key")
        ```

    Args:
        creds_key (str): The key to be added.
    """
    self.link_creds.add_creds(creds_key)
    save_link_creds(self.link_creds, self.storage)
def append(self, sample, skip_ok=False)

Append samples to mutliple tensors at once. This method expects all tensors being updated to be of the same length.

Args

sample : dict
Dictionary with tensor names as keys and samples as values.
skip_ok : bool
Skip tensors not in sample if set to True.

Raises

KeyError
If any tensor in the dataset is not a key in sample and skip_ok is False.
TensorDoesNotExistError
If tensor in sample does not exist.
ValueError
If all tensors being updated are not of the same length.
NotImplementedError
If an error occurs while writing tiles.
Exception
Error while attempting to rollback appends.
Expand source code
def append(self, sample: Dict[str, Any], skip_ok: bool = False):
    """Append samples to mutliple tensors at once. This method expects all tensors being updated to be of the same length.
    Args:
        sample (dict): Dictionary with tensor names as keys and samples as values.
        skip_ok (bool): Skip tensors not in `sample` if set to True.
    Raises:
        KeyError: If any tensor in the dataset is not a key in `sample` and `skip_ok` is False.
        TensorDoesNotExistError: If tensor in `sample` does not exist.
        ValueError: If all tensors being updated are not of the same length.
        NotImplementedError: If an error occurs while writing tiles.
        Exception: Error while attempting to rollback appends.
    """
    if isinstance(sample, Dataset):
        sample = sample.tensors
    if not skip_ok:
        for k in self.tensors:
            if k not in sample:
                raise KeyError(
                    f"Required tensor not provided: {k}. Use ds.append(sample, skip_ok=True) to skip tensors."
                )
    for k in sample:
        if k not in self._tensors():
            raise TensorDoesNotExistError(k)
    if len(set(map(len, (self[k] for k in sample)))) != 1:
        raise ValueError(
            "When appending using Dataset.append, all tensors are expected to have the same length."
        )
    tensors_appended = []
    with self:
        for k, v in sample.items():
            try:
                tensor = self[k]
                enc = tensor.chunk_engine.chunk_id_encoder
                num_chunks = enc.num_chunks
                tensor.append(v)
                tensors_appended.append(k)
            except Exception as e:
                new_num_chunks = enc.num_chunks
                num_chunks_added = new_num_chunks - num_chunks
                if num_chunks_added > 1:
                    # This is unlikely to happen, i.e the sample passed the validation
                    # steps and tiling but some error occured while writing tiles to chunks
                    raise NotImplementedError(
                        "Unable to recover from error while writing tiles."
                    ) from e
                elif num_chunks_added == 1:
                    enc._encoded = enc._encoded[:-1]
                for k in tensors_appended:
                    try:
                        self[k]._pop()
                    except Exception as e2:
                        raise Exception(
                            "Error while attepting to rollback appends"
                        ) from e2
                raise e
def checkout(self, address, create=False)

Checks out to a specific commit_id or branch. If create = True, creates a new branch with name as address.

Note

Checkout from a head node in any branch that contains uncommitted data will lead to an auto commit before the checkout.

Args

address : str
The commit_id or branch to checkout to.
create : bool
If True, creates a new branch with name as address.

Returns

str
The commit_id of the dataset after checkout.

Raises

Exception
If dataset is a filtered view.
Expand source code
def checkout(self, address: str, create: bool = False) -> Optional[str]:
    """Checks out to a specific commit_id or branch. If `create = True`, creates a new branch with name as address.

    Note:
        Checkout from a head node in any branch that contains uncommitted data will lead to an auto commit before the checkout.

    Args:
        address (str): The commit_id or branch to checkout to.
        create (bool): If True, creates a new branch with name as address.

    Returns:
        str: The commit_id of the dataset after checkout.

    Raises:
        Exception: If dataset is a filtered view.
    """
    return self._checkout(address, create)
def commit(self, message=None, allow_empty=False)

Stores a snapshot of the current state of the dataset.

Note

  • Commiting from a non-head node in any branch, will lead to an auto checkout to a new branch.
  • This same behaviour will happen if new samples are added or existing samples are updated from a non-head node.

Args

message : str, Optional
Used to describe the commit.
allow_empty : bool
If True, commit even if there are no changes

Returns

str
the commit id of the stored commit that can be used to access the snapshot.

Raises

Exception
if dataset is a filtered view.
EmptyCommitError
if there are no changes and user does not forced to commit unchanged data
Expand source code
def commit(self, message: Optional[str] = None, allow_empty=False) -> str:
    """Stores a snapshot of the current state of the dataset.

    Note:
        - Commiting from a non-head node in any branch, will lead to an auto checkout to a new branch.
        - This same behaviour will happen if new samples are added or existing samples are updated from a non-head node.

    Args:
        message (str, Optional): Used to describe the commit.
        allow_empty (bool): If True, commit even if there are no changes

    Returns:
        str: the commit id of the stored commit that can be used to access the snapshot.

    Raises:
        Exception: if dataset is a filtered view.
        EmptyCommitError: if there are no changes and user does not forced to commit unchanged data
    """
    if not allow_empty and not self.has_head_changes:
        raise EmptyCommitError(
            "There are no changes, commit is not done. Try again with allow_empty=True."
        )

    return self._commit(message)
def copy(self, dest, overwrite=False, creds=None, token=None, num_workers=0, scheduler='threaded', progressbar=True, public=False)

Copies this dataset or dataset view to dest. Version control history is not included.

Args

dest : str
Destination path to copy to.
overwrite : bool
If True and a dataset exists at destination, it will be overwritten. Defaults to False.
creds : dict, Optional
creds required to create / overwrite datasets at dest.
token : str, Optional
token used to for fetching credentials to dest.
num_workers : int
The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
scheduler : str
The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.
progressbar : bool
Displays a progress bar if True (default).
public : bool
Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.

Returns

Dataset
New dataset object.

Raises

DatasetHandlerError
If a dataset already exists at destination path and overwrite is False.
Expand source code
def copy(
    self,
    dest: str,
    overwrite: bool = False,
    creds=None,
    token=None,
    num_workers: int = 0,
    scheduler="threaded",
    progressbar=True,
    public: bool = False,
):
    """Copies this dataset or dataset view to `dest`. Version control history is not included.

    Args:
        dest (str): Destination path to copy to.
        overwrite (bool): If True and a dataset exists at `destination`, it will be overwritten. Defaults to False.
        creds (dict, Optional): creds required to create / overwrite datasets at `dest`.
        token (str, Optional): token used to for fetching credentials to `dest`.
        num_workers (int): The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
        scheduler (str): The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'.
            Defaults to 'threaded'.
        progressbar (bool): Displays a progress bar if True (default).
        public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.

    Returns:
        Dataset: New dataset object.

    Raises:
        DatasetHandlerError: If a dataset already exists at destination path and overwrite is False.
    """
    report_params = {
        "Overwrite": overwrite,
        "Num_Workers": num_workers,
        "Scheduler": scheduler,
        "Progressbar": progressbar,
        "Public": public,
    }
    if dest.startswith("hub://"):
        report_params["Dest"] = dest
    feature_report_path(self.path, "copy", report_params)
    dest_ds = hub.like(
        dest,
        self,
        creds=creds,
        token=token,
        overwrite=overwrite,
        public=public,
    )
    with dest_ds:
        dest_ds.info.update(self.info)
    for tensor in self.tensors:
        if progressbar:
            sys.stderr.write(f"Copying tensor: {tensor}.\n")
        hub.compute(_copy_tensor, name="tensor copy transform")(
            tensor_name=tensor
        ).eval(
            self,
            dest_ds,
            num_workers=num_workers,
            scheduler=scheduler,
            progressbar=progressbar,
            skip_ok=True,
            check_lengths=False,
        )
    return dest_ds
def create_group(self, name)

Creates a tensor group. Intermediate groups in the path are also created.

Examples

ds.create_group("images")
ds['images'].create_tensor("cats")
ds.create_groups("images/jpg/cats")
ds["images"].create_tensor("png")
ds["images/jpg"].create_group("dogs")
Expand source code
@hub_reporter.record_call
def create_group(self, name: str) -> "Dataset":
    """Creates a tensor group. Intermediate groups in the path are also created.

    Examples:

        ```
        ds.create_group("images")
        ds['images'].create_tensor("cats")
        ```

            ds.create_groups("images/jpg/cats")
            ds["images"].create_tensor("png")
            ds["images/jpg"].create_group("dogs")
    """
    if not self._is_root():
        return self.root.create_group(posixpath.join(self.group_index, name))
    name = filter_name(name)
    if name in self._groups:
        raise TensorGroupAlreadyExistsError(name)
    return self._create_group(name)
def create_tensor(self, name, htype='unspecified', dtype='unspecified', sample_compression='unspecified', chunk_compression='unspecified', hidden=False, create_sample_info_tensor=True, create_shape_tensor=True, create_id_tensor=True, verify=False, **kwargs)

Creates a new tensor in the dataset.

Examples

# create dataset
ds = hub.dataset("path/to/dataset")

# create tensors
ds.create_tensor("images", htype="image", sample_compression="jpg")
ds.create_tensor("videos", htype="video", sample_compression="mp4")
ds.create_tensor("data")

# append data
ds.images.append(np.ones((400, 400, 3), dtype='uint8'))
ds.videos.append(hub.read("videos/sample_video.mp4"))
ds.data.append(np.zeros((100, 100, 2)))

Args

name : str
The name of the tensor to be created.
htype : str
The class of data for the tensor. The defaults for other parameters are determined in terms of this value. For example, htype="image" would have dtype default to uint8. These defaults can be overridden by explicitly passing any of the other parameters to this function. May also modify the defaults for other parameters.
dtype : str
Optionally override this tensor's dtype. All subsequent samples are required to have this dtype.
sample_compression : str
All samples will be compressed in the provided format. If None, samples are uncompressed.
chunk_compression : str
All chunks will be compressed in the provided format. If None, chunks are uncompressed.
**kwargs
hub.htype defaults can be overridden by passing any of the compatible parameters. To see all hub.htypes and their correspondent arguments, check out hub/htypes.py.
hidden : bool
If True, the tensor will be hidden from ds.tensors but can still be accessed via ds[tensor_name]
create_sample_info_tensor : bool
If True, meta data of individual samples will be stored in a hidden tensor. This data can be accessed via Tensor[i].sample_info.
create_shape_tensor : bool
If True, an associated tensor containing shapes of each sample will be created.
create_id_tensor : bool
If True, an associated tensor containing unique ids for each sample will be created. This is useful for merge operations.
verify : bool
Valid only for link htypes. If True, all links will be verified before they are added to the tensor.

Returns

The new tensor, which can also be accessed by self[name].

Raises

TensorAlreadyExistsError
Duplicate tensors are not allowed.
TensorGroupAlreadyExistsError
Duplicate tensor groups are not allowed.
InvalidTensorNameError
If name is in dataset attributes.
NotImplementedError
If trying to override chunk_compression.
TensorMetaInvalidHtype
If invalid htype is specified.
ValueError
If an illegal argument is specified.
Expand source code
@invalid_view_op
@hub_reporter.record_call
def create_tensor(
    self,
    name: str,
    htype: str = UNSPECIFIED,
    dtype: Union[str, np.dtype] = UNSPECIFIED,
    sample_compression: str = UNSPECIFIED,
    chunk_compression: str = UNSPECIFIED,
    hidden: bool = False,
    create_sample_info_tensor: bool = True,
    create_shape_tensor: bool = True,
    create_id_tensor: bool = True,
    verify: bool = False,
    **kwargs,
):
    """Creates a new tensor in the dataset.

    Examples:
        ```
        # create dataset
        ds = hub.dataset("path/to/dataset")

        # create tensors
        ds.create_tensor("images", htype="image", sample_compression="jpg")
        ds.create_tensor("videos", htype="video", sample_compression="mp4")
        ds.create_tensor("data")

        # append data
        ds.images.append(np.ones((400, 400, 3), dtype='uint8'))
        ds.videos.append(hub.read("videos/sample_video.mp4"))
        ds.data.append(np.zeros((100, 100, 2)))
        ```

    Args:
        name (str): The name of the tensor to be created.
        htype (str): The class of data for the tensor.
            The defaults for other parameters are determined in terms of this value.
            For example, `htype="image"` would have `dtype` default to `uint8`.
            These defaults can be overridden by explicitly passing any of the other parameters to this function.
            May also modify the defaults for other parameters.
        dtype (str): Optionally override this tensor's `dtype`. All subsequent samples are required to have this `dtype`.
        sample_compression (str): All samples will be compressed in the provided format. If `None`, samples are uncompressed.
        chunk_compression (str): All chunks will be compressed in the provided format. If `None`, chunks are uncompressed.
        **kwargs: `htype` defaults can be overridden by passing any of the compatible parameters.
            To see all `htype`s and their correspondent arguments, check out `hub/htypes.py`.
        hidden (bool): If True, the tensor will be hidden from ds.tensors but can still be accessed via ds[tensor_name]
        create_sample_info_tensor (bool): If True, meta data of individual samples will be stored in a hidden tensor. This data can be accessed via `tensor[i].sample_info`.
        create_shape_tensor (bool): If True, an associated tensor containing shapes of each sample will be created.
        create_id_tensor (bool): If True, an associated tensor containing unique ids for each sample will be created.
            This is useful for merge operations.
        verify (bool): Valid only for link htypes. If True, all links will be verified before they are added to the tensor.

    Returns:
        The new tensor, which can also be accessed by `self[name]`.

    Raises:
        TensorAlreadyExistsError: Duplicate tensors are not allowed.
        TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed.
        InvalidTensorNameError: If `name` is in dataset attributes.
        NotImplementedError: If trying to override `chunk_compression`.
        TensorMetaInvalidHtype: If invalid htype is specified.
        ValueError: If an illegal argument is specified.
    """
    # if not the head node, checkout to an auto branch that is newly created
    auto_checkout(self)

    name = filter_name(name, self.group_index)
    key = self.version_state["tensor_names"].get(name)
    if key:
        raise TensorAlreadyExistsError(name)
    else:
        if name in self.version_state["full_tensors"]:
            key = f"{name}_{uuid.uuid4().hex[:4]}"
        else:
            key = name

    if name in self._groups:
        raise TensorGroupAlreadyExistsError(name)

    tensor_name = posixpath.split(name)[1]
    if not tensor_name or tensor_name in dir(self):
        raise InvalidTensorNameError(tensor_name)

    is_sequence, is_link, htype = parse_complex_htype(htype)
    kwargs["is_sequence"] = kwargs.get("is_sequence") or is_sequence
    kwargs["is_link"] = kwargs.get("is_link") or is_link
    kwargs["verify"] = verify
    if is_link and (
        sample_compression != UNSPECIFIED or chunk_compression != UNSPECIFIED
    ):
        warnings.warn(
            "Chunk_compression and sample_compression aren't valid for tensors with linked data. Ignoring these arguments."
        )
        sample_compression = UNSPECIFIED
        chunk_compression = UNSPECIFIED

    if not self._is_root():
        return self.root.create_tensor(
            key, htype, dtype, sample_compression, chunk_compression, **kwargs
        )

    if "/" in name:
        self._create_group(posixpath.split(name)[0])

    # Seperate meta and info

    htype_config = HTYPE_CONFIGURATIONS.get(htype, {})
    info_keys = htype_config.copy().pop("_info", [])
    info_kwargs = {}
    meta_kwargs = {}
    for k, v in kwargs.items():
        if k in info_keys:
            verify_htype_key_value(htype, k, v)
            info_kwargs[k] = v
        else:
            meta_kwargs[k] = v

    # Set defaults
    for k in info_keys:
        if k not in info_kwargs:
            info_kwargs[k] = htype_config[k]

    create_tensor(
        key,
        self.storage,
        htype=htype,
        dtype=dtype,
        sample_compression=sample_compression,
        chunk_compression=chunk_compression,
        version_state=self.version_state,
        hidden=hidden,
        **meta_kwargs,
    )
    meta: DatasetMeta = self.meta
    ffw_dataset_meta(meta)
    meta.add_tensor(name, key, hidden=hidden)
    tensor = Tensor(key, self)  # type: ignore
    tensor.meta.name = name
    self.version_state["full_tensors"][key] = tensor
    self.version_state["tensor_names"][name] = key
    if info_kwargs:
        tensor.info.update(info_kwargs)
    self.storage.maybe_flush()
    if create_sample_info_tensor and htype in ("image", "audio", "video", "dicom"):
        self._create_sample_info_tensor(name)
    if create_shape_tensor and htype not in ("text", "json"):
        self._create_sample_shape_tensor(name, htype=htype)
    if create_id_tensor:
        self._create_sample_id_tensor(name)
    return tensor
def create_tensor_like(self, name, source)

Copies the source tensor's meta information and creates a new tensor with it. No samples are copied, only the meta/info for the tensor is.

Examples

ds.create_tensor_like("cats", ds["images"])

Args

name : str
Name for the new tensor.
source : Tensor
Tensor who's meta/info will be copied. May or may not be contained in the same dataset.

Returns

Tensor
New Tensor object.
Expand source code
@invalid_view_op
@hub_reporter.record_call
def create_tensor_like(self, name: str, source: "Tensor") -> "Tensor":
    """Copies the `source` tensor's meta information and creates a new tensor with it. No samples are copied, only the meta/info for the tensor is.

    Examples:
        ```
        ds.create_tensor_like("cats", ds["images"])
        ```

    Args:
        name (str): Name for the new tensor.
        source (Tensor): Tensor who's meta/info will be copied. May or may not be contained in the same dataset.

    Returns:
        Tensor: New Tensor object.
    """

    info = source.info.__getstate__().copy()
    meta = source.meta.__getstate__().copy()
    del meta["min_shape"]
    del meta["max_shape"]
    del meta["length"]
    del meta["version"]
    del meta["name"]

    destination_tensor = self.create_tensor(name, **meta)
    destination_tensor.info.update(info)
    return destination_tensor
def delete(self, large_ok=False)

Deletes the entire dataset from the cache layers (if any) and the underlying storage. This is an IRREVERSIBLE operation. Data once deleted can not be recovered.

Args

large_ok : bool
Delete datasets larger than 1GB. Disabled by default.
Expand source code
@invalid_view_op
@hub_reporter.record_call
def delete(self, large_ok=False):
    """Deletes the entire dataset from the cache layers (if any) and the underlying storage.
    This is an IRREVERSIBLE operation. Data once deleted can not be recovered.

    Args:
        large_ok (bool): Delete datasets larger than 1GB. Disabled by default.
    """

    if not large_ok:
        size = self.size_approx()
        if size > hub.constants.DELETE_SAFETY_SIZE:
            logger.info(
                f"Hub Dataset {self.path} was too large to delete. Try again with large_ok=True."
            )
            return

    self._unlock()
    self.storage.clear()
def delete_group(self, name, large_ok=False)

Delete a tensor group from the dataset.

Examples

ds.delete_group("images/dogs")

Args

name : str
The name of tensor group to be deleted.
large_ok : bool
Delete tensor groups larger than 1GB. Disabled by default.

Returns

None

Raises

TensorGroupDoesNotExistError
If tensor group of name name does not exist in the dataset.
Expand source code
@invalid_view_op
@hub_reporter.record_call
def delete_group(self, name: str, large_ok: bool = False):
    """Delete a tensor group from the dataset.

    Examples:
        ```
        ds.delete_group("images/dogs")
        ```

    Args:
        name (str): The name of tensor group to be deleted.
        large_ok (bool): Delete tensor groups larger than 1GB. Disabled by default.

    Returns:
        None

    Raises:
        TensorGroupDoesNotExistError: If tensor group of name `name` does not exist in the dataset.
    """
    auto_checkout(self)

    full_path = filter_name(name, self.group_index)

    if full_path not in self._groups:
        raise TensorGroupDoesNotExistError(name)

    if not self._is_root():
        return self.root.delete_group(full_path, large_ok)

    if not large_ok:
        size_approx = self[name].size_approx()
        if size_approx > hub.constants.DELETE_SAFETY_SIZE:
            logger.info(
                f"Group {name} was too large to delete. Try again with large_ok=True."
            )
            return

    with self:
        meta = self.version_state["meta"]
        ffw_dataset_meta(meta)
        tensors = [
            posixpath.join(name, tensor)
            for tensor in self[name]._all_tensors_filtered(include_hidden=True)
        ]
        meta.delete_group(name)
        for tensor in tensors:
            key = self.version_state["tensor_names"][tensor]
            delete_tensor(key, self)
            self.version_state["tensor_names"].pop(tensor)
            self.version_state["full_tensors"].pop(key)

    self.storage.maybe_flush()
def delete_tensor(self, name, large_ok=False)

Delete a tensor from the dataset.

Examples

ds.delete_tensor("images/cats")

Args

name : str
The name of tensor to be deleted.
large_ok : bool
Delete tensors larger than 1GB. Disabled by default.

Returns

None

Raises

TensorDoesNotExistError
If tensor of name name does not exist in the dataset.
Expand source code
@invalid_view_op
@hub_reporter.record_call
def delete_tensor(self, name: str, large_ok: bool = False):
    """Delete a tensor from the dataset.

    Examples:
        ```
        ds.delete_tensor("images/cats")
        ```

    Args:
        name (str): The name of tensor to be deleted.
        large_ok (bool): Delete tensors larger than 1GB. Disabled by default.

    Returns:
        None

    Raises:
        TensorDoesNotExistError: If tensor of name `name` does not exist in the dataset.
    """
    auto_checkout(self)

    name = filter_name(name, self.group_index)
    key = self.version_state["tensor_names"].get(name)

    if not key:
        raise TensorDoesNotExistError(name)

    if not tensor_exists(key, self.storage, self.version_state["commit_id"]):
        raise TensorDoesNotExistError(name)

    if not self._is_root():
        return self.root.delete_tensor(name, large_ok)

    if not large_ok:
        chunk_engine = self.version_state["full_tensors"][key].chunk_engine
        size_approx = chunk_engine.num_samples * chunk_engine.min_chunk_size
        if size_approx > hub.constants.DELETE_SAFETY_SIZE:
            logger.info(
                f"Tensor {name} was too large to delete. Try again with large_ok=True."
            )
            return

    with self:
        meta = self.meta
        key = self.version_state["tensor_names"].pop(name)
        if key not in meta.hidden_tensors:
            tensor_diff = Tensor(key, self).chunk_engine.commit_diff
            # if tensor was created in this commit, there's no diff for deleting it.
            if not tensor_diff.created:
                self._dataset_diff.tensor_deleted(name)
        delete_tensor(key, self)
        self.version_state["full_tensors"].pop(key)
        ffw_dataset_meta(meta)
        meta.delete_tensor(name)
        self.version_state["meta"] = meta

    for t_name in [
        func(name)
        for func in (
            get_sample_id_tensor_key,
            get_sample_info_tensor_key,
            get_sample_shape_tensor_key,
        )
    ]:
        t_key = self.meta.tensor_names.get(t_name)
        if t_key and tensor_exists(
            t_key, self.storage, self.version_state["commit_id"]
        ):
            self.delete_tensor(t_name)

    self.storage.maybe_flush()
def diff(self, id_1=None, id_2=None, as_dict=False)

Returns/displays the differences between commits/branches.

For each tensor this contains information about the sample indexes that were added/modified as well as whether the tensor was created.

Args

id_1 : str, Optional
The first commit_id or branch name.
id_2 : str, Optional
The second commit_id or branch name.
as_dict : bool, Optional
If True, returns a dictionary of the differences instead of printing them. This dictionary will have two keys - "tensor" and "dataset" which represents tensor level and dataset level changes, respectively. Defaults to False.

Note

  • If both id_1 and id_2 are None, the differences between the current state and the previous commit will be calculated. If you're at the head of the branch, this will show the uncommitted changes, if any.
  • If only id_1 is provided, the differences between the current state and id_1 will be calculated. If you're at the head of the branch, this will take into account the uncommitted changes, if any.
  • If only id_2 is provided, a ValueError will be raised.
  • If both id_1 and id_2 are provided, the differences between id_1 and id_2 will be calculated.

Returns

Dict

The differences between the commits/branches if as_dict is True.

  • If id_1 and id_2 are None, a dictionary containing the differences between the current state and the previous commit will be returned.
  • If only id_1 is provided, a dictionary containing the differences in the current state and id_1 respectively will be returned.
  • If only id_2 is provided, a ValueError will be raised.
  • If both id_1 and id_2 are provided, a dictionary containing the differences in id_1 and id_2 respectively will be returned.
None
If as_dict is False.

Example of a dict returned:

{
    "image": {"data_added": [3, 6], "data_updated": {0, 2}, "created": False, "info_updated": False, "data_transformed_in_place": False},
    "label": {"data_added": [0, 3], "data_updated": {}, "created": True, "info_updated": False, "data_transformed_in_place": False},
    "other/stuff" : {data_added: [3, 3], data_updated: {1, 2}, created: True, "info_updated": False, "data_transformed_in_place": False},
}

Here, 'data_added' is a range of sample indexes that were added to the tensor:

  • For example [3, 6] means that sample 3, 4 and 5 were added.
  • Another example [3, 3] means that no samples were added as the range is empty

'data_updated' is a set of sample indexes that were updated.

  • For example {0, 2} means that sample 0 and 2 were updated.

'created' is a boolean that is True if the tensor was created.

'info_updated' is a boolean that is True if the info of the tensor was updated.

'data_transformed_in_place' is a boolean that is True if the data of the tensor was transformed in place.

Raises

ValueError
If id_1 is None and id_2 is not None.
Expand source code
@hub_reporter.record_call
def diff(
    self, id_1: Optional[str] = None, id_2: Optional[str] = None, as_dict=False
) -> Optional[Dict]:
    """Returns/displays the differences between commits/branches.

    For each tensor this contains information about the sample indexes that were added/modified as well as whether the tensor was created.

    Args:
        id_1 (str, Optional): The first commit_id or branch name.
        id_2 (str, Optional): The second commit_id or branch name.
        as_dict (bool, Optional): If True, returns a dictionary of the differences instead of printing them.
            This dictionary will have two keys - "tensor" and "dataset" which represents tensor level and dataset level changes, respectively.
            Defaults to False.

    Note:
        - If both `id_1` and `id_2` are None, the differences between the current state and the previous commit will be calculated. If you're at the head of the branch, this will show the uncommitted changes, if any.
        - If only `id_1` is provided, the differences between the current state and id_1 will be calculated. If you're at the head of the branch, this will take into account the uncommitted changes, if any.
        - If only `id_2` is provided, a ValueError will be raised.
        - If both `id_1` and `id_2` are provided, the differences between `id_1` and `id_2` will be calculated.

    Returns:
        Dict: The differences between the commits/branches if as_dict is True.

            - If `id_1` and `id_2` are None, a dictionary containing the differences between the current state and the previous commit will be returned.
            - If only `id_1` is provided, a dictionary containing the differences in the current state and `id_1` respectively will be returned.
            - If only `id_2` is provided, a ValueError will be raised.
            - If both `id_1` and `id_2` are provided, a dictionary containing the differences in `id_1` and `id_2` respectively will be returned.
        None: If as_dict is False.

        Example of a dict returned:
        ```
        {
            "image": {"data_added": [3, 6], "data_updated": {0, 2}, "created": False, "info_updated": False, "data_transformed_in_place": False},
            "label": {"data_added": [0, 3], "data_updated": {}, "created": True, "info_updated": False, "data_transformed_in_place": False},
            "other/stuff" : {data_added: [3, 3], data_updated: {1, 2}, created: True, "info_updated": False, "data_transformed_in_place": False},
        }
        ```

        Here, 'data_added' is a range of sample indexes that were added to the tensor:

        - For example [3, 6] means that sample 3, 4 and 5 were added.
        - Another example [3, 3] means that no samples were added as the range is empty

        'data_updated' is a set of sample indexes that were updated.

        - For example {0, 2} means that sample 0 and 2 were updated.

        'created' is a boolean that is True if the tensor was created.

        'info_updated' is a boolean that is True if the info of the tensor was updated.

        'data_transformed_in_place' is a boolean that is True if the data of the tensor was transformed in place.


    Raises:
        ValueError: If `id_1` is None and `id_2` is not None.
    """
    version_state, storage = self.version_state, self.storage
    res = get_changes_and_messages(version_state, storage, id_1, id_2)
    if as_dict:
        dataset_changes_1 = res[0]
        dataset_changes_2 = res[1]
        tensor_changes_1 = res[2]
        tensor_changes_2 = res[3]
        changes = {}
        if id_1 is None and id_2 is None:
            changes["dataset"] = dataset_changes_1
            changes["tensor"] = tensor_changes_1
            return changes
        changes["dataset"] = dataset_changes_1, dataset_changes_2
        changes["tensor"] = tensor_changes_1, tensor_changes_2
        return changes
    all_changes = get_all_changes_string(*res)
    print(all_changes)
    return None
def extend(self, samples, skip_ok=False)

Appends multiple rows of samples to mutliple tensors at once. This method expects all tensors being updated to be of the same length.

Args

samples : Dict[str, Any]
Dictionary with tensor names as keys and samples as values.
skip_ok : bool
Skip tensors not in samples if set to True.

Raises

KeyError
If any tensor in the dataset is not a key in samples and skip_ok is False.
TensorDoesNotExistError
If tensor in samples does not exist.
ValueError
If all tensors being updated are not of the same length.
NotImplementedError
If an error occurs while writing tiles.
Exception
Error while attempting to rollback appends.
Expand source code
def extend(self, samples: Dict[str, Any], skip_ok: bool = False):
    """Appends multiple rows of samples to mutliple tensors at once. This method expects all tensors being updated to be of the same length.
    Args:
        samples (Dict[str, Any]): Dictionary with tensor names as keys and samples as values.
        skip_ok (bool): Skip tensors not in `samples` if set to True.
    Raises:
        KeyError: If any tensor in the dataset is not a key in `samples` and `skip_ok` is False.
        TensorDoesNotExistError: If tensor in `samples` does not exist.
        ValueError: If all tensors being updated are not of the same length.
        NotImplementedError: If an error occurs while writing tiles.
        Exception: Error while attempting to rollback appends.
    """
    if isinstance(samples, Dataset):
        samples = samples.tensors
    if not samples:
        return
    n = len(samples[next(iter(samples.keys()))])
    for v in samples.values():
        if len(v) != n:
            sizes = {k: len(v) for (k, v) in samples.items()}
            raise ValueError(
                f"Incoming samples are not of equal lengths. Incoming sample sizes: {sizes}"
            )
    for i in range(n):
        self.append({k: v[i] for k, v in samples.items()})
def filter(self, function, num_workers=0, scheduler='threaded', progressbar=True, store_result=False, result_path=None, result_ds_args=None)

Filters the dataset in accordance of filter function f(x: sample) -> bool

Args

function : Callable, str
Filter function that takes sample as argument and returns True/False if sample should be included in result. Also supports simplified expression evaluations. See DatasetQuery for more details.
num_workers : int
Level of parallelization of filter evaluations. 0 indicates in-place for-loop evaluation, multiprocessing is used otherwise.
scheduler : str
Scheduler to use for multiprocessing evaluation. threaded is default
progressbar : bool
Display progress bar while filtering. True is default
store_result : bool
If True, result of the filter will be saved to a dataset asynchronously.
result_path : Optional, str
Path to save the filter result. Only applicable if store_result is True.
result_ds_args : Optional, dict
Additional args for result dataset. Only applicable if store_result is True.

Returns

View on Dataset with elements, that satisfy filter function

Example

Following filters are identical and return dataset view where all the samples have label equals to 2.

>>> dataset.filter(lambda sample: sample.labels.numpy() == 2)
>>> dataset.filter('labels == 2')
Expand source code
@hub_reporter.record_call
def filter(
    self,
    function: Union[Callable, str],
    num_workers: int = 0,
    scheduler: str = "threaded",
    progressbar: bool = True,
    store_result: bool = False,
    result_path: Optional[str] = None,
    result_ds_args: Optional[dict] = None,
):
    """Filters the dataset in accordance of filter function `f(x: sample) -> bool`

    Args:
        function (Callable, str): Filter function that takes sample as argument and returns True/False
            if sample should be included in result. Also supports simplified expression evaluations.
            See `hub.core.query.query.DatasetQuery` for more details.
        num_workers (int): Level of parallelization of filter evaluations.
            `0` indicates in-place for-loop evaluation, multiprocessing is used otherwise.
        scheduler (str): Scheduler to use for multiprocessing evaluation.
            `threaded` is default
        progressbar (bool): Display progress bar while filtering. True is default
        store_result (bool): If True, result of the filter will be saved to a dataset asynchronously.
        result_path (Optional, str): Path to save the filter result. Only applicable if `store_result` is True.
        result_ds_args (Optional, dict): Additional args for result dataset. Only applicable if `store_result` is True.

    Returns:
        View on Dataset with elements, that satisfy filter function


    Example:
        Following filters are identical and return dataset view where all the samples have label equals to 2.
        >>> dataset.filter(lambda sample: sample.labels.numpy() == 2)
        >>> dataset.filter('labels == 2')
    """
    from hub.core.query import filter_dataset, query_dataset
    from hub.core.query import DatasetQuery

    fn = query_dataset if isinstance(function, str) else filter_dataset
    return fn(
        self,
        function,
        num_workers=num_workers,
        scheduler=scheduler,
        progressbar=progressbar,
        store_result=store_result,
        result_path=result_path,
        result_ds_args=result_ds_args,
    )
def get_commit_details(self, commit_id)
Expand source code
def get_commit_details(self, commit_id) -> Dict:
    commit_node: CommitNode = self.version_state["commit_node_map"].get(commit_id)
    if commit_node is None:
        raise KeyError(f"Commit {commit_id} not found in dataset.")
    return {
        "commit": commit_node.commit_id,
        "author": commit_node.commit_user_name,
        "time": str(commit_node.commit_time)[:-7],
        "message": commit_node.commit_message,
    }
def get_creds(self)

Returns the list of creds keys added to the dataset. These are used to fetch external data in linked tensors

Expand source code
def get_creds(self) -> List[str]:
    """Returns the list of creds keys added to the dataset. These are used to fetch external data in linked tensors"""
    return self.link_creds.creds_keys
def log(self)

Displays the details of all the past commits.

Expand source code
@hub_reporter.record_call
def log(self):
    """Displays the details of all the past commits."""
    commit_node = self.version_state["commit_node"]
    print("---------------\nHub Version Log\n---------------\n")
    print(f"Current Branch: {self.version_state['branch']}")
    if self.has_head_changes:
        print("** There are uncommitted changes on this branch.")
    print()
    while commit_node:
        if not commit_node.is_head_node:
            print(f"{commit_node}\n")
        commit_node = commit_node.parent
def merge(self, target_id, conflict_resolution=None, delete_removed_tensors=False, force=False)

Merges the target_id into the current dataset.

Args

target_id : str
The commit_id or branch to merge.
conflict_resolution : str, Optional

The strategy to use to resolve merge conflicts.

  • Conflicts are scenarios where both the current dataset and the target id have made changes to the same sample/s since their common ancestor.
  • Must be one of the following
    • None - this is the default value, will raise an exception if there are conflicts.
    • "ours" - during conflicts, values from the current dataset will be used.
    • "theirs" - during conflicts, values from target id will be used.
delete_removed_tensors : bool
If true, deleted tensors will be deleted from the dataset.
force : bool

Forces merge.

  • force = True will have these effects in the following cases of merge conflicts:
    • If tensor is renamed on target but is missing from HEAD, renamed tensor will be registered as a new tensor on current branch.
    • If tensor is renamed on both target and current branch, tensor on target will be registered as a new tensor on current branch.
    • If tensor is renamed on target and a new tensor of the new name was created on the current branch, they will be merged.

Raises

Exception
if dataset is a filtered view.
ValueError
if the conflict resolution strategy is not one of the None, "ours", or "theirs".
Expand source code
@hub_reporter.record_call
def merge(
    self,
    target_id: str,
    conflict_resolution: Optional[str] = None,
    delete_removed_tensors: bool = False,
    force: bool = False,
):
    """Merges the target_id into the current dataset.

    Args:
        target_id (str): The commit_id or branch to merge.
        conflict_resolution (str, Optional): The strategy to use to resolve merge conflicts.
            -
            - Conflicts are scenarios where both the current dataset and the target id have made changes to the same sample/s since their common ancestor.
            - Must be one of the following
                - None - this is the default value, will raise an exception if there are conflicts.
                - "ours" - during conflicts, values from the current dataset will be used.
                - "theirs" - during conflicts, values from target id will be used.
        delete_removed_tensors (bool): If true, deleted tensors will be deleted from the dataset.
        force (bool): Forces merge.
            -
            - `force` = True will have these effects in the following cases of merge conflicts:
                - If tensor is renamed on target but is missing from HEAD, renamed tensor will be registered as a new tensor on current branch.
                - If tensor is renamed on both target and current branch, tensor on target will be registered as a new tensor on current branch.
                - If tensor is renamed on target and a new tensor of the new name was created on the current branch, they will be merged.

    Raises:
        Exception: if dataset is a filtered view.
        ValueError: if the conflict resolution strategy is not one of the None, "ours", or "theirs".
    """
    if self._is_filtered_view:
        raise Exception(
            "Cannot perform version control operations on a filtered dataset view."
        )

    if conflict_resolution not in [None, "ours", "theirs"]:
        raise ValueError(
            f"conflict_resolution must be one of None, 'ours', or 'theirs'. Got {conflict_resolution}"
        )

    try_flushing(self)

    self._initial_autoflush.append(self.storage.autoflush)
    self.storage.autoflush = False

    merge(self, target_id, conflict_resolution, delete_removed_tensors, force)

    self.storage.autoflush = self._initial_autoflush.pop()
def populate_creds(self, creds_key, creds)

Populates the creds key added in add_creds with the given creds. These creds are used to fetch the external data. This needs to be done everytime the dataset is reloaded for datasets that contain links to external data.

Examples

# create/load a dataset
ds = hub.dataset("path/to/dataset")

# add a new creds key
ds.add_creds("my_s3_key")

# populate the creds
ds.populate_creds("my_s3_key", {"aws_access_key_id": "my_access_key", "aws_secret_access_key": "my_secret_key"})
Expand source code
def populate_creds(self, creds_key: str, creds: dict):
    """Populates the creds key added in add_creds with the given creds. These creds are used to fetch the external data.
    This needs to be done everytime the dataset is reloaded for datasets that contain links to external data.

    Examples:
        ```
        # create/load a dataset
        ds = hub.dataset("path/to/dataset")

        # add a new creds key
        ds.add_creds("my_s3_key")

        # populate the creds
        ds.populate_creds("my_s3_key", {"aws_access_key_id": "my_access_key", "aws_secret_access_key": "my_secret_key"})
        ```

    """
    self.link_creds.populate_creds(creds_key, creds)
def pytorch(self, transform=None, tensors=None, tobytes=False, num_workers=1, batch_size=1, drop_last=False, collate_fn=None, pin_memory=False, shuffle=False, buffer_size=2048, use_local_cache=False, use_progress_bar=False)

Converts the dataset into a pytorch Dataloader.

Note

Pytorch does not support uint16, uint32, uint64 dtypes. These are implicitly type casted to int32, int64 and int64 respectively. This spins up it's own workers to fetch data.

Args

transform : Callable, Optional
Transformation function to be applied to each sample.
tensors : List, Optional
Optionally provide a list of tensor names in the ordering that your training script expects. For example, if you have a dataset that has "image" and "label" tensors, if tensors=["image", "label"], your training script should expect each batch will be provided as a tuple of (image, label).
tobytes : bool
If True, samples will not be decompressed and their raw bytes will be returned instead of numpy arrays. Can also be a list of tensors, in which case those tensors alone will not be decompressed.
num_workers : int
The number of workers to use for fetching data in parallel.
batch_size : int
Number of samples per batch to load. Default value is 1.
drop_last : bool
Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Default value is False. Read torch.utils.data.DataLoader docs for more details.
collate_fn : Callable, Optional
merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset. Read torch.utils.data.DataLoader docs for more details.
pin_memory : bool
If True, the data loader will copy Tensors into CUDA pinned memory before returning them. Default value is False. Read torch.utils.data.DataLoader docs for more details.
shuffle : bool
If True, the data loader will shuffle the data indices. Default value is False. Details about how hub shuffles data can be found at https://docs.activeloop.ai/how-hub-works/shuffling-in-ds.pytorch
buffer_size : int
The size of the buffer used to shuffle the data in MBs. Defaults to 2048 MB. Increasing the buffer_size will increase the extent of shuffling.
use_local_cache : bool
If True, the data loader will use a local cache to store data. This is useful when the dataset can fit on the machine and we don't want to fetch the data multiple times for each iteration. Default value is False.
use_progress_bar : bool
If True, tqdm will be wrapped around the returned dataloader. Default value is True.

Returns

A torch.utils.data.DataLoader object.

Expand source code
@hub_reporter.record_call
def pytorch(
    self,
    transform: Optional[Callable] = None,
    tensors: Optional[Sequence[str]] = None,
    tobytes: Union[bool, Sequence[str]] = False,
    num_workers: int = 1,
    batch_size: int = 1,
    drop_last: bool = False,
    collate_fn: Optional[Callable] = None,
    pin_memory: bool = False,
    shuffle: bool = False,
    buffer_size: int = 2048,
    use_local_cache: bool = False,
    use_progress_bar: bool = False,
):
    """Converts the dataset into a pytorch Dataloader.

    Note:
        Pytorch does not support uint16, uint32, uint64 dtypes. These are implicitly type casted to int32, int64 and int64 respectively.
        This spins up it's own workers to fetch data.

    Args:
        transform (Callable, Optional): Transformation function to be applied to each sample.
        tensors (List, Optional): Optionally provide a list of tensor names in the ordering that your training script expects. For example, if you have a dataset that has "image" and "label" tensors, if `tensors=["image", "label"]`, your training script should expect each batch will be provided as a tuple of (image, label).
        tobytes (bool): If True, samples will not be decompressed and their raw bytes will be returned instead of numpy arrays. Can also be a list of tensors, in which case those tensors alone will not be decompressed.
        num_workers (int): The number of workers to use for fetching data in parallel.
        batch_size (int): Number of samples per batch to load. Default value is 1.
        drop_last (bool): Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size.
            If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Default value is False.
            Read torch.utils.data.DataLoader docs for more details.
        collate_fn (Callable, Optional): merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.
            Read torch.utils.data.DataLoader docs for more details.
        pin_memory (bool): If True, the data loader will copy Tensors into CUDA pinned memory before returning them. Default value is False.
            Read torch.utils.data.DataLoader docs for more details.
        shuffle (bool): If True, the data loader will shuffle the data indices. Default value is False. Details about how hub shuffles data can be found at https://docs.activeloop.ai/how-hub-works/shuffling-in-ds.pytorch
        buffer_size (int): The size of the buffer used to shuffle the data in MBs. Defaults to 2048 MB. Increasing the buffer_size will increase the extent of shuffling.
        use_local_cache (bool): If True, the data loader will use a local cache to store data. This is useful when the dataset can fit on the machine and we don't want to fetch the data multiple times for each iteration. Default value is False.
        use_progress_bar (bool): If True, tqdm will be wrapped around the returned dataloader. Default value is True.

    Returns:
        A torch.utils.data.DataLoader object.
    """
    from hub.integrations import dataset_to_pytorch as to_pytorch

    dataloader = to_pytorch(
        self,
        transform=transform,
        tensors=tensors,
        tobytes=tobytes,
        num_workers=num_workers,
        batch_size=batch_size,
        drop_last=drop_last,
        collate_fn=collate_fn,
        pin_memory=pin_memory,
        shuffle=shuffle,
        buffer_size=buffer_size,
        use_local_cache=use_local_cache,
    )

    if use_progress_bar:
        dataloader = tqdm(dataloader, desc=self.path, total=len(self) // batch_size)

    return dataloader
def rechunk(self, tensors=None, num_workers=0, scheduler='threaded', progressbar=True)

Rewrites the underlying chunks to make their sizes optimal. This is usually needed in cases where a lot of updates have been made to the data.

Args

tensors : str, List[str], Optional
Name/names of the tensors to rechunk. If None, all tensors in the dataset are rechunked.
num_workers : int
The number of workers to use for rechunking. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
scheduler : str
The scheduler to be used for rechunking. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.
progressbar : bool
Displays a progress bar if True (default).
Expand source code
def rechunk(
    self,
    tensors: Optional[Union[str, List[str]]] = None,
    num_workers: int = 0,
    scheduler: str = "threaded",
    progressbar: bool = True,
):
    """Rewrites the underlying chunks to make their sizes optimal.
    This is usually needed in cases where a lot of updates have been made to the data.

    Args:
        tensors (str, List[str], Optional): Name/names of the tensors to rechunk.
            If None, all tensors in the dataset are rechunked.
        num_workers (int): The number of workers to use for rechunking. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
        scheduler (str): The scheduler to be used for rechunking. Supported values include: 'serial', 'threaded', 'processed' and 'ray'.
            Defaults to 'threaded'.
        progressbar (bool): Displays a progress bar if True (default).
    """

    if tensors is None:
        tensors = list(self.tensors)
    elif isinstance(tensors, str):
        tensors = [tensors]

    # identity function that rechunks
    @hub.compute
    def rechunking(sample_in, samples_out):
        for tensor in tensors:
            samples_out[tensor].append(sample_in[tensor])

    rechunking().eval(
        self,
        num_workers=num_workers,
        scheduler=scheduler,
        progressbar=progressbar,
        skip_ok=True,
    )
def rename(self, path)

Renames the dataset to path.

Example

ds = hub.load("hub://username/dataset")
ds.rename("hub://username/renamed_dataset")

Args

path : str
New path to the dataset.

Raises

RenameError
If path points to a different directory.
Expand source code
@invalid_view_op
@hub_reporter.record_call
def rename(self, path: str):
    """Renames the dataset to `path`.

    Example:
        ```
        ds = hub.load("hub://username/dataset")
        ds.rename("hub://username/renamed_dataset")
        ```

    Args:
        path (str): New path to the dataset.

    Raises:
        RenameError: If `path` points to a different directory.
    """
    path = path.rstrip("/")
    if posixpath.split(path)[0] != posixpath.split(self.path)[0]:
        raise RenameError
    storage = get_base_storage(self.storage)
    storage.rename(path)
    self.path = path
def rename_group(self, name, new_name)

Renames group with name name to new_name

Args

name : str
Name of group to be renamed.
new_name : str
New name of group.

Raises

TensorGroupDoesNotExistError
If tensor group of name name does not exist in the dataset.
TensorAlreadyExistsError
Duplicate tensors are not allowed.
TensorGroupAlreadyExistsError
Duplicate tensor groups are not allowed.
InvalidTensorGroupNameError
If name is in dataset attributes.
RenameError
If new_name points to a group different from name.
Expand source code
@hub_reporter.record_call
def rename_group(self, name: str, new_name: str) -> None:
    """Renames group with name `name` to `new_name`

    Args:
        name (str): Name of group to be renamed.
        new_name (str): New name of group.

    Raises:
        TensorGroupDoesNotExistError: If tensor group of name `name` does not exist in the dataset.
        TensorAlreadyExistsError: Duplicate tensors are not allowed.
        TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed.
        InvalidTensorGroupNameError: If `name` is in dataset attributes.
        RenameError: If `new_name` points to a group different from `name`.
    """
    auto_checkout(self)

    name = filter_name(name, self.group_index)
    new_name = filter_name(new_name, self.group_index)

    if name not in self._groups:
        raise TensorGroupDoesNotExistError(name)

    if posixpath.split(name)[0] != posixpath.split(new_name)[0]:
        raise RenameError("Names does not match.")

    if new_name in self.version_state["tensor_names"]:
        raise TensorAlreadyExistsError(new_name)

    if new_name in self._groups:
        raise TensorGroupAlreadyExistsError(new_name)

    new_tensor_name = posixpath.split(new_name)[1]
    if not new_tensor_name or new_tensor_name in dir(self):
        raise InvalidTensorGroupNameError(new_name)

    meta = self.meta
    meta.rename_group(name, new_name)

    root = self.root
    for tensor in filter(
        lambda x: x.startswith(name),
        map(lambda y: y.meta.name or y.key, self.tensors.values()),
    ):
        root._rename_tensor(
            tensor,
            posixpath.join(new_name, posixpath.relpath(tensor, name)),
        )

    self.storage.maybe_flush()
def rename_tensor(self, name, new_name)

Renames tensor with name name to new_name

Args

name : str
Name of tensor to be renamed.
new_name : str
New name of tensor.

Returns

Tensor
Renamed tensor.

Raises

TensorDoesNotExistError
If tensor of name name does not exist in the dataset.
TensorAlreadyExistsError
Duplicate tensors are not allowed.
TensorGroupAlreadyExistsError
Duplicate tensor groups are not allowed.
InvalidTensorNameError
If new_name is in dataset attributes.
RenameError
If new_name points to a group different from name.
Expand source code
@hub_reporter.record_call
def rename_tensor(self, name: str, new_name: str) -> "Tensor":
    """Renames tensor with name `name` to `new_name`

    Args:
        name (str): Name of tensor to be renamed.
        new_name (str): New name of tensor.

    Returns:
        Tensor: Renamed tensor.

    Raises:
        TensorDoesNotExistError: If tensor of name `name` does not exist in the dataset.
        TensorAlreadyExistsError: Duplicate tensors are not allowed.
        TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed.
        InvalidTensorNameError: If `new_name` is in dataset attributes.
        RenameError: If `new_name` points to a group different from `name`.
    """
    auto_checkout(self)

    if name not in self._tensors():
        raise TensorDoesNotExistError(name)

    name = filter_name(name, self.group_index)
    new_name = filter_name(new_name, self.group_index)

    if posixpath.split(name)[0] != posixpath.split(new_name)[0]:
        raise RenameError("New name of tensor cannot point to a different group")

    if new_name in self.version_state["tensor_names"]:
        raise TensorAlreadyExistsError(new_name)

    if new_name in self._groups:
        raise TensorGroupAlreadyExistsError(new_name)

    new_tensor_name = posixpath.split(new_name)[1]
    if not new_tensor_name or new_tensor_name in dir(self):
        raise InvalidTensorNameError(new_name)

    tensor = self.root._rename_tensor(name, new_name)

    self.storage.maybe_flush()
    return tensor
def reset(self)

Resets the uncommitted changes present in the branch. Note: The uncommitted data is deleted from underlying storage, this is not a reversible operation.

Expand source code
@invalid_view_op
def reset(self):
    """Resets the uncommitted changes present in the branch.
    Note: The uncommitted data is deleted from underlying storage, this is not a reversible operation.
    """
    storage, version_state = self.storage, self.version_state
    if version_state["commit_node"].children:
        print("You are not at the head node of the branch, cannot reset.")
        return
    if not self.has_head_changes:
        print("There are no uncommitted changes on this branch.")
        return

    # delete metas first
    self._delete_metas()

    if self.commit_id is None:
        storage.clear()
        self._populate_meta()
    else:
        prefix = "/".join(("versions", self.pending_commit_id))
        storage.clear(prefix=prefix)
        copy_metas(self.commit_id, self.pending_commit_id, storage, version_state)
        create_commit_chunk_sets(self.commit_id, storage, version_state)
    load_meta(self)
    self._info = None
    self._ds_diff = None
def store(self, path=None, **ds_args)

Stores a dataset view as a virtual dataset (VDS)

Args

path : Optional, str
If specified, the VDS will stored as a standalone dataset at the specified path. If not, the VDS is stored under .queries subdirectory of the source dataset's storage. If the user doesn't have write access to the source dataset and the source dataset is a hub cloud dataset, then the VDS is stored is stored under the user's hub account and can be accessed using hub.load(f"hub://{username}/queries/{query_hash}").
ds_args : dict
Additional args for creating VDS when path is specified. (See documentation for dataset.init())

Returns

str
Path to the stored VDS.
Expand source code
def store(self, path: Optional[str] = None, **ds_args) -> str:
    """Stores a dataset view as a virtual dataset (VDS)

    Args:
        path (Optional, str): If specified, the VDS will stored as a standalone dataset at the specified path. If not,
            the VDS is stored under `.queries` subdirectory of the source dataset's storage. If the user doesn't have
            write access to the source dataset and the source dataset is a hub cloud dataset, then the VDS is stored
            is stored under the user's hub account and can be accessed using hub.load(f"hub://{username}/queries/{query_hash}").
        ds_args (dict): Additional args for creating VDS when path is specified. (See documentation for `hub.dataset()`)

    Returns:
        str: Path to the stored VDS.
    """
    return self._store(path, False, **ds_args)
def summary(self)
Expand source code
def summary(self):
    pretty_print = summary_dataset(self)

    print(self)
    print(pretty_print)
def tensorflow(self, tensors=None, tobytes=False)

Converts the dataset into a tensorflow compatible format.

See

https://www.tensorflow.org/api_docs/python/tf/data/Dataset

Args

tensors : List, Optional
Optionally provide a list of tensor names in the ordering that your training script expects. For example, if you have a dataset that has "image" and "label" tensors, if tensors=["image", "label"], your training script should expect each batch will be provided as a tuple of (image, label).
tobytes : bool
If True, samples will not be decompressed and their raw bytes will be returned instead of numpy arrays. Can also be a list of tensors, in which case those tensors alone will not be decompressed.

Returns

tf.data.Dataset object that can be used for tensorflow training.

Expand source code
@hub_reporter.record_call
def tensorflow(
    self,
    tensors: Optional[Sequence[str]] = None,
    tobytes: Union[bool, Sequence[str]] = False,
):
    """Converts the dataset into a tensorflow compatible format.

    See:
        https://www.tensorflow.org/api_docs/python/tf/data/Dataset

    Args:
        tensors (List, Optional): Optionally provide a list of tensor names in the ordering that your training script expects. For example, if you have a dataset that has "image" and "label" tensors, if `tensors=["image", "label"]`, your training script should expect each batch will be provided as a tuple of (image, label).
        tobytes (bool): If True, samples will not be decompressed and their raw bytes will be returned instead of numpy arrays. Can also be a list of tensors, in which case those tensors alone will not be decompressed.

    Returns:
        tf.data.Dataset object that can be used for tensorflow training.
    """
    return dataset_to_tensorflow(self, tensors=tensors, tobytes=tobytes)
def visualize(self, width=None, height=None)

Visualizes the dataset in the Jupyter notebook.

Args

width
Union[int, str, None] Optional width of the visualizer canvas.
height
Union[int, str, None] Optional height of the visualizer canvas.

Raises

Exception
If a dataset is not hub cloud dataset and the visualization happens in colab.
Expand source code
def visualize(
    self, width: Union[int, str, None] = None, height: Union[int, str, None] = None
):
    """
    Visualizes the dataset in the Jupyter notebook.

    Args:
        width: Union[int, str, None] Optional width of the visualizer canvas.
        height: Union[int, str, None] Optional height of the visualizer canvas.

    Raises:
        Exception: If a dataset is not hub cloud dataset and the visualization happens in colab.
    """
    from hub.visualizer import visualize

    hub_reporter.feature_report(feature_name="visualize", parameters={})
    if is_colab:
        raise Exception("Cannot visualize local dataset in Colab.")
    else:
        visualize(self.storage, width=width, height=height)
class tensor (key, dataset, index=None, is_iteration=False, chunk_engine=None)

Initializes a new tensor.

Note

This operation does not create a new tensor in the storage provider, and should normally only be performed by Hub internals.

Args

key : str
The internal identifier for this tensor.
dataset : Dataset
The dataset that this tensor is located in.
index
The Index object restricting the view of this tensor. Can be an int, slice, or (used internally) an Index object.
is_iteration : bool
If this tensor is being used as an iterator.
chunk_engine : ChunkEngine, optional
The underlying chunk_engine for the tensor

Raises

TensorDoesNotExistError
If no tensor with key exists and a tensor_meta was not provided.
Expand source code
class Tensor:
    def __init__(
        self,
        key: str,
        dataset,
        index: Optional[Index] = None,
        is_iteration: bool = False,
        chunk_engine: Optional[ChunkEngine] = None,
    ):
        """Initializes a new tensor.

        Note:
            This operation does not create a new tensor in the storage provider,
            and should normally only be performed by Hub internals.

        Args:
            key (str): The internal identifier for this tensor.
            dataset (Dataset): The dataset that this tensor is located in.
            index: The Index object restricting the view of this tensor.
                Can be an int, slice, or (used internally) an Index object.
            is_iteration (bool): If this tensor is being used as an iterator.
            chunk_engine (ChunkEngine, optional): The underlying chunk_engine for the tensor

        Raises:
            TensorDoesNotExistError: If no tensor with `key` exists and a `tensor_meta` was not provided.
        """
        self.key = key
        self.dataset = dataset
        self.storage: LRUCache = dataset.storage
        self.index = index or Index()
        self.version_state = dataset.version_state
        self.link_creds = dataset.link_creds
        self.is_iteration = is_iteration
        commit_id = self.version_state["commit_id"]

        if not self.is_iteration and not tensor_exists(
            self.key, self.storage, commit_id
        ):
            raise TensorDoesNotExistError(self.key)

        meta_key = get_tensor_meta_key(self.key, commit_id)
        meta = self.storage.get_hub_object(meta_key, TensorMeta)
        if chunk_engine is not None:
            self.chunk_engine = chunk_engine
        elif meta.is_link:
            self.chunk_engine = LinkedChunkEngine(
                self.key,
                self.storage,
                self.version_state,
                link_creds=dataset.link_creds,
            )
        else:
            self.chunk_engine = ChunkEngine(self.key, self.storage, self.version_state)

        if not self.is_iteration:
            self.index.validate(self.num_samples)

        # An optimization to skip multiple .numpy() calls when performing inplace ops on slices:
        self._skip_next_setitem = False

    def _write_initialization(self):
        self.storage.check_readonly()
        # if not the head node, checkout to an auto branch that is newly created
        if auto_checkout(self.dataset):
            self.chunk_engine = self.version_state["full_tensors"][
                self.key
            ].chunk_engine

    @invalid_view_op
    def extend(
        self,
        samples: Union[np.ndarray, Sequence[InputSample], "Tensor"],
        progressbar: bool = False,
    ):

        """Extends the end of the tensor by appending multiple elements from a sequence. Accepts a sequence, a single batched numpy array,
        or a sequence of `hub.read` outputs, which can be used to load files. See examples down below.

        Example:
            Numpy input:

            >>> len(tensor)
            0
            >>> tensor.extend(np.zeros((100, 28, 28, 1)))
            >>> len(tensor)
            100


            File input:

            >>> len(tensor)
            0
            >>> tensor.extend([
                    hub.read("path/to/image1"),
                    hub.read("path/to/image2"),
                ])
            >>> len(tensor)
            2


        Args:
            samples (np.ndarray, Sequence, Sequence[Sample]): The data to add to the tensor.
                The length should be equal to the number of samples to add.
            progressbar (bool): Specifies whether a progressbar should be displayed while extending.

        Raises:
            TensorDtypeMismatchError: TensorDtypeMismatchError: Dtype for array must be equal to or castable to this tensor's dtype
        """
        self.check_link_ready()
        self._write_initialization()
        self.chunk_engine.extend(
            samples,
            progressbar=progressbar,
            link_callback=self._append_to_links if self.meta.links else None,
        )

    @property
    def info(self):
        """Returns the information about the tensor.

        Returns:
            TensorInfo: Information about the tensor.
        """
        commit_id = self.version_state["commit_id"]
        chunk_engine = self.chunk_engine
        if chunk_engine._info is None or chunk_engine._info_commit_id != commit_id:
            path = get_tensor_info_key(self.key, commit_id)
            chunk_engine._info = load_info(path, self.dataset, self.key)
            chunk_engine._info_commit_id = commit_id
            self.storage.register_hub_object(path, chunk_engine._info)
        return chunk_engine._info

    @info.setter
    def info(self, value):
        if isinstance(value, dict):
            info = self.info
            info.replace_with(value)
        else:
            raise TypeError("Info must be set with type Dict")

    @invalid_view_op
    def append(self, sample: InputSample):
        """Appends a single sample to the end of the tensor. Can be an array, scalar value, or the return value from `hub.read`,
        which can be used to load files. See examples down below.

        Examples:
            Numpy input:

            >>> len(tensor)
            0
            >>> tensor.append(np.zeros((28, 28, 1)))
            >>> len(tensor)
            1

            File input:

            >>> len(tensor)
            0
            >>> tensor.append(hub.read("path/to/file"))
            >>> len(tensor)
            1

        Args:
            sample (InputSample): The data to append to the tensor. `Sample` is generated by `hub.read`. See the above examples.
        """
        self.extend([sample], progressbar=False)

    def clear(self):
        """Deletes all samples from the tensor"""
        self.chunk_engine.clear()
        sample_id_key = get_sample_id_tensor_key(self.key)
        try:
            sample_id_tensor = Tensor(sample_id_key, self.dataset)
            sample_id_tensor.chunk_engine.clear()
            self.meta.links.clear()
            self.meta.is_dirty = True
        except TensorDoesNotExistError:
            pass

    def modified_samples(
        self, target_id: Optional[str] = None, return_indexes: Optional[bool] = False
    ):
        """Returns a slice of the tensor with only those elements that were modified/added.
        By default the modifications are calculated relative to the previous commit made, but this can be changed by providing a `target id`.

        Args:
            target_id (str, optional): The commit id or branch name to calculate the modifications relative to. Defaults to None.
            return_indexes (bool, optional): If True, returns the indexes of the modified elements. Defaults to False.

        Returns:
            Tensor: A new tensor with only the modified elements if `return_indexes` is False.
            Tuple[Tensor, List[int]]: A new tensor with only the modified elements and the indexes of the modified elements if `return_indexes` is True.

        Raises:
            TensorModifiedError: If a target id is passed which is not an ancestor of the current commit.
        """
        current_commit_id = self.version_state["commit_id"]
        indexes = get_modified_indexes(
            self.key,
            current_commit_id,
            target_id,
            self.version_state,
            self.storage,
        )
        tensor = self[indexes]
        if return_indexes:
            return tensor, indexes
        return tensor

    @property
    def meta(self):
        return self.chunk_engine.tensor_meta

    @property
    def shape(self) -> Tuple[Optional[int], ...]:
        """Get the shape of this tensor. Length is included.

        Note:
            If you don't want `None` in the output shape or want the lower/upper bound shapes,
            use `tensor.shape_interval` instead.

        Example:
            >>> tensor.append(np.zeros((10, 10)))
            >>> tensor.append(np.zeros((10, 15)))
            >>> tensor.shape
            (2, 10, None)

        Returns:
            tuple: Tuple where each value is either `None` (if that axis is dynamic) or
                an `int` (if that axis is fixed).
        """
        sample_shape_tensor = self._sample_shape_tensor
        sample_shape_provider = (
            self._sample_shape_provider(sample_shape_tensor)
            if sample_shape_tensor
            else None
        )
        if sample_shape_provider is None:
            self.check_link_ready()
        shape: Tuple[Optional[int], ...]
        shape = self.chunk_engine.shape(
            self.index, sample_shape_provider=sample_shape_provider
        )
        if not shape and self.meta.max_shape:
            shape = (0,) * len(self.meta.max_shape)
        if self.meta.max_shape == [0, 0, 0]:
            shape = ()
        return shape

    @property
    def size(self) -> Optional[int]:
        s = 1
        for x in self.shape:
            if x is None:
                return None
            s *= x  # not using np.prod to avoid overflow
        return s

    @property
    def ndim(self) -> int:
        return self.chunk_engine.ndim(self.index)

    @property
    def dtype(self) -> Optional[np.dtype]:
        if self.base_htype in ("json", "list"):
            return np.dtype(str)
        if self.meta.dtype:
            return np.dtype(self.meta.dtype)
        return None

    @property
    def is_sequence(self):
        return self.meta.is_sequence

    @property
    def is_link(self):
        return self.meta.is_link

    @property
    def verify(self):
        return self.is_link and self.meta.verify

    @property
    def htype(self):
        htype = self.meta.htype
        if self.is_sequence:
            htype = f"sequence[{htype}]"
        if self.is_link:
            htype = f"link[{htype}]"
        return htype

    @property
    def hidden(self) -> bool:
        return self.meta.hidden

    @property
    def base_htype(self):
        return self.meta.htype

    @property
    def shape_interval(self) -> ShapeInterval:
        """Returns a `ShapeInterval` object that describes this tensor's shape more accurately. Length is included.

        Note:
            If you are expecting a `tuple`, use `tensor.shape` instead.

        Example:
            >>> tensor.append(np.zeros((10, 10)))
            >>> tensor.append(np.zeros((10, 15)))
            >>> tensor.shape_interval
            ShapeInterval(lower=(2, 10, 10), upper=(2, 10, 15))
            >>> str(tensor.shape_interval)
            (2, 10, 10:15)

        Returns:
            ShapeInterval: Object containing `lower` and `upper` properties.
        """
        return self.chunk_engine.shape_interval

    @property
    def is_dynamic(self) -> bool:
        """Will return True if samples in this tensor have shapes that are unequal."""
        return self.shape_interval.is_dynamic

    @property
    def num_samples(self) -> int:
        """Returns the length of the primary axis of the tensor.
        Ignores any applied indexing and returns the total length.
        """
        if self.is_sequence:
            return self.chunk_engine._sequence_length
        return self.meta.length

    def __len__(self):
        """Returns the length of the primary axis of the tensor.
        Accounts for indexing into the tensor object.

        Examples:
            >>> len(tensor)
            0
            >>> tensor.extend(np.zeros((100, 10, 10)))
            >>> len(tensor)
            100
            >>> len(tensor[5:10])
            5

        Returns:
            int: The current length of this tensor.
        """

        # catch corrupted datasets / user tampering ASAP
        self.chunk_engine.validate_num_samples_is_synchronized()

        return self.index.length(self.num_samples)

    def __getitem__(
        self,
        item: Union[int, slice, List[int], Tuple[Union[int, slice, Tuple[int]]], Index],
        is_iteration: bool = False,
    ):
        if not isinstance(item, (int, slice, list, tuple, Index)):
            raise InvalidKeyTypeError(item)
        return Tensor(
            self.key,
            self.dataset,
            index=self.index[item],
            is_iteration=is_iteration,
            chunk_engine=self.chunk_engine,
        )

    def _get_bigger_dtype(self, d1, d2):
        if np.can_cast(d1, d2):
            if np.can_cast(d2, d1):
                return d1
            else:
                return d2
        else:
            if np.can_cast(d2, d1):
                return d2
            else:
                return np.object

    def _infer_np_dtype(self, val: Any) -> np.dtype:
        # TODO refac
        if hasattr(val, "dtype"):
            return val.dtype
        elif isinstance(val, int):
            return np.array(0).dtype
        elif isinstance(val, float):
            return np.array(0.0).dtype
        elif isinstance(val, str):
            return np.array("").dtype
        elif isinstance(val, bool):
            return np.dtype(bool)
        elif isinstance(val, Sequence):
            return reduce(self._get_bigger_dtype, map(self._infer_np_dtype, val))
        else:
            raise TypeError(f"Cannot infer numpy dtype for {val}")

    def __setitem__(self, item: Union[int, slice], value: Any):
        """Update samples with new values.

        Example:
            >>> tensor.append(np.zeros((10, 10)))
            >>> tensor.shape
            (1, 10, 10)
            >>> tensor[0] = np.zeros((3, 3))
            >>> tensor.shape
            (1, 3, 3)
        """
        self.check_link_ready()
        self._write_initialization()
        update_link_callback = self._update_links if self.meta.links else None
        if isinstance(value, Tensor):
            if value._skip_next_setitem:
                value._skip_next_setitem = False
                return
            value = value.numpy(aslist=True)
        item_index = Index(item)

        if (
            hub.constants._ENABLE_RANDOM_ASSIGNMENT
            and isinstance(item, int)
            and item >= self.num_samples
        ):
            num_samples_to_pad = item - self.num_samples
            append_link_callback = self._append_to_links if self.meta.links else None

            self.chunk_engine.pad_and_append(
                num_samples_to_pad,
                value,
                append_link_callback=append_link_callback,
                update_link_callback=update_link_callback,
            )
            return

        self.chunk_engine.update(
            self.index[item_index],
            value,
            link_callback=update_link_callback,
        )

    def __iter__(self):
        for i in range(len(self)):
            yield self.__getitem__(i, is_iteration=True)

    def check_link_ready(self):
        if not self.is_link:
            return
        missing_keys = self.link_creds.missing_keys
        if missing_keys:
            raise ValueError(
                f"Not all credentials are populated for a tensor with linked data. Missing: {missing_keys}. Populate with `dataset.populate_creds(key, value)`."
            )

    def numpy(
        self, aslist=False, fetch_chunks=False
    ) -> Union[np.ndarray, List[np.ndarray]]:
        """Computes the contents of the tensor in numpy format.

        Args:
            aslist (bool): If True, a list of np.ndarrays will be returned. Helpful for dynamic tensors.
                If False, a single np.ndarray will be returned unless the samples are dynamically shaped, in which case
                an error is raised.
            fetch_chunks (bool): If True, full chunks will be retrieved from the storage, otherwise only required bytes will be retrieved.
                This will always be True even if specified as False in the following cases:
                - The tensor is ChunkCompressed
                - The chunk which is being accessed has more than 128 samples.

        Raises:
            DynamicTensorNumpyError: If reading a dynamically-shaped array slice without `aslist=True`.
            ValueError: If the tensor is a link and the credentials are not populated.

        Returns:
            A numpy array containing the data represented by this tensor.
        """
        self.check_link_ready()
        return self.chunk_engine.numpy(
            self.index, aslist=aslist, fetch_chunks=fetch_chunks
        )

    def summary(self):
        pretty_print = summary_tensor(self)

        print(self)
        print(pretty_print)

    def __str__(self):
        index_str = f", index={self.index}"
        if self.index.is_trivial():
            index_str = ""
        return f"Tensor(key={repr(self.meta.name or self.key)}{index_str})"

    __repr__ = __str__

    def __array__(self) -> np.ndarray:
        return self.numpy()  # type: ignore

    @_inplace_op
    def __iadd__(self, other):
        pass

    @_inplace_op
    def __isub__(self, other):
        pass

    @_inplace_op
    def __imul__(self, other):
        pass

    @_inplace_op
    def __itruediv__(self, other):
        pass

    @_inplace_op
    def __ifloordiv__(self, other):
        pass

    @_inplace_op
    def __imod__(self, other):
        pass

    @_inplace_op
    def __ipow__(self, other):
        pass

    @_inplace_op
    def __ilshift__(self, other):
        pass

    @_inplace_op
    def __irshift__(self, other):
        pass

    @_inplace_op
    def __iand__(self, other):
        pass

    @_inplace_op
    def __ixor__(self, other):
        pass

    @_inplace_op
    def __ior__(self, other):
        pass

    def data(self, aslist: bool = False) -> Any:
        htype = self.base_htype
        if htype in ("json", "text"):

            if self.ndim == 1:
                return self.numpy()[0]
            else:
                return [sample[0] for sample in self.numpy(aslist=True)]
        elif htype == "list":
            if self.ndim == 1:
                return list(self.numpy())
            else:
                return list(map(list, self.numpy(aslist=True)))
        else:
            return self.numpy(aslist=aslist)

    def tobytes(self) -> bytes:
        """Returns the bytes of the tensor.

        - Only works for a single sample of tensor.
        - If the tensor is uncompressed, this returns the bytes of the numpy array.
        - If the tensor is sample compressed, this returns the compressed bytes of the sample.
        - If the tensor is chunk compressed, this raises an error.

        Returns:
            bytes: The bytes of the tensor.

        Raises:
            ValueError: If the tensor has multiple samples.
        """
        if self.index.values[0].subscriptable() or len(self.index.values) > 1:
            raise ValueError("tobytes() can be used only on exatcly 1 sample.")
        self.check_link_ready()
        idx = self.index.values[0].value
        if self.is_link:
            return self.chunk_engine.get_hub_read_sample(idx).compressed_bytes  # type: ignore
        return self.chunk_engine.read_bytes_for_sample(idx)  # type: ignore

    def _pop(self):
        self.chunk_engine._pop()
        [self.dataset[link]._pop() for link in self.meta.links]

    def _append_to_links(self, sample, flat: Optional[bool]):
        for k, v in self.meta.links.items():
            if flat is None or v["flatten_sequence"] == flat:
                v = get_link_transform(v["append"])(sample, self.link_creds)
                tensor = Tensor(k, self.dataset)
                if (
                    isinstance(v, np.ndarray)
                    and tensor.dtype
                    and v.dtype != tensor.dtype
                ):
                    v = v.astype(tensor.dtype)  # bc
                tensor.append(v)

    def _update_links(
        self,
        global_sample_index: int,
        sub_index: Index,
        new_sample,
        flat: Optional[bool],
    ):
        for k, v in self.meta.links.items():
            if flat is None or v["flatten_sequence"] == flat:
                fname = v.get("update")
                if fname:
                    func = get_link_transform(fname)
                    tensor = Tensor(k, self.dataset)
                    val = func(
                        new_sample,
                        tensor[global_sample_index],
                        sub_index=sub_index,
                        partial=not sub_index.is_trivial(),
                        link_creds=self.link_creds,
                    )
                    if val is not _NO_LINK_UPDATE:
                        if (
                            isinstance(val, np.ndarray)
                            and tensor.dtype
                            and val.dtype != tensor.dtype
                        ):
                            val = val.astype(tensor.dtype)  # bc
                        tensor[global_sample_index] = val

    @property
    def _sample_info_tensor(self):
        return self.dataset._tensors().get(get_sample_info_tensor_key(self.key))

    @property
    def _sample_shape_tensor(self):
        return self.dataset._tensors().get(get_sample_shape_tensor_key(self.key))

    @property
    def _sample_id_tensor(self):
        return self.dataset._tensors().get(get_sample_id_tensor_key(self.key))

    def _sample_shape_provider(self, sample_shape_tensor) -> Callable:
        if self.is_sequence:

            def get_sample_shape(global_sample_index: int):
                seq_pos = slice(
                    *self.chunk_engine.sequence_encoder[global_sample_index]
                )
                idx = Index([IndexEntry(seq_pos)])
                shapes = sample_shape_tensor[idx].numpy()
                return shapes

        else:

            def get_sample_shape(global_sample_index: int):
                return tuple(sample_shape_tensor[global_sample_index].numpy().tolist())

        return get_sample_shape

    def _get_sample_info_at_index(self, global_sample_index: int, sample_info_tensor):
        if self.is_sequence:
            return [
                sample_info_tensor[i].data()
                for i in range(*self.chunk_engine.sequence_encoder[global_sample_index])
            ]
        return sample_info_tensor[global_sample_index].data()

    def _sample_info(self, index: Index):
        sample_info_tensor = self._sample_info_tensor
        if sample_info_tensor is None:
            return None
        if index.subscriptable_at(0):
            return list(
                map(
                    partial(
                        self._get_sample_info_at_index,
                        sample_info_tensor=sample_info_tensor,
                    ),
                    index.values[0].indices(self.num_samples),
                )
            )
        return self._get_sample_info_at_index(index.values[0].value, sample_info_tensor)  # type: ignore

    @property
    def sample_info(self):
        return self._sample_info(self.index)

    def _linked_sample(self):
        if not self.is_link:
            raise ValueError("Not supported as the tensor is not a link.")
        if self.index.values[0].subscriptable() or len(self.index.values) > 1:
            raise ValueError("_linked_sample can be used only on exatcly 1 sample.")
        return self.chunk_engine.linked_sample(self.index.values[0].value)

    def _get_video_stream_url(self):
        from hub.visualizer.video_streaming import get_video_stream_url

        return get_video_stream_url(self, self.index.values[0].value)

    def play(self):
        if get_compression_type(self.meta.sample_compression) != VIDEO_COMPRESSION:
            raise Exception("Only supported for video tensors.")
        if self.index.values[0].subscriptable():
            raise ValueError("Video streaming requires exactly 1 sample.")
        if len(self.index.values) > 1:
            warnings.warn(
                "Sub indexes to video sample will be ignored while streaming."
            )
        if is_colab():
            raise NotImplementedError("Video streaming is not supported on colab yet.")
        elif is_jupyter():
            return video_html(
                src=self._get_video_stream_url(),
                alt=f"{self.key}[{self.index.values[0].value}]",
            )
        else:
            webbrowser.open(self._get_video_stream_url())

Instance variables

var base_htype
Expand source code
@property
def base_htype(self):
    return self.meta.htype
var dtype
Expand source code
@property
def dtype(self) -> Optional[np.dtype]:
    if self.base_htype in ("json", "list"):
        return np.dtype(str)
    if self.meta.dtype:
        return np.dtype(self.meta.dtype)
    return None
var hidden
Expand source code
@property
def hidden(self) -> bool:
    return self.meta.hidden
var htype
Expand source code
@property
def htype(self):
    htype = self.meta.htype
    if self.is_sequence:
        htype = f"sequence[{htype}]"
    if self.is_link:
        htype = f"link[{htype}]"
    return htype
var info

Returns the information about the tensor.

Returns

TensorInfo
Information about the tensor.
Expand source code
@property
def info(self):
    """Returns the information about the tensor.

    Returns:
        TensorInfo: Information about the tensor.
    """
    commit_id = self.version_state["commit_id"]
    chunk_engine = self.chunk_engine
    if chunk_engine._info is None or chunk_engine._info_commit_id != commit_id:
        path = get_tensor_info_key(self.key, commit_id)
        chunk_engine._info = load_info(path, self.dataset, self.key)
        chunk_engine._info_commit_id = commit_id
        self.storage.register_hub_object(path, chunk_engine._info)
    return chunk_engine._info
var is_dynamic

Will return True if samples in this tensor have shapes that are unequal.

Expand source code
@property
def is_dynamic(self) -> bool:
    """Will return True if samples in this tensor have shapes that are unequal."""
    return self.shape_interval.is_dynamic
Expand source code
@property
def is_link(self):
    return self.meta.is_link
var is_sequence
Expand source code
@property
def is_sequence(self):
    return self.meta.is_sequence
var meta
Expand source code
@property
def meta(self):
    return self.chunk_engine.tensor_meta
var ndim
Expand source code
@property
def ndim(self) -> int:
    return self.chunk_engine.ndim(self.index)
var num_samples

Returns the length of the primary axis of the tensor. Ignores any applied indexing and returns the total length.

Expand source code
@property
def num_samples(self) -> int:
    """Returns the length of the primary axis of the tensor.
    Ignores any applied indexing and returns the total length.
    """
    if self.is_sequence:
        return self.chunk_engine._sequence_length
    return self.meta.length
var sample_info
Expand source code
@property
def sample_info(self):
    return self._sample_info(self.index)
var shape

Get the shape of this tensor. Length is included.

Note

If you don't want None in the output shape or want the lower/upper bound shapes, use tensor.shape_interval instead.

Example

>>> tensor.append(np.zeros((10, 10)))
>>> tensor.append(np.zeros((10, 15)))
>>> tensor.shape
(2, 10, None)

Returns

tuple
Tuple where each value is either None (if that axis is dynamic) or an int (if that axis is fixed).
Expand source code
@property
def shape(self) -> Tuple[Optional[int], ...]:
    """Get the shape of this tensor. Length is included.

    Note:
        If you don't want `None` in the output shape or want the lower/upper bound shapes,
        use `tensor.shape_interval` instead.

    Example:
        >>> tensor.append(np.zeros((10, 10)))
        >>> tensor.append(np.zeros((10, 15)))
        >>> tensor.shape
        (2, 10, None)

    Returns:
        tuple: Tuple where each value is either `None` (if that axis is dynamic) or
            an `int` (if that axis is fixed).
    """
    sample_shape_tensor = self._sample_shape_tensor
    sample_shape_provider = (
        self._sample_shape_provider(sample_shape_tensor)
        if sample_shape_tensor
        else None
    )
    if sample_shape_provider is None:
        self.check_link_ready()
    shape: Tuple[Optional[int], ...]
    shape = self.chunk_engine.shape(
        self.index, sample_shape_provider=sample_shape_provider
    )
    if not shape and self.meta.max_shape:
        shape = (0,) * len(self.meta.max_shape)
    if self.meta.max_shape == [0, 0, 0]:
        shape = ()
    return shape
var shape_interval

Returns a ShapeInterval object that describes this tensor's shape more accurately. Length is included.

Note

If you are expecting a tuple, use tensor.shape instead.

Example

>>> tensor.append(np.zeros((10, 10)))
>>> tensor.append(np.zeros((10, 15)))
>>> tensor.shape_interval
ShapeInterval(lower=(2, 10, 10), upper=(2, 10, 15))
>>> str(tensor.shape_interval)
(2, 10, 10:15)

Returns

ShapeInterval
Object containing lower and upper properties.
Expand source code
@property
def shape_interval(self) -> ShapeInterval:
    """Returns a `ShapeInterval` object that describes this tensor's shape more accurately. Length is included.

    Note:
        If you are expecting a `tuple`, use `tensor.shape` instead.

    Example:
        >>> tensor.append(np.zeros((10, 10)))
        >>> tensor.append(np.zeros((10, 15)))
        >>> tensor.shape_interval
        ShapeInterval(lower=(2, 10, 10), upper=(2, 10, 15))
        >>> str(tensor.shape_interval)
        (2, 10, 10:15)

    Returns:
        ShapeInterval: Object containing `lower` and `upper` properties.
    """
    return self.chunk_engine.shape_interval
var size
Expand source code
@property
def size(self) -> Optional[int]:
    s = 1
    for x in self.shape:
        if x is None:
            return None
        s *= x  # not using np.prod to avoid overflow
    return s
var verify
Expand source code
@property
def verify(self):
    return self.is_link and self.meta.verify

Methods

def append(self, sample)

Appends a single sample to the end of the tensor. Can be an array, scalar value, or the return value from read(), which can be used to load files. See examples down below.

Examples

Numpy input:

>>> len(tensor)
0
>>> tensor.append(np.zeros((28, 28, 1)))
>>> len(tensor)
1

File input:

>>> len(tensor)
0
>>> tensor.append(hub.read("path/to/file"))
>>> len(tensor)
1

Args

sample : InputSample
The data to append to the tensor. Sample is generated by read(). See the above examples.
Expand source code
@invalid_view_op
def append(self, sample: InputSample):
    """Appends a single sample to the end of the tensor. Can be an array, scalar value, or the return value from `hub.read`,
    which can be used to load files. See examples down below.

    Examples:
        Numpy input:

        >>> len(tensor)
        0
        >>> tensor.append(np.zeros((28, 28, 1)))
        >>> len(tensor)
        1

        File input:

        >>> len(tensor)
        0
        >>> tensor.append(hub.read("path/to/file"))
        >>> len(tensor)
        1

    Args:
        sample (InputSample): The data to append to the tensor. `Sample` is generated by `hub.read`. See the above examples.
    """
    self.extend([sample], progressbar=False)
Expand source code
def check_link_ready(self):
    if not self.is_link:
        return
    missing_keys = self.link_creds.missing_keys
    if missing_keys:
        raise ValueError(
            f"Not all credentials are populated for a tensor with linked data. Missing: {missing_keys}. Populate with `dataset.populate_creds(key, value)`."
        )
def clear(self)

Deletes all samples from the tensor

Expand source code
def clear(self):
    """Deletes all samples from the tensor"""
    self.chunk_engine.clear()
    sample_id_key = get_sample_id_tensor_key(self.key)
    try:
        sample_id_tensor = Tensor(sample_id_key, self.dataset)
        sample_id_tensor.chunk_engine.clear()
        self.meta.links.clear()
        self.meta.is_dirty = True
    except TensorDoesNotExistError:
        pass
def data(self, aslist=False)
Expand source code
def data(self, aslist: bool = False) -> Any:
    htype = self.base_htype
    if htype in ("json", "text"):

        if self.ndim == 1:
            return self.numpy()[0]
        else:
            return [sample[0] for sample in self.numpy(aslist=True)]
    elif htype == "list":
        if self.ndim == 1:
            return list(self.numpy())
        else:
            return list(map(list, self.numpy(aslist=True)))
    else:
        return self.numpy(aslist=aslist)
def extend(self, samples, progressbar=False)

Extends the end of the tensor by appending multiple elements from a sequence. Accepts a sequence, a single batched numpy array, or a sequence of read() outputs, which can be used to load files. See examples down below.

Example

Numpy input:

>>> len(tensor)
0
>>> tensor.extend(np.zeros((100, 28, 28, 1)))
>>> len(tensor)
100

File input:

>>> len(tensor)
0
>>> tensor.extend([
        hub.read("path/to/image1"),
        hub.read("path/to/image2"),
    ])
>>> len(tensor)
2

Args

samples : np.ndarray, Sequence, Sequence[Sample]
The data to add to the tensor. The length should be equal to the number of samples to add.
progressbar : bool
Specifies whether a progressbar should be displayed while extending.

Raises

TensorDtypeMismatchError
TensorDtypeMismatchError: Dtype for array must be equal to or castable to this tensor's dtype
Expand source code
@invalid_view_op
def extend(
    self,
    samples: Union[np.ndarray, Sequence[InputSample], "Tensor"],
    progressbar: bool = False,
):

    """Extends the end of the tensor by appending multiple elements from a sequence. Accepts a sequence, a single batched numpy array,
    or a sequence of `hub.read` outputs, which can be used to load files. See examples down below.

    Example:
        Numpy input:

        >>> len(tensor)
        0
        >>> tensor.extend(np.zeros((100, 28, 28, 1)))
        >>> len(tensor)
        100


        File input:

        >>> len(tensor)
        0
        >>> tensor.extend([
                hub.read("path/to/image1"),
                hub.read("path/to/image2"),
            ])
        >>> len(tensor)
        2


    Args:
        samples (np.ndarray, Sequence, Sequence[Sample]): The data to add to the tensor.
            The length should be equal to the number of samples to add.
        progressbar (bool): Specifies whether a progressbar should be displayed while extending.

    Raises:
        TensorDtypeMismatchError: TensorDtypeMismatchError: Dtype for array must be equal to or castable to this tensor's dtype
    """
    self.check_link_ready()
    self._write_initialization()
    self.chunk_engine.extend(
        samples,
        progressbar=progressbar,
        link_callback=self._append_to_links if self.meta.links else None,
    )
def modified_samples(self, target_id=None, return_indexes=False)

Returns a slice of the tensor with only those elements that were modified/added. By default the modifications are calculated relative to the previous commit made, but this can be changed by providing a target id.

Args

target_id : str, optional
The commit id or branch name to calculate the modifications relative to. Defaults to None.
return_indexes : bool, optional
If True, returns the indexes of the modified elements. Defaults to False.

Returns

Tensor
A new tensor with only the modified elements if return_indexes is False.
Tuple[Tensor, List[int]]
A new tensor with only the modified elements and the indexes of the modified elements if return_indexes is True.

Raises

TensorModifiedError
If a target id is passed which is not an ancestor of the current commit.
Expand source code
def modified_samples(
    self, target_id: Optional[str] = None, return_indexes: Optional[bool] = False
):
    """Returns a slice of the tensor with only those elements that were modified/added.
    By default the modifications are calculated relative to the previous commit made, but this can be changed by providing a `target id`.

    Args:
        target_id (str, optional): The commit id or branch name to calculate the modifications relative to. Defaults to None.
        return_indexes (bool, optional): If True, returns the indexes of the modified elements. Defaults to False.

    Returns:
        Tensor: A new tensor with only the modified elements if `return_indexes` is False.
        Tuple[Tensor, List[int]]: A new tensor with only the modified elements and the indexes of the modified elements if `return_indexes` is True.

    Raises:
        TensorModifiedError: If a target id is passed which is not an ancestor of the current commit.
    """
    current_commit_id = self.version_state["commit_id"]
    indexes = get_modified_indexes(
        self.key,
        current_commit_id,
        target_id,
        self.version_state,
        self.storage,
    )
    tensor = self[indexes]
    if return_indexes:
        return tensor, indexes
    return tensor
def numpy(self, aslist=False, fetch_chunks=False)

Computes the contents of the tensor in numpy format.

Args

aslist : bool
If True, a list of np.ndarrays will be returned. Helpful for dynamic tensors. If False, a single np.ndarray will be returned unless the samples are dynamically shaped, in which case an error is raised.
fetch_chunks : bool
If True, full chunks will be retrieved from the storage, otherwise only required bytes will be retrieved. This will always be True even if specified as False in the following cases: - The tensor is ChunkCompressed - The chunk which is being accessed has more than 128 samples.

Raises

DynamicTensorNumpyError
If reading a dynamically-shaped array slice without aslist=True.
ValueError
If the tensor is a link and the credentials are not populated.

Returns

A numpy array containing the data represented by this tensor.

Expand source code
def numpy(
    self, aslist=False, fetch_chunks=False
) -> Union[np.ndarray, List[np.ndarray]]:
    """Computes the contents of the tensor in numpy format.

    Args:
        aslist (bool): If True, a list of np.ndarrays will be returned. Helpful for dynamic tensors.
            If False, a single np.ndarray will be returned unless the samples are dynamically shaped, in which case
            an error is raised.
        fetch_chunks (bool): If True, full chunks will be retrieved from the storage, otherwise only required bytes will be retrieved.
            This will always be True even if specified as False in the following cases:
            - The tensor is ChunkCompressed
            - The chunk which is being accessed has more than 128 samples.

    Raises:
        DynamicTensorNumpyError: If reading a dynamically-shaped array slice without `aslist=True`.
        ValueError: If the tensor is a link and the credentials are not populated.

    Returns:
        A numpy array containing the data represented by this tensor.
    """
    self.check_link_ready()
    return self.chunk_engine.numpy(
        self.index, aslist=aslist, fetch_chunks=fetch_chunks
    )
def play(self)
Expand source code
def play(self):
    if get_compression_type(self.meta.sample_compression) != VIDEO_COMPRESSION:
        raise Exception("Only supported for video tensors.")
    if self.index.values[0].subscriptable():
        raise ValueError("Video streaming requires exactly 1 sample.")
    if len(self.index.values) > 1:
        warnings.warn(
            "Sub indexes to video sample will be ignored while streaming."
        )
    if is_colab():
        raise NotImplementedError("Video streaming is not supported on colab yet.")
    elif is_jupyter():
        return video_html(
            src=self._get_video_stream_url(),
            alt=f"{self.key}[{self.index.values[0].value}]",
        )
    else:
        webbrowser.open(self._get_video_stream_url())
def summary(self)
Expand source code
def summary(self):
    pretty_print = summary_tensor(self)

    print(self)
    print(pretty_print)
def tobytes(self)

Returns the bytes of the tensor.

  • Only works for a single sample of tensor.
  • If the tensor is uncompressed, this returns the bytes of the numpy array.
  • If the tensor is sample compressed, this returns the compressed bytes of the sample.
  • If the tensor is chunk compressed, this raises an error.

Returns

bytes
The bytes of the tensor.

Raises

ValueError
If the tensor has multiple samples.
Expand source code
def tobytes(self) -> bytes:
    """Returns the bytes of the tensor.

    - Only works for a single sample of tensor.
    - If the tensor is uncompressed, this returns the bytes of the numpy array.
    - If the tensor is sample compressed, this returns the compressed bytes of the sample.
    - If the tensor is chunk compressed, this raises an error.

    Returns:
        bytes: The bytes of the tensor.

    Raises:
        ValueError: If the tensor has multiple samples.
    """
    if self.index.values[0].subscriptable() or len(self.index.values) > 1:
        raise ValueError("tobytes() can be used only on exatcly 1 sample.")
    self.check_link_ready()
    idx = self.index.values[0].value
    if self.is_link:
        return self.chunk_engine.get_hub_read_sample(idx).compressed_bytes  # type: ignore
    return self.chunk_engine.read_bytes_for_sample(idx)  # type: ignore