Package hub
Expand source code
import threading
from queue import Queue
from botocore.config import Config
import numpy as np
import multiprocessing
import sys
from hub.util.check_latest_version import warn_if_update_required
if sys.platform == "darwin":
multiprocessing.set_start_method("fork", force=True)
__pdoc__ = {
"api": False,
"cli": False,
"client": False,
"constants": False,
"config": False,
"integrations": False,
"tests": False,
"Dataset.clear_cache": False,
"Dataset.flush": False,
"Dataset.read_only": False,
"Dataset.size_approx": False,
"Dataset.token": False,
"Dataset.num_samples": False,
}
from .api.dataset import dataset as api_dataset
from .api.read import read
from .api.link import link
from .api.tiled import tiled
from .core.dataset import Dataset
from .core.transform import compute, compose
from .core.tensor import Tensor
from .util.bugout_reporter import hub_reporter
from .compression import SUPPORTED_COMPRESSIONS
from .htype import HTYPE_CONFIGURATIONS
from .htype import htype
from .integrations import huggingface
compressions = list(SUPPORTED_COMPRESSIONS)
htypes = sorted(list(HTYPE_CONFIGURATIONS))
list = api_dataset.list
exists = api_dataset.exists
load = api_dataset.load
empty = api_dataset.empty
like = api_dataset.like
delete = api_dataset.delete
rename = api_dataset.rename
copy = api_dataset.copy
deepcopy = api_dataset.deepcopy
ingest = api_dataset.ingest
ingest_kaggle = api_dataset.ingest_kaggle
ingest_dataframe = api_dataset.ingest_dataframe
ingest_huggingface = huggingface.ingest_huggingface
dataset = api_dataset.init
tensor = Tensor
__all__ = [
"tensor",
"read",
"link",
"__version__",
"load",
"empty",
"exists",
"compute",
"compose",
"copy",
"dataset",
"Dataset",
"deepcopy",
"like",
"list",
"ingest",
"ingest_kaggle",
"ingest_huggingface",
"compressions",
"htypes",
"config",
"delete",
"copy",
"rename",
]
__version__ = "2.7.5"
warn_if_update_required(__version__)
__encoded_version__ = np.array(__version__)
config = {"s3": Config(max_pool_connections=50, connect_timeout=300, read_timeout=300)}
hub_reporter.tags.append(f"version:{__version__}")
hub_reporter.system_report(publish=True)
hub_reporter.setup_excepthook(publish=True)
event_queue: Queue = Queue()
def send_event():
while True:
try:
event = event_queue.get()
client, event_dict = event
client.send_event(event_dict)
except Exception:
pass
threading.Thread(target=send_event, daemon=True).start()
Sub-modules
hub.auto
hub.compression
hub.core
hub.htype
-
"htype" is the class of a tensor: image, bounding box, generic tensor, etc …
hub.util
hub.visualizer
Functions
def compose(functions)
-
Takes a list of functions decorated using hub.compute and creates a pipeline that can be evaluated using .eval
Example::
pipeline = hub.compose([my_fn(a=3), another_function(b=2)]) pipeline.eval(data_in, ds_out, scheduler="processed", num_workers=2)
The eval method evaluates the pipeline/transform function.
It has the following arguments:
-
data_in
: Input passed to the transform to generate output dataset.- It should support __getitem__ and __len__. This can be a Hub dataset.
-
ds_out (Dataset, optional)
: The dataset object to which the transform will get written.- If this is not provided, data_in will be overwritten if it is a Hub dataset, otherwise error will be raised.
- It should have all keys being generated in output already present as tensors.
- It's initial state should be either:
- Empty i.e. all tensors have no samples. In this case all samples are added to the dataset.
- All tensors are populated and have sampe length. In this case new samples are appended to the dataset.
-
num_workers (int)
: The number of workers to use for performing the transform.- Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
-
scheduler (str)
: The scheduler to be used to compute the transformation.- Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.
-
progressbar (bool)
: Displays a progress bar if True (default). -
skip_ok (bool)
: If True, skips the check for output tensors generated.- This allows the user to skip certain tensors in the function definition.
- This is especially useful for inplace transformations in which certain tensors are not modified. Defaults to False.
It raises the following errors:
-
InvalidInputDataError
: If data_in passed to transform is invalid. It should support __getitem__ and __len__ operations. Using scheduler other than "threaded" with hub dataset having base storage as memory as data_in will also raise this. -
InvalidOutputDatasetError
: If all the tensors of ds_out passed to transform don't have the same length. Using scheduler other than "threaded" with hub dataset having base storage as memory as ds_out will also raise this. -
TensorMismatchError
: If one or more of the outputs generated during transform contain different tensors than the ones present in 'ds_out' provided to transform. -
UnsupportedSchedulerError
: If the scheduler passed is not recognized. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. -
TransformError
: All other exceptions raised if there are problems while running the pipeline.
Expand source code
def compose(functions: List[ComputeFunction]): # noqa: DAR101, DAR102, DAR201, DAR401 """Takes a list of functions decorated using hub.compute and creates a pipeline that can be evaluated using .eval Example:: pipeline = hub.compose([my_fn(a=3), another_function(b=2)]) pipeline.eval(data_in, ds_out, scheduler="processed", num_workers=2) The __eval__ method evaluates the pipeline/transform function. It has the following arguments: - `data_in`: Input passed to the transform to generate output dataset. - It should support \__getitem__ and \__len__. This can be a Hub dataset. - `ds_out (Dataset, optional)`: The dataset object to which the transform will get written. - If this is not provided, data_in will be overwritten if it is a Hub dataset, otherwise error will be raised. - It should have all keys being generated in output already present as tensors. - It's initial state should be either: - Empty i.e. all tensors have no samples. In this case all samples are added to the dataset. - All tensors are populated and have sampe length. In this case new samples are appended to the dataset. - `num_workers (int)`: The number of workers to use for performing the transform. - Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler. - `scheduler (str)`: The scheduler to be used to compute the transformation. - Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'. - `progressbar (bool)`: Displays a progress bar if True (default). - `skip_ok (bool)`: If True, skips the check for output tensors generated. - This allows the user to skip certain tensors in the function definition. - This is especially useful for inplace transformations in which certain tensors are not modified. Defaults to False. It raises the following errors: - `InvalidInputDataError`: If data_in passed to transform is invalid. It should support \__getitem__ and \__len__ operations. Using scheduler other than "threaded" with hub dataset having base storage as memory as data_in will also raise this. - `InvalidOutputDatasetError`: If all the tensors of ds_out passed to transform don't have the same length. Using scheduler other than "threaded" with hub dataset having base storage as memory as ds_out will also raise this. - `TensorMismatchError`: If one or more of the outputs generated during transform contain different tensors than the ones present in 'ds_out' provided to transform. - `UnsupportedSchedulerError`: If the scheduler passed is not recognized. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. - `TransformError`: All other exceptions raised if there are problems while running the pipeline. """ if not functions: raise HubComposeEmptyListError for index, fn in enumerate(functions): if not isinstance(fn, ComputeFunction): raise HubComposeIncompatibleFunction(index) return Pipeline(functions)
-
def compute(fn, name=None)
-
Compute is a decorator for functions.
The functions should have atleast 2 argument, the first two will correspond to
sample_in
andsamples_out
.There can be as many other arguments as required.
The output should be appended/extended to the second argument in a hub like syntax.
Any value returned by the fn will be ignored.
Example::
@hub.compute def my_fn(sample_in: Any, samples_out, my_arg0, my_arg1=0): samples_out.my_tensor.append(my_arg0 * my_arg1) # This transform can be used using the eval method in one of these 2 ways:- # Directly evaluating the method # here arg0 and arg1 correspond to the 3rd and 4th argument in my_fn my_fn(arg0, arg1).eval(data_in, ds_out, scheduler="threaded", num_workers=5) # As a part of a Transform pipeline containing other functions pipeline = hub.compose([my_fn(a, b), another_function(x=2)]) pipeline.eval(data_in, ds_out, scheduler="processed", num_workers=2)
The eval method evaluates the pipeline/transform function.
It has the following arguments:
-
data_in
: Input passed to the transform to generate output dataset.- It should support __getitem__ and __len__. This can be a Hub dataset.
-
ds_out (Dataset, optional)
: The dataset object to which the transform will get written.- If this is not provided, data_in will be overwritten if it is a Hub dataset, otherwise error will be raised.
- It should have all keys being generated in output already present as tensors.
- It's initial state should be either:
- Empty i.e. all tensors have no samples. In this case all samples are added to the dataset.
- All tensors are populated and have sampe length. In this case new samples are appended to the dataset.
-
num_workers (int)
: The number of workers to use for performing the transform.- Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
-
scheduler (str)
: The scheduler to be used to compute the transformation.- Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.
-
progressbar (bool)
: Displays a progress bar if True (default). -
skip_ok (bool)
: If True, skips the check for output tensors generated.- This allows the user to skip certain tensors in the function definition.
- This is especially useful for inplace transformations in which certain tensors are not modified. Defaults to False.
It raises the following errors:
-
InvalidInputDataError
: If data_in passed to transform is invalid. It should support __getitem__ and __len__ operations. Using scheduler other than "threaded" with hub dataset having base storage as memory as data_in will also raise this. -
InvalidOutputDatasetError
: If all the tensors of ds_out passed to transform don't have the same length. Using scheduler other than "threaded" with hub dataset having base storage as memory as ds_out will also raise this. -
TensorMismatchError
: If one or more of the outputs generated during transform contain different tensors than the ones present in 'ds_out' provided to transform. -
UnsupportedSchedulerError
: If the scheduler passed is not recognized. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. -
TransformError
: All other exceptions raised if there are problems while running the pipeline.
Expand source code
def compute( fn, name: Optional[str] = None, ) -> Callable[..., ComputeFunction]: # noqa: DAR101, DAR102, DAR201, DAR401 """Compute is a decorator for functions. The functions should have atleast 2 argument, the first two will correspond to `sample_in` and `samples_out`. There can be as many other arguments as required. The output should be appended/extended to the second argument in a hub like syntax. Any value returned by the fn will be ignored. Example:: @hub.compute def my_fn(sample_in: Any, samples_out, my_arg0, my_arg1=0): samples_out.my_tensor.append(my_arg0 * my_arg1) # This transform can be used using the eval method in one of these 2 ways:- # Directly evaluating the method # here arg0 and arg1 correspond to the 3rd and 4th argument in my_fn my_fn(arg0, arg1).eval(data_in, ds_out, scheduler="threaded", num_workers=5) # As a part of a Transform pipeline containing other functions pipeline = hub.compose([my_fn(a, b), another_function(x=2)]) pipeline.eval(data_in, ds_out, scheduler="processed", num_workers=2) The __eval__ method evaluates the pipeline/transform function. It has the following arguments: - `data_in`: Input passed to the transform to generate output dataset. - It should support \__getitem__ and \__len__. This can be a Hub dataset. - `ds_out (Dataset, optional)`: The dataset object to which the transform will get written. - If this is not provided, data_in will be overwritten if it is a Hub dataset, otherwise error will be raised. - It should have all keys being generated in output already present as tensors. - It's initial state should be either: - Empty i.e. all tensors have no samples. In this case all samples are added to the dataset. - All tensors are populated and have sampe length. In this case new samples are appended to the dataset. - `num_workers (int)`: The number of workers to use for performing the transform. - Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler. - `scheduler (str)`: The scheduler to be used to compute the transformation. - Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'. - `progressbar (bool)`: Displays a progress bar if True (default). - `skip_ok (bool)`: If True, skips the check for output tensors generated. - This allows the user to skip certain tensors in the function definition. - This is especially useful for inplace transformations in which certain tensors are not modified. Defaults to False. It raises the following errors: - `InvalidInputDataError`: If data_in passed to transform is invalid. It should support \__getitem__ and \__len__ operations. Using scheduler other than "threaded" with hub dataset having base storage as memory as data_in will also raise this. - `InvalidOutputDatasetError`: If all the tensors of ds_out passed to transform don't have the same length. Using scheduler other than "threaded" with hub dataset having base storage as memory as ds_out will also raise this. - `TensorMismatchError`: If one or more of the outputs generated during transform contain different tensors than the ones present in 'ds_out' provided to transform. - `UnsupportedSchedulerError`: If the scheduler passed is not recognized. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. - `TransformError`: All other exceptions raised if there are problems while running the pipeline. """ def inner(*args, **kwargs): return ComputeFunction(fn, args, kwargs, name) return inner
-
def copy(src, dest, tensors=None, overwrite=False, src_creds=None, src_token=None, dest_creds=None, dest_token=None, num_workers=0, scheduler='threaded', progressbar=True)
-
Copies this dataset at
src
todest
. Version control history is not included.Args
src
:Union[str, Dataset, pathlib.Path]
- The Dataset or the path to the dataset to be copied.
dest
:str, pathlib.Path
- Destination path to copy to.
tensors
:List[str]
, optional- Names of tensors (and groups) to be copied. If not specified all tensors are copied.
overwrite
:bool
- If True and a dataset exists at
destination
, it will be overwritten. Defaults to False. src_creds
:dict
, optional-
A dictionary containing credentials used to access the dataset at
src
.- If 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token' are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
- It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys.
src_token
:str
, optional- Activeloop token, used for fetching credentials to the dataset at
src
if it is a Hub dataset. This is optional, tokens are normally autogenerated. dest_creds
:dict
, optional- creds required to create / overwrite datasets at
dest
. dest_token
:str
, optional- token used to for fetching credentials to
dest
. num_workers
:int
- The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
scheduler
:str
- The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.
progressbar
:bool
- Displays a progress bar if True (default).
Returns
Dataset
- New dataset object.
Raises
DatasetHandlerError
- If a dataset already exists at destination path and overwrite is False.
Expand source code
@staticmethod def copy( src: Union[str, pathlib.Path, Dataset], dest: Union[str, pathlib.Path], tensors: Optional[List[str]] = None, overwrite: bool = False, src_creds=None, src_token=None, dest_creds=None, dest_token=None, num_workers: int = 0, scheduler="threaded", progressbar=True, ): """Copies this dataset at `src` to `dest`. Version control history is not included. Args: src (Union[str, Dataset, pathlib.Path]): The Dataset or the path to the dataset to be copied. dest (str, pathlib.Path): Destination path to copy to. tensors (List[str], optional): Names of tensors (and groups) to be copied. If not specified all tensors are copied. overwrite (bool): If True and a dataset exists at `destination`, it will be overwritten. Defaults to False. src_creds (dict, optional): A dictionary containing credentials used to access the dataset at `src`. - - If 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token' are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths. - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys. src_token (str, optional): Activeloop token, used for fetching credentials to the dataset at `src` if it is a Hub dataset. This is optional, tokens are normally autogenerated. dest_creds (dict, optional): creds required to create / overwrite datasets at `dest`. dest_token (str, optional): token used to for fetching credentials to `dest`. num_workers (int): The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler. scheduler (str): The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'. progressbar (bool): Displays a progress bar if True (default). Returns: Dataset: New dataset object. Raises: DatasetHandlerError: If a dataset already exists at destination path and overwrite is False. """ if isinstance(src, (str, pathlib.Path)): src = convert_pathlib_to_string_if_needed(src) src_ds = hub.load(src, read_only=True, creds=src_creds, token=src_token) else: src_ds = src src_ds.path = str(src_ds.path) dest = convert_pathlib_to_string_if_needed(dest) return src_ds.copy( dest, tensors=tensors, overwrite=overwrite, creds=dest_creds, token=dest_token, num_workers=num_workers, scheduler=scheduler, progressbar=progressbar, )
def dataset(path, read_only=None, overwrite=False, public=False, memory_cache_size=256, local_cache_size=0, creds=None, token=None, verbose=True, access_method='stream')
-
Returns a Dataset object referencing either a new or existing dataset.
Important
Using
overwrite
will delete all of your data if it exists! Be very careful when setting this parameter.Examples
ds = hub.dataset("hub://username/dataset") ds = hub.dataset("s3://mybucket/my_dataset") ds = hub.dataset("./datasets/my_dataset", overwrite=True)
Args
path
:str, pathlib.Path
-
- The full path to the dataset. Can be:
- a Hub cloud path of the form
hub://username/datasetname
. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line) - an s3 path of the form
s3://bucketname/path/to/dataset
. Credentials are required in either the environment or passed to the creds argument. - a local file system path of the form
./path/to/dataset
or~/path/to/dataset
orpath/to/dataset
. - a memory path of the form
mem://path/to/dataset
which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
read_only
:bool
, optional- Opens dataset in read only mode if this is passed as True. Defaults to False. Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode.
overwrite
:bool
- WARNING: If set to True this overwrites the dataset if it already exists. This can NOT be undone! Defaults to False.
public
:bool
- Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to True.
memory_cache_size
:int
- The size of the memory cache to be used in MB.
local_cache_size
:int
- The size of the local filesystem cache to be used in MB.
creds
:dict
, optional-
- A dictionary containing credentials used to access the dataset at the path.
- If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
- It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys.
token
:str
, optional- Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
verbose
:bool
- If True, logs will be printed. Defaults to True.
access_method
:str
-
- The access method to use for the dataset. Can be:
- 'stream'
- Streams the data from the dataset i.e. only fetches data when required. This is the default value.
- 'download'
- Downloads the data to the local filesystem to the path specified in environment variable
HUB_DOWNLOAD_PATH
. - Raises an exception if the environment variable is not set, or if the path is not empty.
- Will also raise an exception if the dataset does not exist. The 'download' access method can also be modified to specify num_workers and/or scheduler.
- For example: 'download:2:processed', will use 2 workers and use processed scheduler, while 'download:3' will use 3 workers and default scheduler (threaded), and 'download:processed' will use a single worker and use processed scheduler.
- Downloads the data to the local filesystem to the path specified in environment variable
- 'local'
- Used when download was already done in a previous run.
- Doesn't download the data again.
- Raises an exception if
HUB_DOWNLOAD_PATH
environment variable is not set or the dataset is not found inHUB_DOWNLOAD_PATH
.
Note: Any changes made to the dataset in download/local mode will only be made to the local copy and will not be reflected in the original dataset.
Returns
Dataset object created using the arguments provided.
Raises
AgreementError
- When agreement is rejected
UserNotLoggedInException
- When user is not logged in
InvalidTokenException
- If the specified token is invalid
TokenPermissionError
- When there are permission or other errors related to token
Expand source code
@staticmethod def init( path: Union[str, pathlib.Path], read_only: Optional[bool] = None, overwrite: bool = False, public: bool = False, memory_cache_size: int = DEFAULT_MEMORY_CACHE_SIZE, local_cache_size: int = DEFAULT_LOCAL_CACHE_SIZE, creds: Optional[Union[Dict, str]] = None, token: Optional[str] = None, verbose: bool = True, access_method: str = "stream", ): """Returns a Dataset object referencing either a new or existing dataset. Important: Using `overwrite` will delete all of your data if it exists! Be very careful when setting this parameter. Examples: ``` ds = hub.dataset("hub://username/dataset") ds = hub.dataset("s3://mybucket/my_dataset") ds = hub.dataset("./datasets/my_dataset", overwrite=True) ``` Args: path (str, pathlib.Path): - The full path to the dataset. Can be: - a Hub cloud path of the form `hub://username/datasetname`. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line) - an s3 path of the form `s3://bucketname/path/to/dataset`. Credentials are required in either the environment or passed to the creds argument. - a local file system path of the form `./path/to/dataset` or `~/path/to/dataset` or `path/to/dataset`. - a memory path of the form `mem://path/to/dataset` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist. read_only (bool, optional): Opens dataset in read only mode if this is passed as True. Defaults to False. Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode. overwrite (bool): WARNING: If set to True this overwrites the dataset if it already exists. This can NOT be undone! Defaults to False. public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to True. memory_cache_size (int): The size of the memory cache to be used in MB. local_cache_size (int): The size of the local filesystem cache to be used in MB. creds (dict, optional): - A dictionary containing credentials used to access the dataset at the path. - If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths. - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys. token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated. verbose (bool): If True, logs will be printed. Defaults to True. access_method (str): - The access method to use for the dataset. Can be: - 'stream' - Streams the data from the dataset i.e. only fetches data when required. This is the default value. - 'download' - Downloads the data to the local filesystem to the path specified in environment variable `HUB_DOWNLOAD_PATH`. - Raises an exception if the environment variable is not set, or if the path is not empty. - Will also raise an exception if the dataset does not exist. The 'download' access method can also be modified to specify num_workers and/or scheduler. - For example: 'download:2:processed', will use 2 workers and use processed scheduler, while 'download:3' will use 3 workers and default scheduler (threaded), and 'download:processed' will use a single worker and use processed scheduler. - 'local' - Used when download was already done in a previous run. - Doesn't download the data again. - Raises an exception if `HUB_DOWNLOAD_PATH` environment variable is not set or the dataset is not found in `HUB_DOWNLOAD_PATH`. **Note: Any changes made to the dataset in download/local mode will only be made to the local copy and will not be reflected in the original dataset.** Returns: Dataset object created using the arguments provided. Raises: AgreementError: When agreement is rejected UserNotLoggedInException: When user is not logged in InvalidTokenException: If the specified token is invalid TokenPermissionError: When there are permission or other errors related to token """ access_method, num_workers, scheduler = parse_access_method(access_method) check_access_method(access_method, overwrite) path = convert_pathlib_to_string_if_needed(path) if creds is None: creds = {} try: storage, cache_chain = get_storage_and_cache_chain( path=path, read_only=read_only, creds=creds, token=token, memory_cache_size=memory_cache_size, local_cache_size=local_cache_size, ) feature_report_path(path, "dataset", {"Overwrite": overwrite}, token=token) except Exception as e: if isinstance(e, UserNotLoggedInException): message = ( f"Please log in through the CLI in order to create this dataset, " "or create an API token in the UI and pass it to this method using " "the ‘token’ parameter. The CLI commands are ‘activeloop login’ and " "‘activeloop register." ) raise UserNotLoggedInException(message) elif isinstance(e, TokenPermissionError): message = ( f"You can not load or create this dataset. You do not have sufficient " f"permissions. Please make sure that you have sufficient permissions " f"to the path provided." ) raise TokenPermissionError(message) raise ds_exists = dataset_exists(cache_chain) if overwrite and ds_exists: cache_chain.clear() try: if access_method == "stream": return dataset_factory( path=path, storage=cache_chain, read_only=read_only, public=public, token=token, verbose=verbose, ) return get_local_dataset( access_method=access_method, path=path, read_only=read_only, memory_cache_size=memory_cache_size, local_cache_size=local_cache_size, creds=creds, token=token, verbose=verbose, ds_exists=ds_exists, num_workers=num_workers, scheduler=scheduler, ) except AgreementError as e: raise e from None
def deepcopy(src, dest, tensors=None, overwrite=False, src_creds=None, src_token=None, dest_creds=None, dest_token=None, num_workers=0, scheduler='threaded', progressbar=True, public=False, verbose=True)
-
Copies dataset at
src
todest
including version control history.Args
src
:str, pathlib.Path
- Path to the dataset to be copied.
dest
:str, pathlib.Path
- Destination path to copy to.
tensors
:List[str]
, optional- Names of tensors (and groups) to be copied. If not specified all tensors are copied.
overwrite
:bool
- If True and a dataset exists at
destination
, it will be overwritten. Defaults to False. src_creds
:dict
, optional-
A dictionary containing credentials used to access the dataset at
src
.- If 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token' are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
- It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys.
src_token
:str
, optional- Activeloop token, used for fetching credentials to the dataset at
src
if it is a Hub dataset. This is optional, tokens are normally autogenerated. dest_creds
:dict
, optional- creds required to create / overwrite datasets at
dest
. dest_token
:str
, optional- token used to for fetching credentials to
dest
. num_workers
:int
- The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
scheduler
:str
- The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.
progressbar
:bool
- Displays a progress bar if True (default).
public
:bool
- Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.
verbose
:bool
- If True, logs will be printed. Defaults to True.
Returns
Dataset
- New dataset object.
Raises
DatasetHandlerError
- If a dataset already exists at destination path and overwrite is False.
Expand source code
@staticmethod def deepcopy( src: Union[str, pathlib.Path], dest: Union[str, pathlib.Path], tensors: Optional[List[str]] = None, overwrite: bool = False, src_creds=None, src_token=None, dest_creds=None, dest_token=None, num_workers: int = 0, scheduler="threaded", progressbar=True, public: bool = False, verbose: bool = True, ): """Copies dataset at `src` to `dest` including version control history. Args: src (str, pathlib.Path): Path to the dataset to be copied. dest (str, pathlib.Path): Destination path to copy to. tensors (List[str], optional): Names of tensors (and groups) to be copied. If not specified all tensors are copied. overwrite (bool): If True and a dataset exists at `destination`, it will be overwritten. Defaults to False. src_creds (dict, optional): A dictionary containing credentials used to access the dataset at `src`. - - If 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token' are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths. - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys. src_token (str, optional): Activeloop token, used for fetching credentials to the dataset at `src` if it is a Hub dataset. This is optional, tokens are normally autogenerated. dest_creds (dict, optional): creds required to create / overwrite datasets at `dest`. dest_token (str, optional): token used to for fetching credentials to `dest`. num_workers (int): The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler. scheduler (str): The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'. progressbar (bool): Displays a progress bar if True (default). public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False. verbose (bool): If True, logs will be printed. Defaults to True. Returns: Dataset: New dataset object. Raises: DatasetHandlerError: If a dataset already exists at destination path and overwrite is False. """ src = convert_pathlib_to_string_if_needed(src) dest = convert_pathlib_to_string_if_needed(dest) report_params = { "Overwrite": overwrite, "Num_Workers": num_workers, "Scheduler": scheduler, "Progressbar": progressbar, "Public": public, } if dest.startswith("hub://"): report_params["Dest"] = dest feature_report_path(src, "deepcopy", report_params, token=dest_token) src_ds = hub.load( src, read_only=True, creds=src_creds, token=src_token, verbose=False ) src_storage = get_base_storage(src_ds.storage) dest_storage, cache_chain = get_storage_and_cache_chain( dest, creds=dest_creds, token=dest_token, read_only=False, memory_cache_size=DEFAULT_MEMORY_CACHE_SIZE, local_cache_size=DEFAULT_LOCAL_CACHE_SIZE, ) if dataset_exists(cache_chain): if overwrite: cache_chain.clear() else: raise DatasetHandlerError( f"A dataset already exists at the given path ({dest}). If you want to copy to a new dataset, either specify another path or use overwrite=True." ) metas: Dict[str, DatasetMeta] = {} def copy_func(keys, progress_callback=None): cache = generate_chain( src_storage, memory_cache_size=DEFAULT_MEMORY_CACHE_SIZE, local_cache_size=DEFAULT_LOCAL_CACHE_SIZE, path=src, ) for key in keys: val = metas.get(key) or cache[key] if isinstance(val, HubMemoryObject): dest_storage[key] = val.tobytes() else: dest_storage[key] = val if progress_callback: progress_callback(1) def copy_func_with_progress_bar(pg_callback, keys): copy_func(keys, pg_callback) keys = src_storage._all_keys() if tensors is not None: required_tensors = src_ds._resolve_tensor_list(tensors) for t in required_tensors[:]: required_tensors.extend(src_ds[t].meta.links) required_tensor_paths = set( src_ds.meta.tensor_names[t] for t in required_tensors ) all_tensors_in_src = src_ds._tensors() all_tensor_paths_in_src = [ src_ds.meta.tensor_names[t] for t in all_tensors_in_src ] tensor_paths_to_exclude = [ t for t in all_tensor_paths_in_src if t not in required_tensor_paths ] def fltr(k): for t in tensor_paths_to_exclude: if k.startswith(t + "/") or "/" + t + "/" in k: return False return True def keep_group(g): for t in tensors: if t == g or t.startswith(g + "/"): return True return False def process_meta(k): if posixpath.basename(k) == DATASET_META_FILENAME: meta = DatasetMeta.frombuffer(src_storage[k]) if not meta.tensor_names: # backward compatibility meta.tensor_names = {t: t for t in meta.tensors} meta.tensors = list( filter( lambda t: meta.tensor_names[t] in required_tensor_paths, meta.tensors, ) ) meta.hidden_tensors = list( filter(lambda t: t in meta.tensors, meta.hidden_tensors) ) meta.groups = list(filter(keep_group, meta.groups)) meta.tensor_names = { k: v for k, v in meta.tensor_names.items() if k in meta.tensors } metas[k] = meta return k keys = filter(fltr, map(process_meta, keys)) keys = list(keys) if tensors: assert metas len_keys = len(keys) if num_workers == 0: keys = [keys] else: keys = [keys[i::num_workers] for i in range(num_workers)] compute_provider = get_compute_provider(scheduler, num_workers) try: if progressbar: compute_provider.map_with_progressbar( copy_func_with_progress_bar, keys, len_keys, "Copying dataset", ) else: compute_provider.map(copy_func, keys) finally: compute_provider.close() ret = dataset_factory( path=dest, storage=cache_chain, read_only=False, public=public, token=dest_token, verbose=verbose, ) ret._register_dataset() return ret
def delete(path, force=False, large_ok=False, creds=None, token=None, verbose=False)
-
Deletes a dataset at a given path.
This is an IRREVERSIBLE operation. Data once deleted can not be recovered.
Args
path
:str, pathlib.Path
- The path to the dataset to be deleted.
force
:bool
- Delete data regardless of whether it looks like a hub dataset. All data at the path will be removed.
large_ok
:bool
- Delete datasets larger than 1GB. Disabled by default.
creds
:dict
, optional-
A dictionary containing credentials used to access the dataset at the path.
- If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
- It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys.
token
:str
, optional- Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
verbose
:bool
- If True, logs will be printed. Defaults to True.
Raises
DatasetHandlerError
- If a Dataset does not exist at the given path and force = False.
NotImplementedError
- When attempting to delete a managed view.
Expand source code
@staticmethod def delete( path: Union[str, pathlib.Path], force: bool = False, large_ok: bool = False, creds: Optional[dict] = None, token: Optional[str] = None, verbose: bool = False, ) -> None: """Deletes a dataset at a given path. This is an __IRREVERSIBLE__ operation. Data once deleted can not be recovered. Args: path (str, pathlib.Path): The path to the dataset to be deleted. force (bool): Delete data regardless of whether it looks like a hub dataset. All data at the path will be removed. large_ok (bool): Delete datasets larger than 1GB. Disabled by default. creds (dict, optional): A dictionary containing credentials used to access the dataset at the path. - - If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths. - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys. token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated. verbose (bool): If True, logs will be printed. Defaults to True. Raises: DatasetHandlerError: If a Dataset does not exist at the given path and force = False. NotImplementedError: When attempting to delete a managed view. """ path = convert_pathlib_to_string_if_needed(path) if creds is None: creds = {} feature_report_path( path, "delete", {"Force": force, "Large_OK": large_ok}, token=token ) try: qtokens = ["/.queries/", "\\.queries\\"] for qt in qtokens: if qt in path: raise NotImplementedError( "Deleting managed views by path is not supported. Load the source dataset and do `ds.delete_view(id)` instead." ) ds = hub.load(path, verbose=False, token=token, creds=creds) ds.delete(large_ok=large_ok) if verbose: logger.info(f"{path} dataset deleted successfully.") except Exception as e: if force: base_storage = storage_provider_from_path( path=path, creds=creds, read_only=False, token=token, ) base_storage.clear() remove_path_from_backend(path, token) if verbose: logger.info(f"{path} folder deleted successfully.") else: if isinstance(e, (DatasetHandlerError, PathNotEmptyException)): raise DatasetHandlerError( "A Hub dataset wasn't found at the specified path. " "This may be due to a corrupt dataset or a wrong path. " "If you want to delete the data at the path regardless, use force=True" ) raise
def empty(path, overwrite=False, public=False, memory_cache_size=256, local_cache_size=0, creds=None, token=None, verbose=True)
-
Creates an empty dataset
Important
Using
overwrite
will delete all of your data if it exists! Be very careful when setting this parameter.Args
path
:str, pathlib.Path
-
The full path to the dataset. Can be:
- a Hub cloud path of the form
hub://username/datasetname
. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line) - an s3 path of the form
s3://bucketname/path/to/dataset
. Credentials are required in either the environment or passed to the creds argument. - a local file system path of the form
./path/to/dataset
or~/path/to/dataset
orpath/to/dataset
. - a memory path of the form
mem://path/to/dataset
which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
- a Hub cloud path of the form
overwrite
:bool
- WARNING: If set to True this overwrites the dataset if it already exists. This can NOT be undone! Defaults to False.
public
:bool
- Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.
memory_cache_size
:int
- The size of the memory cache to be used in MB.
local_cache_size
:int
- The size of the local filesystem cache to be used in MB.
creds
:dict
, optional-
- A dictionary containing credentials used to access the dataset at the path.
- If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
- It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys.
token
:str
, optional- Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
verbose
:bool
- If True, logs will be printed. Defaults to True.
Returns
Dataset object created using the arguments provided.
Raises
DatasetHandlerError
- If a Dataset already exists at the given path and overwrite is False.
UserNotLoggedInException
- When user is not logged in
InvalidTokenException
- If the specified toke is invalid
TokenPermissionError
- When there are permission or other errors related to token
Expand source code
@staticmethod def empty( path: Union[str, pathlib.Path], overwrite: bool = False, public: bool = False, memory_cache_size: int = DEFAULT_MEMORY_CACHE_SIZE, local_cache_size: int = DEFAULT_LOCAL_CACHE_SIZE, creds: Optional[dict] = None, token: Optional[str] = None, verbose: bool = True, ) -> Dataset: """Creates an empty dataset Important: Using `overwrite` will delete all of your data if it exists! Be very careful when setting this parameter. Args: path (str, pathlib.Path): The full path to the dataset. Can be: - - a Hub cloud path of the form `hub://username/datasetname`. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line) - an s3 path of the form `s3://bucketname/path/to/dataset`. Credentials are required in either the environment or passed to the creds argument. - a local file system path of the form `./path/to/dataset` or `~/path/to/dataset` or `path/to/dataset`. - a memory path of the form `mem://path/to/dataset` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist. overwrite (bool): __WARNING__: If set to True this overwrites the dataset if it already exists. This can __NOT__ be undone! Defaults to False. public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False. memory_cache_size (int): The size of the memory cache to be used in MB. local_cache_size (int): The size of the local filesystem cache to be used in MB. creds (dict, optional): - A dictionary containing credentials used to access the dataset at the path. - If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths. - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys. token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated. verbose (bool): If True, logs will be printed. Defaults to True. Returns: Dataset object created using the arguments provided. Raises: DatasetHandlerError: If a Dataset already exists at the given path and overwrite is False. UserNotLoggedInException: When user is not logged in InvalidTokenException: If the specified toke is invalid TokenPermissionError: When there are permission or other errors related to token """ path = convert_pathlib_to_string_if_needed(path) if creds is None: creds = {} try: storage, cache_chain = get_storage_and_cache_chain( path=path, read_only=False, creds=creds, token=token, memory_cache_size=memory_cache_size, local_cache_size=local_cache_size, ) feature_report_path(path, "empty", {"Overwrite": overwrite}, token=token) except Exception as e: if isinstance(e, UserNotLoggedInException): message = ( f"Please log in through the CLI in order to create this dataset, " f"or create an API token in the UI and pass it to this method using the " f"‘token’ parameter. The CLI commands are ‘activeloop login’ and ‘activeloop register’." ) raise UserNotLoggedInException(message) elif isinstance(e, TokenPermissionError): message = ( "You do not have sufficient permissions to create a dataset at the specified path. " "Please make sure that you have write access to the path provided." ) raise TokenPermissionError(message) raise if overwrite and dataset_exists(cache_chain): cache_chain.clear() elif dataset_exists(cache_chain): raise DatasetHandlerError( f"A dataset already exists at the given path ({path}). If you want to create" f" a new empty dataset, either specify another path or use overwrite=True. " f"If you want to load the dataset that exists at this path, use hub.load() instead." ) read_only = storage.read_only return dataset_factory( path=path, storage=cache_chain, read_only=read_only, public=public, token=token, verbose=verbose, )
def exists(path, creds=None, token=None)
-
Checks if a dataset exists at the given
path
.Args
path
:str, pathlib.Path
- the path which needs to be checked.
creds
:dict
, optional- A dictionary containing credentials used to access the dataset at the path.
token
:str
, optional- Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
Returns
A boolean confirming whether the dataset exists or not at the given path.
Expand source code
@staticmethod def exists( path: Union[str, pathlib.Path], creds: Optional[dict] = None, token: Optional[str] = None, ) -> bool: """Checks if a dataset exists at the given `path`. Args: path (str, pathlib.Path): the path which needs to be checked. creds (dict, optional): A dictionary containing credentials used to access the dataset at the path. token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated. Returns: A boolean confirming whether the dataset exists or not at the given path. """ path = convert_pathlib_to_string_if_needed(path) if creds is None: creds = {} try: storage, cache_chain = get_storage_and_cache_chain( path=path, read_only=True, creds=creds, token=token, memory_cache_size=DEFAULT_MEMORY_CACHE_SIZE, local_cache_size=DEFAULT_LOCAL_CACHE_SIZE, ) except (TokenPermissionError): # Cloud Dataset does not exist return False return dataset_exists(storage)
def ingest(src, dest, images_compression='auto', dest_creds=None, progressbar=True, summary=True, **dataset_kwargs)
-
Ingests a dataset from a source and stores it as a structured dataset to destination
Note
- Currently only local source paths and image classification datasets / csv files are supported for automatic ingestion.
- Supported filetypes: png/jpeg/jpg/csv.
- All files and sub-directories with unsupported filetypes are ignored.
- Valid source directory structures for image classification look like:
data/ img0.jpg img1.jpg ...
or
data/ class0/ cat0.jpg ... class1/ dog0.jpg ... ...
or
data/ train/ class0/ img0.jpg ... ... val/ class0/ img0.jpg ... ... ...
- Classes defined as sub-directories can be accessed at
ds["test/labels"].info.class_names
. - Support for train and test sub directories is present under ds["train/images"], ds["train/labels"] and ds["test/images"], ds["test/labels"]
- Mapping filenames to classes from an external file is currently not supported.
Args
src
:str, pathlib.Path
- Local path to where the unstructured dataset is stored or path to csv file.
dest
:str, pathlib.Path
-
- Destination path where the structured dataset will be stored. Can be:
- a Hub cloud path of the form
hub://username/datasetname
. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line) - an s3 path of the form
s3://bucketname/path/to/dataset
. Credentials are required in either the environment or passed to the creds argument. - a local file system path of the form
./path/to/dataset
or~/path/to/dataset
orpath/to/dataset
. - a memory path of the form
mem://path/to/dataset
which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
images_compression
:str
- For image classification datasets, this compression will be used for the
images
tensor. If images_compression is "auto", compression will be automatically determined by the most common extension in the directory. dest_creds
:dict
- A dictionary containing credentials used to access the destination path of the dataset.
progressbar
:bool
- Enables or disables ingestion progress bar. Defaults to True.
summary
:bool
- If True, a summary of skipped files will be printed after completion. Defaults to True.
**dataset_kwargs
- Any arguments passed here will be forwarded to the dataset creator function.
Returns
Dataset
- New dataset object with structured dataset.
Raises
InvalidPathException
- If the source directory does not exist.
SamePathException
- If the source and destination path are same.
AutoCompressionError
- If the source director is empty or does not contain a valid extension.
InvalidFileExtension
- If the most frequent file extension is found to be 'None' during auto-compression.
Expand source code
@staticmethod def ingest( src: Union[str, pathlib.Path], dest: Union[str, pathlib.Path], images_compression: str = "auto", dest_creds: dict = None, progressbar: bool = True, summary: bool = True, **dataset_kwargs, ) -> Dataset: """Ingests a dataset from a source and stores it as a structured dataset to destination Note: - Currently only local source paths and image classification datasets / csv files are supported for automatic ingestion. - Supported filetypes: png/jpeg/jpg/csv. - All files and sub-directories with unsupported filetypes are ignored. - Valid source directory structures for image classification look like: ``` data/ img0.jpg img1.jpg ... ``` or ``` data/ class0/ cat0.jpg ... class1/ dog0.jpg ... ... ``` or ``` data/ train/ class0/ img0.jpg ... ... val/ class0/ img0.jpg ... ... ... ``` - Classes defined as sub-directories can be accessed at `ds["test/labels"].info.class_names`. - Support for train and test sub directories is present under ds["train/images"], ds["train/labels"] and ds["test/images"], ds["test/labels"] - Mapping filenames to classes from an external file is currently not supported. Args: src (str, pathlib.Path): Local path to where the unstructured dataset is stored or path to csv file. dest (str, pathlib.Path): - Destination path where the structured dataset will be stored. Can be: - a Hub cloud path of the form `hub://username/datasetname`. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line) - an s3 path of the form `s3://bucketname/path/to/dataset`. Credentials are required in either the environment or passed to the creds argument. - a local file system path of the form `./path/to/dataset` or `~/path/to/dataset` or `path/to/dataset`. - a memory path of the form `mem://path/to/dataset` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist. images_compression (str): For image classification datasets, this compression will be used for the `images` tensor. If images_compression is "auto", compression will be automatically determined by the most common extension in the directory. dest_creds (dict): A dictionary containing credentials used to access the destination path of the dataset. progressbar (bool): Enables or disables ingestion progress bar. Defaults to True. summary (bool): If True, a summary of skipped files will be printed after completion. Defaults to True. **dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function. Returns: Dataset: New dataset object with structured dataset. Raises: InvalidPathException: If the source directory does not exist. SamePathException: If the source and destination path are same. AutoCompressionError: If the source director is empty or does not contain a valid extension. InvalidFileExtension: If the most frequent file extension is found to be 'None' during auto-compression. """ dest = convert_pathlib_to_string_if_needed(dest) feature_report_path( dest, "ingest", { "Images_Compression": images_compression, "Progressbar": progressbar, "Summary": summary, }, ) src = convert_pathlib_to_string_if_needed(src) if isinstance(src, str): if os.path.isdir(dest) and os.path.samefile(src, dest): raise SamePathException(src) if src.endswith(".csv"): import pandas as pd # type:ignore if not os.path.isfile(src): raise InvalidPathException(src) source = pd.read_csv(src, quotechar='"', skipinitialspace=True) ds = dataset.ingest_dataframe( source, dest, dest_creds, progressbar, **dataset_kwargs ) return ds if not os.path.isdir(src): raise InvalidPathException(src) if images_compression == "auto": images_compression = get_most_common_extension(src) if images_compression is None: raise InvalidFileExtension(src) ds = hub.dataset(dest, creds=dest_creds, **dataset_kwargs) # TODO: support more than just image classification (and update docstring) unstructured = ImageClassification(source=src) # TODO: auto detect compression unstructured.structure( ds, # type: ignore use_progress_bar=progressbar, generate_summary=summary, image_tensor_args={"sample_compression": images_compression}, ) return ds # type: ignore
def ingest_huggingface(src, dest, use_progressbar=True)
-
Converts hugging face datasets to hub format.
Note
- if DatasetDict looks like:
{ train: Dataset({ features: ['data'] }), validation: Dataset({ features: ['data'] }), test: Dataset({ features: ['data'] }), }
it will be converted to a Hub
Dataset
with tensors['train/data', 'validation/data', 'test/data']
.Features of the type
Sequence(feature=Value(dtype='string'))
are not supported. Columns of such type are skipped.Args
src
:hfDataset, DatasetDict
- Hugging Face Dataset or DatasetDict to be converted. Data in different splits of a DatasetDict will be stored under respective tensor groups.
dest
:Dataset, str, pathlib.Path
- Destination dataset or path to it.
use_progressbar
:bool
- Defines if progress bar should be used to show conversion progress.
Returns
Dataset
- The destination Hub dataset.
Expand source code
def ingest_huggingface( src, dest, use_progressbar=True, ) -> Dataset: """Converts hugging face datasets to hub format. Note: - if DatasetDict looks like: ``` { train: Dataset({ features: ['data'] }), validation: Dataset({ features: ['data'] }), test: Dataset({ features: ['data'] }), } ``` it will be converted to a Hub `Dataset` with tensors `['train/data', 'validation/data', 'test/data']`. Features of the type `Sequence(feature=Value(dtype='string'))` are not supported. Columns of such type are skipped. Args: src (hfDataset, DatasetDict): Hugging Face Dataset or DatasetDict to be converted. Data in different splits of a DatasetDict will be stored under respective tensor groups. dest (Dataset, str, pathlib.Path): Destination dataset or path to it. use_progressbar (bool): Defines if progress bar should be used to show conversion progress. Returns: Dataset: The destination Hub dataset. """ from datasets import DatasetDict if isinstance(dest, (str, pathlib.Path)): ds = hub.dataset(dest) else: ds = dest # type: ignore if isinstance(src, DatasetDict): for split, src_ds in src.items(): for key, feature in src_ds.features.items(): _create_tensor_from_feature(f"{split}/{key}", feature, src_ds, ds) else: skipped_keys: Set[str] = set() features = tqdm( src.features.items(), desc=f"Converting...({len(skipped_keys)} skipped)", disable=not use_progressbar, ) for key, feature in features: if not _create_tensor_from_feature(key, feature, src, ds): skipped_keys.add(key) features.set_description(f"Converting...({len(skipped_keys)} skipped)") return ds # type: ignore
def ingest_kaggle(tag, src, dest, exist_ok=False, images_compression='auto', dest_creds=None, kaggle_credentials=None, progressbar=True, summary=True, **dataset_kwargs)
-
Download and ingest a kaggle dataset and store it as a structured dataset to destination
Note
Currently only local source paths and image classification datasets are supported for automatic ingestion.
Args
tag
:str
- Kaggle dataset tag. Example:
"coloradokb/dandelionimages"
points to https://www.kaggle.com/coloradokb/dandelionimages src
:str, pathlib.Path
- Local path to where the raw kaggle dataset will be downlaoded to.
dest
:str, pathlib.Path
-
- Destination path where the structured dataset will be stored. Can be:
- a Hub cloud path of the form
hub://username/datasetname
. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line) - an s3 path of the form
s3://bucketname/path/to/dataset
. Credentials are required in either the environment or passed to the creds argument. - a local file system path of the form
./path/to/dataset
or~/path/to/dataset
orpath/to/dataset
. - a memory path of the form
mem://path/to/dataset
which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
exist_ok
:bool
- If the kaggle dataset was already downloaded and
exist_ok
is True, ingestion will proceed without error. images_compression
:str
- For image classification datasets, this compression will be used for the
images
tensor. If images_compression is "auto", compression will be automatically determined by the most common extension in the directory. dest_creds
:dict
- A dictionary containing credentials used to access the destination path of the dataset.
kaggle_credentials
:dict
- A dictionary containing kaggle credentials {"username":"YOUR_USERNAME", "key": "YOUR_KEY"}. If None, environment variables/the kaggle.json file will be used if available.
progressbar
:bool
- Enables or disables ingestion progress bar. Set to true by default.
summary
:bool
- Generates ingestion summary. Set to true by default.
**dataset_kwargs
- Any arguments passed here will be forwarded to the dataset creator function.
Returns
Dataset
- New dataset object with structured dataset.
Raises
SamePathException
- If the source and destination path are same.
Expand source code
@staticmethod def ingest_kaggle( tag: str, src: Union[str, pathlib.Path], dest: Union[str, pathlib.Path], exist_ok: bool = False, images_compression: str = "auto", dest_creds: dict = None, kaggle_credentials: dict = None, progressbar: bool = True, summary: bool = True, **dataset_kwargs, ) -> Dataset: """Download and ingest a kaggle dataset and store it as a structured dataset to destination Note: Currently only local source paths and image classification datasets are supported for automatic ingestion. Args: tag (str): Kaggle dataset tag. Example: `"coloradokb/dandelionimages"` points to https://www.kaggle.com/coloradokb/dandelionimages src (str, pathlib.Path): Local path to where the raw kaggle dataset will be downlaoded to. dest (str, pathlib.Path): - Destination path where the structured dataset will be stored. Can be: - a Hub cloud path of the form `hub://username/datasetname`. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line) - an s3 path of the form `s3://bucketname/path/to/dataset`. Credentials are required in either the environment or passed to the creds argument. - a local file system path of the form `./path/to/dataset` or `~/path/to/dataset` or `path/to/dataset`. - a memory path of the form `mem://path/to/dataset` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist. exist_ok (bool): If the kaggle dataset was already downloaded and `exist_ok` is True, ingestion will proceed without error. images_compression (str): For image classification datasets, this compression will be used for the `images` tensor. If images_compression is "auto", compression will be automatically determined by the most common extension in the directory. dest_creds (dict): A dictionary containing credentials used to access the destination path of the dataset. kaggle_credentials (dict): A dictionary containing kaggle credentials {"username":"YOUR_USERNAME", "key": "YOUR_KEY"}. If None, environment variables/the kaggle.json file will be used if available. progressbar (bool): Enables or disables ingestion progress bar. Set to true by default. summary (bool): Generates ingestion summary. Set to true by default. **dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function. Returns: Dataset: New dataset object with structured dataset. Raises: SamePathException: If the source and destination path are same. """ src = convert_pathlib_to_string_if_needed(src) dest = convert_pathlib_to_string_if_needed(dest) feature_report_path( dest, "ingest_kaggle", { "Images_Compression": images_compression, "Exist_Ok": exist_ok, "Progressbar": progressbar, "Summary": summary, }, ) if os.path.isdir(src) and os.path.isdir(dest): if os.path.samefile(src, dest): raise SamePathException(src) download_kaggle_dataset( tag, local_path=src, kaggle_credentials=kaggle_credentials, exist_ok=exist_ok, ) ds = hub.ingest( src=src, dest=dest, images_compression=images_compression, dest_creds=dest_creds, progressbar=progressbar, summary=summary, **dataset_kwargs, ) return ds
def like(dest, src, tensors=None, overwrite=False, creds=None, token=None, public=False)
-
Copies the
source
dataset's structure to a new location. No samples are copied, only the meta/info for the dataset and it's tensors.Args
dest
- Empty Dataset or Path where the new dataset will be created.
src
:Union[str, Dataset]
- Path or dataset object that will be used as the template for the new dataset.
tensors
:List[str]
, optional- Names of tensors (and groups) to be replicated. If not specified all tensors in source dataset are considered.
overwrite
:bool
- If True and a dataset exists at
destination
, it will be overwritten. Defaults to False. creds
:dict
, optional-
- A dictionary containing credentials used to access the dataset at the path.
- If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
- It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys.
token
:str
, optional- Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
public
:bool
- Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.
Returns
Dataset
- New dataset object.
Expand source code
@staticmethod def like( dest: Union[str, pathlib.Path], src: Union[str, Dataset, pathlib.Path], tensors: Optional[List[str]] = None, overwrite: bool = False, creds: Optional[dict] = None, token: Optional[str] = None, public: bool = False, ) -> Dataset: """Copies the `source` dataset's structure to a new location. No samples are copied, only the meta/info for the dataset and it's tensors. Args: dest: Empty Dataset or Path where the new dataset will be created. src (Union[str, Dataset]): Path or dataset object that will be used as the template for the new dataset. tensors (List[str], optional): Names of tensors (and groups) to be replicated. If not specified all tensors in source dataset are considered. overwrite (bool): If True and a dataset exists at `destination`, it will be overwritten. Defaults to False. creds (dict, optional): - A dictionary containing credentials used to access the dataset at the path. - If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths. - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys. token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated. public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False. Returns: Dataset: New dataset object. """ if isinstance(dest, Dataset): path = dest.path else: path = dest feature_report_path( path, "like", {"Overwrite": overwrite, "Public": public, "Tensors": tensors}, token=token, ) return dataset._like(dest, src, tensors, overwrite, creds, token, public)
def link(path, creds_key=None)
-
Utility that stores a link to raw data. Used to add data to a Hub Dataset without copying it.
Note
No data is actually loaded until you try to read the sample from a dataset. There are a few exceptions to this:-
- If verify=True was specified DURING create_tensor of the tensor to which this is being added, some metadata is read to verify the integrity of the sample.
- If create_shape_tensor=True was specified DURING create_tensor of the tensor to which this is being added, the shape of the sample is read.
- If create_sample_info_tensor=True was specified DURING create_tensor of the tensor to which this is being added, the sample info is read.
Examples
>>> ds = hub.dataset("......")
Add the names of the creds you want to use (not needed for http/local urls)
>>> ds.add_creds_key("MY_S3_KEY") >>> ds.add_creds_key("GCS_KEY")
Populate the names added with creds dictionary These creds are only present temporarily and will have to be repopulated on every reload
>>> ds.populate_creds("MY_S3_KEY", {}) >>> ds.populate_creds("GCS_KEY", {})
Create a tensor that can contain links
>>> ds.create_tensor("img", htype="link[image]", verify=True, create_shape_tensor=False, create_sample_info_tensor=False)
Populate the tensor with links
>>> ds.img.append(hub.link("s3://abc/def.jpeg", creds_key="MY_S3_KEY")) >>> ds.img.append(hub.link("gcs://ghi/jkl.png", creds_key="GCS_KEY")) >>> ds.img.append(hub.link(“<https://picsum.photos/200/300”>)) # http path doesn’t need creds >>> ds.img.append(hub.link(“./path/to/cat.jpeg”)) # local path doesn’t need creds >>> ds.img.append(hub.link(“s3://abc/def.jpeg”)) # this will throw an exception as cloud paths always need creds_key >>> ds.img.append(hub.link("s3://abc/def.jpeg", creds_key="ENV")) # this will use creds from environment
Accessing the data
>>> for i in range(5): >>> ds.img[i].numpy()
Updating a sample
>>> ds.img[0] = hub.link("./data/cat.jpeg")
Supported file types:
Image: "bmp", "dib", "gif", "ico", "jpeg", "jpeg2000", "pcx", "png", "ppm", "sgi", "tga", "tiff", "webp", "wmf", "xbm" Audio: "flac", "mp3", "wav" Video: "mp4", "mkv", "avi" Dicom: "dcm"
Args
path
:str
- Path to a supported file.
creds_key
:optional, str
- The credential key to use to read data for this sample. The actual credentials are fetched from the dataset.
Returns
LinkedSample
- LinkedSample object that stores path and creds.
Expand source code
def link( path: str, creds_key: Optional[str] = None, ) -> LinkedSample: """Utility that stores a link to raw data. Used to add data to a Hub Dataset without copying it. Note: No data is actually loaded until you try to read the sample from a dataset. There are a few exceptions to this:- - If verify=True was specified DURING create_tensor of the tensor to which this is being added, some metadata is read to verify the integrity of the sample. - If create_shape_tensor=True was specified DURING create_tensor of the tensor to which this is being added, the shape of the sample is read. - If create_sample_info_tensor=True was specified DURING create_tensor of the tensor to which this is being added, the sample info is read. Examples: >>> ds = hub.dataset("......") Add the names of the creds you want to use (not needed for http/local urls) >>> ds.add_creds_key("MY_S3_KEY") >>> ds.add_creds_key("GCS_KEY") Populate the names added with creds dictionary These creds are only present temporarily and will have to be repopulated on every reload >>> ds.populate_creds("MY_S3_KEY", {}) >>> ds.populate_creds("GCS_KEY", {}) Create a tensor that can contain links >>> ds.create_tensor("img", htype="link[image]", verify=True, create_shape_tensor=False, create_sample_info_tensor=False) Populate the tensor with links >>> ds.img.append(hub.link("s3://abc/def.jpeg", creds_key="MY_S3_KEY")) >>> ds.img.append(hub.link("gcs://ghi/jkl.png", creds_key="GCS_KEY")) >>> ds.img.append(hub.link(“https://picsum.photos/200/300”)) # http path doesn’t need creds >>> ds.img.append(hub.link(“./path/to/cat.jpeg”)) # local path doesn’t need creds >>> ds.img.append(hub.link(“s3://abc/def.jpeg”)) # this will throw an exception as cloud paths always need creds_key >>> ds.img.append(hub.link("s3://abc/def.jpeg", creds_key="ENV")) # this will use creds from environment Accessing the data >>> for i in range(5): >>> ds.img[i].numpy() Updating a sample >>> ds.img[0] = hub.link("./data/cat.jpeg") Supported file types: Image: "bmp", "dib", "gif", "ico", "jpeg", "jpeg2000", "pcx", "png", "ppm", "sgi", "tga", "tiff", "webp", "wmf", "xbm" Audio: "flac", "mp3", "wav" Video: "mp4", "mkv", "avi" Dicom: "dcm" Args: path (str): Path to a supported file. creds_key (optional, str): The credential key to use to read data for this sample. The actual credentials are fetched from the dataset. Returns: LinkedSample: LinkedSample object that stores path and creds. """ return LinkedSample(path, creds_key)
def list(workspace='', token=None)
-
List all available hub cloud datasets.
Args
workspace
:str
- Specify user/organization name. If not given, returns a list of all datasets that can be accessed, regardless of what workspace they are in. Otherwise, lists all datasets in the given workspace.
token
:str
, optional- Activeloop token, used for fetching credentials for Hub datasets. This is optional, tokens are normally autogenerated.
Returns
List of dataset names.
Expand source code
@staticmethod @hub_reporter.record_call def list( workspace: str = "", token: Optional[str] = None, ) -> None: """List all available hub cloud datasets. Args: workspace (str): Specify user/organization name. If not given, returns a list of all datasets that can be accessed, regardless of what workspace they are in. Otherwise, lists all datasets in the given workspace. token (str, optional): Activeloop token, used for fetching credentials for Hub datasets. This is optional, tokens are normally autogenerated. Returns: List of dataset names. """ client = HubBackendClient(token=token) datasets = client.get_datasets(workspace=workspace) return datasets
def load(path, read_only=None, memory_cache_size=256, local_cache_size=0, creds=None, token=None, verbose=True, access_method='stream')
-
Loads an existing dataset
Args
path
:str, pathlib.Path
-
- The full path to the dataset. Can be:
- a Hub cloud path of the form
hub://username/datasetname
. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line) - an s3 path of the form
s3://bucketname/path/to/dataset
. Credentials are required in either the environment or passed to the creds argument. - a local file system path of the form
./path/to/dataset
or~/path/to/dataset
orpath/to/dataset
. - a memory path of the form
mem://path/to/dataset
which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist.
read_only
:bool
, optional- Opens dataset in read only mode if this is passed as True. Defaults to False. Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode.
memory_cache_size
:int
- The size of the memory cache to be used in MB.
local_cache_size
:int
- The size of the local filesystem cache to be used in MB.
creds
:dict
, optional-
- A dictionary containing credentials used to access the dataset at the path.
- If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths.
- It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys.
token
:str
, optional- Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
verbose
:bool
- If True, logs will be printed. Defaults to True.
access_method
:str
-
- The access method to use for the dataset. Can be:-
- 'stream' - Streams the data from the dataset i.e. only fetches data when required. This is the default.
- 'download' -
- Downloads the data to the local filesystem to the path specified in environment variable HUB_DOWNLOAD_PATH.
- Raises an exception if the environment variable is not set, or if the path is not empty.
- Will also raise an exception if the dataset does not exist.
- The 'download' access method can also be modified to specify num_workers and/or scheduler.
- For example:
'download:2:processed'
, will use 2 workers and use processed scheduler, while'download:3'
will use 3 workers and default scheduler (threaded), and'download:processed'
will use a single worker and use processed scheduler.
- 'local' - Used when download was already done in a previous run. Doesn't download the data again. Raises an exception if the dataset is not found in HUB_DOWNLOAD_PATH. Note: Any changes made to the dataset in download/local mode will only be made to the local copy and will not be reflected in the original dataset.
Returns
Dataset object created using the arguments provided.
Raises
DatasetHandlerError
- If a Dataset does not exist at the given path.
AgreementError
- When agreement is rejected
UserNotLoggedInException
- When user is not logged in
InvalidTokenException
- If the specified toke is invalid
TokenPermissionError
- When there are permission or other errors related to token
Expand source code
@staticmethod def load( path: Union[str, pathlib.Path], read_only: Optional[bool] = None, memory_cache_size: int = DEFAULT_MEMORY_CACHE_SIZE, local_cache_size: int = DEFAULT_LOCAL_CACHE_SIZE, creds: Optional[dict] = None, token: Optional[str] = None, verbose: bool = True, access_method: str = "stream", ) -> Dataset: """Loads an existing dataset Args: path (str, pathlib.Path): - The full path to the dataset. Can be: - a Hub cloud path of the form `hub://username/datasetname`. To write to Hub cloud datasets, ensure that you are logged in to Hub (use 'activeloop login' from command line) - an s3 path of the form `s3://bucketname/path/to/dataset`. Credentials are required in either the environment or passed to the creds argument. - a local file system path of the form `./path/to/dataset` or `~/path/to/dataset` or `path/to/dataset`. - a memory path of the form `mem://path/to/dataset` which doesn't save the dataset but keeps it in memory instead. Should be used only for testing as it does not persist. read_only (bool, optional): Opens dataset in read only mode if this is passed as True. Defaults to False. Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode. memory_cache_size (int): The size of the memory cache to be used in MB. local_cache_size (int): The size of the local filesystem cache to be used in MB. creds (dict, optional): - A dictionary containing credentials used to access the dataset at the path. - If aws_access_key_id, aws_secret_access_key, aws_session_token are present, these take precedence over credentials present in the environment or in credentials file. Currently only works with s3 paths. - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url', 'aws_region', 'profile_name' as keys. token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated. verbose (bool): If True, logs will be printed. Defaults to True. access_method (str): - The access method to use for the dataset. Can be:- - 'stream' - Streams the data from the dataset i.e. only fetches data when required. This is the default. - 'download' - - Downloads the data to the local filesystem to the path specified in environment variable HUB_DOWNLOAD_PATH. - Raises an exception if the environment variable is not set, or if the path is not empty. - Will also raise an exception if the dataset does not exist. - The 'download' access method can also be modified to specify num_workers and/or scheduler. - For example: `'download:2:processed'`, will use 2 workers and use processed scheduler, while `'download:3'` will use 3 workers and default scheduler (threaded), and `'download:processed'` will use a single worker and use processed scheduler. - 'local' - Used when download was already done in a previous run. Doesn't download the data again. Raises an exception if the dataset is not found in HUB_DOWNLOAD_PATH. Note: Any changes made to the dataset in download/local mode will only be made to the local copy and will not be reflected in the original dataset. Returns: Dataset object created using the arguments provided. Raises: DatasetHandlerError: If a Dataset does not exist at the given path. AgreementError: When agreement is rejected UserNotLoggedInException: When user is not logged in InvalidTokenException: If the specified toke is invalid TokenPermissionError: When there are permission or other errors related to token """ access_method, num_workers, scheduler = parse_access_method(access_method) check_access_method(access_method, overwrite=False) path = convert_pathlib_to_string_if_needed(path) if creds is None: creds = {} try: storage, cache_chain = get_storage_and_cache_chain( path=path, read_only=read_only, creds=creds, token=token, memory_cache_size=memory_cache_size, local_cache_size=local_cache_size, ) feature_report_path(path, "load", {}, token=token) except Exception as e: if isinstance(e, UserNotLoggedInException): message = ( "Please log in through the CLI in order to load this dataset, " "or create an API token in the UI and pass it to this method using " "the ‘token’ parameter. The CLI commands are ‘activeloop login’ and " "‘activeloop register’." ) raise UserNotLoggedInException(message) elif isinstance(e, TokenPermissionError): message = ( "You do not have sufficient permissions to load a dataset from the specified path. " "Please make sure that you have read access to the path provided." ) raise TokenPermissionError(message) raise if not dataset_exists(cache_chain): raise DatasetHandlerError( f"A Hub dataset does not exist at the given path ({path}). Check the path provided or in case you want to create a new dataset, use hub.empty()." ) try: if access_method == "stream": return dataset_factory( path=path, storage=cache_chain, read_only=read_only, token=token, verbose=verbose, ) return get_local_dataset( access_method=access_method, path=path, read_only=read_only, memory_cache_size=memory_cache_size, local_cache_size=local_cache_size, creds=creds, token=token, verbose=verbose, ds_exists=True, num_workers=num_workers, scheduler=scheduler, ) except AgreementError as e: raise e from None
def read(path, verify=False, creds=None, compression=None, storage=None)
-
Utility that reads raw data from supported files into hub format.
- Recompresses data into format required by the tensor if permitted by the tensor htype.
- Simply copies the data in the file if file format matches sample_compression of the tensor, thus maximizing upload speeds.
Note
No data is actually loaded until you try to get a property of the returned
Sample
. This is useful for passing along totensor.append
andtensor.extend
.Examples
>>> ds.create_tensor("images", htype="image", sample_compression="jpeg") >>> ds.images.append(hub.read("path/to/cat.jpg")) >>> ds.images.shape (1, 399, 640, 3)
>>> ds.create_tensor("videos", htype="video", sample_compression="mp4") >>> ds.videos.append(hub.read("path/to/video.mp4")) >>> ds.videos.shape (1, 136, 720, 1080, 3)
>>> ds.create_tensor("images", htype="image", sample_compression="jpeg") >>> ds.images.append(hub.read("https://picsum.photos/200/300")) >>> ds.images[0].shape (300, 200, 3)
Supported file types:
Image: "bmp", "dib", "gif", "ico", "jpeg", "jpeg2000", "pcx", "png", "ppm", "sgi", "tga", "tiff", "webp", "wmf", "xbm" Audio: "flac", "mp3", "wav" Video: "mp4", "mkv", "avi" Dicom: "dcm"
Args
path
:str
- Path to a supported file.
verify
:bool
- If True, contents of the file are verified.
creds
:optional, Dict
- Credentials for s3, gcp and http urls.
compression
:optional, str
- Format of the file (see
hub.compression.SUPPORTED_COMPRESSIONS
). Only required if path does not have an extension. storage
:optional, StorageProvider
- Storage provider to use to retrieve remote files. Useful if multiple files are being read from same storage to minimize overhead of creating a new provider.
Returns
Sample
- Sample object. Call
sample.array
to get thenp.ndarray
.
Expand source code
def read( path: str, verify: bool = False, creds: Optional[Dict] = None, compression: Optional[str] = None, storage: Optional[StorageProvider] = None, ) -> Sample: """Utility that reads raw data from supported files into hub format. - Recompresses data into format required by the tensor if permitted by the tensor htype. - Simply copies the data in the file if file format matches sample_compression of the tensor, thus maximizing upload speeds. Note: No data is actually loaded until you try to get a property of the returned `Sample`. This is useful for passing along to `tensor.append` and `tensor.extend`. Examples: >>> ds.create_tensor("images", htype="image", sample_compression="jpeg") >>> ds.images.append(hub.read("path/to/cat.jpg")) >>> ds.images.shape (1, 399, 640, 3) >>> ds.create_tensor("videos", htype="video", sample_compression="mp4") >>> ds.videos.append(hub.read("path/to/video.mp4")) >>> ds.videos.shape (1, 136, 720, 1080, 3) >>> ds.create_tensor("images", htype="image", sample_compression="jpeg") >>> ds.images.append(hub.read("https://picsum.photos/200/300")) >>> ds.images[0].shape (300, 200, 3) Supported file types: Image: "bmp", "dib", "gif", "ico", "jpeg", "jpeg2000", "pcx", "png", "ppm", "sgi", "tga", "tiff", "webp", "wmf", "xbm" Audio: "flac", "mp3", "wav" Video: "mp4", "mkv", "avi" Dicom: "dcm" Args: path (str): Path to a supported file. verify (bool): If True, contents of the file are verified. creds (optional, Dict): Credentials for s3, gcp and http urls. compression (optional, str): Format of the file (see `hub.compression.SUPPORTED_COMPRESSIONS`). Only required if path does not have an extension. storage (optional, StorageProvider): Storage provider to use to retrieve remote files. Useful if multiple files are being read from same storage to minimize overhead of creating a new provider. Returns: Sample: Sample object. Call `sample.array` to get the `np.ndarray`. """ return Sample( path, verify=verify, compression=compression, creds=creds, storage=storage )
def rename(old_path, new_path, creds=None, token=None)
-
Renames dataset at
old_path
tonew_path
.Examples
hub.rename("hub://username/image_ds", "hub://username/new_ds") hub.rename("s3://mybucket/my_ds", "s3://mybucket/renamed_ds")
Args
old_path
:str, pathlib.Path
- The path to the dataset to be renamed.
new_path
:str, pathlib.Path
- Path to the dataset after renaming.
creds
:dict
, optional-
- A dictionary containing credentials used to access the dataset at the path.
- This takes precedence over credentials present in the environment. Currently only works with s3 paths.
- It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url' and 'aws_region' as keys.
token
:str
, optional- Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
Returns
Dataset object after renaming.
Raises
DatasetHandlerError
- If a Dataset does not exist at the given path or if new path is to a different directory.
Expand source code
@staticmethod def rename( old_path: Union[str, pathlib.Path], new_path: Union[str, pathlib.Path], creds: Optional[dict] = None, token: Optional[str] = None, ) -> Dataset: """Renames dataset at `old_path` to `new_path`. Examples: ``` hub.rename("hub://username/image_ds", "hub://username/new_ds") hub.rename("s3://mybucket/my_ds", "s3://mybucket/renamed_ds") ``` Args: old_path (str, pathlib.Path): The path to the dataset to be renamed. new_path (str, pathlib.Path): Path to the dataset after renaming. creds (dict, optional): - A dictionary containing credentials used to access the dataset at the path. - This takes precedence over credentials present in the environment. Currently only works with s3 paths. - It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url' and 'aws_region' as keys. token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated. Returns: Dataset object after renaming. Raises: DatasetHandlerError: If a Dataset does not exist at the given path or if new path is to a different directory. """ old_path = convert_pathlib_to_string_if_needed(old_path) new_path = convert_pathlib_to_string_if_needed(new_path) if creds is None: creds = {} feature_report_path(old_path, "rename", {}, token=token) ds = hub.load(old_path, verbose=False, token=token, creds=creds) ds.rename(new_path) return ds # type: ignore
Classes
class Dataset (storage, index=None, group_index='', read_only=None, public=False, token=None, verbose=True, version_state=None, path=None, is_iteration=False, link_creds=None, pad_tensors=False, lock=True, **kwargs)
-
Initializes a new or existing dataset.
Args
storage
:LRUCache
- The storage provider used to access the dataset.
index
:Index, Optional
- The Index object restricting the view of this dataset's tensors.
group_index
:str
- Name of the group this dataset instance represents.
read_only
:bool, Optional
- Opens dataset in read only mode if this is passed as True. Defaults to False. Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode.
public
:bool, Optional
- Applied only if storage is Hub cloud storage and a new Dataset is being created. Defines if the dataset will have public access.
token
:str, Optional
- Activeloop token, used for fetching credentials for Hub datasets. This is Optional, tokens are normally autogenerated.
verbose
:bool
- If True, logs will be printed. Defaults to True.
version_state
:Dict[str, Any], Optional
- The version state of the dataset, includes commit_id, commit_node, branch, branch_commit_map and commit_node_map.
path
:str, pathlib.Path
- The path to the dataset.
is_iteration
:bool
- If this Dataset is being used as an iterator.
link_creds
:LinkCreds, Optional
- The LinkCreds object used to access tensors that have external data linked to them.
pad_tensors
:bool
- If True, shorter tensors will be padded to the length of the longest tensor.
**kwargs
- Passing subclass variables through without errors.
lock
:bool
- Whether the dataset should be locked for writing. Only applicable for s3, hub and gcs datasets. No effect if read_only=True.
Raises
ValueError
- If an existing local path is given, it must be a directory.
ImproperDatasetInitialization
- Exactly one argument out of 'path' and 'storage' needs to be specified. This is raised if none of them are specified or more than one are specifed.
InvalidHubPathException
- If a Hub cloud path (path starting with
hub://
) is specified and it isn't of the formhub://username/datasetname
. AuthorizationException
- If a Hub cloud path (path starting with
hub://
) is specified and the user doesn't have access to the dataset. PathNotEmptyException
- If the path to the dataset doesn't contain a Hub dataset and is also not empty.
LockedException
- If read_only is False but the dataset is locked for writing by another machine.
ReadOnlyModeError
- If read_only is False but write access is not available.
Expand source code
class Dataset: def __init__( self, storage: LRUCache, index: Optional[Index] = None, group_index: str = "", read_only: Optional[bool] = None, public: Optional[bool] = False, token: Optional[str] = None, verbose: bool = True, version_state: Optional[Dict[str, Any]] = None, path: Optional[Union[str, pathlib.Path]] = None, is_iteration: bool = False, link_creds=None, pad_tensors: bool = False, lock: bool = True, **kwargs, ): """Initializes a new or existing dataset. Args: storage (LRUCache): The storage provider used to access the dataset. index (Index, Optional): The Index object restricting the view of this dataset's tensors. group_index (str): Name of the group this dataset instance represents. read_only (bool, Optional): Opens dataset in read only mode if this is passed as True. Defaults to False. Datasets stored on Hub cloud that your account does not have write access to will automatically open in read mode. public (bool, Optional): Applied only if storage is Hub cloud storage and a new Dataset is being created. Defines if the dataset will have public access. token (str, Optional): Activeloop token, used for fetching credentials for Hub datasets. This is Optional, tokens are normally autogenerated. verbose (bool): If True, logs will be printed. Defaults to True. version_state (Dict[str, Any], Optional): The version state of the dataset, includes commit_id, commit_node, branch, branch_commit_map and commit_node_map. path (str, pathlib.Path): The path to the dataset. is_iteration (bool): If this Dataset is being used as an iterator. link_creds (LinkCreds, Optional): The LinkCreds object used to access tensors that have external data linked to them. pad_tensors (bool): If True, shorter tensors will be padded to the length of the longest tensor. **kwargs: Passing subclass variables through without errors. lock (bool): Whether the dataset should be locked for writing. Only applicable for s3, hub and gcs datasets. No effect if read_only=True. Raises: ValueError: If an existing local path is given, it must be a directory. ImproperDatasetInitialization: Exactly one argument out of 'path' and 'storage' needs to be specified. This is raised if none of them are specified or more than one are specifed. InvalidHubPathException: If a Hub cloud path (path starting with `hub://`) is specified and it isn't of the form `hub://username/datasetname`. AuthorizationException: If a Hub cloud path (path starting with `hub://`) is specified and the user doesn't have access to the dataset. PathNotEmptyException: If the path to the dataset doesn't contain a Hub dataset and is also not empty. LockedException: If read_only is False but the dataset is locked for writing by another machine. ReadOnlyModeError: If read_only is False but write access is not available. """ d: Dict[str, Any] = {} d["_client"] = d["org_id"] = d["ds_name"] = None # uniquely identifies dataset d["path"] = convert_pathlib_to_string_if_needed(path) or get_path_from_storage( storage ) d["storage"] = storage d["_read_only_error"] = read_only is False d["_read_only"] = DEFAULT_READONLY if read_only is None else read_only d["base_storage"] = get_base_storage(storage) d["_locked_out"] = False # User requested write access but was denied d["is_iteration"] = is_iteration d["is_first_load"] = version_state is None d["_is_filtered_view"] = False d["index"] = index or Index() d["group_index"] = group_index d["_token"] = token d["public"] = public d["verbose"] = verbose d["version_state"] = version_state or {} d["link_creds"] = link_creds d["_info"] = None d["_ds_diff"] = None d["_view_id"] = str(uuid.uuid4) d["_view_invalid"] = False d["_waiting_for_view_base_commit"] = False d["_new_view_base_commit"] = None d["_view_base"] = None d["_update_hooks"] = {} d["_commit_hooks"] = {} d["_parent_dataset"] = None d["_pad_tensors"] = pad_tensors d["_locking_enabled"] = lock self.__dict__.update(d) try: self._set_derived_attributes() except LockedException: raise LockedException( "This dataset cannot be open for writing as it is locked by another machine. Try loading the dataset with `read_only=True`." ) except ReadOnlyModeError as e: raise ReadOnlyModeError( "This dataset cannot be open for writing as you don't have permissions. Try loading the dataset with `read_only=True." ) self._first_load_init() self._initial_autoflush: List[ bool ] = [] # This is a stack to support nested with contexts def _lock_lost_handler(self): """This is called when lock is acquired but lost later on due to slow update.""" self.read_only = True always_warn( "Unable to update dataset lock as another machine has locked it for writing. Switching to read only mode." ) self._locked_out = True def __enter__(self): self._initial_autoflush.append(self.storage.autoflush) self.storage.autoflush = False return self def __exit__(self, exc_type, exc_val, exc_tb): self.storage.autoflush = self._initial_autoflush.pop() if not self._read_only: self.storage.maybe_flush() @property def num_samples(self) -> int: """Returns the length of the smallest tensor. Ignores any applied indexing and returns the total length. """ return min( map( len, filter( lambda t: t.key not in self.meta.hidden_tensors, self.version_state["full_tensors"].values(), ), ), default=0, ) @property def meta(self) -> DatasetMeta: """Returns the metadata of the dataset.""" return self.version_state["meta"] @property def client(self): """Returns the client of the dataset.""" return self._client def __len__(self): """Returns the length of the smallest tensor""" tensor_lengths = [len(tensor) for tensor in self.tensors.values()] length_fn = max if self._pad_tensors else min return length_fn(tensor_lengths, default=0) def __getstate__(self) -> Dict[str, Any]: """Returns a dict that can be pickled and used to restore this dataset. Note: Pickling a dataset does not copy the dataset, it only saves attributes that can be used to restore the dataset. If you pickle a local dataset and try to access it on a machine that does not have the data present, the dataset will not work. """ if self.path.startswith("mem://"): raise MemoryDatasetCanNotBePickledError keys = [ "path", "_read_only", "index", "group_index", "public", "storage", "_token", "verbose", "version_state", "org_id", "ds_name", "_is_filtered_view", "_view_id", "_view_invalid", "_new_view_base_commit", "_parent_dataset", "_pad_tensors", "_locking_enabled", ] state = {k: getattr(self, k) for k in keys} state["link_creds"] = self.link_creds return state def __setstate__(self, state: Dict[str, Any]): """Restores dataset from a pickled state. Args: state (dict): The pickled state used to restore the dataset. """ state["is_first_load"] = True state["_info"] = None state["is_iteration"] = False state["_read_only_error"] = False state["_initial_autoflush"] = [] state["_ds_diff"] = None state["_view_base"] = None state["_update_hooks"] = {} state["_commit_hooks"] = {} state["_waiting_for_view_base_commit"] = False state["_client"] = state["org_id"] = state["ds_name"] = None self.__dict__.update(state) self.__dict__["base_storage"] = get_base_storage(self.storage) # clear cache while restoring self.storage.clear_cache_without_flush() self._set_derived_attributes(verbose=False) def __getitem__( self, item: Union[ str, int, slice, List[int], Tuple[Union[int, slice, Tuple[int]]], Index ], is_iteration: bool = False, ): if isinstance(item, str): fullpath = posixpath.join(self.group_index, item) tensor = self._get_tensor_from_root(fullpath) if tensor is not None: return tensor[self.index] elif self._has_group_in_root(fullpath): ret = self.__class__( storage=self.storage, index=self.index, group_index=posixpath.join(self.group_index, item), read_only=self.read_only, token=self._token, verbose=False, version_state=self.version_state, path=self.path, link_creds=self.link_creds, pad_tensors=self._pad_tensors, ) elif "/" in item: splt = posixpath.split(item) ret = self[splt[0]][splt[1]] else: raise TensorDoesNotExistError(item) elif isinstance(item, (int, slice, list, tuple, Index)): ret = self.__class__( storage=self.storage, index=self.index[item], group_index=self.group_index, read_only=self._read_only, token=self._token, verbose=False, version_state=self.version_state, path=self.path, is_iteration=is_iteration, link_creds=self.link_creds, pad_tensors=self._pad_tensors, ) else: raise InvalidKeyTypeError(item) ret._view_base = self._view_base or self return ret @invalid_view_op @hub_reporter.record_call def create_tensor( self, name: str, htype: str = UNSPECIFIED, dtype: Union[str, np.dtype] = UNSPECIFIED, sample_compression: str = UNSPECIFIED, chunk_compression: str = UNSPECIFIED, hidden: bool = False, create_sample_info_tensor: bool = True, create_shape_tensor: bool = True, create_id_tensor: bool = True, verify: bool = False, exist_ok: bool = False, **kwargs, ): """Creates a new tensor in the dataset. Examples: ``` # create dataset ds = hub.dataset("path/to/dataset") # create tensors ds.create_tensor("images", htype="image", sample_compression="jpg") ds.create_tensor("videos", htype="video", sample_compression="mp4") ds.create_tensor("data") # append data ds.images.append(np.ones((400, 400, 3), dtype='uint8')) ds.videos.append(hub.read("videos/sample_video.mp4")) ds.data.append(np.zeros((100, 100, 2))) ``` Args: name (str): The name of the tensor to be created. htype (str): The class of data for the tensor. The defaults for other parameters are determined in terms of this value. For example, `htype="image"` would have `dtype` default to `uint8`. These defaults can be overridden by explicitly passing any of the other parameters to this function. May also modify the defaults for other parameters. dtype (str): Optionally override this tensor's `dtype`. All subsequent samples are required to have this `dtype`. sample_compression (str): All samples will be compressed in the provided format. If `None`, samples are uncompressed. chunk_compression (str): All chunks will be compressed in the provided format. If `None`, chunks are uncompressed. **kwargs: `htype` defaults can be overridden by passing any of the compatible parameters. To see all `htype`s and their correspondent arguments, check out `hub/htypes.py`. hidden (bool): If True, the tensor will be hidden from ds.tensors but can still be accessed via ds[tensor_name] create_sample_info_tensor (bool): If True, meta data of individual samples will be saved in a hidden tensor. This data can be accessed via `tensor[i].sample_info`. create_shape_tensor (bool): If True, an associated tensor containing shapes of each sample will be created. create_id_tensor (bool): If True, an associated tensor containing unique ids for each sample will be created. This is useful for merge operations. verify (bool): Valid only for link htypes. If True, all links will be verified before they are added to the tensor. exist_ok: If True, the group is created if it does not exist. If False, an error is raised if the group already exists. Returns: The new tensor, which can also be accessed by `self[name]`. Raises: TensorAlreadyExistsError: If the tensor already exists and `exist_ok` is False. TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed. InvalidTensorNameError: If `name` is in dataset attributes. NotImplementedError: If trying to override `chunk_compression`. TensorMetaInvalidHtype: If invalid htype is specified. ValueError: If an illegal argument is specified. """ # if not the head node, checkout to an auto branch that is newly created auto_checkout(self) name = filter_name(name, self.group_index) key = self.version_state["tensor_names"].get(name) is_sequence, is_link, htype = parse_complex_htype(htype) if key: if not exist_ok: raise TensorAlreadyExistsError(name) tensor = self.root[key] current_config = tensor._config new_config = { "htype": htype, "dtype": dtype, "sample_compression": sample_compression, "chunk_compression": chunk_compression, "hidden": hidden, "is_link": is_link, "is_sequence": is_sequence, } if current_config != new_config: raise ValueError( f"Tensor {name} already exists with different configuration. " f"Current config: {current_config}. " f"New config: {new_config}" ) return tensor elif name in self.version_state["full_tensors"]: key = f"{name}_{uuid.uuid4().hex[:4]}" else: key = name if name in self._groups: raise TensorGroupAlreadyExistsError(name) tensor_name = posixpath.split(name)[1] if not tensor_name or tensor_name in dir(self): raise InvalidTensorNameError(tensor_name) kwargs["is_sequence"] = kwargs.get("is_sequence") or is_sequence kwargs["is_link"] = kwargs.get("is_link") or is_link kwargs["verify"] = verify if is_link and ( sample_compression != UNSPECIFIED or chunk_compression != UNSPECIFIED ): warnings.warn( "Chunk_compression and sample_compression aren't valid for tensors with linked data. Ignoring these arguments." ) sample_compression = UNSPECIFIED chunk_compression = UNSPECIFIED if not self._is_root(): return self.root.create_tensor( name=key, htype=htype, dtype=dtype, sample_compression=sample_compression, chunk_compression=chunk_compression, hidden=hidden, create_sample_info_tensor=create_sample_info_tensor, create_shape_tensor=create_shape_tensor, create_id_tensor=create_id_tensor, exist_ok=exist_ok, **kwargs, ) if "/" in name: self._create_group(posixpath.split(name)[0]) # Seperate meta and info htype_config = HTYPE_CONFIGURATIONS.get(htype, {}).copy() info_keys = htype_config.pop("_info", []) info_kwargs = {} meta_kwargs = {} for k, v in kwargs.items(): if k in info_keys: verify_htype_key_value(htype, k, v) info_kwargs[k] = v else: meta_kwargs[k] = v # Set defaults for k in info_keys: if k not in info_kwargs: if k == "class_names": info_kwargs[k] = htype_config[k].copy() else: info_kwargs[k] = htype_config[k] create_tensor( key, self.storage, htype=htype, dtype=dtype, sample_compression=sample_compression, chunk_compression=chunk_compression, version_state=self.version_state, hidden=hidden, **meta_kwargs, ) meta: DatasetMeta = self.meta ffw_dataset_meta(meta) meta.add_tensor(name, key, hidden=hidden) tensor = Tensor(key, self) # type: ignore tensor.meta.name = name self.version_state["full_tensors"][key] = tensor self.version_state["tensor_names"][name] = key if info_kwargs: tensor.info.update(info_kwargs) self.storage.maybe_flush() if create_sample_info_tensor and htype in ("image", "audio", "video", "dicom"): self._create_sample_info_tensor(name) if create_shape_tensor and htype not in ("text", "json"): self._create_sample_shape_tensor(name, htype=htype) if create_id_tensor: self._create_sample_id_tensor(name) return tensor def _create_sample_shape_tensor(self, tensor: str, htype: str): shape_tensor = get_sample_shape_tensor_key(tensor) self.create_tensor( shape_tensor, hidden=True, create_id_tensor=False, create_sample_info_tensor=False, create_shape_tensor=False, max_chunk_size=SAMPLE_INFO_TENSOR_MAX_CHUNK_SIZE, ) f = "append_len" if htype == "list" else "append_shape" self._link_tensors( tensor, shape_tensor, append_f=f, update_f=f, flatten_sequence=True ) def _create_sample_id_tensor(self, tensor: str): id_tensor = get_sample_id_tensor_key(tensor) self.create_tensor( id_tensor, hidden=True, create_id_tensor=False, create_sample_info_tensor=False, create_shape_tensor=False, ) self._link_tensors( tensor, id_tensor, append_f="append_id", flatten_sequence=False, ) def _create_sample_info_tensor(self, tensor: str): sample_info_tensor = get_sample_info_tensor_key(tensor) self.create_tensor( sample_info_tensor, htype="json", max_chunk_size=SAMPLE_INFO_TENSOR_MAX_CHUNK_SIZE, hidden=True, create_id_tensor=False, create_sample_info_tensor=False, create_shape_tensor=False, ) self._link_tensors( tensor, sample_info_tensor, "append_info", "update_info", flatten_sequence=True, ) def _hide_tensor(self, tensor: str): self._tensors()[tensor].meta.set_hidden(True) self.meta._hide_tensor(tensor) self.storage.maybe_flush() @invalid_view_op @hub_reporter.record_call def delete_tensor(self, name: str, large_ok: bool = False): """Delete a tensor from the dataset. Examples: ``` ds.delete_tensor("images/cats") ``` Args: name (str): The name of tensor to be deleted. large_ok (bool): Delete tensors larger than 1GB. Disabled by default. Returns: None Raises: TensorDoesNotExistError: If tensor of name `name` does not exist in the dataset. """ auto_checkout(self) name = filter_name(name, self.group_index) key = self.version_state["tensor_names"].get(name) if not key: raise TensorDoesNotExistError(name) if not tensor_exists(key, self.storage, self.version_state["commit_id"]): raise TensorDoesNotExistError(name) if not self._is_root(): return self.root.delete_tensor(name, large_ok) if not large_ok: chunk_engine = self.version_state["full_tensors"][key].chunk_engine size_approx = chunk_engine.num_samples * chunk_engine.min_chunk_size if size_approx > hub.constants.DELETE_SAFETY_SIZE: logger.info( f"Tensor {name} was too large to delete. Try again with large_ok=True." ) return with self: meta = self.meta key = self.version_state["tensor_names"].pop(name) if key not in meta.hidden_tensors: tensor_diff = Tensor(key, self).chunk_engine.commit_diff # if tensor was created in this commit, there's no diff for deleting it. if not tensor_diff.created: self._dataset_diff.tensor_deleted(name) delete_tensor(key, self) self.version_state["full_tensors"].pop(key) ffw_dataset_meta(meta) meta.delete_tensor(name) self.version_state["meta"] = meta for t_name in [ func(name) for func in ( get_sample_id_tensor_key, get_sample_info_tensor_key, get_sample_shape_tensor_key, ) ]: t_key = self.meta.tensor_names.get(t_name) if t_key and tensor_exists( t_key, self.storage, self.version_state["commit_id"] ): self.delete_tensor(t_name, large_ok=True) self.storage.flush() @invalid_view_op @hub_reporter.record_call def delete_group(self, name: str, large_ok: bool = False): """Delete a tensor group from the dataset. Examples: ``` ds.delete_group("images/dogs") ``` Args: name (str): The name of tensor group to be deleted. large_ok (bool): Delete tensor groups larger than 1GB. Disabled by default. Returns: None Raises: TensorGroupDoesNotExistError: If tensor group of name `name` does not exist in the dataset. """ auto_checkout(self) full_path = filter_name(name, self.group_index) if full_path not in self._groups: raise TensorGroupDoesNotExistError(name) if not self._is_root(): return self.root.delete_group(full_path, large_ok) if not large_ok: size_approx = self[name].size_approx() if size_approx > hub.constants.DELETE_SAFETY_SIZE: logger.info( f"Group {name} was too large to delete. Try again with large_ok=True." ) return with self: meta = self.version_state["meta"] ffw_dataset_meta(meta) tensors = [ posixpath.join(name, tensor) for tensor in self[name]._all_tensors_filtered(include_hidden=True) ] meta.delete_group(name) for tensor in tensors: key = self.version_state["tensor_names"][tensor] delete_tensor(key, self) self.version_state["tensor_names"].pop(tensor) self.version_state["full_tensors"].pop(key) self.storage.maybe_flush() @invalid_view_op @hub_reporter.record_call def create_tensor_like( self, name: str, source: "Tensor", unlink: bool = False ) -> "Tensor": """Copies the `source` tensor's meta information and creates a new tensor with it. No samples are copied, only the meta/info for the tensor is. Examples: ``` ds.create_tensor_like("cats", ds["images"]) ``` Args: name (str): Name for the new tensor. source (Tensor): Tensor who's meta/info will be copied. May or may not be contained in the same dataset. unlink (bool): Whether to unlink linked tensors. Returns: Tensor: New Tensor object. """ info = source.info.__getstate__().copy() meta = source.meta.__getstate__().copy() if unlink: meta["is_link"] = False del meta["min_shape"] del meta["max_shape"] del meta["length"] del meta["version"] del meta["name"] destination_tensor = self.create_tensor(name, **meta) destination_tensor.info.update(info) return destination_tensor def _rename_tensor(self, name, new_name): tensor = self[name] tensor.meta.name = new_name key = self.version_state["tensor_names"].pop(name) meta = self.meta if key not in meta.hidden_tensors: tensor_diff = tensor.chunk_engine.commit_diff # if tensor was created in this commit, tensor name has to be updated without adding it to diff. if not tensor_diff.created: self._dataset_diff.tensor_renamed(name, new_name) self.version_state["tensor_names"][new_name] = key ffw_dataset_meta(meta) meta.rename_tensor(name, new_name) for func in ( get_sample_id_tensor_key, get_sample_info_tensor_key, get_sample_shape_tensor_key, ): t_old, t_new = map(func, (name, new_name)) t_key = self.meta.tensor_names.get(t_old) if t_key and tensor_exists( t_key, self.storage, self.version_state["commit_id"] ): self._rename_tensor(t_old, t_new) return tensor @hub_reporter.record_call def rename_tensor(self, name: str, new_name: str) -> "Tensor": """Renames tensor with name `name` to `new_name` Args: name (str): Name of tensor to be renamed. new_name (str): New name of tensor. Returns: Tensor: Renamed tensor. Raises: TensorDoesNotExistError: If tensor of name `name` does not exist in the dataset. TensorAlreadyExistsError: Duplicate tensors are not allowed. TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed. InvalidTensorNameError: If `new_name` is in dataset attributes. RenameError: If `new_name` points to a group different from `name`. """ auto_checkout(self) if name not in self._tensors(): raise TensorDoesNotExistError(name) name = filter_name(name, self.group_index) new_name = filter_name(new_name, self.group_index) if posixpath.split(name)[0] != posixpath.split(new_name)[0]: raise RenameError("New name of tensor cannot point to a different group") if new_name in self.version_state["tensor_names"]: raise TensorAlreadyExistsError(new_name) if new_name in self._groups: raise TensorGroupAlreadyExistsError(new_name) new_tensor_name = posixpath.split(new_name)[1] if not new_tensor_name or new_tensor_name in dir(self): raise InvalidTensorNameError(new_name) tensor = self.root._rename_tensor(name, new_name) self.storage.maybe_flush() return tensor @hub_reporter.record_call def rename_group(self, name: str, new_name: str) -> None: """Renames group with name `name` to `new_name` Args: name (str): Name of group to be renamed. new_name (str): New name of group. Raises: TensorGroupDoesNotExistError: If tensor group of name `name` does not exist in the dataset. TensorAlreadyExistsError: Duplicate tensors are not allowed. TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed. InvalidTensorGroupNameError: If `name` is in dataset attributes. RenameError: If `new_name` points to a group different from `name`. """ auto_checkout(self) name = filter_name(name, self.group_index) new_name = filter_name(new_name, self.group_index) if name not in self._groups: raise TensorGroupDoesNotExistError(name) if posixpath.split(name)[0] != posixpath.split(new_name)[0]: raise RenameError("Names does not match.") if new_name in self.version_state["tensor_names"]: raise TensorAlreadyExistsError(new_name) if new_name in self._groups: raise TensorGroupAlreadyExistsError(new_name) new_tensor_name = posixpath.split(new_name)[1] if not new_tensor_name or new_tensor_name in dir(self): raise InvalidTensorGroupNameError(new_name) meta = self.meta meta.rename_group(name, new_name) root = self.root for tensor in filter( lambda x: x.startswith(name), map(lambda y: y.meta.name or y.key, self.tensors.values()), ): root._rename_tensor( tensor, posixpath.join(new_name, posixpath.relpath(tensor, name)), ) self.storage.maybe_flush() def __getattr__(self, key): try: return self.__getitem__(key) except TensorDoesNotExistError as ke: raise AttributeError( f"'{self.__class__}' object has no attribute '{key}'" ) from ke def __setattr__(self, name: str, value): if isinstance(value, (np.ndarray, np.generic)): raise TypeError( "Setting tensor attributes directly is not supported. To add a tensor, use the `create_tensor` method." + "To add data to a tensor, use the `append` and `extend` methods." ) else: return super().__setattr__(name, value) def __iter__(self): for i in range(len(self)): yield self.__getitem__(i, is_iteration=True) def _load_version_info(self): """Loads data from version_control_file otherwise assume it doesn't exist and load all empty""" if self.version_state: return branch = "main" version_state = {"branch": branch} try: version_info = load_version_info(self.storage) version_state["branch_commit_map"] = version_info["branch_commit_map"] version_state["commit_node_map"] = version_info["commit_node_map"] commit_id = version_state["branch_commit_map"][branch] version_state["commit_id"] = commit_id version_state["commit_node"] = version_state["commit_node_map"][commit_id] except Exception: version_state["branch_commit_map"] = {} version_state["commit_node_map"] = {} # used to identify that this is the first commit so its data will not be in similar directory structure to the rest commit_id = FIRST_COMMIT_ID commit_node = CommitNode(branch, commit_id) version_state["commit_id"] = commit_id version_state["commit_node"] = commit_node version_state["branch_commit_map"][branch] = commit_id version_state["commit_node_map"][commit_id] = commit_node # keeps track of the full unindexed tensors version_state["full_tensors"] = {} version_state["tensor_names"] = {} self.__dict__["version_state"] = version_state def _load_link_creds(self): if self.link_creds is not None: return link_creds_key = get_dataset_linked_creds_key() try: data_bytes = self.storage[link_creds_key] except KeyError: data_bytes = None if data_bytes is None: link_creds = LinkCreds() else: link_creds = LinkCreds.frombuffer(data_bytes) self.link_creds = link_creds def _lock(self, err=False): if not self._locking_enabled: return True storage = self.base_storage if storage.read_only and not self._locked_out: if err: raise ReadOnlyModeError() return False if isinstance(storage, tuple(_LOCKABLE_STORAGES)) and ( not self.read_only or self._locked_out ): try: # temporarily disable read only on base storage, to try to acquire lock, if exception, it will be again made readonly storage.disable_readonly() lock_dataset( self, lock_lost_callback=self._lock_lost_handler, ) except LockedException as e: self.read_only = True self.__dict__["_locked_out"] = True if err: raise e always_warn( "Checking out dataset in read only mode as another machine has locked this version for writing." ) return False return True def _unlock(self): unlock_dataset(self) def __del__(self): try: self._unlock() except Exception: # python shutting down pass def commit(self, message: Optional[str] = None, allow_empty=False) -> str: """Stores a snapshot of the current state of the dataset. Note: - Commiting from a non-head node in any branch, will lead to an auto checkout to a new branch. - This same behaviour will happen if new samples are added or existing samples are updated from a non-head node. Args: message (str, Optional): Used to describe the commit. allow_empty (bool): If True, commit even if there are no changes Returns: str: the commit id of the saved commit that can be used to access the snapshot. Raises: Exception: if dataset is a filtered view. EmptyCommitError: if there are no changes and user does not forced to commit unchanged data """ if not allow_empty and not self.has_head_changes: raise EmptyCommitError( "There are no changes, commit is not done. Try again with allow_empty=True." ) return self._commit(message) @hub_reporter.record_call def merge( self, target_id: str, conflict_resolution: Optional[str] = None, delete_removed_tensors: bool = False, force: bool = False, ): """Merges the target_id into the current dataset. Args: target_id (str): The commit_id or branch to merge. conflict_resolution (str, Optional): The strategy to use to resolve merge conflicts. - - Conflicts are scenarios where both the current dataset and the target id have made changes to the same sample/s since their common ancestor. - Must be one of the following - None - this is the default value, will raise an exception if there are conflicts. - "ours" - during conflicts, values from the current dataset will be used. - "theirs" - during conflicts, values from target id will be used. delete_removed_tensors (bool): If true, deleted tensors will be deleted from the dataset. force (bool): Forces merge. - - `force` = True will have these effects in the following cases of merge conflicts: - If tensor is renamed on target but is missing from HEAD, renamed tensor will be registered as a new tensor on current branch. - If tensor is renamed on both target and current branch, tensor on target will be registered as a new tensor on current branch. - If tensor is renamed on target and a new tensor of the new name was created on the current branch, they will be merged. Raises: Exception: if dataset is a filtered view. ValueError: if the conflict resolution strategy is not one of the None, "ours", or "theirs". """ if self._is_filtered_view: raise Exception( "Cannot perform version control operations on a filtered dataset view." ) if conflict_resolution not in [None, "ours", "theirs"]: raise ValueError( f"conflict_resolution must be one of None, 'ours', or 'theirs'. Got {conflict_resolution}" ) try_flushing(self) self._initial_autoflush.append(self.storage.autoflush) self.storage.autoflush = False merge(self, target_id, conflict_resolution, delete_removed_tensors, force) self.storage.autoflush = self._initial_autoflush.pop() def _commit(self, message: Optional[str] = None, hash: Optional[str] = None) -> str: if self._is_filtered_view: raise Exception( "Cannot perform version control operations on a filtered dataset view." ) try_flushing(self) self._initial_autoflush.append(self.storage.autoflush) self.storage.autoflush = False try: self._unlock() commit(self, message, hash) self._lock() finally: self.storage.autoflush = self._initial_autoflush.pop() self._info = None self._ds_diff = None [f() for f in list(self._commit_hooks.values())] # do not store commit message hub_reporter.feature_report(feature_name="commit", parameters={}) return self.commit_id # type: ignore def checkout(self, address: str, create: bool = False) -> Optional[str]: """Checks out to a specific commit_id or branch. If `create = True`, creates a new branch with name as address. Note: Checkout from a head node in any branch that contains uncommitted data will lead to an auto commit before the checkout. Args: address (str): The commit_id or branch to checkout to. create (bool): If True, creates a new branch with name as address. Returns: str: The commit_id of the dataset after checkout. Raises: Exception: If dataset is a filtered view. """ return self._checkout(address, create) def _checkout( self, address: str, create: bool = False, hash: Optional[str] = None ) -> Optional[str]: if self._is_filtered_view: raise Exception( "Cannot perform version control operations on a filtered dataset view." ) if self._locked_out: self.storage.disable_readonly() self._read_only = False self.base_storage.disable_readonly() try_flushing(self) self._initial_autoflush.append(self.storage.autoflush) self.storage.autoflush = False err = False try: self._unlock() checkout(self, address, create, hash) except Exception as e: err = True if self._locked_out: self.storage.enable_readonly() self._read_only = True self.base_storage.enable_readonly() raise e finally: if not (err and self._locked_out): self._lock() self.storage.autoflush = self._initial_autoflush.pop() self._info = None self._ds_diff = None # do not store address hub_reporter.feature_report( feature_name="checkout", parameters={"Create": str(create)} ) commit_node = self.version_state["commit_node"] if self.verbose: warn_node_checkout(commit_node, create) return self.commit_id @hub_reporter.record_call def log(self): """Displays the details of all the past commits.""" commit_node = self.version_state["commit_node"] print("---------------\nHub Version Log\n---------------\n") print(f"Current Branch: {self.version_state['branch']}") if self.has_head_changes: print("** There are uncommitted changes on this branch.") print() while commit_node: if not commit_node.is_head_node: print(f"{commit_node}\n") commit_node = commit_node.parent @hub_reporter.record_call def diff( self, id_1: Optional[str] = None, id_2: Optional[str] = None, as_dict=False ) -> Optional[Dict]: """Returns/displays the differences between commits/branches. For each tensor this contains information about the sample indexes that were added/modified as well as whether the tensor was created. Args: id_1 (str, Optional): The first commit_id or branch name. id_2 (str, Optional): The second commit_id or branch name. as_dict (bool, Optional): If True, returns a dictionary of the differences instead of printing them. This dictionary will have two keys - "tensor" and "dataset" which represents tensor level and dataset level changes, respectively. Defaults to False. Note: - If both `id_1` and `id_2` are None, the differences between the current state and the previous commit will be calculated. If you're at the head of the branch, this will show the uncommitted changes, if any. - If only `id_1` is provided, the differences between the current state and id_1 will be calculated. If you're at the head of the branch, this will take into account the uncommitted changes, if any. - If only `id_2` is provided, a ValueError will be raised. - If both `id_1` and `id_2` are provided, the differences between `id_1` and `id_2` will be calculated. Returns: Dict: The differences between the commits/branches if as_dict is True. - If `id_1` and `id_2` are None, a dictionary containing the differences between the current state and the previous commit will be returned. - If only `id_1` is provided, a dictionary containing the differences in the current state and `id_1` respectively will be returned. - If only `id_2` is provided, a ValueError will be raised. - If both `id_1` and `id_2` are provided, a dictionary containing the differences in `id_1` and `id_2` respectively will be returned. None: If as_dict is False. Example of a dict returned: ``` { "image": {"data_added": [3, 6], "data_updated": {0, 2}, "created": False, "info_updated": False, "data_transformed_in_place": False}, "label": {"data_added": [0, 3], "data_updated": {}, "created": True, "info_updated": False, "data_transformed_in_place": False}, "other/stuff" : {data_added: [3, 3], data_updated: {1, 2}, created: True, "info_updated": False, "data_transformed_in_place": False}, } ``` Here, 'data_added' is a range of sample indexes that were added to the tensor: - For example [3, 6] means that sample 3, 4 and 5 were added. - Another example [3, 3] means that no samples were added as the range is empty 'data_updated' is a set of sample indexes that were updated. - For example {0, 2} means that sample 0 and 2 were updated. 'created' is a boolean that is True if the tensor was created. 'info_updated' is a boolean that is True if the info of the tensor was updated. 'data_transformed_in_place' is a boolean that is True if the data of the tensor was transformed in place. Raises: ValueError: If `id_1` is None and `id_2` is not None. """ version_state, storage = self.version_state, self.storage res = get_changes_and_messages(version_state, storage, id_1, id_2) if as_dict: dataset_changes_1 = res[0] dataset_changes_2 = res[1] tensor_changes_1 = res[2] tensor_changes_2 = res[3] changes = {} if id_1 is None and id_2 is None: changes["dataset"] = dataset_changes_1 changes["tensor"] = tensor_changes_1 return changes changes["dataset"] = dataset_changes_1, dataset_changes_2 changes["tensor"] = tensor_changes_1, tensor_changes_2 return changes all_changes = get_all_changes_string(*res) print(all_changes) return None def _populate_meta(self, verbose=True): """Populates the meta information for the dataset.""" if dataset_exists(self.storage): if verbose and self.verbose: logger.info(f"{self.path} loaded successfully.") load_meta(self) elif not self.storage.empty(): # dataset does not exist, but the path was not empty raise PathNotEmptyException else: if self.read_only: # cannot create a new dataset when in read_only mode. raise CouldNotCreateNewDatasetException(self.path) meta = DatasetMeta() key = get_dataset_meta_key(self.version_state["commit_id"]) self.version_state["meta"] = meta self.storage.register_hub_object(key, meta) self._register_dataset() self.flush() def _register_dataset(self): """overridden in HubCloudDataset""" def _send_query_progress(self, *args, **kwargs): """overridden in HubCloudDataset""" def _send_compute_progress(self, *args, **kwargs): """overridden in HubCloudDataset""" def _send_pytorch_progress(self, *args, **kwargs): """overridden in HubCloudDataset""" def _send_filter_progress(self, *args, **kwargs): """overridden in HubCloudDataset""" def _send_commit_event(self, *args, **kwargs): """overridden in HubCloudDataset""" def _send_dataset_creation_event(self, *args, **kwargs): """overridden in HubCloudDataset""" def _send_branch_creation_event(self, *args, **kwargs): """overridden in HubCloudDataset""" def _first_load_init(self): """overridden in HubCloudDataset""" @property def read_only(self): return self._read_only @property def has_head_changes(self): """Returns True if currently at head node and uncommitted changes are present.""" commit_node = self.version_state["commit_node"] return not commit_node.children and current_commit_has_change( self.version_state, self.storage ) def _set_read_only(self, value: bool, err: bool): storage = self.storage self.__dict__["_read_only"] = value if value: storage.enable_readonly() if isinstance(storage, LRUCache) and storage.next_storage is not None: storage.next_storage.enable_readonly() else: try: locked = self._lock(err=err) if locked: self.storage.disable_readonly() if ( isinstance(storage, LRUCache) and storage.next_storage is not None ): storage.next_storage.disable_readonly() else: self.__dict__["_read_only"] = True except LockedException as e: self.__dict__["_read_only"] = True raise e @read_only.setter @invalid_view_op def read_only(self, value: bool): self._set_read_only(value, True) @hub_reporter.record_call def pytorch( self, transform: Optional[Callable] = None, tensors: Optional[Sequence[str]] = None, tobytes: Union[bool, Sequence[str]] = False, num_workers: int = 1, batch_size: int = 1, drop_last: bool = False, collate_fn: Optional[Callable] = None, pin_memory: bool = False, shuffle: bool = False, buffer_size: int = 2048, use_local_cache: bool = False, use_progress_bar: bool = False, return_index: bool = True, pad_tensors: bool = False, ): """Converts the dataset into a pytorch Dataloader. Note: Pytorch does not support uint16, uint32, uint64 dtypes. These are implicitly type casted to int32, int64 and int64 respectively. This spins up it's own workers to fetch data. Args: transform (Callable, Optional): Transformation function to be applied to each sample. tensors (List, Optional): Optionally provide a list of tensor names in the ordering that your training script expects. For example, if you have a dataset that has "image" and "label" tensors, if `tensors=["image", "label"]`, your training script should expect each batch will be provided as a tuple of (image, label). tobytes (bool): If True, samples will not be decompressed and their raw bytes will be returned instead of numpy arrays. Can also be a list of tensors, in which case those tensors alone will not be decompressed. num_workers (int): The number of workers to use for fetching data in parallel. batch_size (int): Number of samples per batch to load. Default value is 1. drop_last (bool): Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Default value is False. Read torch.utils.data.DataLoader docs for more details. collate_fn (Callable, Optional): merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset. Read torch.utils.data.DataLoader docs for more details. pin_memory (bool): If True, the data loader will copy Tensors into CUDA pinned memory before returning them. Default value is False. Read torch.utils.data.DataLoader docs for more details. shuffle (bool): If True, the data loader will shuffle the data indices. Default value is False. Details about how hub shuffles data can be found at https://docs.activeloop.ai/how-hub-works/shuffling-in-ds.pytorch buffer_size (int): The size of the buffer used to shuffle the data in MBs. Defaults to 2048 MB. Increasing the buffer_size will increase the extent of shuffling. use_local_cache (bool): If True, the data loader will use a local cache to store data. This is useful when the dataset can fit on the machine and we don't want to fetch the data multiple times for each iteration. Default value is False. use_progress_bar (bool): If True, tqdm will be wrapped around the returned dataloader. Default value is True. return_index (bool): If True, the returned dataloader will have a key "index" that contains the index of the sample(s) in the original dataset. Default value is True. pad_tensors (bool): If True, shorter tensors will be padded to the length of the longest tensor. Default value is False. Returns: A torch.utils.data.DataLoader object. Raises: EmptyTensorError: If one or more tensors being passed to pytorch are empty. """ from hub.integrations import dataset_to_pytorch as to_pytorch dataloader = to_pytorch( self, transform=transform, tensors=tensors, tobytes=tobytes, num_workers=num_workers, batch_size=batch_size, drop_last=drop_last, collate_fn=collate_fn, pin_memory=pin_memory, shuffle=shuffle, buffer_size=buffer_size, use_local_cache=use_local_cache, return_index=return_index, pad_tensors=pad_tensors, ) if use_progress_bar: dataloader = tqdm(dataloader, desc=self.path, total=len(self) // batch_size) return dataloader @hub_reporter.record_call def filter( self, function: Union[Callable, str], num_workers: int = 0, scheduler: str = "threaded", progressbar: bool = True, save_result: bool = False, result_path: Optional[str] = None, result_ds_args: Optional[dict] = None, ): """Filters the dataset in accordance of filter function `f(x: sample) -> bool` Args: function (Callable, str): Filter function that takes sample as argument and returns True/False if sample should be included in result. Also supports simplified expression evaluations. See `hub.core.query.query.DatasetQuery` for more details. num_workers (int): Level of parallelization of filter evaluations. `0` indicates in-place for-loop evaluation, multiprocessing is used otherwise. scheduler (str): Scheduler to use for multiprocessing evaluation. `threaded` is default progressbar (bool): Display progress bar while filtering. True is default save_result (bool): If True, result of the filter will be saved to a dataset asynchronously. result_path (Optional, str): Path to save the filter result. Only applicable if `save_result` is True. result_ds_args (Optional, dict): Additional args for result dataset. Only applicable if `save_result` is True. Returns: View of Dataset with elements that satisfy filter function. Example: Following filters are identical and return dataset view where all the samples have label equals to 2. >>> dataset.filter(lambda sample: sample.labels.numpy() == 2) >>> dataset.filter('labels == 2') """ from hub.core.query import filter_dataset, query_dataset fn = query_dataset if isinstance(function, str) else filter_dataset result = fn( self, function, num_workers=num_workers, scheduler=scheduler, progressbar=progressbar, save_result=save_result, result_path=result_path, result_ds_args=result_ds_args, ) return result def _get_total_meta(self): """Returns tensor metas all together""" return { tensor_key: tensor_value.meta for tensor_key, tensor_value in self.version_state["full_tensors"].items() } def _set_derived_attributes(self, verbose: bool = True): """Sets derived attributes during init and unpickling.""" if self.is_first_load: self.storage.autoflush = True self._load_version_info() self._load_link_creds() self._set_read_only( self._read_only, err=self._read_only_error ) # TODO: weird fix for dataset unpickling self._populate_meta(verbose) # TODO: use the same scheme as `load_info` if self.index.is_trivial(): self.index = Index.from_json(self.meta.default_index) elif not self._read_only: self._lock() # for ref counting if not self.is_iteration: group_index = self.group_index group_filter = ( lambda t: (not group_index or t.key.startswith(group_index + "/")) and t.key not in self.meta.hidden_tensors ) group_tensors = filter( group_filter, self.version_state["full_tensors"].values() ) max_tensor_length = max(map(len, group_tensors), default=0) self.index.validate(max_tensor_length) @property def info(self): """Returns the information about the dataset.""" if self._info is None: path = get_dataset_info_key(self.version_state["commit_id"]) self.__dict__["_info"] = load_info(path, self) # type: ignore return self._info @info.setter def info(self, value): if isinstance(value, dict): info = self.info info.replace_with(value) else: raise TypeError("Info must be set with type Dict") @property def _dataset_diff(self): if self._ds_diff is None: self.__dict__["_ds_diff"] = load_dataset_diff(self) return self._ds_diff @hub_reporter.record_call def tensorflow( self, tensors: Optional[Sequence[str]] = None, tobytes: Union[bool, Sequence[str]] = False, ): """Converts the dataset into a tensorflow compatible format. See https://www.tensorflow.org/api_docs/python/tf/data/Dataset Args: tensors (List, Optional): Optionally provide a list of tensor names in the ordering that your training script expects. For example, if you have a dataset that has "image" and "label" tensors, if `tensors=["image", "label"]`, your training script should expect each batch will be provided as a tuple of (image, label). tobytes (bool): If True, samples will not be decompressed and their raw bytes will be returned instead of numpy arrays. Can also be a list of tensors, in which case those tensors alone will not be decompressed. Returns: tf.data.Dataset object that can be used for tensorflow training. """ return dataset_to_tensorflow(self, tensors=tensors, tobytes=tobytes) def flush(self): """Necessary operation after writes if caches are being used. Writes all the dirty data from the cache layers (if any) to the underlying storage. Here dirty data corresponds to data that has been changed/assigned and but hasn't yet been sent to the underlying storage. """ self.storage.flush() def clear_cache(self): """Flushes (see Dataset.flush documentation) the contents of the cache layers (if any) and then deletes contents of all the layers of it. This doesn't delete data from the actual storage. This is useful if you have multiple datasets with memory caches open, taking up too much RAM. Also useful when local cache is no longer needed for certain datasets and is taking up storage space. """ if hasattr(self.storage, "clear_cache"): self.storage.clear_cache() def size_approx(self): """Estimates the size in bytes of the dataset. Includes only content, so will generally return an under-estimate. """ tensors = self.version_state["full_tensors"].values() chunk_engines = [tensor.chunk_engine for tensor in tensors] size = sum(c.num_chunks * c.min_chunk_size for c in chunk_engines) for group in self._groups_filtered: size += self[group].size_approx() return size @invalid_view_op @hub_reporter.record_call def rename(self, path: Union[str, pathlib.Path]): """Renames the dataset to `path`. Example: ``` ds = hub.load("hub://username/dataset") ds.rename("hub://username/renamed_dataset") ``` Args: path (str, pathlib.Path): New path to the dataset. Raises: RenameError: If `path` points to a different directory. """ path = convert_pathlib_to_string_if_needed(path) path = path.rstrip("/") if posixpath.split(path)[0] != posixpath.split(self.path)[0]: raise RenameError self.base_storage.rename(path) self.path = path @invalid_view_op @hub_reporter.record_call def delete(self, large_ok=False): """Deletes the entire dataset from the cache layers (if any) and the underlying storage. This is an IRREVERSIBLE operation. Data once deleted can not be recovered. Args: large_ok (bool): Delete datasets larger than 1GB. Disabled by default. """ if hasattr(self, "_view_entry"): self._view_entry.delete() return if hasattr(self, "_vds"): self._vds.delete(large_ok=large_ok) return if not large_ok: size = self.size_approx() if size > hub.constants.DELETE_SAFETY_SIZE: logger.info( f"Hub Dataset {self.path} was too large to delete. Try again with large_ok=True." ) return self._unlock() self.storage.clear() def summary(self): """Prints a summary of the dataset.""" pretty_print = summary_dataset(self) print(self) print(pretty_print) def __str__(self): path_str = "" if self.path: path_str = f"path='{self.path}', " mode_str = "" if self.read_only: mode_str = f"read_only=True, " index_str = f"index={self.index}, " if self.index.is_trivial(): index_str = "" group_index_str = ( f"group_index='{self.group_index}', " if self.group_index else "" ) return f"Dataset({path_str}{mode_str}{index_str}{group_index_str}tensors={self._all_tensors_filtered(include_hidden=False)})" __repr__ = __str__ def _get_tensor_from_root(self, name: str) -> Optional[Tensor]: """Gets a tensor from the root dataset. Acesses storage only for the first call. """ key = self.version_state["tensor_names"].get(name) return self.version_state["full_tensors"].get(key) def _has_group_in_root(self, name: str) -> bool: """Checks if a group exists in the root dataset. This is faster than checking `if group in self._groups:` """ return name in self.version_state["meta"].groups @property def token(self): """Get attached token of the dataset""" return self._token @property def _ungrouped_tensors(self) -> Dict[str, Tensor]: """Top level tensors in this group that do not belong to any sub groups""" return { posixpath.basename(k): self.version_state["full_tensors"][v] for k, v in self.version_state["tensor_names"].items() if posixpath.dirname(k) == self.group_index } def _all_tensors_filtered(self, include_hidden: bool = True) -> List[str]: """Names of all tensors belonging to this group, including those within sub groups""" hidden_tensors = self.meta.hidden_tensors tensor_names = self.version_state["tensor_names"] return [ posixpath.relpath(t, self.group_index) for t in tensor_names if (not self.group_index or t.startswith(self.group_index + "/")) and (include_hidden or tensor_names[t] not in hidden_tensors) ] def _tensors(self, include_hidden: bool = True) -> Dict[str, Tensor]: """All tensors belonging to this group, including those within sub groups. Always returns the sliced tensors.""" return { t: self.version_state["full_tensors"][ self.version_state["tensor_names"][posixpath.join(self.group_index, t)] ][self.index] for t in self._all_tensors_filtered(include_hidden) } @property def tensors(self) -> Dict[str, Tensor]: """All tensors belonging to this group, including those within sub groups. Always returns the sliced tensors.""" return self._tensors(include_hidden=False) @property def branches(self): """Lists all the branches of the dataset. Returns: List of branches. """ return list(self.version_state["branch_commit_map"]) @property def commits(self) -> List[Dict]: """Lists all the commits leading to the current dataset state. Returns: List of dictionaries containing commit information. """ commits = [] commit_node = self.version_state["commit_node"] while commit_node: if not commit_node.is_head_node: commit_info = { "commit": commit_node.commit_id, "author": commit_node.commit_user_name, "time": str(commit_node.commit_time)[:-7], "message": commit_node.commit_message, } commits.append(commit_info) commit_node = commit_node.parent return commits def get_commit_details(self, commit_id) -> Dict: commit_node: CommitNode = self.version_state["commit_node_map"].get(commit_id) if commit_node is None: raise KeyError(f"Commit {commit_id} not found in dataset.") return { "commit": commit_node.commit_id, "author": commit_node.commit_user_name, "time": str(commit_node.commit_time)[:-7], "message": commit_node.commit_message, } @property def _groups(self) -> List[str]: """Names of all groups in the root dataset""" return self.meta.groups # type: ignore @property def _groups_filtered(self) -> List[str]: """Names of all sub groups in this group""" groups_filtered = [] for g in self._groups: dirname, basename = posixpath.split(g) if dirname == self.group_index: groups_filtered.append(basename) return groups_filtered @property def groups(self) -> Dict[str, "Dataset"]: """All sub groups in this group""" return {g: self[g] for g in self._groups_filtered} @property def commit_id(self) -> Optional[str]: """The lasted committed commit_id of the dataset. If there are no commits, this returns None.""" commit_node = self.version_state["commit_node"] if not commit_node.is_head_node: return commit_node.commit_id parent = commit_node.parent if parent is None: return None else: return parent.commit_id @property def pending_commit_id(self) -> str: """The commit_id of the next commit that will be made to the dataset. If you're not at the head of the current branch, this will be the same as the commit_id. """ return self.version_state["commit_id"] @property def branch(self) -> str: """The current branch of the dataset""" return self.version_state["branch"] def _is_root(self) -> bool: return not self.group_index @property def parent(self): """Returns the parent of this group. Returns None if this is the root dataset.""" if self._is_root(): return None autoflush = self.storage.autoflush ds = self.__class__( storage=self.storage, index=self.index, group_index=posixpath.dirname(self.group_index), read_only=self.read_only, public=self.public, token=self._token, verbose=self.verbose, version_state=self.version_state, path=self.path, link_creds=self.link_creds, ) self.storage.autoflush = autoflush return ds @property def root(self): """Returns the root dataset of a group.""" if self._is_root(): return self autoflush = self.storage.autoflush ds = self.__class__( storage=self.storage, index=self.index, group_index="", read_only=self.read_only, public=self.public, token=self._token, verbose=self.verbose, version_state=self.version_state, path=self.path, link_creds=self.link_creds, ) self.storage.autoflush = autoflush return ds def _create_group(self, name: str) -> "Dataset": """Internal method used by `create_group` and `create_tensor`.""" meta: DatasetMeta = self.version_state["meta"] if not name or name in dir(self): raise InvalidTensorGroupNameError(name) fullname = name while name: if name in self.version_state["full_tensors"]: raise TensorAlreadyExistsError(name) meta.add_group(name) name, _ = posixpath.split(name) return self[fullname] @hub_reporter.record_call def create_group(self, name: str, exist_ok=False) -> "Dataset": """Creates a tensor group. Intermediate groups in the path are also created. Args: name: The name of the group to create. exist_ok: If True, the group is created if it does not exist. If False, an error is raised if the group already exists. Returns: The created group. Raises: TensorGroupAlreadyExistsError: If the group already exists and exist_ok is False. Examples: ``` ds.create_group("images") ds['images'].create_tensor("cats") ``` ds.create_groups("images/jpg/cats") ds["images"].create_tensor("png") ds["images/jpg"].create_group("dogs") """ if not self._is_root(): return self.root.create_group( posixpath.join(self.group_index, name), exist_ok=exist_ok ) name = filter_name(name) if name in self._groups: if not exist_ok: raise TensorGroupAlreadyExistsError(name) return self[name] return self._create_group(name) def rechunk( self, tensors: Optional[Union[str, List[str]]] = None, num_workers: int = 0, scheduler: str = "threaded", progressbar: bool = True, ): """Rewrites the underlying chunks to make their sizes optimal. This is usually needed in cases where a lot of updates have been made to the data. Args: tensors (str, List[str], Optional): Name/names of the tensors to rechunk. If None, all tensors in the dataset are rechunked. num_workers (int): The number of workers to use for rechunking. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler. scheduler (str): The scheduler to be used for rechunking. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'. progressbar (bool): Displays a progress bar if True (default). """ if tensors is None: tensors = list(self.tensors) elif isinstance(tensors, str): tensors = [tensors] # identity function that rechunks @hub.compute def rechunking(sample_in, samples_out): for tensor in tensors: samples_out[tensor].append(sample_in[tensor]) rechunking().eval( self, num_workers=num_workers, scheduler=scheduler, progressbar=progressbar, skip_ok=True, ) # the below methods are used by cloudpickle dumps def __origin__(self): return None def __values__(self): return None def __type__(self): return None def __union_params__(self): return None def __tuple_params__(self): return None def __result__(self): return None def __args__(self): return None def __bool__(self): return True def extend(self, samples: Dict[str, Any], skip_ok: bool = False): """Appends multiple rows of samples to mutliple tensors at once. This method expects all tensors being updated to be of the same length. Args: samples (Dict[str, Any]): Dictionary with tensor names as keys and samples as values. skip_ok (bool): Skip tensors not in `samples` if set to True. Raises: KeyError: If any tensor in the dataset is not a key in `samples` and `skip_ok` is False. TensorDoesNotExistError: If tensor in `samples` does not exist. ValueError: If all tensors being updated are not of the same length. NotImplementedError: If an error occurs while writing tiles. Exception: Error while attempting to rollback appends. """ if isinstance(samples, Dataset): samples = samples.tensors if not samples: return n = len(samples[next(iter(samples.keys()))]) for v in samples.values(): if len(v) != n: sizes = {k: len(v) for (k, v) in samples.items()} raise ValueError( f"Incoming samples are not of equal lengths. Incoming sample sizes: {sizes}" ) [f() for f in list(self._update_hooks.values())] for i in range(n): self.append({k: v[i] for k, v in samples.items()}) @invalid_view_op def append(self, sample: Dict[str, Any], skip_ok: bool = False): """Append samples to mutliple tensors at once. This method expects all tensors being updated to be of the same length. Args: sample (dict): Dictionary with tensor names as keys and samples as values. skip_ok (bool): Skip tensors not in `sample` if set to True. Raises: KeyError: If any tensor in the dataset is not a key in `sample` and `skip_ok` is False. TensorDoesNotExistError: If tensor in `sample` does not exist. ValueError: If all tensors being updated are not of the same length. NotImplementedError: If an error occurs while writing tiles. Exception: Error while attempting to rollback appends. """ if isinstance(sample, Dataset): sample = sample.tensors if not skip_ok: for k in self.tensors: if k not in sample: raise KeyError( f"Required tensor not provided: {k}. Use ds.append(sample, skip_ok=True) to skip tensors." ) for k in sample: if k not in self._tensors(): raise TensorDoesNotExistError(k) if len(set(map(len, (self[k] for k in sample)))) != 1: raise ValueError( "When appending using Dataset.append, all tensors are expected to have the same length." ) [f() for f in list(self._update_hooks.values())] tensors_appended = [] with self: for k, v in sample.items(): try: tensor = self[k] enc = tensor.chunk_engine.chunk_id_encoder num_chunks = enc.num_chunks tensor.append(v) tensors_appended.append(k) except Exception as e: new_num_chunks = enc.num_chunks num_chunks_added = new_num_chunks - num_chunks if num_chunks_added > 1: # This is unlikely to happen, i.e the sample passed the validation # steps and tiling but some error occured while writing tiles to chunks raise NotImplementedError( "Unable to recover from error while writing tiles." ) from e elif num_chunks_added == 1: enc._encoded = enc._encoded[:-1] for k in tensors_appended: try: self[k].pop() except Exception as e2: raise Exception( "Error while attepting to rollback appends" ) from e2 raise e def _view_hash(self) -> str: """Generates a unique hash for a filtered dataset view.""" return hash_inputs( self.path, *[e.value for e in self.index.values], self.pending_commit_id, getattr(self, "_query", None), ) def _get_view_info( self, id: Optional[str] = None, message: Optional[str] = None, copy: bool = False, ): if self._view_invalid: raise DatasetViewSavingError( "This view cannot be saved as new changes were made at HEAD node after creation of this query view." ) commit_id = self.commit_id if self.has_head_changes: if self._new_view_base_commit: commit_id = self._view_base_commit else: if self._view_base: self._waiting_for_view_base_commit = True uid = self._view_id if uid not in self._update_hooks: def update_hook(): self._view_invalid = True self._waiting_for_view_base_commit = False del self._view_base._update_hooks[uid] del self._view_base._commit_hooks[uid] def commit_hook(): self._waiting_for_view_base_commit = False self._new_view_base_commit = self._view_base.commit_id del self._view_base._update_hooks[uid] del self._view_base._commit_hooks[uid] self._view_base._update_hooks[uid] = update_hook self._view_base._commit_hooks[uid] = commit_hook raise DatasetViewSavingError( "HEAD node has uncommitted changes. Commit them before saving views." ) tm = getattr(self, "_created_at", time()) id = self._view_hash() if id is None else id info = { "id": id, "virtual-datasource": not copy, "source-dataset": self.path, "source-dataset-version": commit_id, "created_at": tm, } if message is not None: info["message"] = message query = getattr(self, "_query", None) if query: info["query"] = query info["source-dataset-index"] = getattr(self, "_source_ds_idx", None) return info def _lock_queries_json(self): class _LockQueriesJson: def __enter__(self2): storage = self.base_storage self2.storage_read_only = storage.read_only if self._locked_out: # Ignore storage level lock since we have file level lock storage.read_only = False lock = Lock(storage, get_queries_lock_key()) lock.acquire(timeout=10, force=True) self2.lock = lock def __exit__(self2, *_, **__): self2.lock.release() self.base_storage.read_only = self2.storage_read_only return _LockQueriesJson() def _write_queries_json(self, data: dict): read_only = self.base_storage.read_only self.base_storage.disable_readonly() try: self.base_storage[get_queries_key()] = json.dumps(data).encode("utf-8") finally: if read_only: self.base_storage.enable_readonly() def _append_to_queries_json(self, info: dict): with self._lock_queries_json(): qjson = self._read_queries_json() idx = None for i in range(len(qjson)): if qjson[i]["id"] == info["id"]: idx = i break if idx is None: qjson.append(info) else: qjson[idx] = info self._write_queries_json(qjson) def _read_queries_json(self) -> list: try: return json.loads(self.base_storage[get_queries_key()].decode("utf-8")) except KeyError: return [] def _read_view_info(self, id: str): for info in self._read_queries_json(): if info["id"] == id: return info raise KeyError(f"View with id {id} not found.") def _write_vds( self, vds, info: dict, copy: Optional[bool] = False, num_workers: Optional[int] = 0, scheduler: str = "threaded", unlink=True, ): """Writes the indices of this view to a vds.""" vds._allow_view_updates = True try: with vds: if copy: self._copy( vds, num_workers=num_workers, scheduler=scheduler, unlink=unlink, create_vds_index_tensor=True, ) else: vds.create_tensor( "VDS_INDEX", dtype="uint64", create_shape_tensor=False, create_id_tensor=False, create_sample_info_tensor=False, ).extend(list(self.index.values[0].indices(self.num_samples))) info["first-index-subscriptable"] = self.index.subscriptable_at(0) if len(self.index) > 1: info["sub-sample-index"] = Index( self.index.values[1:] ).to_json() vds.info.update(info) finally: try: delattr(vds, "_allow_view_updates") except AttributeError: # Attribute already deleted by _copy() pass def _save_view_in_subdir( self, id: Optional[str], message: Optional[str], copy: bool, num_workers: int, scheduler: str, ): """Saves this view under ".queries" sub directory of same storage.""" info = self._get_view_info(id, message, copy) hash = info["id"] path = f".queries/{hash}" vds = self._sub_ds(path, empty=True, verbose=False) self._write_vds(vds, info, copy, num_workers, scheduler) self._append_to_queries_json(info) return vds def _save_view_in_user_queries_dataset( self, id: Optional[str], message: Optional[str], copy: bool, num_workers: int, scheduler: str, ): """Saves this view under hub://username/queries Only applicable for views of hub datasets. """ if len(self.index.values) > 1: raise NotImplementedError("Storing sub-sample slices is not supported yet.") username = jwt.decode(self.token, options={"verify_signature": False})["id"] if username == "public": raise DatasetViewSavingError( "Unable to save view for read only dataset. Login to save the view to your user account." ) info = self._get_view_info(id, message, copy) base = self._view_base or self org_id, ds_name = base.org_id, base.ds_name hash = f"[{org_id}][{ds_name}]{info['id']}" info["id"] = hash queries_ds_path = f"hub://{username}/queries" try: queries_ds = hub.load( queries_ds_path, verbose=False, ) # create if doesn't exist except PathNotEmptyException: hub.delete(queries_ds_path, force=True) queries_ds = hub.empty(queries_ds_path, verbose=False) except DatasetHandlerError: queries_ds = hub.empty(queries_ds_path, verbose=False) queries_ds._unlock() # we don't need locking as no data will be added to this ds. path = f"hub://{username}/queries/{hash}" vds = hub.empty(path, overwrite=True, verbose=False) self._write_vds(vds, info, copy, num_workers, scheduler) queries_ds._append_to_queries_json(info) return vds def _save_view_in_path( self, path: str, id: Optional[str], message: Optional[str], copy: bool, num_workers: int, scheduler: str, **ds_args, ): """Saves this view at a given dataset path""" if os.path.abspath(path) == os.path.abspath(self.path): raise DatasetViewSavingError("Rewriting parent dataset is not allowed.") try: vds = hub.empty(path, **ds_args) except Exception as e: raise DatasetViewSavingError from e info = self._get_view_info(id, message, copy) self._write_vds(vds, info, copy, num_workers, scheduler) return vds def save_view( self, message: Optional[str] = None, path: Optional[Union[str, pathlib.Path]] = None, id: Optional[str] = None, optimize: bool = False, num_workers: int = 0, scheduler: str = "threaded", verbose: bool = True, **ds_args, ) -> str: """Saves a dataset view as a virtual dataset (VDS) Examples: ``` # Save to specified path vds_path = ds[:10].save_view(path="views/first_10", id="first_10") # vds_path = views/first_10 ``` # Path unspecified vds_path = ds[:100].save_view(id="first_100", message="first 100 samples") # vds_path = path/to/dataset/.queries/first_100 ``` # Random id vds_path = ds[:100].save_view() # vds_path = "path/to/dataset/.queries/92f41922ed0471ec2d27690b7351fc96bea060e6c5ee22b14f7ffa5f291aa068" ``` See `Dataset.get_view` to learn how to load views by id. These virtual datasets can also be loaded from their path like normal datasets. Args: message (Optional, str): Custom user message. path (Optional, str, pathlib.Path): - The VDS will be saved as a standalone dataset at the specified path. - If not specified, the VDS is saved under `.queries` subdirectory of the source dataset's storage. - If the user doesn't have write access to the source dataset and the source dataset is a hub cloud dataset, then the VDS is saved is saved under the user's hub account and can be accessed using `hub.load(f"hub://{username}/queries/{query_hash}")`. id (Optional, str): Unique id for this view. Random id will be generated if not specified. optimize (bool): - If True, the dataset view will be optimized by copying and rechunking the required data. This is necessary to achieve fast streaming speeds when training models using the dataset view. The optimization process will take some time, depending on the size of the data. - You can also choose to optimize the saved view later by calling its `optimize` method: See `hub.core.dataset.view_entry.ViewEntry.optimize`. num_workers (int): Number of workers to be used for optimization process. Applicable only if `optimize=True`. Defaults to 0. scheduler (str): The scheduler to be used for optimization. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Only applicable if `optimize=True`. Defaults to 'threaded'. verbose (bool): If True, logs will be printed. Defaults to True. ds_args (dict): Additional args for creating VDS when path is specified. (See documentation for `hub.dataset()`) Note: Specifying `path` makes the view external. External views cannot be accessed using the parent dataset's `Dataset.get_view`, `Dataset.load_view`, `Dataset.delete_view` methods. They have to be loaded using `hub.\0load(path)`. Returns: str: Path to the saved VDS. Raises: ReadOnlyModeError: When attempting to save a view inplace and the user doesn't have write access. DatasetViewSavingError: If HEAD node has uncommitted changes. """ return self._save_view( path, id, message, optimize, num_workers, scheduler, verbose, False, **ds_args, ) def _save_view( self, path: Optional[Union[str, pathlib.Path]] = None, id: Optional[str] = None, message: Optional[str] = None, optimize: bool = False, num_workers: int = 0, scheduler: str = "threaded", verbose: bool = True, _ret_ds: bool = False, **ds_args, ) -> Union[str, Any]: """Saves a dataset view as a virtual dataset (VDS) Args: path (Optional, str, pathlib.Path): If specified, the VDS will saved as a standalone dataset at the specified path. If not, the VDS is saved under `.queries` subdirectory of the source dataset's storage. If the user doesn't have write access to the source dataset and the source dataset is a hub cloud dataset, then the VDS is saved is saved under the user's hub account and can be accessed using hub.load(f"hub://{username}/queries/{query_hash}"). id (Optional, str): Unique id for this view. message (Optional, message): Custom user message. optimize (bool): Whether the view should be optimized by copying the required data. Default False. num_workers (int): Number of workers to be used if `optimize` is True. scheduler (str): The scheduler to be used for optimization. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Only applicable if `optimize=True`. Defaults to 'threaded'. verbose (bool): If True, logs will be printed. Defaults to True. _ret_ds (bool): If True, the VDS is retured as such without converting it to a view. If False, the VDS path is returned. Default False. ds_args (dict): Additional args for creating VDS when path is specified. (See documentation for `hub.dataset()`) Returns: If _ret_ds is True, the VDS is returned, else path to the VDS is returned. Raises: ReadOnlyModeError: When attempting to save a view inplace and the user doesn't have write access. NotImplementedError: When attempting to save in-memory datasets. """ path = convert_pathlib_to_string_if_needed(path) ds_args["verbose"] = False vds = None if path is None and hasattr(self, "_vds"): vds = self._vds vds_id = vds.info["id"] if id is not None and vds_id != id: vds = None warnings.warn( f"This view is already saved with id '{vds_id}'. A copy of this view will be created with the provided id '{id}'" ) if vds is None: if path is None: if isinstance(self, MemoryProvider): raise NotImplementedError( "Saving views inplace is not supported for in-memory datasets." ) if self.read_only and not (self._view_base or self)._locked_out: if isinstance(self, hub.core.dataset.HubCloudDataset): vds = self._save_view_in_user_queries_dataset( id, message, optimize, num_workers, scheduler ) else: raise ReadOnlyModeError( "Cannot save view in read only dataset. Speicify a path to save the view in a different location." ) else: vds = self._save_view_in_subdir( id, message, optimize, num_workers, scheduler ) else: vds = self._save_view_in_path( path, id, message, optimize, num_workers, scheduler, **ds_args ) if verbose: log_visualizer_link(vds.path, self.path) if _ret_ds: return vds return vds.path def _get_view(self, inherit_creds=True, creds: Optional[Dict] = None): """Returns a view for this VDS. Only works if this Dataset is a virtual dataset. Returns: A view of the source dataset based on the indices from VDS. Args: inherit_creds (bool): Whether to inherit creds from the parent dataset in which this vds is stored. Default True. creds (optional, Dict): Creds for the source dataset. Used only if inherit_creds is False. Raises: Exception: If this is not a VDS. """ try: commit_id = self.info["source-dataset-version"] except KeyError: raise Exception("Dataset._get_view() works only for virtual datasets.") ds = ( self._parent_dataset if (inherit_creds and self._parent_dataset) else hub.load( self.info["source-dataset"], verbose=False, creds=creds, read_only=True ) ) try: orig_index = ds.index ds.index = Index() ds.checkout(commit_id) first_index_subscriptable = self.info.get("first-index-subscriptable", True) if first_index_subscriptable: index_entries = [ IndexEntry(self.VDS_INDEX.numpy().reshape(-1).tolist()) ] else: index_entries = [IndexEntry(int(self.VDS_INDEX.numpy()))] sub_sample_index = self.info.get("sub-sample-index") if sub_sample_index: index_entries += Index.from_json(sub_sample_index).values ret = ds[Index(index_entries)] ret._vds = self return ret finally: ds.index = orig_index def _get_empty_vds( self, vds_path: Optional[Union[str, pathlib.Path]] = None, query: Optional[str] = None, **vds_args, ): """Returns an empty VDS with this dataset as the source dataset. Internal. Args: vds_path (Optional, str, pathlib.Path): If specified, the vds will be sved at this path. Else the vds will be saved under `.queries` subdirectory. query (Optional, str): Query string associated with this view. vds_args (dict): Additional args for creating vds when path is specified. Returns: Empty VDS with this dataset as the source dataset. """ view = self[:0] vds_path = convert_pathlib_to_string_if_needed(vds_path) if query: view._query = query return view._save_view(vds_path, _ret_ds=True, **vds_args) @staticmethod def _get_queries_ds_from_user_account(): username = get_user_name() if username == "public": return try: return hub.load(f"hub://{username}/queries", verbose=False) except DatasetHandlerError: return def _read_queries_json_from_user_account(self): queries_ds = Dataset._get_queries_ds_from_user_account() if not queries_ds: return [], None return ( list( filter( lambda x: x["source-dataset"] == self.path, queries_ds._read_queries_json(), ) ), queries_ds, ) def get_views(self, commit_id: Optional[str] = None) -> List[ViewEntry]: """Returns list of views stored in this Dataset. Args: commit_id (str, optional): - Commit from which views should be returned. - If not specified, views from current commit is returned. - If not specified, views from the currently checked out commit will be returned. Returns: List of `hub.core.dataset.view_entry.ViewEntry` instances. """ commit_id = commit_id or self.commit_id queries = self._read_queries_json() f = lambda x: x["source-dataset-version"] == commit_id ret = map( partial(ViewEntry, dataset=self), filter(f, queries), ) if self.path.startswith("hub://"): queries, qds = self._read_queries_json_from_user_account() if queries: ret = chain( ret, map( partial(ViewEntry, dataset=qds, external=True), filter(f, queries), ), ) return list(ret) def get_view(self, id: str) -> ViewEntry: """Returns the dataset view corresponding to `id` Examples: ``` # save view ds[:100].save_view(id="first_100") # load view first_100 = ds.get_view("first_100").load() # 100 print(len(first_100)) ``` See `Dataset.save_view` to learn more about saving views. Args: id (str): id of required view. Returns: `hub.core.dataset.view_entry.ViewEntry` Raises: KeyError: If no such view exists. """ queries = self._read_queries_json() for q in queries: if q["id"] == id: return ViewEntry(q, self) if self.path.startswith("hub://"): queries, qds = self._read_queries_json_from_user_account() for q in queries: if q["id"] == f"[{self.org_id}][{self.ds_name}]{id}": return ViewEntry(q, qds, True) raise KeyError(f"No view with id {id} found in the dataset.") def load_view( self, id: str, optimize: Optional[bool] = False, num_workers: int = 0, scheduler: str = "threaded", progressbar: Optional[bool] = True, ): """Loads the view and returns the `hub.Dataset` by id. Equivalent to ds.get_view(id).load(). Args: id (str): id of the view to be loaded. optimize (bool): If True, the dataset view is optimized by copying and rechunking the required data before loading. This is necessary to achieve fast streaming speeds when training models using the dataset view. The optimization process will take some time, depending on the size of the data. num_workers (int): Number of workers to be used for the optimization process. Only applicable if `optimize=True`. Defaults to 0. scheduler (str): The scheduler to be used for optimization. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Only applicable if `optimize=True`. Defaults to 'threaded'. progressbar (bool): Whether to use progressbar for optimization. Only applicable if `optimize=True`. Defaults to True. Returns: Dataset: The loaded view. Raises: KeyError: if view with given id does not exist. """ if optimize: return ( self.get_view(id) .optimize( num_workers=num_workers, scheduler=scheduler, progressbar=progressbar, ) .load() ) return self.get_view(id).load() def delete_view(self, id: str): """Deletes the view with given view id Args: id (str): Id of the view to delete Raises: KeyError: if view with given id does not exist. """ try: with self._lock_queries_json(): qjson = self._read_queries_json() for i, q in enumerate(qjson): if q["id"] == id: qjson.pop(i) self.base_storage.subdir( ".queries/" + (q.get("path") or q["id"]) ).clear() self._write_queries_json(qjson) return except Exception: pass if self.path.startswith("hub://"): qds = Dataset._get_queries_ds_from_user_account() if qds: with qds._lock_queries_json(): qjson = qds._read_queries_json() for i, q in enumerate(qjson): if ( q["source-dataset"] == self.path and q["id"] == f"[{self.org_id}][{self.ds_name}]{id}" ): qjson.pop(i) qds.base_storage.subdir( ".queries/" + (q.get("path") or q["id"]) ).clear() qds._write_queries_json(qjson) return raise KeyError(f"No view with id {id} found in the dataset.") def _sub_ds( self, path, empty=False, memory_cache_size: int = DEFAULT_MEMORY_CACHE_SIZE, local_cache_size: int = DEFAULT_LOCAL_CACHE_SIZE, read_only=None, lock=True, verbose=True, ): """Loads a nested dataset. Internal. Note: Virtual datasets are returned as such, they are not converted to views. Args: path (str): Path to sub directory empty (bool): If True, all contents of the sub directory is cleared before initializing the sub dataset. memory_cache_size (int): Memory cache size for the sub dataset. local_cache_size (int): Local storage cache size for the sub dataset. read_only (bool): Loads the sub dataset in read only mode if True. Default False. lock (bool): Whether the dataset should be locked for writing. Only applicable for s3, hub and gcs datasets. No effect if read_only=True. verbose (bool): If True, logs will be printed. Defaults to True. Returns: Sub dataset """ sub_storage = self.base_storage.subdir(path) if empty: sub_storage.clear() if self.path.startswith("hub://"): path = posixpath.join(self.path, path) cls = hub.core.dataset.HubCloudDataset else: path = sub_storage.root cls = hub.core.dataset.Dataset ret = cls( generate_chain( sub_storage, memory_cache_size * MB, local_cache_size * MB, ), path=path, token=self._token, read_only=read_only, lock=lock, verbose=verbose, ) ret._parent_dataset = self return ret def _link_tensors( self, src: str, dest: str, append_f: str, update_f: Optional[str] = None, flatten_sequence: Optional[bool] = None, ): """Internal. Links a source tensor to a destination tensor. Appends / updates made to the source tensor will be reflected in the destination tensor. Args: src (str): Name of the source tensor. dest (str): Name of the destination tensor. append_f (str): Name of the linked tensor transform to be used for appending items to the destination tensor. This transform should be defined in `hub.core.tensor_link` module. update_f (str): Name of the linked tensor transform to be used for updating items in the destination tensor. This transform should be defined in `hub.core.tensor_link` module. flatten_sequence (bool, Optional): Whether appends and updates should be done per item or per sequence if the source tensor is a sequence tensor. Raises: TensorDoesNotExistError: If source or destination tensors do not exist in this dataset. ValueError: If source tensor is a sequence tensor and `flatten_sequence` argument is not specified. """ assert self._is_root() tensors = self._tensors() if src not in tensors: raise TensorDoesNotExistError(src) if dest not in tensors: raise TensorDoesNotExistError(dest) src_tensor = self[src] dest_key = self.version_state["tensor_names"][dest] if flatten_sequence is None: if src_tensor.is_sequence: raise ValueError( "`flatten_sequence` arg must be specified when linking a sequence tensor." ) flatten_sequence = False src_tensor.meta.add_link(dest_key, append_f, update_f, flatten_sequence) self.storage.maybe_flush() def _resolve_tensor_list(self, keys: List[str]) -> List[str]: ret = [] for k in keys: fullpath = posixpath.join(self.group_index, k) if ( self.version_state["tensor_names"].get(fullpath) in self.version_state["full_tensors"] ): ret.append(k) else: if fullpath[-1] != "/": fullpath = fullpath + "/" hidden = self.meta.hidden_tensors ret += filter( lambda t: t.startswith(fullpath) and t not in hidden, self.version_state["tensor_names"], ) return ret def _copy( self, dest: Union[str, pathlib.Path], tensors: Optional[List[str]] = None, overwrite: bool = False, creds=None, token=None, num_workers: int = 0, scheduler="threaded", progressbar=True, public: bool = False, unlink: bool = False, create_vds_index_tensor: bool = False, ): """Copies this dataset or dataset view to `dest`. Version control history is not included. Args: dest (str, pathlib.Path): Destination dataset or path to copy to. If a Dataset instance is provided, it is expected to be empty. tensors (List[str], optional): Names of tensors (and groups) to be copied. If not specified all tensors are copied. overwrite (bool): If True and a dataset exists at `destination`, it will be overwritten. Defaults to False. creds (dict, Optional): creds required to create / overwrite datasets at `dest`. token (str, Optional): token used to for fetching credentials to `dest`. num_workers (int): The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler. scheduler (str): The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'. progressbar (bool): Displays a progress bar if True (default). public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False. unlink (bool): Whether to copy the data from source for linked tensors. Does not apply for linked video tensors. create_vds_index_tensor (bool): If True, a hidden tensor called "VDS_INDEX" is created which contains the sample indices in the source view. Returns: Dataset: New dataset object. Raises: DatasetHandlerError: If a dataset already exists at destination path and overwrite is False. """ if isinstance(dest, str): path = dest else: path = dest.path report_params = { "Tensors": tensors, "Overwrite": overwrite, "Num_Workers": num_workers, "Scheduler": scheduler, "Progressbar": progressbar, "Public": public, } if path.startswith("hub://"): report_params["Dest"] = path feature_report_path(self.path, "copy", report_params, token=token) dest_ds = hub.api.dataset.dataset._like( dest, self, tensors=tensors, creds=creds, token=token, overwrite=overwrite, public=public, unlink=[ t for t in self.tensors if ( self.tensors[t].base_htype != "video" or hub.constants._UNLINK_VIDEOS ) ] if unlink else False, ) if not self.index.subscriptable_at(0): old_first_index = self.index.values[0] new_first_index = IndexEntry( slice(old_first_index.value, old_first_index.value + 1) ) self.index.values[0] = new_first_index reset_index = True else: reset_index = False try: for tensor in dest_ds.tensors: src = self[tensor] copy_f = ( ( _copy_tensor_unlinked_partial_sample if len(self.index) > 1 else _copy_tensor_unlinked_full_sample ) if unlink and src.is_link and (src.base_htype != "video" or hub.constants._UNLINK_VIDEOS) else _copy_tensor ) if progressbar: sys.stderr.write(f"Copying tensor: {tensor}.\n") hub.compute(copy_f, name="tensor copy transform")( tensor_name=tensor ).eval( self, dest_ds, num_workers=num_workers, scheduler=scheduler, progressbar=progressbar, skip_ok=True, check_lengths=False, ) dest_ds.flush() if create_vds_index_tensor: with dest_ds: try: dest_ds._allow_view_updates = True dest_ds.create_tensor( "VDS_INDEX", dtype=np.uint64, hidden=True, create_shape_tensor=False, create_id_tensor=False, create_sample_info_tensor=False, ) dest_ds.VDS_INDEX.extend(list(self.sample_indices)) finally: delattr(dest_ds, "_allow_view_updates") finally: if reset_index: dest_ds.meta.default_index = Index([IndexEntry(0)]).to_json() dest_ds.meta.is_dirty = True dest_ds.flush() dest_ds = dest_ds[0] self.index.values[0] = old_first_index return dest_ds def copy( self, dest: Union[str, pathlib.Path], tensors: Optional[List[str]] = None, overwrite: bool = False, creds=None, token=None, num_workers: int = 0, scheduler="threaded", progressbar=True, public: bool = False, ): """Copies this dataset or dataset view to `dest`. Version control history is not included. Args: dest (str, pathlib.Path): Destination dataset or path to copy to. If a Dataset instance is provided, it is expected to be empty. tensors (List[str], optional): Names of tensors (and groups) to be copied. If not specified all tensors are copied. overwrite (bool): If True and a dataset exists at `destination`, it will be overwritten. Defaults to False. creds (dict, Optional): creds required to create / overwrite datasets at `dest`. token (str, Optional): token used to for fetching credentials to `dest`. num_workers (int): The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler. scheduler (str): The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'. progressbar (bool): Displays a progress bar if True (default). public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False. Returns: Dataset: New dataset object. Raises: DatasetHandlerError: If a dataset already exists at destination path and overwrite is False. """ return self._copy( dest, tensors, overwrite, creds, token, num_workers, scheduler, progressbar, public, ) @invalid_view_op def reset(self): """Resets the uncommitted changes present in the branch. Note: - The uncommitted data is deleted from underlying storage, this is not a reversible operation. """ storage, version_state = self.storage, self.version_state if version_state["commit_node"].children: print("You are not at the head node of the branch, cannot reset.") return if not self.has_head_changes: print("There are no uncommitted changes on this branch.") return # delete metas first self._delete_metas() if self.commit_id is None: storage.clear() self._populate_meta() else: prefix = "/".join(("versions", self.pending_commit_id)) storage.clear(prefix=prefix) src_id, dest_id = self.commit_id, self.pending_commit_id # by doing this checkout, we get list of tensors in previous commit, which is what we require for copying metas and create_commit_chunk_set self.checkout(src_id) copy_metas(src_id, dest_id, storage, version_state) create_commit_chunk_sets(dest_id, storage, version_state) self.checkout(dest_id) load_meta(self) self._info = None self._ds_diff = None def _delete_metas(self): """Deletes all metas in the dataset.""" commit_id = self.pending_commit_id meta_keys = [get_dataset_meta_key(commit_id)] meta_keys.append(get_dataset_diff_key(commit_id)) meta_keys.append(get_dataset_info_key(commit_id)) for tensor in self.tensors: meta_keys.append(get_tensor_meta_key(commit_id, tensor)) meta_keys.append(get_tensor_tile_encoder_key(commit_id, tensor)) meta_keys.append(get_tensor_info_key(commit_id, tensor)) meta_keys.append(get_tensor_commit_chunk_set_key(commit_id, tensor)) meta_keys.append(get_tensor_commit_diff_key(commit_id, tensor)) meta_keys.append(get_chunk_id_encoder_key(commit_id, tensor)) meta_keys.append(get_sequence_encoder_key(commit_id, tensor)) for key in meta_keys: try: del self.storage[key] except KeyError: pass def add_creds_key(self, creds_key: str, managed: bool = False): """Adds a new creds key to the dataset. These keys are used for tensors that are linked to external data. Examples: ``` # create/load a dataset ds = hub.dataset("path/to/dataset") # add a new creds key ds.add_creds_key("my_s3_key") ``` Args: creds_key (str): The key to be added. managed (bool): If True, the creds corresponding to the key will be fetched from activeloop platform. Note, this is only applicable for datasets that are connected to activeloop platform. Defaults to False. Raises: ValueError: If the dataset is not connected to activeloop platform. """ if managed: raise ValueError( "Managed creds are not supported for datasets that are not connected to activeloop platform." ) self.link_creds.add_creds_key(creds_key) save_link_creds(self.link_creds, self.storage) def populate_creds(self, creds_key: str, creds: dict): """Populates the creds key added in add_creds_key with the given creds. These creds are used to fetch the external data. This needs to be done everytime the dataset is reloaded for datasets that contain links to external data. Examples: ``` # create/load a dataset ds = hub.dataset("path/to/dataset") # add a new creds key ds.add_creds_key("my_s3_key") # populate the creds ds.populate_creds("my_s3_key", {"aws_access_key_id": "my_access_key", "aws_secret_access_key": "my_secret_key"}) ``` """ self.link_creds.populate_creds(creds_key, creds) def update_creds_key(self, old_creds_key: str, new_creds_key: str): """Replaces the old creds key with the new creds key. This is used to replace the creds key used for external data.""" replaced_index = self.link_creds.replace_creds(old_creds_key, new_creds_key) save_link_creds(self.link_creds, self.storage, replaced_index=replaced_index) def change_creds_management(self, creds_key: str, managed: bool): """Changes the management status of the creds key. Args: creds_key (str): The key whose management status is to be changed. managed (bool): The target management status. If True, the creds corresponding to the key will be fetched from activeloop platform. Raises: ValueError: If the dataset is not connected to activeloop platform. KeyError: If the creds key is not present in the dataset. Examples: ``` # create/load a dataset ds = hub.dataset("path/to/dataset") # add a new creds key ds.add_creds_key("my_s3_key") # Populate the name added with creds dictionary # These creds are only present temporarily and will have to be repopulated on every reload ds.populate_creds("my_s3_key", {}) # Change the management status of the key to True. Before doing this, ensure that the creds have been created on activeloop platform # Now, this key will no longer use the credentials populated in the previous step but will instead fetch them from activeloop platform # These creds don't have to be populated again on every reload and will be fetched every time the dataset is loaded ds.change_creds_management("my_s3_key", True) ``` """ raise ValueError( "Managed creds are not supported for datasets that are not connected to activeloop platform." ) def get_creds_keys(self) -> List[str]: """Returns the list of creds keys added to the dataset. These are used to fetch external data in linked tensors""" return self.link_creds.creds_keys def visualize( self, width: Union[int, str, None] = None, height: Union[int, str, None] = None ): """ Visualizes the dataset in the Jupyter notebook. Args: width: Union[int, str, None] Optional width of the visualizer canvas. height: Union[int, str, None] Optional height of the visualizer canvas. Raises: Exception: If a dataset is not hub cloud dataset and the visualization happens in colab. """ from hub.visualizer import visualize hub_reporter.feature_report(feature_name="visualize", parameters={}) if is_colab(): raise Exception("Cannot visualize non hub cloud dataset in Colab.") else: visualize(self.storage, width=width, height=height) def __contains__(self, tensor: str): return tensor in self.tensors def _optimize_saved_view( self, id: str, external=False, unlink=True, num_workers=0, scheduler="threaded", progressbar=True, ): with self._lock_queries_json(): qjson = self._read_queries_json() idx = -1 for i in range(len(qjson)): if qjson[i]["id"] == id: idx = i break if idx == -1: raise KeyError(f"View with id {id} not found.") info = qjson[i] if not info["virtual-datasource"]: # Already optimized return info path = info.get("path", info["id"]) vds = self._sub_ds(".queries/" + path, verbose=False) view = vds._get_view(not external) new_path = path + "_OPTIMIZED" optimized = self._sub_ds(".queries/" + new_path, empty=True, verbose=False) view._copy( optimized, overwrite=True, unlink=unlink, create_vds_index_tensor=True, num_workers=num_workers, scheduler=scheduler, progressbar=progressbar, ) optimized.info.update(vds.info.__getstate__()) optimized.info["virtual-datasource"] = False optimized.info["path"] = new_path optimized.flush() info["virtual-datasource"] = False info["path"] = new_path self._write_queries_json(qjson) vds.base_storage.disable_readonly() try: vds.base_storage.clear() except Exception as e: warnings.warn( f"Error while deleting old view after writing optimized version: {e}" ) return info def _sample_indices(self, maxlen: int): vds_index = self._tensors(include_hidden=True).get("VDS_INDEX") if vds_index: return vds_index.numpy().reshape(-1).tolist() return self.index.values[0].indices(maxlen) @property def sample_indices(self): return self._sample_indices(min(t.num_samples for t in self.tensors.values())) def _enable_padding(self): self._pad_tensors = True def _disable_padding(self): self._pad_tensors = False @invalid_view_op def pop(self, index: Optional[int] = None): """ Removes a sample from all the tensors of the dataset. For any tensor if the index >= len(tensor), the sample won't be popped from it. Args: index (int, Optional): The index of the sample to be removed. If it is None, the index becomes the length of the longest tensor - 1. Raises: IndexError: If the index is out of range. """ max_len = max((t.num_samples for t in self.tensors.values()), default=0) if max_len == 0: raise IndexError("Can't pop from empty dataset.") if index is None: index = max_len - 1 if index < 0: raise IndexError("Pop doesn't support negative indices.") elif index >= max_len: raise IndexError( f"Index {index} is out of range. The longest tensor has {max_len} samples." ) for tensor in self.tensors.values(): if tensor.num_samples > index: tensor.pop(index) @property def is_view(self) -> bool: return ( not self.index.is_trivial() or hasattr(self, "_vds") or hasattr(self, "_view_entry") )
Subclasses
Instance variables
var branch
-
The current branch of the dataset
Expand source code
@property def branch(self) -> str: """The current branch of the dataset""" return self.version_state["branch"]
var branches
-
Lists all the branches of the dataset.
Returns
List of branches.
Expand source code
@property def branches(self): """Lists all the branches of the dataset. Returns: List of branches. """ return list(self.version_state["branch_commit_map"])
var client
-
Returns the client of the dataset.
Expand source code
@property def client(self): """Returns the client of the dataset.""" return self._client
var commit_id
-
The lasted committed commit_id of the dataset. If there are no commits, this returns None.
Expand source code
@property def commit_id(self) -> Optional[str]: """The lasted committed commit_id of the dataset. If there are no commits, this returns None.""" commit_node = self.version_state["commit_node"] if not commit_node.is_head_node: return commit_node.commit_id parent = commit_node.parent if parent is None: return None else: return parent.commit_id
var commits
-
Lists all the commits leading to the current dataset state.
Returns
List of dictionaries containing commit information.
Expand source code
@property def commits(self) -> List[Dict]: """Lists all the commits leading to the current dataset state. Returns: List of dictionaries containing commit information. """ commits = [] commit_node = self.version_state["commit_node"] while commit_node: if not commit_node.is_head_node: commit_info = { "commit": commit_node.commit_id, "author": commit_node.commit_user_name, "time": str(commit_node.commit_time)[:-7], "message": commit_node.commit_message, } commits.append(commit_info) commit_node = commit_node.parent return commits
var groups
-
All sub groups in this group
Expand source code
@property def groups(self) -> Dict[str, "Dataset"]: """All sub groups in this group""" return {g: self[g] for g in self._groups_filtered}
var has_head_changes
-
Returns True if currently at head node and uncommitted changes are present.
Expand source code
@property def has_head_changes(self): """Returns True if currently at head node and uncommitted changes are present.""" commit_node = self.version_state["commit_node"] return not commit_node.children and current_commit_has_change( self.version_state, self.storage )
var info
-
Returns the information about the dataset.
Expand source code
@property def info(self): """Returns the information about the dataset.""" if self._info is None: path = get_dataset_info_key(self.version_state["commit_id"]) self.__dict__["_info"] = load_info(path, self) # type: ignore return self._info
var is_view
-
Expand source code
@property def is_view(self) -> bool: return ( not self.index.is_trivial() or hasattr(self, "_vds") or hasattr(self, "_view_entry") )
var meta
-
Returns the metadata of the dataset.
Expand source code
@property def meta(self) -> DatasetMeta: """Returns the metadata of the dataset.""" return self.version_state["meta"]
var parent
-
Returns the parent of this group. Returns None if this is the root dataset.
Expand source code
@property def parent(self): """Returns the parent of this group. Returns None if this is the root dataset.""" if self._is_root(): return None autoflush = self.storage.autoflush ds = self.__class__( storage=self.storage, index=self.index, group_index=posixpath.dirname(self.group_index), read_only=self.read_only, public=self.public, token=self._token, verbose=self.verbose, version_state=self.version_state, path=self.path, link_creds=self.link_creds, ) self.storage.autoflush = autoflush return ds
var pending_commit_id
-
The commit_id of the next commit that will be made to the dataset. If you're not at the head of the current branch, this will be the same as the commit_id.
Expand source code
@property def pending_commit_id(self) -> str: """The commit_id of the next commit that will be made to the dataset. If you're not at the head of the current branch, this will be the same as the commit_id. """ return self.version_state["commit_id"]
var root
-
Returns the root dataset of a group.
Expand source code
@property def root(self): """Returns the root dataset of a group.""" if self._is_root(): return self autoflush = self.storage.autoflush ds = self.__class__( storage=self.storage, index=self.index, group_index="", read_only=self.read_only, public=self.public, token=self._token, verbose=self.verbose, version_state=self.version_state, path=self.path, link_creds=self.link_creds, ) self.storage.autoflush = autoflush return ds
var sample_indices
-
Expand source code
@property def sample_indices(self): return self._sample_indices(min(t.num_samples for t in self.tensors.values()))
var tensors
-
All tensors belonging to this group, including those within sub groups. Always returns the sliced tensors.
Expand source code
@property def tensors(self) -> Dict[str, Tensor]: """All tensors belonging to this group, including those within sub groups. Always returns the sliced tensors.""" return self._tensors(include_hidden=False)
Methods
def add_creds_key(self, creds_key, managed=False)
-
Adds a new creds key to the dataset. These keys are used for tensors that are linked to external data.
Examples
# create/load a dataset ds = hub.dataset("path/to/dataset") # add a new creds key ds.add_creds_key("my_s3_key")
Args
creds_key
:str
- The key to be added.
managed
:bool
- If True, the creds corresponding to the key will be fetched from activeloop platform. Note, this is only applicable for datasets that are connected to activeloop platform. Defaults to False.
Raises
ValueError
- If the dataset is not connected to activeloop platform.
Expand source code
def add_creds_key(self, creds_key: str, managed: bool = False): """Adds a new creds key to the dataset. These keys are used for tensors that are linked to external data. Examples: ``` # create/load a dataset ds = hub.dataset("path/to/dataset") # add a new creds key ds.add_creds_key("my_s3_key") ``` Args: creds_key (str): The key to be added. managed (bool): If True, the creds corresponding to the key will be fetched from activeloop platform. Note, this is only applicable for datasets that are connected to activeloop platform. Defaults to False. Raises: ValueError: If the dataset is not connected to activeloop platform. """ if managed: raise ValueError( "Managed creds are not supported for datasets that are not connected to activeloop platform." ) self.link_creds.add_creds_key(creds_key) save_link_creds(self.link_creds, self.storage)
def append(self, sample, skip_ok=False)
-
Append samples to mutliple tensors at once. This method expects all tensors being updated to be of the same length.
Args
sample
:dict
- Dictionary with tensor names as keys and samples as values.
skip_ok
:bool
- Skip tensors not in
sample
if set to True.
Raises
KeyError
- If any tensor in the dataset is not a key in
sample
andskip_ok
is False. TensorDoesNotExistError
- If tensor in
sample
does not exist. ValueError
- If all tensors being updated are not of the same length.
NotImplementedError
- If an error occurs while writing tiles.
Exception
- Error while attempting to rollback appends.
Expand source code
@invalid_view_op def append(self, sample: Dict[str, Any], skip_ok: bool = False): """Append samples to mutliple tensors at once. This method expects all tensors being updated to be of the same length. Args: sample (dict): Dictionary with tensor names as keys and samples as values. skip_ok (bool): Skip tensors not in `sample` if set to True. Raises: KeyError: If any tensor in the dataset is not a key in `sample` and `skip_ok` is False. TensorDoesNotExistError: If tensor in `sample` does not exist. ValueError: If all tensors being updated are not of the same length. NotImplementedError: If an error occurs while writing tiles. Exception: Error while attempting to rollback appends. """ if isinstance(sample, Dataset): sample = sample.tensors if not skip_ok: for k in self.tensors: if k not in sample: raise KeyError( f"Required tensor not provided: {k}. Use ds.append(sample, skip_ok=True) to skip tensors." ) for k in sample: if k not in self._tensors(): raise TensorDoesNotExistError(k) if len(set(map(len, (self[k] for k in sample)))) != 1: raise ValueError( "When appending using Dataset.append, all tensors are expected to have the same length." ) [f() for f in list(self._update_hooks.values())] tensors_appended = [] with self: for k, v in sample.items(): try: tensor = self[k] enc = tensor.chunk_engine.chunk_id_encoder num_chunks = enc.num_chunks tensor.append(v) tensors_appended.append(k) except Exception as e: new_num_chunks = enc.num_chunks num_chunks_added = new_num_chunks - num_chunks if num_chunks_added > 1: # This is unlikely to happen, i.e the sample passed the validation # steps and tiling but some error occured while writing tiles to chunks raise NotImplementedError( "Unable to recover from error while writing tiles." ) from e elif num_chunks_added == 1: enc._encoded = enc._encoded[:-1] for k in tensors_appended: try: self[k].pop() except Exception as e2: raise Exception( "Error while attepting to rollback appends" ) from e2 raise e
def change_creds_management(self, creds_key, managed)
-
Changes the management status of the creds key.
Args
creds_key
:str
- The key whose management status is to be changed.
managed
:bool
- The target management status. If True, the creds corresponding to the key will be fetched from activeloop platform.
Raises
ValueError
- If the dataset is not connected to activeloop platform.
KeyError
- If the creds key is not present in the dataset.
Examples
# create/load a dataset ds = hub.dataset("path/to/dataset") # add a new creds key ds.add_creds_key("my_s3_key") # Populate the name added with creds dictionary # These creds are only present temporarily and will have to be repopulated on every reload ds.populate_creds("my_s3_key", {}) # Change the management status of the key to True. Before doing this, ensure that the creds have been created on activeloop platform # Now, this key will no longer use the credentials populated in the previous step but will instead fetch them from activeloop platform # These creds don't have to be populated again on every reload and will be fetched every time the dataset is loaded ds.change_creds_management("my_s3_key", True)
Expand source code
def change_creds_management(self, creds_key: str, managed: bool): """Changes the management status of the creds key. Args: creds_key (str): The key whose management status is to be changed. managed (bool): The target management status. If True, the creds corresponding to the key will be fetched from activeloop platform. Raises: ValueError: If the dataset is not connected to activeloop platform. KeyError: If the creds key is not present in the dataset. Examples: ``` # create/load a dataset ds = hub.dataset("path/to/dataset") # add a new creds key ds.add_creds_key("my_s3_key") # Populate the name added with creds dictionary # These creds are only present temporarily and will have to be repopulated on every reload ds.populate_creds("my_s3_key", {}) # Change the management status of the key to True. Before doing this, ensure that the creds have been created on activeloop platform # Now, this key will no longer use the credentials populated in the previous step but will instead fetch them from activeloop platform # These creds don't have to be populated again on every reload and will be fetched every time the dataset is loaded ds.change_creds_management("my_s3_key", True) ``` """ raise ValueError( "Managed creds are not supported for datasets that are not connected to activeloop platform." )
def checkout(self, address, create=False)
-
Checks out to a specific commit_id or branch. If
create = True
, creates a new branch with name as address.Note
Checkout from a head node in any branch that contains uncommitted data will lead to an auto commit before the checkout.
Args
address
:str
- The commit_id or branch to checkout to.
create
:bool
- If True, creates a new branch with name as address.
Returns
str
- The commit_id of the dataset after checkout.
Raises
Exception
- If dataset is a filtered view.
Expand source code
def checkout(self, address: str, create: bool = False) -> Optional[str]: """Checks out to a specific commit_id or branch. If `create = True`, creates a new branch with name as address. Note: Checkout from a head node in any branch that contains uncommitted data will lead to an auto commit before the checkout. Args: address (str): The commit_id or branch to checkout to. create (bool): If True, creates a new branch with name as address. Returns: str: The commit_id of the dataset after checkout. Raises: Exception: If dataset is a filtered view. """ return self._checkout(address, create)
def commit(self, message=None, allow_empty=False)
-
Stores a snapshot of the current state of the dataset.
Note
- Commiting from a non-head node in any branch, will lead to an auto checkout to a new branch.
- This same behaviour will happen if new samples are added or existing samples are updated from a non-head node.
Args
message
:str, Optional
- Used to describe the commit.
allow_empty
:bool
- If True, commit even if there are no changes
Returns
str
- the commit id of the saved commit that can be used to access the snapshot.
Raises
Exception
- if dataset is a filtered view.
EmptyCommitError
- if there are no changes and user does not forced to commit unchanged data
Expand source code
def commit(self, message: Optional[str] = None, allow_empty=False) -> str: """Stores a snapshot of the current state of the dataset. Note: - Commiting from a non-head node in any branch, will lead to an auto checkout to a new branch. - This same behaviour will happen if new samples are added or existing samples are updated from a non-head node. Args: message (str, Optional): Used to describe the commit. allow_empty (bool): If True, commit even if there are no changes Returns: str: the commit id of the saved commit that can be used to access the snapshot. Raises: Exception: if dataset is a filtered view. EmptyCommitError: if there are no changes and user does not forced to commit unchanged data """ if not allow_empty and not self.has_head_changes: raise EmptyCommitError( "There are no changes, commit is not done. Try again with allow_empty=True." ) return self._commit(message)
def copy(self, dest, tensors=None, overwrite=False, creds=None, token=None, num_workers=0, scheduler='threaded', progressbar=True, public=False)
-
Copies this dataset or dataset view to
dest
. Version control history is not included.Args
dest
:str, pathlib.Path
- Destination dataset or path to copy to. If a Dataset instance is provided, it is expected to be empty.
tensors
:List[str]
, optional- Names of tensors (and groups) to be copied. If not specified all tensors are copied.
overwrite
:bool
- If True and a dataset exists at
destination
, it will be overwritten. Defaults to False. creds
:dict, Optional
- creds required to create / overwrite datasets at
dest
. token
:str, Optional
- token used to for fetching credentials to
dest
. num_workers
:int
- The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
scheduler
:str
- The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.
progressbar
:bool
- Displays a progress bar if True (default).
public
:bool
- Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False.
Returns
Dataset
- New dataset object.
Raises
DatasetHandlerError
- If a dataset already exists at destination path and overwrite is False.
Expand source code
def copy( self, dest: Union[str, pathlib.Path], tensors: Optional[List[str]] = None, overwrite: bool = False, creds=None, token=None, num_workers: int = 0, scheduler="threaded", progressbar=True, public: bool = False, ): """Copies this dataset or dataset view to `dest`. Version control history is not included. Args: dest (str, pathlib.Path): Destination dataset or path to copy to. If a Dataset instance is provided, it is expected to be empty. tensors (List[str], optional): Names of tensors (and groups) to be copied. If not specified all tensors are copied. overwrite (bool): If True and a dataset exists at `destination`, it will be overwritten. Defaults to False. creds (dict, Optional): creds required to create / overwrite datasets at `dest`. token (str, Optional): token used to for fetching credentials to `dest`. num_workers (int): The number of workers to use for copying. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler. scheduler (str): The scheduler to be used for copying. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'. progressbar (bool): Displays a progress bar if True (default). public (bool): Defines if the dataset will have public access. Applicable only if Hub cloud storage is used and a new Dataset is being created. Defaults to False. Returns: Dataset: New dataset object. Raises: DatasetHandlerError: If a dataset already exists at destination path and overwrite is False. """ return self._copy( dest, tensors, overwrite, creds, token, num_workers, scheduler, progressbar, public, )
def create_group(self, name, exist_ok=False)
-
Creates a tensor group. Intermediate groups in the path are also created.
Args
name
- The name of the group to create.
exist_ok
- If True, the group is created if it does not exist. If False, an error is raised if the group already exists.
Returns
The created group.
Raises
TensorGroupAlreadyExistsError
- If the group already exists and exist_ok is False.
Examples
ds.create_group("images") ds['images'].create_tensor("cats")
ds.create_groups("images/jpg/cats") ds["images"].create_tensor("png") ds["images/jpg"].create_group("dogs")
Expand source code
@hub_reporter.record_call def create_group(self, name: str, exist_ok=False) -> "Dataset": """Creates a tensor group. Intermediate groups in the path are also created. Args: name: The name of the group to create. exist_ok: If True, the group is created if it does not exist. If False, an error is raised if the group already exists. Returns: The created group. Raises: TensorGroupAlreadyExistsError: If the group already exists and exist_ok is False. Examples: ``` ds.create_group("images") ds['images'].create_tensor("cats") ``` ds.create_groups("images/jpg/cats") ds["images"].create_tensor("png") ds["images/jpg"].create_group("dogs") """ if not self._is_root(): return self.root.create_group( posixpath.join(self.group_index, name), exist_ok=exist_ok ) name = filter_name(name) if name in self._groups: if not exist_ok: raise TensorGroupAlreadyExistsError(name) return self[name] return self._create_group(name)
def create_tensor(self, name, htype='unspecified', dtype='unspecified', sample_compression='unspecified', chunk_compression='unspecified', hidden=False, create_sample_info_tensor=True, create_shape_tensor=True, create_id_tensor=True, verify=False, exist_ok=False, **kwargs)
-
Creates a new tensor in the dataset.
Examples
# create dataset ds = hub.dataset("path/to/dataset") # create tensors ds.create_tensor("images", htype="image", sample_compression="jpg") ds.create_tensor("videos", htype="video", sample_compression="mp4") ds.create_tensor("data") # append data ds.images.append(np.ones((400, 400, 3), dtype='uint8')) ds.videos.append(hub.read("videos/sample_video.mp4")) ds.data.append(np.zeros((100, 100, 2)))
Args
name
:str
- The name of the tensor to be created.
htype
:str
- The class of data for the tensor.
The defaults for other parameters are determined in terms of this value.
For example,
htype="image"
would havedtype
default touint8
. These defaults can be overridden by explicitly passing any of the other parameters to this function. May also modify the defaults for other parameters. dtype
:str
- Optionally override this tensor's
dtype
. All subsequent samples are required to have thisdtype
. sample_compression
:str
- All samples will be compressed in the provided format. If
None
, samples are uncompressed. chunk_compression
:str
- All chunks will be compressed in the provided format. If
None
, chunks are uncompressed. **kwargs
hub.htype
defaults can be overridden by passing any of the compatible parameters. To see allhub.htype
s and their correspondent arguments, check outhub/htypes.py
.hidden
:bool
- If True, the tensor will be hidden from ds.tensors but can still be accessed via ds[tensor_name]
create_sample_info_tensor
:bool
- If True, meta data of individual samples will be saved in a hidden tensor. This data can be accessed via
Tensor[i].sample_info
. create_shape_tensor
:bool
- If True, an associated tensor containing shapes of each sample will be created.
create_id_tensor
:bool
- If True, an associated tensor containing unique ids for each sample will be created. This is useful for merge operations.
verify
:bool
- Valid only for link htypes. If True, all links will be verified before they are added to the tensor.
exist_ok
- If True, the group is created if it does not exist. If False, an error is raised if the group already exists.
Returns
The new tensor, which can also be accessed by
self[name]
.Raises
TensorAlreadyExistsError
- If the tensor already exists and
exist_ok
is False. TensorGroupAlreadyExistsError
- Duplicate tensor groups are not allowed.
InvalidTensorNameError
- If
name
is in dataset attributes. NotImplementedError
- If trying to override
chunk_compression
. TensorMetaInvalidHtype
- If invalid htype is specified.
ValueError
- If an illegal argument is specified.
Expand source code
@invalid_view_op @hub_reporter.record_call def create_tensor( self, name: str, htype: str = UNSPECIFIED, dtype: Union[str, np.dtype] = UNSPECIFIED, sample_compression: str = UNSPECIFIED, chunk_compression: str = UNSPECIFIED, hidden: bool = False, create_sample_info_tensor: bool = True, create_shape_tensor: bool = True, create_id_tensor: bool = True, verify: bool = False, exist_ok: bool = False, **kwargs, ): """Creates a new tensor in the dataset. Examples: ``` # create dataset ds = hub.dataset("path/to/dataset") # create tensors ds.create_tensor("images", htype="image", sample_compression="jpg") ds.create_tensor("videos", htype="video", sample_compression="mp4") ds.create_tensor("data") # append data ds.images.append(np.ones((400, 400, 3), dtype='uint8')) ds.videos.append(hub.read("videos/sample_video.mp4")) ds.data.append(np.zeros((100, 100, 2))) ``` Args: name (str): The name of the tensor to be created. htype (str): The class of data for the tensor. The defaults for other parameters are determined in terms of this value. For example, `htype="image"` would have `dtype` default to `uint8`. These defaults can be overridden by explicitly passing any of the other parameters to this function. May also modify the defaults for other parameters. dtype (str): Optionally override this tensor's `dtype`. All subsequent samples are required to have this `dtype`. sample_compression (str): All samples will be compressed in the provided format. If `None`, samples are uncompressed. chunk_compression (str): All chunks will be compressed in the provided format. If `None`, chunks are uncompressed. **kwargs: `htype` defaults can be overridden by passing any of the compatible parameters. To see all `htype`s and their correspondent arguments, check out `hub/htypes.py`. hidden (bool): If True, the tensor will be hidden from ds.tensors but can still be accessed via ds[tensor_name] create_sample_info_tensor (bool): If True, meta data of individual samples will be saved in a hidden tensor. This data can be accessed via `tensor[i].sample_info`. create_shape_tensor (bool): If True, an associated tensor containing shapes of each sample will be created. create_id_tensor (bool): If True, an associated tensor containing unique ids for each sample will be created. This is useful for merge operations. verify (bool): Valid only for link htypes. If True, all links will be verified before they are added to the tensor. exist_ok: If True, the group is created if it does not exist. If False, an error is raised if the group already exists. Returns: The new tensor, which can also be accessed by `self[name]`. Raises: TensorAlreadyExistsError: If the tensor already exists and `exist_ok` is False. TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed. InvalidTensorNameError: If `name` is in dataset attributes. NotImplementedError: If trying to override `chunk_compression`. TensorMetaInvalidHtype: If invalid htype is specified. ValueError: If an illegal argument is specified. """ # if not the head node, checkout to an auto branch that is newly created auto_checkout(self) name = filter_name(name, self.group_index) key = self.version_state["tensor_names"].get(name) is_sequence, is_link, htype = parse_complex_htype(htype) if key: if not exist_ok: raise TensorAlreadyExistsError(name) tensor = self.root[key] current_config = tensor._config new_config = { "htype": htype, "dtype": dtype, "sample_compression": sample_compression, "chunk_compression": chunk_compression, "hidden": hidden, "is_link": is_link, "is_sequence": is_sequence, } if current_config != new_config: raise ValueError( f"Tensor {name} already exists with different configuration. " f"Current config: {current_config}. " f"New config: {new_config}" ) return tensor elif name in self.version_state["full_tensors"]: key = f"{name}_{uuid.uuid4().hex[:4]}" else: key = name if name in self._groups: raise TensorGroupAlreadyExistsError(name) tensor_name = posixpath.split(name)[1] if not tensor_name or tensor_name in dir(self): raise InvalidTensorNameError(tensor_name) kwargs["is_sequence"] = kwargs.get("is_sequence") or is_sequence kwargs["is_link"] = kwargs.get("is_link") or is_link kwargs["verify"] = verify if is_link and ( sample_compression != UNSPECIFIED or chunk_compression != UNSPECIFIED ): warnings.warn( "Chunk_compression and sample_compression aren't valid for tensors with linked data. Ignoring these arguments." ) sample_compression = UNSPECIFIED chunk_compression = UNSPECIFIED if not self._is_root(): return self.root.create_tensor( name=key, htype=htype, dtype=dtype, sample_compression=sample_compression, chunk_compression=chunk_compression, hidden=hidden, create_sample_info_tensor=create_sample_info_tensor, create_shape_tensor=create_shape_tensor, create_id_tensor=create_id_tensor, exist_ok=exist_ok, **kwargs, ) if "/" in name: self._create_group(posixpath.split(name)[0]) # Seperate meta and info htype_config = HTYPE_CONFIGURATIONS.get(htype, {}).copy() info_keys = htype_config.pop("_info", []) info_kwargs = {} meta_kwargs = {} for k, v in kwargs.items(): if k in info_keys: verify_htype_key_value(htype, k, v) info_kwargs[k] = v else: meta_kwargs[k] = v # Set defaults for k in info_keys: if k not in info_kwargs: if k == "class_names": info_kwargs[k] = htype_config[k].copy() else: info_kwargs[k] = htype_config[k] create_tensor( key, self.storage, htype=htype, dtype=dtype, sample_compression=sample_compression, chunk_compression=chunk_compression, version_state=self.version_state, hidden=hidden, **meta_kwargs, ) meta: DatasetMeta = self.meta ffw_dataset_meta(meta) meta.add_tensor(name, key, hidden=hidden) tensor = Tensor(key, self) # type: ignore tensor.meta.name = name self.version_state["full_tensors"][key] = tensor self.version_state["tensor_names"][name] = key if info_kwargs: tensor.info.update(info_kwargs) self.storage.maybe_flush() if create_sample_info_tensor and htype in ("image", "audio", "video", "dicom"): self._create_sample_info_tensor(name) if create_shape_tensor and htype not in ("text", "json"): self._create_sample_shape_tensor(name, htype=htype) if create_id_tensor: self._create_sample_id_tensor(name) return tensor
def create_tensor_like(self, name, source, unlink=False)
-
Copies the
source
tensor's meta information and creates a new tensor with it. No samples are copied, only the meta/info for the tensor is.Examples
ds.create_tensor_like("cats", ds["images"])
Args
name
:str
- Name for the new tensor.
source
:Tensor
- Tensor who's meta/info will be copied. May or may not be contained in the same dataset.
unlink
:bool
- Whether to unlink linked tensors.
Returns
Tensor
- New Tensor object.
Expand source code
@invalid_view_op @hub_reporter.record_call def create_tensor_like( self, name: str, source: "Tensor", unlink: bool = False ) -> "Tensor": """Copies the `source` tensor's meta information and creates a new tensor with it. No samples are copied, only the meta/info for the tensor is. Examples: ``` ds.create_tensor_like("cats", ds["images"]) ``` Args: name (str): Name for the new tensor. source (Tensor): Tensor who's meta/info will be copied. May or may not be contained in the same dataset. unlink (bool): Whether to unlink linked tensors. Returns: Tensor: New Tensor object. """ info = source.info.__getstate__().copy() meta = source.meta.__getstate__().copy() if unlink: meta["is_link"] = False del meta["min_shape"] del meta["max_shape"] del meta["length"] del meta["version"] del meta["name"] destination_tensor = self.create_tensor(name, **meta) destination_tensor.info.update(info) return destination_tensor
def delete(self, large_ok=False)
-
Deletes the entire dataset from the cache layers (if any) and the underlying storage. This is an IRREVERSIBLE operation. Data once deleted can not be recovered.
Args
large_ok
:bool
- Delete datasets larger than 1GB. Disabled by default.
Expand source code
@invalid_view_op @hub_reporter.record_call def delete(self, large_ok=False): """Deletes the entire dataset from the cache layers (if any) and the underlying storage. This is an IRREVERSIBLE operation. Data once deleted can not be recovered. Args: large_ok (bool): Delete datasets larger than 1GB. Disabled by default. """ if hasattr(self, "_view_entry"): self._view_entry.delete() return if hasattr(self, "_vds"): self._vds.delete(large_ok=large_ok) return if not large_ok: size = self.size_approx() if size > hub.constants.DELETE_SAFETY_SIZE: logger.info( f"Hub Dataset {self.path} was too large to delete. Try again with large_ok=True." ) return self._unlock() self.storage.clear()
def delete_group(self, name, large_ok=False)
-
Delete a tensor group from the dataset.
Examples
ds.delete_group("images/dogs")
Args
name
:str
- The name of tensor group to be deleted.
large_ok
:bool
- Delete tensor groups larger than 1GB. Disabled by default.
Returns
None
Raises
TensorGroupDoesNotExistError
- If tensor group of name
name
does not exist in the dataset.
Expand source code
@invalid_view_op @hub_reporter.record_call def delete_group(self, name: str, large_ok: bool = False): """Delete a tensor group from the dataset. Examples: ``` ds.delete_group("images/dogs") ``` Args: name (str): The name of tensor group to be deleted. large_ok (bool): Delete tensor groups larger than 1GB. Disabled by default. Returns: None Raises: TensorGroupDoesNotExistError: If tensor group of name `name` does not exist in the dataset. """ auto_checkout(self) full_path = filter_name(name, self.group_index) if full_path not in self._groups: raise TensorGroupDoesNotExistError(name) if not self._is_root(): return self.root.delete_group(full_path, large_ok) if not large_ok: size_approx = self[name].size_approx() if size_approx > hub.constants.DELETE_SAFETY_SIZE: logger.info( f"Group {name} was too large to delete. Try again with large_ok=True." ) return with self: meta = self.version_state["meta"] ffw_dataset_meta(meta) tensors = [ posixpath.join(name, tensor) for tensor in self[name]._all_tensors_filtered(include_hidden=True) ] meta.delete_group(name) for tensor in tensors: key = self.version_state["tensor_names"][tensor] delete_tensor(key, self) self.version_state["tensor_names"].pop(tensor) self.version_state["full_tensors"].pop(key) self.storage.maybe_flush()
def delete_tensor(self, name, large_ok=False)
-
Delete a tensor from the dataset.
Examples
ds.delete_tensor("images/cats")
Args
name
:str
- The name of tensor to be deleted.
large_ok
:bool
- Delete tensors larger than 1GB. Disabled by default.
Returns
None
Raises
TensorDoesNotExistError
- If tensor of name
name
does not exist in the dataset.
Expand source code
@invalid_view_op @hub_reporter.record_call def delete_tensor(self, name: str, large_ok: bool = False): """Delete a tensor from the dataset. Examples: ``` ds.delete_tensor("images/cats") ``` Args: name (str): The name of tensor to be deleted. large_ok (bool): Delete tensors larger than 1GB. Disabled by default. Returns: None Raises: TensorDoesNotExistError: If tensor of name `name` does not exist in the dataset. """ auto_checkout(self) name = filter_name(name, self.group_index) key = self.version_state["tensor_names"].get(name) if not key: raise TensorDoesNotExistError(name) if not tensor_exists(key, self.storage, self.version_state["commit_id"]): raise TensorDoesNotExistError(name) if not self._is_root(): return self.root.delete_tensor(name, large_ok) if not large_ok: chunk_engine = self.version_state["full_tensors"][key].chunk_engine size_approx = chunk_engine.num_samples * chunk_engine.min_chunk_size if size_approx > hub.constants.DELETE_SAFETY_SIZE: logger.info( f"Tensor {name} was too large to delete. Try again with large_ok=True." ) return with self: meta = self.meta key = self.version_state["tensor_names"].pop(name) if key not in meta.hidden_tensors: tensor_diff = Tensor(key, self).chunk_engine.commit_diff # if tensor was created in this commit, there's no diff for deleting it. if not tensor_diff.created: self._dataset_diff.tensor_deleted(name) delete_tensor(key, self) self.version_state["full_tensors"].pop(key) ffw_dataset_meta(meta) meta.delete_tensor(name) self.version_state["meta"] = meta for t_name in [ func(name) for func in ( get_sample_id_tensor_key, get_sample_info_tensor_key, get_sample_shape_tensor_key, ) ]: t_key = self.meta.tensor_names.get(t_name) if t_key and tensor_exists( t_key, self.storage, self.version_state["commit_id"] ): self.delete_tensor(t_name, large_ok=True) self.storage.flush()
def delete_view(self, id)
-
Deletes the view with given view id
Args
id
:str
- Id of the view to delete
Raises
KeyError
- if view with given id does not exist.
Expand source code
def delete_view(self, id: str): """Deletes the view with given view id Args: id (str): Id of the view to delete Raises: KeyError: if view with given id does not exist. """ try: with self._lock_queries_json(): qjson = self._read_queries_json() for i, q in enumerate(qjson): if q["id"] == id: qjson.pop(i) self.base_storage.subdir( ".queries/" + (q.get("path") or q["id"]) ).clear() self._write_queries_json(qjson) return except Exception: pass if self.path.startswith("hub://"): qds = Dataset._get_queries_ds_from_user_account() if qds: with qds._lock_queries_json(): qjson = qds._read_queries_json() for i, q in enumerate(qjson): if ( q["source-dataset"] == self.path and q["id"] == f"[{self.org_id}][{self.ds_name}]{id}" ): qjson.pop(i) qds.base_storage.subdir( ".queries/" + (q.get("path") or q["id"]) ).clear() qds._write_queries_json(qjson) return raise KeyError(f"No view with id {id} found in the dataset.")
def diff(self, id_1=None, id_2=None, as_dict=False)
-
Returns/displays the differences between commits/branches.
For each tensor this contains information about the sample indexes that were added/modified as well as whether the tensor was created.
Args
id_1
:str, Optional
- The first commit_id or branch name.
id_2
:str, Optional
- The second commit_id or branch name.
as_dict
:bool, Optional
- If True, returns a dictionary of the differences instead of printing them. This dictionary will have two keys - "tensor" and "dataset" which represents tensor level and dataset level changes, respectively. Defaults to False.
Note
- If both
id_1
andid_2
are None, the differences between the current state and the previous commit will be calculated. If you're at the head of the branch, this will show the uncommitted changes, if any. - If only
id_1
is provided, the differences between the current state and id_1 will be calculated. If you're at the head of the branch, this will take into account the uncommitted changes, if any. - If only
id_2
is provided, a ValueError will be raised. - If both
id_1
andid_2
are provided, the differences betweenid_1
andid_2
will be calculated.
Returns
Dict
-
The differences between the commits/branches if as_dict is True.
- If
id_1
andid_2
are None, a dictionary containing the differences between the current state and the previous commit will be returned. - If only
id_1
is provided, a dictionary containing the differences in the current state andid_1
respectively will be returned. - If only
id_2
is provided, a ValueError will be raised. - If both
id_1
andid_2
are provided, a dictionary containing the differences inid_1
andid_2
respectively will be returned.
- If
None
- If as_dict is False.
Example of a dict returned:
{ "image": {"data_added": [3, 6], "data_updated": {0, 2}, "created": False, "info_updated": False, "data_transformed_in_place": False}, "label": {"data_added": [0, 3], "data_updated": {}, "created": True, "info_updated": False, "data_transformed_in_place": False}, "other/stuff" : {data_added: [3, 3], data_updated: {1, 2}, created: True, "info_updated": False, "data_transformed_in_place": False}, }
Here, 'data_added' is a range of sample indexes that were added to the tensor:
- For example [3, 6] means that sample 3, 4 and 5 were added.
- Another example [3, 3] means that no samples were added as the range is empty
'data_updated' is a set of sample indexes that were updated.
- For example {0, 2} means that sample 0 and 2 were updated.
'created' is a boolean that is True if the tensor was created.
'info_updated' is a boolean that is True if the info of the tensor was updated.
'data_transformed_in_place' is a boolean that is True if the data of the tensor was transformed in place.
Raises
ValueError
- If
id_1
is None andid_2
is not None.
Expand source code
@hub_reporter.record_call def diff( self, id_1: Optional[str] = None, id_2: Optional[str] = None, as_dict=False ) -> Optional[Dict]: """Returns/displays the differences between commits/branches. For each tensor this contains information about the sample indexes that were added/modified as well as whether the tensor was created. Args: id_1 (str, Optional): The first commit_id or branch name. id_2 (str, Optional): The second commit_id or branch name. as_dict (bool, Optional): If True, returns a dictionary of the differences instead of printing them. This dictionary will have two keys - "tensor" and "dataset" which represents tensor level and dataset level changes, respectively. Defaults to False. Note: - If both `id_1` and `id_2` are None, the differences between the current state and the previous commit will be calculated. If you're at the head of the branch, this will show the uncommitted changes, if any. - If only `id_1` is provided, the differences between the current state and id_1 will be calculated. If you're at the head of the branch, this will take into account the uncommitted changes, if any. - If only `id_2` is provided, a ValueError will be raised. - If both `id_1` and `id_2` are provided, the differences between `id_1` and `id_2` will be calculated. Returns: Dict: The differences between the commits/branches if as_dict is True. - If `id_1` and `id_2` are None, a dictionary containing the differences between the current state and the previous commit will be returned. - If only `id_1` is provided, a dictionary containing the differences in the current state and `id_1` respectively will be returned. - If only `id_2` is provided, a ValueError will be raised. - If both `id_1` and `id_2` are provided, a dictionary containing the differences in `id_1` and `id_2` respectively will be returned. None: If as_dict is False. Example of a dict returned: ``` { "image": {"data_added": [3, 6], "data_updated": {0, 2}, "created": False, "info_updated": False, "data_transformed_in_place": False}, "label": {"data_added": [0, 3], "data_updated": {}, "created": True, "info_updated": False, "data_transformed_in_place": False}, "other/stuff" : {data_added: [3, 3], data_updated: {1, 2}, created: True, "info_updated": False, "data_transformed_in_place": False}, } ``` Here, 'data_added' is a range of sample indexes that were added to the tensor: - For example [3, 6] means that sample 3, 4 and 5 were added. - Another example [3, 3] means that no samples were added as the range is empty 'data_updated' is a set of sample indexes that were updated. - For example {0, 2} means that sample 0 and 2 were updated. 'created' is a boolean that is True if the tensor was created. 'info_updated' is a boolean that is True if the info of the tensor was updated. 'data_transformed_in_place' is a boolean that is True if the data of the tensor was transformed in place. Raises: ValueError: If `id_1` is None and `id_2` is not None. """ version_state, storage = self.version_state, self.storage res = get_changes_and_messages(version_state, storage, id_1, id_2) if as_dict: dataset_changes_1 = res[0] dataset_changes_2 = res[1] tensor_changes_1 = res[2] tensor_changes_2 = res[3] changes = {} if id_1 is None and id_2 is None: changes["dataset"] = dataset_changes_1 changes["tensor"] = tensor_changes_1 return changes changes["dataset"] = dataset_changes_1, dataset_changes_2 changes["tensor"] = tensor_changes_1, tensor_changes_2 return changes all_changes = get_all_changes_string(*res) print(all_changes) return None
def extend(self, samples, skip_ok=False)
-
Appends multiple rows of samples to mutliple tensors at once. This method expects all tensors being updated to be of the same length.
Args
samples
:Dict[str, Any]
- Dictionary with tensor names as keys and samples as values.
skip_ok
:bool
- Skip tensors not in
samples
if set to True.
Raises
KeyError
- If any tensor in the dataset is not a key in
samples
andskip_ok
is False. TensorDoesNotExistError
- If tensor in
samples
does not exist. ValueError
- If all tensors being updated are not of the same length.
NotImplementedError
- If an error occurs while writing tiles.
Exception
- Error while attempting to rollback appends.
Expand source code
def extend(self, samples: Dict[str, Any], skip_ok: bool = False): """Appends multiple rows of samples to mutliple tensors at once. This method expects all tensors being updated to be of the same length. Args: samples (Dict[str, Any]): Dictionary with tensor names as keys and samples as values. skip_ok (bool): Skip tensors not in `samples` if set to True. Raises: KeyError: If any tensor in the dataset is not a key in `samples` and `skip_ok` is False. TensorDoesNotExistError: If tensor in `samples` does not exist. ValueError: If all tensors being updated are not of the same length. NotImplementedError: If an error occurs while writing tiles. Exception: Error while attempting to rollback appends. """ if isinstance(samples, Dataset): samples = samples.tensors if not samples: return n = len(samples[next(iter(samples.keys()))]) for v in samples.values(): if len(v) != n: sizes = {k: len(v) for (k, v) in samples.items()} raise ValueError( f"Incoming samples are not of equal lengths. Incoming sample sizes: {sizes}" ) [f() for f in list(self._update_hooks.values())] for i in range(n): self.append({k: v[i] for k, v in samples.items()})
def filter(self, function, num_workers=0, scheduler='threaded', progressbar=True, save_result=False, result_path=None, result_ds_args=None)
-
Filters the dataset in accordance of filter function
f(x: sample) -> bool
Args
function
:Callable, str
- Filter function that takes sample as argument and returns True/False
if sample should be included in result. Also supports simplified expression evaluations.
See
DatasetQuery
for more details. num_workers
:int
- Level of parallelization of filter evaluations.
0
indicates in-place for-loop evaluation, multiprocessing is used otherwise. scheduler
:str
- Scheduler to use for multiprocessing evaluation.
threaded
is default progressbar
:bool
- Display progress bar while filtering. True is default
save_result
:bool
- If True, result of the filter will be saved to a dataset asynchronously.
result_path
:Optional, str
- Path to save the filter result. Only applicable if
save_result
is True. result_ds_args
:Optional, dict
- Additional args for result dataset. Only applicable if
save_result
is True.
Returns
View of Dataset with elements that satisfy filter function.
Example
Following filters are identical and return dataset view where all the samples have label equals to 2.
>>> dataset.filter(lambda sample: sample.labels.numpy() == 2) >>> dataset.filter('labels == 2')
Expand source code
@hub_reporter.record_call def filter( self, function: Union[Callable, str], num_workers: int = 0, scheduler: str = "threaded", progressbar: bool = True, save_result: bool = False, result_path: Optional[str] = None, result_ds_args: Optional[dict] = None, ): """Filters the dataset in accordance of filter function `f(x: sample) -> bool` Args: function (Callable, str): Filter function that takes sample as argument and returns True/False if sample should be included in result. Also supports simplified expression evaluations. See `hub.core.query.query.DatasetQuery` for more details. num_workers (int): Level of parallelization of filter evaluations. `0` indicates in-place for-loop evaluation, multiprocessing is used otherwise. scheduler (str): Scheduler to use for multiprocessing evaluation. `threaded` is default progressbar (bool): Display progress bar while filtering. True is default save_result (bool): If True, result of the filter will be saved to a dataset asynchronously. result_path (Optional, str): Path to save the filter result. Only applicable if `save_result` is True. result_ds_args (Optional, dict): Additional args for result dataset. Only applicable if `save_result` is True. Returns: View of Dataset with elements that satisfy filter function. Example: Following filters are identical and return dataset view where all the samples have label equals to 2. >>> dataset.filter(lambda sample: sample.labels.numpy() == 2) >>> dataset.filter('labels == 2') """ from hub.core.query import filter_dataset, query_dataset fn = query_dataset if isinstance(function, str) else filter_dataset result = fn( self, function, num_workers=num_workers, scheduler=scheduler, progressbar=progressbar, save_result=save_result, result_path=result_path, result_ds_args=result_ds_args, ) return result
def get_commit_details(self, commit_id)
-
Expand source code
def get_commit_details(self, commit_id) -> Dict: commit_node: CommitNode = self.version_state["commit_node_map"].get(commit_id) if commit_node is None: raise KeyError(f"Commit {commit_id} not found in dataset.") return { "commit": commit_node.commit_id, "author": commit_node.commit_user_name, "time": str(commit_node.commit_time)[:-7], "message": commit_node.commit_message, }
def get_creds_keys(self)
-
Returns the list of creds keys added to the dataset. These are used to fetch external data in linked tensors
Expand source code
def get_creds_keys(self) -> List[str]: """Returns the list of creds keys added to the dataset. These are used to fetch external data in linked tensors""" return self.link_creds.creds_keys
def get_view(self, id)
-
Returns the dataset view corresponding to
id
Examples
# save view ds[:100].save_view(id="first_100") # load view first_100 = ds.get_view("first_100").load() # 100 print(len(first_100))
See
Dataset.save_view()
to learn more about saving views.Args
id
:str
- id of required view.
Returns
Raises
KeyError
- If no such view exists.
Expand source code
def get_view(self, id: str) -> ViewEntry: """Returns the dataset view corresponding to `id` Examples: ``` # save view ds[:100].save_view(id="first_100") # load view first_100 = ds.get_view("first_100").load() # 100 print(len(first_100)) ``` See `Dataset.save_view` to learn more about saving views. Args: id (str): id of required view. Returns: `hub.core.dataset.view_entry.ViewEntry` Raises: KeyError: If no such view exists. """ queries = self._read_queries_json() for q in queries: if q["id"] == id: return ViewEntry(q, self) if self.path.startswith("hub://"): queries, qds = self._read_queries_json_from_user_account() for q in queries: if q["id"] == f"[{self.org_id}][{self.ds_name}]{id}": return ViewEntry(q, qds, True) raise KeyError(f"No view with id {id} found in the dataset.")
def get_views(self, commit_id=None)
-
Returns list of views stored in this Dataset.
Args
commit_id
:str
, optional-
- Commit from which views should be returned.
- If not specified, views from current commit is returned.
- If not specified, views from the currently checked out commit will be returned.
Returns
List of
ViewEntry
instances.Expand source code
def get_views(self, commit_id: Optional[str] = None) -> List[ViewEntry]: """Returns list of views stored in this Dataset. Args: commit_id (str, optional): - Commit from which views should be returned. - If not specified, views from current commit is returned. - If not specified, views from the currently checked out commit will be returned. Returns: List of `hub.core.dataset.view_entry.ViewEntry` instances. """ commit_id = commit_id or self.commit_id queries = self._read_queries_json() f = lambda x: x["source-dataset-version"] == commit_id ret = map( partial(ViewEntry, dataset=self), filter(f, queries), ) if self.path.startswith("hub://"): queries, qds = self._read_queries_json_from_user_account() if queries: ret = chain( ret, map( partial(ViewEntry, dataset=qds, external=True), filter(f, queries), ), ) return list(ret)
def load_view(self, id, optimize=False, num_workers=0, scheduler='threaded', progressbar=True)
-
Loads the view and returns the
Dataset
by id. Equivalent to ds.get_view(id).load().Args
id
:str
- id of the view to be loaded.
optimize
:bool
- If True, the dataset view is optimized by copying and rechunking the required data before loading. This is necessary to achieve fast streaming speeds when training models using the dataset view. The optimization process will take some time, depending on the size of the data.
num_workers
:int
- Number of workers to be used for the optimization process. Only applicable if
optimize=True
. Defaults to 0. scheduler
:str
- The scheduler to be used for optimization. Supported values include: 'serial', 'threaded', 'processed' and 'ray'.
Only applicable if
optimize=True
. Defaults to 'threaded'. progressbar
:bool
- Whether to use progressbar for optimization. Only applicable if
optimize=True
. Defaults to True.
Returns
Dataset
- The loaded view.
Raises
KeyError
- if view with given id does not exist.
Expand source code
def load_view( self, id: str, optimize: Optional[bool] = False, num_workers: int = 0, scheduler: str = "threaded", progressbar: Optional[bool] = True, ): """Loads the view and returns the `hub.Dataset` by id. Equivalent to ds.get_view(id).load(). Args: id (str): id of the view to be loaded. optimize (bool): If True, the dataset view is optimized by copying and rechunking the required data before loading. This is necessary to achieve fast streaming speeds when training models using the dataset view. The optimization process will take some time, depending on the size of the data. num_workers (int): Number of workers to be used for the optimization process. Only applicable if `optimize=True`. Defaults to 0. scheduler (str): The scheduler to be used for optimization. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Only applicable if `optimize=True`. Defaults to 'threaded'. progressbar (bool): Whether to use progressbar for optimization. Only applicable if `optimize=True`. Defaults to True. Returns: Dataset: The loaded view. Raises: KeyError: if view with given id does not exist. """ if optimize: return ( self.get_view(id) .optimize( num_workers=num_workers, scheduler=scheduler, progressbar=progressbar, ) .load() ) return self.get_view(id).load()
def log(self)
-
Displays the details of all the past commits.
Expand source code
@hub_reporter.record_call def log(self): """Displays the details of all the past commits.""" commit_node = self.version_state["commit_node"] print("---------------\nHub Version Log\n---------------\n") print(f"Current Branch: {self.version_state['branch']}") if self.has_head_changes: print("** There are uncommitted changes on this branch.") print() while commit_node: if not commit_node.is_head_node: print(f"{commit_node}\n") commit_node = commit_node.parent
def merge(self, target_id, conflict_resolution=None, delete_removed_tensors=False, force=False)
-
Merges the target_id into the current dataset.
Args
target_id
:str
- The commit_id or branch to merge.
conflict_resolution
:str, Optional
-
The strategy to use to resolve merge conflicts.
- Conflicts are scenarios where both the current dataset and the target id have made changes to the same sample/s since their common ancestor.
- Must be one of the following
- None - this is the default value, will raise an exception if there are conflicts.
- "ours" - during conflicts, values from the current dataset will be used.
- "theirs" - during conflicts, values from target id will be used.
delete_removed_tensors
:bool
- If true, deleted tensors will be deleted from the dataset.
force
:bool
-
Forces merge.
force
= True will have these effects in the following cases of merge conflicts:- If tensor is renamed on target but is missing from HEAD, renamed tensor will be registered as a new tensor on current branch.
- If tensor is renamed on both target and current branch, tensor on target will be registered as a new tensor on current branch.
- If tensor is renamed on target and a new tensor of the new name was created on the current branch, they will be merged.
Raises
Exception
- if dataset is a filtered view.
ValueError
- if the conflict resolution strategy is not one of the None, "ours", or "theirs".
Expand source code
@hub_reporter.record_call def merge( self, target_id: str, conflict_resolution: Optional[str] = None, delete_removed_tensors: bool = False, force: bool = False, ): """Merges the target_id into the current dataset. Args: target_id (str): The commit_id or branch to merge. conflict_resolution (str, Optional): The strategy to use to resolve merge conflicts. - - Conflicts are scenarios where both the current dataset and the target id have made changes to the same sample/s since their common ancestor. - Must be one of the following - None - this is the default value, will raise an exception if there are conflicts. - "ours" - during conflicts, values from the current dataset will be used. - "theirs" - during conflicts, values from target id will be used. delete_removed_tensors (bool): If true, deleted tensors will be deleted from the dataset. force (bool): Forces merge. - - `force` = True will have these effects in the following cases of merge conflicts: - If tensor is renamed on target but is missing from HEAD, renamed tensor will be registered as a new tensor on current branch. - If tensor is renamed on both target and current branch, tensor on target will be registered as a new tensor on current branch. - If tensor is renamed on target and a new tensor of the new name was created on the current branch, they will be merged. Raises: Exception: if dataset is a filtered view. ValueError: if the conflict resolution strategy is not one of the None, "ours", or "theirs". """ if self._is_filtered_view: raise Exception( "Cannot perform version control operations on a filtered dataset view." ) if conflict_resolution not in [None, "ours", "theirs"]: raise ValueError( f"conflict_resolution must be one of None, 'ours', or 'theirs'. Got {conflict_resolution}" ) try_flushing(self) self._initial_autoflush.append(self.storage.autoflush) self.storage.autoflush = False merge(self, target_id, conflict_resolution, delete_removed_tensors, force) self.storage.autoflush = self._initial_autoflush.pop()
def pop(self, index=None)
-
Removes a sample from all the tensors of the dataset. For any tensor if the index >= len(tensor), the sample won't be popped from it.
Args
index
:int, Optional
- The index of the sample to be removed. If it is None, the index becomes the length of the longest tensor - 1.
Raises
IndexError
- If the index is out of range.
Expand source code
@invalid_view_op def pop(self, index: Optional[int] = None): """ Removes a sample from all the tensors of the dataset. For any tensor if the index >= len(tensor), the sample won't be popped from it. Args: index (int, Optional): The index of the sample to be removed. If it is None, the index becomes the length of the longest tensor - 1. Raises: IndexError: If the index is out of range. """ max_len = max((t.num_samples for t in self.tensors.values()), default=0) if max_len == 0: raise IndexError("Can't pop from empty dataset.") if index is None: index = max_len - 1 if index < 0: raise IndexError("Pop doesn't support negative indices.") elif index >= max_len: raise IndexError( f"Index {index} is out of range. The longest tensor has {max_len} samples." ) for tensor in self.tensors.values(): if tensor.num_samples > index: tensor.pop(index)
def populate_creds(self, creds_key, creds)
-
Populates the creds key added in add_creds_key with the given creds. These creds are used to fetch the external data. This needs to be done everytime the dataset is reloaded for datasets that contain links to external data.
Examples
# create/load a dataset ds = hub.dataset("path/to/dataset") # add a new creds key ds.add_creds_key("my_s3_key") # populate the creds ds.populate_creds("my_s3_key", {"aws_access_key_id": "my_access_key", "aws_secret_access_key": "my_secret_key"})
Expand source code
def populate_creds(self, creds_key: str, creds: dict): """Populates the creds key added in add_creds_key with the given creds. These creds are used to fetch the external data. This needs to be done everytime the dataset is reloaded for datasets that contain links to external data. Examples: ``` # create/load a dataset ds = hub.dataset("path/to/dataset") # add a new creds key ds.add_creds_key("my_s3_key") # populate the creds ds.populate_creds("my_s3_key", {"aws_access_key_id": "my_access_key", "aws_secret_access_key": "my_secret_key"}) ``` """ self.link_creds.populate_creds(creds_key, creds)
def pytorch(self, transform=None, tensors=None, tobytes=False, num_workers=1, batch_size=1, drop_last=False, collate_fn=None, pin_memory=False, shuffle=False, buffer_size=2048, use_local_cache=False, use_progress_bar=False, return_index=True, pad_tensors=False)
-
Converts the dataset into a pytorch Dataloader.
Note
Pytorch does not support uint16, uint32, uint64 dtypes. These are implicitly type casted to int32, int64 and int64 respectively. This spins up it's own workers to fetch data.
Args
transform
:Callable, Optional
- Transformation function to be applied to each sample.
tensors
:List, Optional
- Optionally provide a list of tensor names in the ordering that your training script expects. For example, if you have a dataset that has "image" and "label" tensors, if
tensors=["image", "label"]
, your training script should expect each batch will be provided as a tuple of (image, label). tobytes
:bool
- If True, samples will not be decompressed and their raw bytes will be returned instead of numpy arrays. Can also be a list of tensors, in which case those tensors alone will not be decompressed.
num_workers
:int
- The number of workers to use for fetching data in parallel.
batch_size
:int
- Number of samples per batch to load. Default value is 1.
drop_last
:bool
- Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Default value is False. Read torch.utils.data.DataLoader docs for more details.
collate_fn
:Callable, Optional
- merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset. Read torch.utils.data.DataLoader docs for more details.
pin_memory
:bool
- If True, the data loader will copy Tensors into CUDA pinned memory before returning them. Default value is False. Read torch.utils.data.DataLoader docs for more details.
shuffle
:bool
- If True, the data loader will shuffle the data indices. Default value is False. Details about how hub shuffles data can be found at https://docs.activeloop.ai/how-hub-works/shuffling-in-ds.pytorch
buffer_size
:int
- The size of the buffer used to shuffle the data in MBs. Defaults to 2048 MB. Increasing the buffer_size will increase the extent of shuffling.
use_local_cache
:bool
- If True, the data loader will use a local cache to store data. This is useful when the dataset can fit on the machine and we don't want to fetch the data multiple times for each iteration. Default value is False.
use_progress_bar
:bool
- If True, tqdm will be wrapped around the returned dataloader. Default value is True.
return_index
:bool
- If True, the returned dataloader will have a key "index" that contains the index of the sample(s) in the original dataset. Default value is True.
pad_tensors
:bool
- If True, shorter tensors will be padded to the length of the longest tensor. Default value is False.
Returns
A torch.utils.data.DataLoader object.
Raises
EmptyTensorError
- If one or more tensors being passed to pytorch are empty.
Expand source code
@hub_reporter.record_call def pytorch( self, transform: Optional[Callable] = None, tensors: Optional[Sequence[str]] = None, tobytes: Union[bool, Sequence[str]] = False, num_workers: int = 1, batch_size: int = 1, drop_last: bool = False, collate_fn: Optional[Callable] = None, pin_memory: bool = False, shuffle: bool = False, buffer_size: int = 2048, use_local_cache: bool = False, use_progress_bar: bool = False, return_index: bool = True, pad_tensors: bool = False, ): """Converts the dataset into a pytorch Dataloader. Note: Pytorch does not support uint16, uint32, uint64 dtypes. These are implicitly type casted to int32, int64 and int64 respectively. This spins up it's own workers to fetch data. Args: transform (Callable, Optional): Transformation function to be applied to each sample. tensors (List, Optional): Optionally provide a list of tensor names in the ordering that your training script expects. For example, if you have a dataset that has "image" and "label" tensors, if `tensors=["image", "label"]`, your training script should expect each batch will be provided as a tuple of (image, label). tobytes (bool): If True, samples will not be decompressed and their raw bytes will be returned instead of numpy arrays. Can also be a list of tensors, in which case those tensors alone will not be decompressed. num_workers (int): The number of workers to use for fetching data in parallel. batch_size (int): Number of samples per batch to load. Default value is 1. drop_last (bool): Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Default value is False. Read torch.utils.data.DataLoader docs for more details. collate_fn (Callable, Optional): merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset. Read torch.utils.data.DataLoader docs for more details. pin_memory (bool): If True, the data loader will copy Tensors into CUDA pinned memory before returning them. Default value is False. Read torch.utils.data.DataLoader docs for more details. shuffle (bool): If True, the data loader will shuffle the data indices. Default value is False. Details about how hub shuffles data can be found at https://docs.activeloop.ai/how-hub-works/shuffling-in-ds.pytorch buffer_size (int): The size of the buffer used to shuffle the data in MBs. Defaults to 2048 MB. Increasing the buffer_size will increase the extent of shuffling. use_local_cache (bool): If True, the data loader will use a local cache to store data. This is useful when the dataset can fit on the machine and we don't want to fetch the data multiple times for each iteration. Default value is False. use_progress_bar (bool): If True, tqdm will be wrapped around the returned dataloader. Default value is True. return_index (bool): If True, the returned dataloader will have a key "index" that contains the index of the sample(s) in the original dataset. Default value is True. pad_tensors (bool): If True, shorter tensors will be padded to the length of the longest tensor. Default value is False. Returns: A torch.utils.data.DataLoader object. Raises: EmptyTensorError: If one or more tensors being passed to pytorch are empty. """ from hub.integrations import dataset_to_pytorch as to_pytorch dataloader = to_pytorch( self, transform=transform, tensors=tensors, tobytes=tobytes, num_workers=num_workers, batch_size=batch_size, drop_last=drop_last, collate_fn=collate_fn, pin_memory=pin_memory, shuffle=shuffle, buffer_size=buffer_size, use_local_cache=use_local_cache, return_index=return_index, pad_tensors=pad_tensors, ) if use_progress_bar: dataloader = tqdm(dataloader, desc=self.path, total=len(self) // batch_size) return dataloader
def rechunk(self, tensors=None, num_workers=0, scheduler='threaded', progressbar=True)
-
Rewrites the underlying chunks to make their sizes optimal. This is usually needed in cases where a lot of updates have been made to the data.
Args
tensors
:str, List[str], Optional
- Name/names of the tensors to rechunk. If None, all tensors in the dataset are rechunked.
num_workers
:int
- The number of workers to use for rechunking. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler.
scheduler
:str
- The scheduler to be used for rechunking. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'.
progressbar
:bool
- Displays a progress bar if True (default).
Expand source code
def rechunk( self, tensors: Optional[Union[str, List[str]]] = None, num_workers: int = 0, scheduler: str = "threaded", progressbar: bool = True, ): """Rewrites the underlying chunks to make their sizes optimal. This is usually needed in cases where a lot of updates have been made to the data. Args: tensors (str, List[str], Optional): Name/names of the tensors to rechunk. If None, all tensors in the dataset are rechunked. num_workers (int): The number of workers to use for rechunking. Defaults to 0. When set to 0, it will always use serial processing, irrespective of the scheduler. scheduler (str): The scheduler to be used for rechunking. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Defaults to 'threaded'. progressbar (bool): Displays a progress bar if True (default). """ if tensors is None: tensors = list(self.tensors) elif isinstance(tensors, str): tensors = [tensors] # identity function that rechunks @hub.compute def rechunking(sample_in, samples_out): for tensor in tensors: samples_out[tensor].append(sample_in[tensor]) rechunking().eval( self, num_workers=num_workers, scheduler=scheduler, progressbar=progressbar, skip_ok=True, )
def rename(self, path)
-
Renames the dataset to
path
.Example
ds = hub.load("hub://username/dataset") ds.rename("hub://username/renamed_dataset")
Args
path
:str, pathlib.Path
- New path to the dataset.
Raises
RenameError
- If
path
points to a different directory.
Expand source code
@invalid_view_op @hub_reporter.record_call def rename(self, path: Union[str, pathlib.Path]): """Renames the dataset to `path`. Example: ``` ds = hub.load("hub://username/dataset") ds.rename("hub://username/renamed_dataset") ``` Args: path (str, pathlib.Path): New path to the dataset. Raises: RenameError: If `path` points to a different directory. """ path = convert_pathlib_to_string_if_needed(path) path = path.rstrip("/") if posixpath.split(path)[0] != posixpath.split(self.path)[0]: raise RenameError self.base_storage.rename(path) self.path = path
def rename_group(self, name, new_name)
-
Renames group with name
name
tonew_name
Args
name
:str
- Name of group to be renamed.
new_name
:str
- New name of group.
Raises
TensorGroupDoesNotExistError
- If tensor group of name
name
does not exist in the dataset. TensorAlreadyExistsError
- Duplicate tensors are not allowed.
TensorGroupAlreadyExistsError
- Duplicate tensor groups are not allowed.
InvalidTensorGroupNameError
- If
name
is in dataset attributes. RenameError
- If
new_name
points to a group different fromname
.
Expand source code
@hub_reporter.record_call def rename_group(self, name: str, new_name: str) -> None: """Renames group with name `name` to `new_name` Args: name (str): Name of group to be renamed. new_name (str): New name of group. Raises: TensorGroupDoesNotExistError: If tensor group of name `name` does not exist in the dataset. TensorAlreadyExistsError: Duplicate tensors are not allowed. TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed. InvalidTensorGroupNameError: If `name` is in dataset attributes. RenameError: If `new_name` points to a group different from `name`. """ auto_checkout(self) name = filter_name(name, self.group_index) new_name = filter_name(new_name, self.group_index) if name not in self._groups: raise TensorGroupDoesNotExistError(name) if posixpath.split(name)[0] != posixpath.split(new_name)[0]: raise RenameError("Names does not match.") if new_name in self.version_state["tensor_names"]: raise TensorAlreadyExistsError(new_name) if new_name in self._groups: raise TensorGroupAlreadyExistsError(new_name) new_tensor_name = posixpath.split(new_name)[1] if not new_tensor_name or new_tensor_name in dir(self): raise InvalidTensorGroupNameError(new_name) meta = self.meta meta.rename_group(name, new_name) root = self.root for tensor in filter( lambda x: x.startswith(name), map(lambda y: y.meta.name or y.key, self.tensors.values()), ): root._rename_tensor( tensor, posixpath.join(new_name, posixpath.relpath(tensor, name)), ) self.storage.maybe_flush()
def rename_tensor(self, name, new_name)
-
Renames tensor with name
name
tonew_name
Args
name
:str
- Name of tensor to be renamed.
new_name
:str
- New name of tensor.
Returns
Tensor
- Renamed tensor.
Raises
TensorDoesNotExistError
- If tensor of name
name
does not exist in the dataset. TensorAlreadyExistsError
- Duplicate tensors are not allowed.
TensorGroupAlreadyExistsError
- Duplicate tensor groups are not allowed.
InvalidTensorNameError
- If
new_name
is in dataset attributes. RenameError
- If
new_name
points to a group different fromname
.
Expand source code
@hub_reporter.record_call def rename_tensor(self, name: str, new_name: str) -> "Tensor": """Renames tensor with name `name` to `new_name` Args: name (str): Name of tensor to be renamed. new_name (str): New name of tensor. Returns: Tensor: Renamed tensor. Raises: TensorDoesNotExistError: If tensor of name `name` does not exist in the dataset. TensorAlreadyExistsError: Duplicate tensors are not allowed. TensorGroupAlreadyExistsError: Duplicate tensor groups are not allowed. InvalidTensorNameError: If `new_name` is in dataset attributes. RenameError: If `new_name` points to a group different from `name`. """ auto_checkout(self) if name not in self._tensors(): raise TensorDoesNotExistError(name) name = filter_name(name, self.group_index) new_name = filter_name(new_name, self.group_index) if posixpath.split(name)[0] != posixpath.split(new_name)[0]: raise RenameError("New name of tensor cannot point to a different group") if new_name in self.version_state["tensor_names"]: raise TensorAlreadyExistsError(new_name) if new_name in self._groups: raise TensorGroupAlreadyExistsError(new_name) new_tensor_name = posixpath.split(new_name)[1] if not new_tensor_name or new_tensor_name in dir(self): raise InvalidTensorNameError(new_name) tensor = self.root._rename_tensor(name, new_name) self.storage.maybe_flush() return tensor
def reset(self)
-
Resets the uncommitted changes present in the branch.
Note
- The uncommitted data is deleted from underlying storage, this is not a reversible operation.
Expand source code
@invalid_view_op def reset(self): """Resets the uncommitted changes present in the branch. Note: - The uncommitted data is deleted from underlying storage, this is not a reversible operation. """ storage, version_state = self.storage, self.version_state if version_state["commit_node"].children: print("You are not at the head node of the branch, cannot reset.") return if not self.has_head_changes: print("There are no uncommitted changes on this branch.") return # delete metas first self._delete_metas() if self.commit_id is None: storage.clear() self._populate_meta() else: prefix = "/".join(("versions", self.pending_commit_id)) storage.clear(prefix=prefix) src_id, dest_id = self.commit_id, self.pending_commit_id # by doing this checkout, we get list of tensors in previous commit, which is what we require for copying metas and create_commit_chunk_set self.checkout(src_id) copy_metas(src_id, dest_id, storage, version_state) create_commit_chunk_sets(dest_id, storage, version_state) self.checkout(dest_id) load_meta(self) self._info = None self._ds_diff = None
def save_view(self, message=None, path=None, id=None, optimize=False, num_workers=0, scheduler='threaded', verbose=True, **ds_args)
-
Saves a dataset view as a virtual dataset (VDS)
Examples
# Save to specified path vds_path = ds[:10].save_view(path="views/first_10", id="first_10") # vds_path = views/first_10
# Path unspecified vds_path = ds[:100].save_view(id="first_100", message="first 100 samples") # vds_path = path/to/dataset/.queries/first_100
# Random id vds_path = ds[:100].save_view() # vds_path = "path/to/dataset/.queries/92f41922ed0471ec2d27690b7351fc96bea060e6c5ee22b14f7ffa5f291aa068"
See
Dataset.get_view()
to learn how to load views by id. These virtual datasets can also be loaded from their path like normal datasets.Args
message
:Optional, str
- Custom user message.
path
:Optional, str, pathlib.Path
-
- The VDS will be saved as a standalone dataset at the specified path.
- If not specified, the VDS is saved under
.queries
subdirectory of the source dataset's storage. - If the user doesn't have write access to the source dataset and the source dataset is a hub cloud dataset, then the VDS is saved is saved under the user's hub account and can be accessed using
hub.load(f"hub://{username}/queries/{query_hash}")
.
id
:Optional, str
- Unique id for this view. Random id will be generated if not specified.
optimize
:bool
-
- If True, the dataset view will be optimized by copying and rechunking the required data. This is necessary to achieve fast streaming speeds when training models using the dataset view. The optimization process will take some time, depending on the size of the data.
- You can also choose to optimize the saved view later by calling its
optimize
method: SeeViewEntry.optimize()
.
num_workers
:int
- Number of workers to be used for optimization process. Applicable only if
optimize=True
. Defaults to 0. scheduler
:str
- The scheduler to be used for optimization. Supported values include: 'serial', 'threaded', 'processed' and 'ray'.
Only applicable if
optimize=True
. Defaults to 'threaded'. verbose
:bool
- If True, logs will be printed. Defaults to True.
ds_args
:dict
- Additional args for creating VDS when path is specified. (See documentation for
dataset.init()
)
Note
Specifying
path
makes the view external. External views cannot be accessed using the parent dataset'sDataset.get_view()
,Dataset.load_view()
,Dataset.delete_view()
methods. They have to be loaded usinghub. load(path)
.Returns
str
- Path to the saved VDS.
Raises
ReadOnlyModeError
- When attempting to save a view inplace and the user doesn't have write access.
DatasetViewSavingError
- If HEAD node has uncommitted changes.
Expand source code
def save_view( self, message: Optional[str] = None, path: Optional[Union[str, pathlib.Path]] = None, id: Optional[str] = None, optimize: bool = False, num_workers: int = 0, scheduler: str = "threaded", verbose: bool = True, **ds_args, ) -> str: """Saves a dataset view as a virtual dataset (VDS) Examples: ``` # Save to specified path vds_path = ds[:10].save_view(path="views/first_10", id="first_10") # vds_path = views/first_10 ``` # Path unspecified vds_path = ds[:100].save_view(id="first_100", message="first 100 samples") # vds_path = path/to/dataset/.queries/first_100 ``` # Random id vds_path = ds[:100].save_view() # vds_path = "path/to/dataset/.queries/92f41922ed0471ec2d27690b7351fc96bea060e6c5ee22b14f7ffa5f291aa068" ``` See `Dataset.get_view` to learn how to load views by id. These virtual datasets can also be loaded from their path like normal datasets. Args: message (Optional, str): Custom user message. path (Optional, str, pathlib.Path): - The VDS will be saved as a standalone dataset at the specified path. - If not specified, the VDS is saved under `.queries` subdirectory of the source dataset's storage. - If the user doesn't have write access to the source dataset and the source dataset is a hub cloud dataset, then the VDS is saved is saved under the user's hub account and can be accessed using `hub.load(f"hub://{username}/queries/{query_hash}")`. id (Optional, str): Unique id for this view. Random id will be generated if not specified. optimize (bool): - If True, the dataset view will be optimized by copying and rechunking the required data. This is necessary to achieve fast streaming speeds when training models using the dataset view. The optimization process will take some time, depending on the size of the data. - You can also choose to optimize the saved view later by calling its `optimize` method: See `hub.core.dataset.view_entry.ViewEntry.optimize`. num_workers (int): Number of workers to be used for optimization process. Applicable only if `optimize=True`. Defaults to 0. scheduler (str): The scheduler to be used for optimization. Supported values include: 'serial', 'threaded', 'processed' and 'ray'. Only applicable if `optimize=True`. Defaults to 'threaded'. verbose (bool): If True, logs will be printed. Defaults to True. ds_args (dict): Additional args for creating VDS when path is specified. (See documentation for `hub.dataset()`) Note: Specifying `path` makes the view external. External views cannot be accessed using the parent dataset's `Dataset.get_view`, `Dataset.load_view`, `Dataset.delete_view` methods. They have to be loaded using `hub.\0load(path)`. Returns: str: Path to the saved VDS. Raises: ReadOnlyModeError: When attempting to save a view inplace and the user doesn't have write access. DatasetViewSavingError: If HEAD node has uncommitted changes. """ return self._save_view( path, id, message, optimize, num_workers, scheduler, verbose, False, **ds_args, )
def summary(self)
-
Prints a summary of the dataset.
Expand source code
def summary(self): """Prints a summary of the dataset.""" pretty_print = summary_dataset(self) print(self) print(pretty_print)
def tensorflow(self, tensors=None, tobytes=False)
-
Converts the dataset into a tensorflow compatible format.
See https://www.tensorflow.org/api_docs/python/tf/data/Dataset
Args
tensors
:List, Optional
- Optionally provide a list of tensor names in the ordering that your training script expects. For example, if you have a dataset that has "image" and "label" tensors, if
tensors=["image", "label"]
, your training script should expect each batch will be provided as a tuple of (image, label). tobytes
:bool
- If True, samples will not be decompressed and their raw bytes will be returned instead of numpy arrays. Can also be a list of tensors, in which case those tensors alone will not be decompressed.
Returns
tf.data.Dataset object that can be used for tensorflow training.
Expand source code
@hub_reporter.record_call def tensorflow( self, tensors: Optional[Sequence[str]] = None, tobytes: Union[bool, Sequence[str]] = False, ): """Converts the dataset into a tensorflow compatible format. See https://www.tensorflow.org/api_docs/python/tf/data/Dataset Args: tensors (List, Optional): Optionally provide a list of tensor names in the ordering that your training script expects. For example, if you have a dataset that has "image" and "label" tensors, if `tensors=["image", "label"]`, your training script should expect each batch will be provided as a tuple of (image, label). tobytes (bool): If True, samples will not be decompressed and their raw bytes will be returned instead of numpy arrays. Can also be a list of tensors, in which case those tensors alone will not be decompressed. Returns: tf.data.Dataset object that can be used for tensorflow training. """ return dataset_to_tensorflow(self, tensors=tensors, tobytes=tobytes)
def update_creds_key(self, old_creds_key, new_creds_key)
-
Replaces the old creds key with the new creds key. This is used to replace the creds key used for external data.
Expand source code
def update_creds_key(self, old_creds_key: str, new_creds_key: str): """Replaces the old creds key with the new creds key. This is used to replace the creds key used for external data.""" replaced_index = self.link_creds.replace_creds(old_creds_key, new_creds_key) save_link_creds(self.link_creds, self.storage, replaced_index=replaced_index)
def visualize(self, width=None, height=None)
-
Visualizes the dataset in the Jupyter notebook.
Args
width
- Union[int, str, None] Optional width of the visualizer canvas.
height
- Union[int, str, None] Optional height of the visualizer canvas.
Raises
Exception
- If a dataset is not hub cloud dataset and the visualization happens in colab.
Expand source code
def visualize( self, width: Union[int, str, None] = None, height: Union[int, str, None] = None ): """ Visualizes the dataset in the Jupyter notebook. Args: width: Union[int, str, None] Optional width of the visualizer canvas. height: Union[int, str, None] Optional height of the visualizer canvas. Raises: Exception: If a dataset is not hub cloud dataset and the visualization happens in colab. """ from hub.visualizer import visualize hub_reporter.feature_report(feature_name="visualize", parameters={}) if is_colab(): raise Exception("Cannot visualize non hub cloud dataset in Colab.") else: visualize(self.storage, width=width, height=height)
class tensor (key, dataset, index=None, is_iteration=False, chunk_engine=None)
-
Initializes a new tensor.
Note
This operation does not create a new tensor in the storage provider, and should normally only be performed by Hub internals.
Args
key
:str
- The internal identifier for this tensor.
dataset
:Dataset
- The dataset that this tensor is located in.
index
- The Index object restricting the view of this tensor. Can be an int, slice, or (used internally) an Index object.
is_iteration
:bool
- If this tensor is being used as an iterator.
chunk_engine
:ChunkEngine
, optional- The underlying chunk_engine for the tensor
Raises
TensorDoesNotExistError
- If no tensor with
key
exists and atensor_meta
was not provided.
Expand source code
class Tensor: def __init__( self, key: str, dataset, index: Optional[Index] = None, is_iteration: bool = False, chunk_engine: Optional[ChunkEngine] = None, ): """Initializes a new tensor. Note: This operation does not create a new tensor in the storage provider, and should normally only be performed by Hub internals. Args: key (str): The internal identifier for this tensor. dataset (Dataset): The dataset that this tensor is located in. index: The Index object restricting the view of this tensor. Can be an int, slice, or (used internally) an Index object. is_iteration (bool): If this tensor is being used as an iterator. chunk_engine (ChunkEngine, optional): The underlying chunk_engine for the tensor Raises: TensorDoesNotExistError: If no tensor with `key` exists and a `tensor_meta` was not provided. """ self.key = key self.dataset = dataset self.storage: LRUCache = dataset.storage self.index = index or Index() self.version_state = dataset.version_state self.link_creds = dataset.link_creds self.is_iteration = is_iteration commit_id = self.version_state["commit_id"] if not self.is_iteration and not tensor_exists( self.key, self.storage, commit_id ): raise TensorDoesNotExistError(self.key) meta_key = get_tensor_meta_key(self.key, commit_id) meta = self.storage.get_hub_object(meta_key, TensorMeta) if chunk_engine is not None: self.chunk_engine = chunk_engine elif meta.is_link: self.chunk_engine = LinkedChunkEngine( self.key, self.storage, self.version_state, link_creds=dataset.link_creds, ) else: self.chunk_engine = ChunkEngine(self.key, self.storage, self.version_state) if not self.pad_tensor and not self.is_iteration: self.index.validate(self.num_samples) # An optimization to skip multiple .numpy() calls when performing inplace ops on slices: self._skip_next_setitem = False @property def pad_tensor(self): return self.dataset._pad_tensors def _write_initialization(self): self.storage.check_readonly() # if not the head node, checkout to an auto branch that is newly created if auto_checkout(self.dataset): self.chunk_engine = self.version_state["full_tensors"][ self.key ].chunk_engine @invalid_view_op def extend( self, samples: Union[np.ndarray, Sequence[InputSample], "Tensor"], progressbar: bool = False, ): """Extends the end of the tensor by appending multiple elements from a sequence. Accepts a sequence, a single batched numpy array, or a sequence of `hub.read` outputs, which can be used to load files. See examples down below. Example: Numpy input: >>> len(tensor) 0 >>> tensor.extend(np.zeros((100, 28, 28, 1))) >>> len(tensor) 100 File input: >>> len(tensor) 0 >>> tensor.extend([ hub.read("path/to/image1"), hub.read("path/to/image2"), ]) >>> len(tensor) 2 Args: samples (np.ndarray, Sequence, Sequence[Sample]): The data to add to the tensor. The length should be equal to the number of samples to add. progressbar (bool): Specifies whether a progressbar should be displayed while extending. Raises: TensorDtypeMismatchError: TensorDtypeMismatchError: Dtype for array must be equal to or castable to this tensor's dtype """ self._write_initialization() [f() for f in list(self.dataset._update_hooks.values())] self.chunk_engine.extend( samples, progressbar=progressbar, link_callback=self._append_to_links if self.meta.links else None, ) @property def info(self): """Returns the information about the tensor. Returns: TensorInfo: Information about the tensor. """ commit_id = self.version_state["commit_id"] chunk_engine = self.chunk_engine if chunk_engine._info is None or chunk_engine._info_commit_id != commit_id: path = get_tensor_info_key(self.key, commit_id) chunk_engine._info = load_info(path, self.dataset, self.key) chunk_engine._info_commit_id = commit_id self.storage.register_hub_object(path, chunk_engine._info) return chunk_engine._info @info.setter def info(self, value): if isinstance(value, dict): info = self.info info.replace_with(value) else: raise TypeError("Info must be set with type Dict") @invalid_view_op def append(self, sample: InputSample): """Appends a single sample to the end of the tensor. Can be an array, scalar value, or the return value from `hub.read`, which can be used to load files. See examples down below. Examples: Numpy input: >>> len(tensor) 0 >>> tensor.append(np.zeros((28, 28, 1))) >>> len(tensor) 1 File input: >>> len(tensor) 0 >>> tensor.append(hub.read("path/to/file")) >>> len(tensor) 1 Args: sample (InputSample): The data to append to the tensor. `Sample` is generated by `hub.read`. See the above examples. """ self.extend([sample], progressbar=False) def clear(self): """Deletes all samples from the tensor""" self.chunk_engine.clear() sample_id_key = get_sample_id_tensor_key(self.key) try: sample_id_tensor = Tensor(sample_id_key, self.dataset) sample_id_tensor.chunk_engine.clear() self.meta.links.clear() self.meta.is_dirty = True except TensorDoesNotExistError: pass def modified_samples( self, target_id: Optional[str] = None, return_indexes: Optional[bool] = False ): """Returns a slice of the tensor with only those elements that were modified/added. By default the modifications are calculated relative to the previous commit made, but this can be changed by providing a `target id`. Args: target_id (str, optional): The commit id or branch name to calculate the modifications relative to. Defaults to None. return_indexes (bool, optional): If True, returns the indexes of the modified elements. Defaults to False. Returns: Tensor: A new tensor with only the modified elements if `return_indexes` is False. Tuple[Tensor, List[int]]: A new tensor with only the modified elements and the indexes of the modified elements if `return_indexes` is True. Raises: TensorModifiedError: If a target id is passed which is not an ancestor of the current commit. """ current_commit_id = self.version_state["commit_id"] indexes = get_modified_indexes( self.key, current_commit_id, target_id, self.version_state, self.storage, ) tensor = self[indexes] if return_indexes: return tensor, indexes return tensor @property def meta(self): return self.chunk_engine.tensor_meta @property def shape(self) -> Tuple[Optional[int], ...]: """Get the shape of this tensor. Length is included. Note: If you don't want `None` in the output shape or want the lower/upper bound shapes, use `tensor.shape_interval` instead. Example: >>> tensor.append(np.zeros((10, 10))) >>> tensor.append(np.zeros((10, 15))) >>> tensor.shape (2, 10, None) Returns: tuple: Tuple where each value is either `None` (if that axis is dynamic) or an `int` (if that axis is fixed). """ sample_shape_tensor = self._sample_shape_tensor sample_shape_provider = ( self._sample_shape_provider(sample_shape_tensor) if sample_shape_tensor else None ) shape: Tuple[Optional[int], ...] shape = self.chunk_engine.shape( self.index, sample_shape_provider=sample_shape_provider ) if not shape and self.meta.max_shape: shape = (0,) * len(self.meta.max_shape) if self.meta.max_shape == [0, 0, 0]: shape = () return shape @property def size(self) -> Optional[int]: s = 1 for x in self.shape: if x is None: return None s *= x # not using np.prod to avoid overflow return s @property def ndim(self) -> int: return self.chunk_engine.ndim(self.index) @property def dtype(self) -> Optional[np.dtype]: if self.base_htype in ("json", "list"): return np.dtype(str) if self.meta.dtype: return np.dtype(self.meta.dtype) return None @property def is_sequence(self): return self.meta.is_sequence @property def is_link(self): return self.meta.is_link @property def verify(self): return self.is_link and self.meta.verify @property def htype(self): htype = self.meta.htype if self.is_sequence: htype = f"sequence[{htype}]" if self.is_link: htype = f"link[{htype}]" return htype @property def hidden(self) -> bool: return self.meta.hidden @property def base_htype(self): return self.meta.htype @property def shape_interval(self) -> ShapeInterval: """Returns a `ShapeInterval` object that describes this tensor's shape more accurately. Length is included. Note: If you are expecting a `tuple`, use `tensor.shape` instead. Example: >>> tensor.append(np.zeros((10, 10))) >>> tensor.append(np.zeros((10, 15))) >>> tensor.shape_interval ShapeInterval(lower=(2, 10, 10), upper=(2, 10, 15)) >>> str(tensor.shape_interval) (2, 10, 10:15) Returns: ShapeInterval: Object containing `lower` and `upper` properties. """ return self.chunk_engine.shape_interval @property def is_dynamic(self) -> bool: """Will return True if samples in this tensor have shapes that are unequal.""" return self.shape_interval.is_dynamic @property def num_samples(self) -> int: """Returns the length of the primary axis of the tensor. Ignores any applied indexing and returns the total length. """ if self.is_sequence: return self.chunk_engine._sequence_length return self.chunk_engine.num_samples def __len__(self): """Returns the length of the primary axis of the tensor. Accounts for indexing into the tensor object. Examples: >>> len(tensor) 0 >>> tensor.extend(np.zeros((100, 10, 10))) >>> len(tensor) 100 >>> len(tensor[5:10]) 5 Returns: int: The current length of this tensor. """ # catch corrupted datasets / user tampering ASAP self.chunk_engine.validate_num_samples_is_synchronized() return self.index.length(self.num_samples) def __getitem__( self, item: Union[int, slice, List[int], Tuple[Union[int, slice, Tuple[int]]], Index], is_iteration: bool = False, ): if not isinstance(item, (int, slice, list, tuple, Index)): raise InvalidKeyTypeError(item) return Tensor( self.key, self.dataset, index=self.index[item], is_iteration=is_iteration, chunk_engine=self.chunk_engine, ) def _get_bigger_dtype(self, d1, d2): if np.can_cast(d1, d2): if np.can_cast(d2, d1): return d1 else: return d2 else: if np.can_cast(d2, d1): return d2 else: return np.object def _infer_np_dtype(self, val: Any) -> np.dtype: # TODO refac if hasattr(val, "dtype"): return val.dtype elif isinstance(val, int): return np.array(0).dtype elif isinstance(val, float): return np.array(0.0).dtype elif isinstance(val, str): return np.array("").dtype elif isinstance(val, bool): return np.dtype(bool) elif isinstance(val, Sequence): return reduce(self._get_bigger_dtype, map(self._infer_np_dtype, val)) else: raise TypeError(f"Cannot infer numpy dtype for {val}") def __setitem__(self, item: Union[int, slice], value: Any): """Update samples with new values. Example: >>> tensor.append(np.zeros((10, 10))) >>> tensor.shape (1, 10, 10) >>> tensor[0] = np.zeros((3, 3)) >>> tensor.shape (1, 3, 3) """ self._write_initialization() [f() for f in list(self.dataset._update_hooks.values())] update_link_callback = self._update_links if self.meta.links else None if isinstance(value, Tensor): if value._skip_next_setitem: value._skip_next_setitem = False return value = value.numpy(aslist=True) item_index = Index(item) if ( hub.constants._ENABLE_RANDOM_ASSIGNMENT and isinstance(item, int) and item >= self.num_samples ): if self.is_sequence: raise NotImplementedError( "Random assignment is not supported for sequences yet." ) num_samples_to_pad = item - self.num_samples append_link_callback = self._append_to_links if self.meta.links else None self.chunk_engine.pad_and_append( num_samples_to_pad, value, append_link_callback=append_link_callback, update_link_callback=update_link_callback, ) return if not item_index.values[0].subscriptable() and not self.is_sequence: # we're modifying a single sample, convert it to a list as chunk engine expects multiple samples value = [value] self.chunk_engine.update( self.index[item_index], value, link_callback=update_link_callback, ) def __iter__(self): for i in range(len(self)): yield self.__getitem__(i, is_iteration=True) def numpy( self, aslist=False, fetch_chunks=False ) -> Union[np.ndarray, List[np.ndarray]]: """Computes the contents of the tensor in numpy format. Args: aslist (bool): If True, a list of np.ndarrays will be returned. Helpful for dynamic tensors. If False, a single np.ndarray will be returned unless the samples are dynamically shaped, in which case an error is raised. fetch_chunks (bool): If True, full chunks will be retrieved from the storage, otherwise only required bytes will be retrieved. This will always be True even if specified as False in the following cases: - The tensor is ChunkCompressed - The chunk which is being accessed has more than 128 samples. Raises: DynamicTensorNumpyError: If reading a dynamically-shaped array slice without `aslist=True`. ValueError: If the tensor is a link and the credentials are not populated. Returns: A numpy array containing the data represented by this tensor. """ return self.chunk_engine.numpy( self.index, aslist=aslist, fetch_chunks=fetch_chunks, pad_tensor=self.pad_tensor, ) def summary(self): pretty_print = summary_tensor(self) print(self) print(pretty_print) def __str__(self): index_str = f", index={self.index}" if self.index.is_trivial(): index_str = "" return f"Tensor(key={repr(self.meta.name or self.key)}{index_str})" __repr__ = __str__ def __array__(self) -> np.ndarray: return self.numpy() # type: ignore @_inplace_op def __iadd__(self, other): pass @_inplace_op def __isub__(self, other): pass @_inplace_op def __imul__(self, other): pass @_inplace_op def __itruediv__(self, other): pass @_inplace_op def __ifloordiv__(self, other): pass @_inplace_op def __imod__(self, other): pass @_inplace_op def __ipow__(self, other): pass @_inplace_op def __ilshift__(self, other): pass @_inplace_op def __irshift__(self, other): pass @_inplace_op def __iand__(self, other): pass @_inplace_op def __ixor__(self, other): pass @_inplace_op def __ior__(self, other): pass def data(self, aslist: bool = False) -> Any: htype = self.base_htype if htype in ("json", "text"): if self.ndim == 1: return {"value": self.numpy()[0]} else: return {"value": [sample[0] for sample in self.numpy(aslist=True)]} elif htype == "list": if self.ndim == 1: return {"value": list(self.numpy())} else: return {"value": list(map(list, self.numpy(aslist=True)))} elif self.htype == "video": data = {} data["frames"] = self.numpy(aslist=aslist) index = self.index if index.values[0].subscriptable(): root = Tensor(self.key, self.dataset) if len(index.values) > 1: data["timestamps"] = np.array( [ root[i, index.values[1].value].timestamps # type: ignore for i in index.values[0].indices(self.num_samples) ] ) else: data["timestamps"] = np.array( [ root[i].timestamps for i in index.values[0].indices(self.num_samples) ] ) else: data["timestamps"] = self.timestamps if aslist: data["timestamps"] = data["timestamps"].tolist() # type: ignore data["sample_info"] = self.sample_info return data elif htype == "class_label": labels = self.numpy(aslist=aslist) data = {"value": labels} class_names = self.info.class_names if class_names: data["text"] = convert_to_text(labels, self.info.class_names) return data elif htype in ("image", "image.rgb", "image.gray", "dicom"): return { "value": self.numpy(aslist=aslist), "sample_info": self.sample_info or {}, } else: return { "value": self.numpy(aslist=aslist), } def tobytes(self) -> bytes: """Returns the bytes of the tensor. - Only works for a single sample of tensor. - If the tensor is uncompressed, this returns the bytes of the numpy array. - If the tensor is sample compressed, this returns the compressed bytes of the sample. - If the tensor is chunk compressed, this raises an error. Returns: bytes: The bytes of the tensor. Raises: ValueError: If the tensor has multiple samples. """ if self.index.values[0].subscriptable() or len(self.index.values) > 1: raise ValueError("tobytes() can be used only on exatcly 1 sample.") idx = self.index.values[0].value return self.chunk_engine.read_bytes_for_sample(idx) # type: ignore def _append_to_links(self, sample, flat: Optional[bool]): for k, v in self.meta.links.items(): if flat is None or v["flatten_sequence"] == flat: v = get_link_transform(v["append"])(sample, self.link_creds) tensor = Tensor(k, self.dataset) if ( isinstance(v, np.ndarray) and tensor.dtype and v.dtype != tensor.dtype ): v = v.astype(tensor.dtype) # bc tensor.append(v) def _update_links( self, global_sample_index: int, sub_index: Index, new_sample, flat: Optional[bool], ): for k, v in self.meta.links.items(): if flat is None or v["flatten_sequence"] == flat: fname = v.get("update") if fname: func = get_link_transform(fname) tensor = Tensor(k, self.dataset) val = func( new_sample, tensor[global_sample_index], sub_index=sub_index, partial=not sub_index.is_trivial(), link_creds=self.link_creds, ) if val is not _NO_LINK_UPDATE: if ( isinstance(val, np.ndarray) and tensor.dtype and val.dtype != tensor.dtype ): val = val.astype(tensor.dtype) # bc tensor[global_sample_index] = val @property def _sample_info_tensor(self): ds = self.dataset return ds.version_state["full_tensors"].get( ds.version_state["tensor_names"].get(get_sample_info_tensor_key(self.key)) ) @property def _sample_shape_tensor(self): ds = self.dataset return ds.version_state["full_tensors"].get( ds.version_state["tensor_names"].get(get_sample_shape_tensor_key(self.key)) ) @property def _sample_id_tensor(self): return self.dataset._tensors().get(get_sample_id_tensor_key(self.key)) def _sample_shape_provider(self, sample_shape_tensor) -> Callable: if self.is_sequence: def get_sample_shape(global_sample_index: int): seq_pos = slice( *self.chunk_engine.sequence_encoder[global_sample_index] ) idx = Index([IndexEntry(seq_pos)]) shapes = sample_shape_tensor[idx].numpy() return shapes else: def get_sample_shape(global_sample_index: int): return tuple(sample_shape_tensor[global_sample_index].numpy().tolist()) return get_sample_shape def _get_sample_info_at_index(self, global_sample_index: int, sample_info_tensor): if self.is_sequence: return [ sample_info_tensor[i].data() for i in range(*self.chunk_engine.sequence_encoder[global_sample_index]) ] return sample_info_tensor[global_sample_index].data()["value"] def _sample_info(self, index: Index): sample_info_tensor = self._sample_info_tensor if sample_info_tensor is None: return None if index.subscriptable_at(0): return list( map( partial( self._get_sample_info_at_index, sample_info_tensor=sample_info_tensor, ), index.values[0].indices(self.num_samples), ) ) return self._get_sample_info_at_index(index.values[0].value, sample_info_tensor) # type: ignore @property def sample_info(self): return self._sample_info(self.index) def _linked_sample(self): if not self.is_link: raise ValueError("Not supported as the tensor is not a link.") if self.index.values[0].subscriptable() or len(self.index.values) > 1: raise ValueError("_linked_sample can be used only on exatcly 1 sample.") return self.chunk_engine.linked_sample(self.index.values[0].value) def _get_video_stream_url(self): if self.is_link: return self.chunk_engine.get_video_url(self.index.values[0].value) from hub.visualizer.video_streaming import get_video_stream_url return get_video_stream_url(self, self.index.values[0].value) def play(self): if ( get_compression_type(self.meta.sample_compression) != VIDEO_COMPRESSION and self.htype != "link[video]" ): raise Exception("Only supported for video tensors.") if self.index.values[0].subscriptable(): raise ValueError("Video streaming requires exactly 1 sample.") if len(self.index.values) > 1: warnings.warn( "Sub indexes to video sample will be ignored while streaming." ) if is_colab(): raise NotImplementedError("Video streaming is not supported on colab yet.") elif is_jupyter(): return video_html( src=self._get_video_stream_url(), alt=f"{self.key}[{self.index.values[0].value}]", ) else: webbrowser.open(self._get_video_stream_url()) @invalid_view_op def pop(self, index: Optional[int] = None): """Removes an element at the given index.""" if index is None: index = self.num_samples - 1 self.chunk_engine.pop(index) [self.dataset[link].pop(index) for link in self.meta.links] @property def timestamps(self) -> np.ndarray: """Returns timestamps (in seconds) for video sample as numpy array. ## Examples: Return timestamps for all frames of first video sample ``` >>> ds.video[0].timestamp ``` Return timestamps for 5th to 10th frame of first video sample ``` >>> ds.video[0, 5:10].timestamp array([0.2002 , 0.23356667, 0.26693332, 0.33366665, 0.4004 ], dtype=float32) ``` """ if ( get_compression_type(self.meta.sample_compression) != VIDEO_COMPRESSION and self.htype != "link[video]" ): raise Exception("Only supported for video tensors.") index = self.index if index.values[0].subscriptable(): raise ValueError("Only supported for exactly 1 video sample.") if self.is_sequence: if len(index.values) == 1 or index.values[1].subscriptable(): raise ValueError("Only supported for exactly 1 video sample.") sub_index = index.values[2].value if len(index.values) > 2 else None else: sub_index = index.values[1].value if len(index.values) > 1 else None global_sample_index = next(index.values[0].indices(self.num_samples)) if self.is_link: sample = self.chunk_engine.get_video_url(global_sample_index) # type: ignore else: sample = self.chunk_engine.get_video_sample( global_sample_index, index, decompress=False ) nframes = self.shape[0] start, stop, step, reverse = normalize_index(sub_index, nframes) stamps = _read_timestamps(sample, start, stop, step, reverse) return stamps @property def _config(self): """Returns a summary of the configuration of the tensor.""" tensor_meta = self.meta return { "htype": tensor_meta.htype or UNSPECIFIED, "dtype": tensor_meta.dtype or UNSPECIFIED, "sample_compression": tensor_meta.sample_compression or UNSPECIFIED, "chunk_compression": tensor_meta.chunk_compression or UNSPECIFIED, "hidden": tensor_meta.hidden, "is_link": tensor_meta.is_link, "is_sequence": tensor_meta.is_sequence, } @property def sample_indices(self): return self.dataset._sample_indices(self.num_samples) def _extract_value(self, htype): if self.base_htype != htype: raise Exception(f"Only supported for {htype} tensors.") if self.ndim == 1: return self.numpy()[0] else: return [sample[0] for sample in self.numpy(aslist=True)] def text(self): return self._extract_value("text") def dict(self): return self._extract_value("json") def list(self): if self.base_htype != "list": raise Exception(f"Only supported for list tensors.") if self.ndim == 1: return list(self.numpy()) else: return list(map(list, self.numpy(aslist=True)))
Instance variables
var base_htype
-
Expand source code
@property def base_htype(self): return self.meta.htype
var dtype
-
Expand source code
@property def dtype(self) -> Optional[np.dtype]: if self.base_htype in ("json", "list"): return np.dtype(str) if self.meta.dtype: return np.dtype(self.meta.dtype) return None
-
Expand source code
@property def hidden(self) -> bool: return self.meta.hidden
var htype
-
Expand source code
@property def htype(self): htype = self.meta.htype if self.is_sequence: htype = f"sequence[{htype}]" if self.is_link: htype = f"link[{htype}]" return htype
var info
-
Returns the information about the tensor.
Returns
TensorInfo
- Information about the tensor.
Expand source code
@property def info(self): """Returns the information about the tensor. Returns: TensorInfo: Information about the tensor. """ commit_id = self.version_state["commit_id"] chunk_engine = self.chunk_engine if chunk_engine._info is None or chunk_engine._info_commit_id != commit_id: path = get_tensor_info_key(self.key, commit_id) chunk_engine._info = load_info(path, self.dataset, self.key) chunk_engine._info_commit_id = commit_id self.storage.register_hub_object(path, chunk_engine._info) return chunk_engine._info
var is_dynamic
-
Will return True if samples in this tensor have shapes that are unequal.
Expand source code
@property def is_dynamic(self) -> bool: """Will return True if samples in this tensor have shapes that are unequal.""" return self.shape_interval.is_dynamic
var is_link
-
Expand source code
@property def is_link(self): return self.meta.is_link
var is_sequence
-
Expand source code
@property def is_sequence(self): return self.meta.is_sequence
var meta
-
Expand source code
@property def meta(self): return self.chunk_engine.tensor_meta
var ndim
-
Expand source code
@property def ndim(self) -> int: return self.chunk_engine.ndim(self.index)
var num_samples
-
Returns the length of the primary axis of the tensor. Ignores any applied indexing and returns the total length.
Expand source code
@property def num_samples(self) -> int: """Returns the length of the primary axis of the tensor. Ignores any applied indexing and returns the total length. """ if self.is_sequence: return self.chunk_engine._sequence_length return self.chunk_engine.num_samples
var pad_tensor
-
Expand source code
@property def pad_tensor(self): return self.dataset._pad_tensors
var sample_indices
-
Expand source code
@property def sample_indices(self): return self.dataset._sample_indices(self.num_samples)
var sample_info
-
Expand source code
@property def sample_info(self): return self._sample_info(self.index)
var shape
-
Get the shape of this tensor. Length is included.
Note
If you don't want
None
in the output shape or want the lower/upper bound shapes, usetensor.shape_interval
instead.Example
>>> tensor.append(np.zeros((10, 10))) >>> tensor.append(np.zeros((10, 15))) >>> tensor.shape (2, 10, None)
Returns
tuple
- Tuple where each value is either
None
(if that axis is dynamic) or anint
(if that axis is fixed).
Expand source code
@property def shape(self) -> Tuple[Optional[int], ...]: """Get the shape of this tensor. Length is included. Note: If you don't want `None` in the output shape or want the lower/upper bound shapes, use `tensor.shape_interval` instead. Example: >>> tensor.append(np.zeros((10, 10))) >>> tensor.append(np.zeros((10, 15))) >>> tensor.shape (2, 10, None) Returns: tuple: Tuple where each value is either `None` (if that axis is dynamic) or an `int` (if that axis is fixed). """ sample_shape_tensor = self._sample_shape_tensor sample_shape_provider = ( self._sample_shape_provider(sample_shape_tensor) if sample_shape_tensor else None ) shape: Tuple[Optional[int], ...] shape = self.chunk_engine.shape( self.index, sample_shape_provider=sample_shape_provider ) if not shape and self.meta.max_shape: shape = (0,) * len(self.meta.max_shape) if self.meta.max_shape == [0, 0, 0]: shape = () return shape
var shape_interval
-
Returns a
ShapeInterval
object that describes this tensor's shape more accurately. Length is included.Note
If you are expecting a
tuple
, usetensor.shape
instead.Example
>>> tensor.append(np.zeros((10, 10))) >>> tensor.append(np.zeros((10, 15))) >>> tensor.shape_interval ShapeInterval(lower=(2, 10, 10), upper=(2, 10, 15)) >>> str(tensor.shape_interval) (2, 10, 10:15)
Returns
ShapeInterval
- Object containing
lower
andupper
properties.
Expand source code
@property def shape_interval(self) -> ShapeInterval: """Returns a `ShapeInterval` object that describes this tensor's shape more accurately. Length is included. Note: If you are expecting a `tuple`, use `tensor.shape` instead. Example: >>> tensor.append(np.zeros((10, 10))) >>> tensor.append(np.zeros((10, 15))) >>> tensor.shape_interval ShapeInterval(lower=(2, 10, 10), upper=(2, 10, 15)) >>> str(tensor.shape_interval) (2, 10, 10:15) Returns: ShapeInterval: Object containing `lower` and `upper` properties. """ return self.chunk_engine.shape_interval
var size
-
Expand source code
@property def size(self) -> Optional[int]: s = 1 for x in self.shape: if x is None: return None s *= x # not using np.prod to avoid overflow return s
var timestamps
-
Returns timestamps (in seconds) for video sample as numpy array.
Examples:
Return timestamps for all frames of first video sample
>>> ds.video[0].timestamp
Return timestamps for 5th to 10th frame of first video sample
>>> ds.video[0, 5:10].timestamp array([0.2002 , 0.23356667, 0.26693332, 0.33366665, 0.4004 ], dtype=float32)
Expand source code
@property def timestamps(self) -> np.ndarray: """Returns timestamps (in seconds) for video sample as numpy array. ## Examples: Return timestamps for all frames of first video sample ``` >>> ds.video[0].timestamp ``` Return timestamps for 5th to 10th frame of first video sample ``` >>> ds.video[0, 5:10].timestamp array([0.2002 , 0.23356667, 0.26693332, 0.33366665, 0.4004 ], dtype=float32) ``` """ if ( get_compression_type(self.meta.sample_compression) != VIDEO_COMPRESSION and self.htype != "link[video]" ): raise Exception("Only supported for video tensors.") index = self.index if index.values[0].subscriptable(): raise ValueError("Only supported for exactly 1 video sample.") if self.is_sequence: if len(index.values) == 1 or index.values[1].subscriptable(): raise ValueError("Only supported for exactly 1 video sample.") sub_index = index.values[2].value if len(index.values) > 2 else None else: sub_index = index.values[1].value if len(index.values) > 1 else None global_sample_index = next(index.values[0].indices(self.num_samples)) if self.is_link: sample = self.chunk_engine.get_video_url(global_sample_index) # type: ignore else: sample = self.chunk_engine.get_video_sample( global_sample_index, index, decompress=False ) nframes = self.shape[0] start, stop, step, reverse = normalize_index(sub_index, nframes) stamps = _read_timestamps(sample, start, stop, step, reverse) return stamps
var verify
-
Expand source code
@property def verify(self): return self.is_link and self.meta.verify
Methods
def append(self, sample)
-
Appends a single sample to the end of the tensor. Can be an array, scalar value, or the return value from
read()
, which can be used to load files. See examples down below.Examples
Numpy input:
>>> len(tensor) 0 >>> tensor.append(np.zeros((28, 28, 1))) >>> len(tensor) 1
File input:
>>> len(tensor) 0 >>> tensor.append(hub.read("path/to/file")) >>> len(tensor) 1
Args
sample
:InputSample
- The data to append to the tensor.
Sample
is generated byread()
. See the above examples.
Expand source code
@invalid_view_op def append(self, sample: InputSample): """Appends a single sample to the end of the tensor. Can be an array, scalar value, or the return value from `hub.read`, which can be used to load files. See examples down below. Examples: Numpy input: >>> len(tensor) 0 >>> tensor.append(np.zeros((28, 28, 1))) >>> len(tensor) 1 File input: >>> len(tensor) 0 >>> tensor.append(hub.read("path/to/file")) >>> len(tensor) 1 Args: sample (InputSample): The data to append to the tensor. `Sample` is generated by `hub.read`. See the above examples. """ self.extend([sample], progressbar=False)
def clear(self)
-
Deletes all samples from the tensor
Expand source code
def clear(self): """Deletes all samples from the tensor""" self.chunk_engine.clear() sample_id_key = get_sample_id_tensor_key(self.key) try: sample_id_tensor = Tensor(sample_id_key, self.dataset) sample_id_tensor.chunk_engine.clear() self.meta.links.clear() self.meta.is_dirty = True except TensorDoesNotExistError: pass
def data(self, aslist=False)
-
Expand source code
def data(self, aslist: bool = False) -> Any: htype = self.base_htype if htype in ("json", "text"): if self.ndim == 1: return {"value": self.numpy()[0]} else: return {"value": [sample[0] for sample in self.numpy(aslist=True)]} elif htype == "list": if self.ndim == 1: return {"value": list(self.numpy())} else: return {"value": list(map(list, self.numpy(aslist=True)))} elif self.htype == "video": data = {} data["frames"] = self.numpy(aslist=aslist) index = self.index if index.values[0].subscriptable(): root = Tensor(self.key, self.dataset) if len(index.values) > 1: data["timestamps"] = np.array( [ root[i, index.values[1].value].timestamps # type: ignore for i in index.values[0].indices(self.num_samples) ] ) else: data["timestamps"] = np.array( [ root[i].timestamps for i in index.values[0].indices(self.num_samples) ] ) else: data["timestamps"] = self.timestamps if aslist: data["timestamps"] = data["timestamps"].tolist() # type: ignore data["sample_info"] = self.sample_info return data elif htype == "class_label": labels = self.numpy(aslist=aslist) data = {"value": labels} class_names = self.info.class_names if class_names: data["text"] = convert_to_text(labels, self.info.class_names) return data elif htype in ("image", "image.rgb", "image.gray", "dicom"): return { "value": self.numpy(aslist=aslist), "sample_info": self.sample_info or {}, } else: return { "value": self.numpy(aslist=aslist), }
def dict(self)
-
Expand source code
def dict(self): return self._extract_value("json")
def extend(self, samples, progressbar=False)
-
Extends the end of the tensor by appending multiple elements from a sequence. Accepts a sequence, a single batched numpy array, or a sequence of
read()
outputs, which can be used to load files. See examples down below.Example
Numpy input:
>>> len(tensor) 0 >>> tensor.extend(np.zeros((100, 28, 28, 1))) >>> len(tensor) 100
File input:
>>> len(tensor) 0 >>> tensor.extend([ hub.read("path/to/image1"), hub.read("path/to/image2"), ]) >>> len(tensor) 2
Args
samples
:np.ndarray, Sequence, Sequence[Sample]
- The data to add to the tensor. The length should be equal to the number of samples to add.
progressbar
:bool
- Specifies whether a progressbar should be displayed while extending.
Raises
TensorDtypeMismatchError
- TensorDtypeMismatchError: Dtype for array must be equal to or castable to this tensor's dtype
Expand source code
@invalid_view_op def extend( self, samples: Union[np.ndarray, Sequence[InputSample], "Tensor"], progressbar: bool = False, ): """Extends the end of the tensor by appending multiple elements from a sequence. Accepts a sequence, a single batched numpy array, or a sequence of `hub.read` outputs, which can be used to load files. See examples down below. Example: Numpy input: >>> len(tensor) 0 >>> tensor.extend(np.zeros((100, 28, 28, 1))) >>> len(tensor) 100 File input: >>> len(tensor) 0 >>> tensor.extend([ hub.read("path/to/image1"), hub.read("path/to/image2"), ]) >>> len(tensor) 2 Args: samples (np.ndarray, Sequence, Sequence[Sample]): The data to add to the tensor. The length should be equal to the number of samples to add. progressbar (bool): Specifies whether a progressbar should be displayed while extending. Raises: TensorDtypeMismatchError: TensorDtypeMismatchError: Dtype for array must be equal to or castable to this tensor's dtype """ self._write_initialization() [f() for f in list(self.dataset._update_hooks.values())] self.chunk_engine.extend( samples, progressbar=progressbar, link_callback=self._append_to_links if self.meta.links else None, )
def list(self)
-
Expand source code
def list(self): if self.base_htype != "list": raise Exception(f"Only supported for list tensors.") if self.ndim == 1: return list(self.numpy()) else: return list(map(list, self.numpy(aslist=True)))
def modified_samples(self, target_id=None, return_indexes=False)
-
Returns a slice of the tensor with only those elements that were modified/added. By default the modifications are calculated relative to the previous commit made, but this can be changed by providing a
target id
.Args
target_id
:str
, optional- The commit id or branch name to calculate the modifications relative to. Defaults to None.
return_indexes
:bool
, optional- If True, returns the indexes of the modified elements. Defaults to False.
Returns
Tensor
- A new tensor with only the modified elements if
return_indexes
is False. Tuple[Tensor, List[int]]
- A new tensor with only the modified elements and the indexes of the modified elements if
return_indexes
is True.
Raises
TensorModifiedError
- If a target id is passed which is not an ancestor of the current commit.
Expand source code
def modified_samples( self, target_id: Optional[str] = None, return_indexes: Optional[bool] = False ): """Returns a slice of the tensor with only those elements that were modified/added. By default the modifications are calculated relative to the previous commit made, but this can be changed by providing a `target id`. Args: target_id (str, optional): The commit id or branch name to calculate the modifications relative to. Defaults to None. return_indexes (bool, optional): If True, returns the indexes of the modified elements. Defaults to False. Returns: Tensor: A new tensor with only the modified elements if `return_indexes` is False. Tuple[Tensor, List[int]]: A new tensor with only the modified elements and the indexes of the modified elements if `return_indexes` is True. Raises: TensorModifiedError: If a target id is passed which is not an ancestor of the current commit. """ current_commit_id = self.version_state["commit_id"] indexes = get_modified_indexes( self.key, current_commit_id, target_id, self.version_state, self.storage, ) tensor = self[indexes] if return_indexes: return tensor, indexes return tensor
def numpy(self, aslist=False, fetch_chunks=False)
-
Computes the contents of the tensor in numpy format.
Args
aslist
:bool
- If True, a list of np.ndarrays will be returned. Helpful for dynamic tensors. If False, a single np.ndarray will be returned unless the samples are dynamically shaped, in which case an error is raised.
fetch_chunks
:bool
- If True, full chunks will be retrieved from the storage, otherwise only required bytes will be retrieved. This will always be True even if specified as False in the following cases: - The tensor is ChunkCompressed - The chunk which is being accessed has more than 128 samples.
Raises
DynamicTensorNumpyError
- If reading a dynamically-shaped array slice without
aslist=True
. ValueError
- If the tensor is a link and the credentials are not populated.
Returns
A numpy array containing the data represented by this tensor.
Expand source code
def numpy( self, aslist=False, fetch_chunks=False ) -> Union[np.ndarray, List[np.ndarray]]: """Computes the contents of the tensor in numpy format. Args: aslist (bool): If True, a list of np.ndarrays will be returned. Helpful for dynamic tensors. If False, a single np.ndarray will be returned unless the samples are dynamically shaped, in which case an error is raised. fetch_chunks (bool): If True, full chunks will be retrieved from the storage, otherwise only required bytes will be retrieved. This will always be True even if specified as False in the following cases: - The tensor is ChunkCompressed - The chunk which is being accessed has more than 128 samples. Raises: DynamicTensorNumpyError: If reading a dynamically-shaped array slice without `aslist=True`. ValueError: If the tensor is a link and the credentials are not populated. Returns: A numpy array containing the data represented by this tensor. """ return self.chunk_engine.numpy( self.index, aslist=aslist, fetch_chunks=fetch_chunks, pad_tensor=self.pad_tensor, )
def play(self)
-
Expand source code
def play(self): if ( get_compression_type(self.meta.sample_compression) != VIDEO_COMPRESSION and self.htype != "link[video]" ): raise Exception("Only supported for video tensors.") if self.index.values[0].subscriptable(): raise ValueError("Video streaming requires exactly 1 sample.") if len(self.index.values) > 1: warnings.warn( "Sub indexes to video sample will be ignored while streaming." ) if is_colab(): raise NotImplementedError("Video streaming is not supported on colab yet.") elif is_jupyter(): return video_html( src=self._get_video_stream_url(), alt=f"{self.key}[{self.index.values[0].value}]", ) else: webbrowser.open(self._get_video_stream_url())
def pop(self, index=None)
-
Removes an element at the given index.
Expand source code
@invalid_view_op def pop(self, index: Optional[int] = None): """Removes an element at the given index.""" if index is None: index = self.num_samples - 1 self.chunk_engine.pop(index) [self.dataset[link].pop(index) for link in self.meta.links]
def summary(self)
-
Expand source code
def summary(self): pretty_print = summary_tensor(self) print(self) print(pretty_print)
def text(self)
-
Expand source code
def text(self): return self._extract_value("text")
def tobytes(self)
-
Returns the bytes of the tensor.
- Only works for a single sample of tensor.
- If the tensor is uncompressed, this returns the bytes of the numpy array.
- If the tensor is sample compressed, this returns the compressed bytes of the sample.
- If the tensor is chunk compressed, this raises an error.
Returns
bytes
- The bytes of the tensor.
Raises
ValueError
- If the tensor has multiple samples.
Expand source code
def tobytes(self) -> bytes: """Returns the bytes of the tensor. - Only works for a single sample of tensor. - If the tensor is uncompressed, this returns the bytes of the numpy array. - If the tensor is sample compressed, this returns the compressed bytes of the sample. - If the tensor is chunk compressed, this raises an error. Returns: bytes: The bytes of the tensor. Raises: ValueError: If the tensor has multiple samples. """ if self.index.values[0].subscriptable() or len(self.index.values) > 1: raise ValueError("tobytes() can be used only on exatcly 1 sample.") idx = self.index.values[0].value return self.chunk_engine.read_bytes_for_sample(idx) # type: ignore