Module hub.core.chunk_engine

Expand source code
from collections import OrderedDict
from hub.client.log import logger
import hub
import numpy as np
from tqdm import tqdm  # type: ignore
from typing import (
    Any,
    Callable,
    Dict,
    Optional,
    Sequence,
    Union,
    List,
    Tuple,
)
from hub.api.info import Info
from hub.core.linked_sample import LinkedSample
from hub.core.meta.encode.base_encoder import LAST_SEEN_INDEX_COLUMN
from hub.core.serialize import HEADER_SIZE_BYTES
from hub.core.tensor_link import get_link_transform
from hub.core.version_control.commit_diff import CommitDiff
from hub.core.version_control.commit_node import CommitNode  # type: ignore
from hub.core.version_control.commit_chunk_set import CommitChunkSet  # type: ignore
from typing import Any, Dict, List, Optional, Sequence, Union, Callable
from hub.core.meta.encode.tile import TileEncoder
from hub.core.storage.provider import StorageProvider
from hub.core.storage import S3Provider, GCSProvider
from hub.core.tiling.deserialize import combine_chunks, translate_slices, coalesce_tiles
from hub.core.tiling.serialize import break_into_tiles
from hub.util.casting import get_empty_text_like_sample, intelligent_cast
from hub.util.empty_sample import is_empty_list
from hub.util.shape_interval import ShapeInterval
from hub.constants import (
    DEFAULT_MAX_CHUNK_SIZE,
    FIRST_COMMIT_ID,
    PARTIAL_NUM_SAMPLES,
    RANDOM_MAX_ALLOWED_CHUNK_SIZE,
    RANDOM_MINIMAL_CHUNK_SIZE,
    DEFAULT_MAX_CHUNK_SIZE,
    FIRST_COMMIT_ID,
    PARTIAL_NUM_SAMPLES,
    DEFAULT_TILING_THRESHOLD,
)
from hub.core.chunk.base_chunk import BaseChunk, InputSample
from hub.core.chunk.chunk_compressed_chunk import ChunkCompressedChunk
from hub.core.chunk.sample_compressed_chunk import SampleCompressedChunk
from hub.core.chunk.uncompressed_chunk import UncompressedChunk
from hub.core.fast_forwarding import ffw_chunk_id_encoder
from hub.core.index.index import Index, IndexEntry
from hub.core.meta.encode.chunk_id import CHUNK_ID_COLUMN, ChunkIdEncoder
from hub.core.meta.encode.sequence import SequenceEncoder
from hub.core.meta.tensor_meta import TensorMeta
from hub.core.storage.lru_cache import LRUCache
from hub.util.casting import get_dtype, get_htype
from hub.core.sample import Sample
from hub.util.chunk_engine import (
    check_samples_type,
    make_sequence,
    check_suboptimal_chunks,
    check_sample_shape,
)
from hub.util.keys import (
    get_chunk_id_encoder_key,
    get_sequence_encoder_key,
    get_tensor_commit_diff_key,
    get_tensor_meta_key,
    get_chunk_key,
    get_tensor_commit_chunk_set_key,
    get_tensor_meta_key,
    get_tensor_tile_encoder_key,
    get_tensor_info_key,
)
from hub.util.exceptions import (
    CorruptedMetaError,
    DynamicTensorNumpyError,
    ReadOnlyModeError,
    SampleHtypeMismatchError,
)
from hub.util.remove_cache import get_base_storage
from hub.util.image import convert_sample, convert_img_arr
from hub.util.class_label import convert_to_idx, convert_to_hash
from hub.compression import VIDEO_COMPRESSIONS
from hub.core.sample import Sample
from itertools import chain, repeat
from collections.abc import Iterable


class ChunkEngine:
    def __init__(
        self,
        key: str,
        cache: LRUCache,
        version_state: Dict[str, Any],
        meta_cache: LRUCache = None,
    ):
        """Handles creating `Chunk`s and filling them with incoming samples.

        Data delegation:
            All samples must live inside a chunk. No chunks may contain partial samples, only 1 chunk per sample.
            A chunk holds the dynamic information for the samples they contain (like shape and byte ranges).
            For more information on the `Chunk` format, check out the `Chunk` class.

        ChunkIdEncoder:
            The `ChunkIdEncoder` bidirectionally maps samples to the chunk IDs they live in. For more information,
            see `ChunkIdEncoder`'s docstring.

        Example:
            Given:
                Sample sizes: [1 * MB, 1 * MB, 14 * MB, 15 * MB, 15 * MB]
                Min chunk size: 16 * MB
                Max chunk size: 32 * MB


            Basic logic:
                >>> chunks = []
                >>> chunks.append(sum([1 * MB, 1 * MB, 14 * MB, 15 * MB]))  # i=(0, 1, 2, 3)
                >>> chunks[-1]
                31 * MB
                >>> chunks.append(sum([15 * MB]))  # i=(4,)
                >>> chunks[-1]
                15 * MB

            Samples 0, 1, 2, and 3 can be stored in 1 chunk. sample 4 resides in it's own chunk.

            If more samples come later: sizes = [15 * MB, 1 * MB]

            Basic logic:
                >>> len(chunks)
                2
                >>> chunks[-1]
                15 * MB
                >>> chunks[-1] += sum([15 * MB, 1 * MB])  # i=(5, 6)
                >>> chunks[-1]
                31 * MB
                >>> sum(chunks)
                62 * MB
                >>> len(chunks)
                2

            Because our max chunk size is 32 * MB, we try to fit as much data into this size as possible.


        Args:
            key (str): Tensor key.
            cache (LRUCache): Cache for which chunks and the metadata are stored.
            version_state (Dict[str, Any]): The version state of the dataset, includes commit_id, commit_node, branch, branch_commit_map and commit_node_map.
            meta_cache (LRUCache): Cache used for storing non chunk data such as tensor meta and chunk id encoder during transforms in memory.

        Raises:
            ValueError: If invalid max chunk size.
        """

        self.key = key
        self.cache = cache
        self.base_storage = get_base_storage(cache)
        self._meta_cache = meta_cache
        self.version_state = version_state
        self.compression = None
        self.chunk_class = BaseChunk

        self._tensor_meta: Optional[TensorMeta] = None
        self._tensor_meta_commit_id: Optional[str] = None

        self._chunk_id_encoder: Optional[ChunkIdEncoder] = None
        self._chunk_id_encoder_commit_id: Optional[str] = None

        self._sequence_encoder: Optional[SequenceEncoder] = None
        self._sequence_encoder_commit_id: Optional[str] = None

        self._tile_encoder: Optional[TileEncoder] = None
        self._tile_encoder_commit_id: Optional[str] = None

        self._commit_chunk_set: Optional[CommitChunkSet] = None
        self._commit_chunk_set_commit_id: Optional[str] = None

        self._commit_diff: Optional[CommitDiff] = None
        self._commit_diff_commit_id: Optional[str] = None

        self._active_appended_chunk: Optional[BaseChunk] = None
        self._active_updated_chunk: Optional[BaseChunk] = None

        self._info: Optional[Info] = None
        self._info_commit_id: Optional[str] = None

        self._all_chunk_engines: Optional[Dict[str, ChunkEngine]] = None
        self._is_temp_label_tensor: bool = False
        self._hash_label_map: Dict[int, str] = OrderedDict()

        tensor_meta = self.tensor_meta

        if tensor_meta.sample_compression:
            self.compression = tensor_meta.sample_compression
            self.chunk_class = SampleCompressedChunk
        elif tensor_meta.chunk_compression:
            self.compression = tensor_meta.chunk_compression
            self.chunk_class = ChunkCompressedChunk
        else:
            self.chunk_class = UncompressedChunk

        self.cached_data: Optional[np.ndarray] = None
        self.cache_range: range = range(0)

        self._chunk_args = None
        self._num_samples_per_chunk: Optional[int] = None

    @property
    def is_data_cachable(self):
        tensor_meta = self.tensor_meta
        return (
            self.chunk_class == UncompressedChunk
            and tensor_meta.htype not in ["text", "json", "list"]
            and tensor_meta.max_shape
            and (tensor_meta.max_shape == tensor_meta.min_shape)
            and (np.prod(tensor_meta.max_shape) < 20)
        )

    @property
    def commit_id(self):
        return self.version_state["commit_id"]

    @property
    def max_chunk_size(self):
        # no chunks may exceed this
        return (
            getattr(self.tensor_meta, "max_chunk_size", None) or DEFAULT_MAX_CHUNK_SIZE
        )

    @property
    def tiling_threshold(self):
        return (
            getattr(self.tensor_meta, "tiling_threshold", None)
            or DEFAULT_TILING_THRESHOLD
            or self.min_chunk_size
        )

    @property
    def chunk_args(self):
        if self._chunk_args is None:
            self._chunk_args = [
                self.min_chunk_size,
                self.max_chunk_size,
                self.tiling_threshold,
                self.tensor_meta,
                self.compression,
            ]
        return self._chunk_args

    @property
    def min_chunk_size(self):
        # only the last chunk may be less than this
        return self.max_chunk_size // 2

    @property
    def tensor_meta(self):
        commit_id = self.commit_id
        if self._tensor_meta is None or self._tensor_meta_commit_id != commit_id:
            key = get_tensor_meta_key(self.key, commit_id)
            self._tensor_meta = self.meta_cache.get_hub_object(key, TensorMeta)
            self._tensor_meta_commit_id = commit_id
            self.meta_cache.register_hub_object(key, self._tensor_meta)
        return self._tensor_meta

    @property
    def meta_cache(self) -> LRUCache:
        return self._meta_cache or self.cache

    @property
    def chunk_id_encoder(self) -> ChunkIdEncoder:
        """Gets the chunk id encoder from cache, if one is not found it creates a blank encoder.
        For more information on what `ChunkIdEncoder` is used for, see the `__init__` docstring.

        Raises:
            CorruptedMetaError: If chunk id encoding was corrupted.

        Returns:
            ChunkIdEncoder: The chunk ID encoder handles the mapping between sample indices
                and their corresponding chunks.
        """
        commit_id = self.commit_id
        if (
            self._chunk_id_encoder is None
            or self._chunk_id_encoder_commit_id != commit_id
        ):
            commit_id = self.commit_id
            key = get_chunk_id_encoder_key(self.key, commit_id)
            if not self.chunk_id_encoder_exists:
                enc = ChunkIdEncoder()
                try:
                    self.meta_cache[key] = enc
                except ReadOnlyModeError:
                    pass
            else:
                enc = self.meta_cache.get_hub_object(key, ChunkIdEncoder)
            self._chunk_id_encoder = enc
            self._chunk_id_encoder_commit_id = commit_id
            self.meta_cache.register_hub_object(key, enc)
        return self._chunk_id_encoder

    @property
    def commit_chunk_set(self) -> Optional[CommitChunkSet]:
        """Gets the commit chunk set from cache, if one is not found it creates a blank one.

        Returns:
            Optional[CommitChunkSet]: The commit chunk set keeps track of all the chunks present in the current commit, returns None for the first commit.
        """
        commit_id = self.commit_id
        if commit_id == FIRST_COMMIT_ID:
            # the first commit doesn't need a commit chunk set
            return None
        if (
            self._commit_chunk_set is None
            or self._commit_chunk_set_commit_id != commit_id
        ):
            key = get_tensor_commit_chunk_set_key(self.key, commit_id)
            if not self.commit_chunk_set_exists:
                cset = CommitChunkSet()
                try:
                    self.meta_cache[key] = cset
                except ReadOnlyModeError:
                    pass
            else:
                cset = self.meta_cache.get_hub_object(key, CommitChunkSet)
            self._commit_chunk_set = cset
            self._commit_chunk_set_commit_id = commit_id
            self.meta_cache.register_hub_object(key, cset)
        return self._commit_chunk_set

    @property
    def commit_chunk_set_exists(self) -> bool:
        """Checks if the commit chunk set exists for the given tensor in the current commit."""
        commit_id = self.commit_id
        if (
            self._commit_chunk_set is not None
            and self._commit_chunk_set_commit_id == commit_id
        ):
            return True

        try:
            key = get_tensor_commit_chunk_set_key(self.key, commit_id)
            self.meta_cache[key]
            return True
        except KeyError:
            return False

    @property
    def commit_diff(self) -> CommitDiff:
        """Gets the commit diff from cache, if one is not found it creates a blank one.

        Returns:
            CommitDiff: The commit diff keeps track of all the changes in the current commit.
        """
        commit_id = self.commit_id
        if self._commit_diff is None or self._commit_diff_commit_id != commit_id:
            key = get_tensor_commit_diff_key(self.key, commit_id)
            if not self.commit_diff_exists:
                diff = CommitDiff(self.num_samples)
                try:
                    self.meta_cache[key] = diff
                except ReadOnlyModeError:
                    pass
            else:
                diff = self.meta_cache.get_hub_object(key, CommitDiff)
            self._commit_diff = diff
            self._commit_diff_commit_id = commit_id
            self.meta_cache.register_hub_object(key, diff)
        return self._commit_diff

    @property
    def commit_diff_exists(self) -> bool:
        commit_id = self.commit_id
        if self._commit_diff is not None and self._commit_diff_commit_id == commit_id:
            return True
        try:
            key = get_tensor_commit_diff_key(self.key, commit_id)
            self.meta_cache[key]
            return True
        except KeyError:
            return False

    @property
    def chunk_id_encoder_exists(self) -> bool:
        commit_id = self.commit_id
        if (
            self._chunk_id_encoder is not None
            and self._chunk_id_encoder_commit_id == commit_id
        ):
            return True
        try:
            key = get_chunk_id_encoder_key(self.key, commit_id)
            self.meta_cache[key]
            return True
        except KeyError:
            return False

    def _is_tiled_sample(self, global_sample_index):
        return global_sample_index in self.tile_encoder

    @property
    def tile_encoder(self) -> TileEncoder:
        """Gets the tile encoder from cache, if one is not found it creates a blank encoder."""
        commit_id = self.commit_id
        if self._tile_encoder is None or self._tile_encoder_commit_id != commit_id:
            key = get_tensor_tile_encoder_key(self.key, commit_id)
            if not self.tile_encoder_exists:
                enc = TileEncoder()
                try:
                    self.meta_cache[key] = enc
                except ReadOnlyModeError:
                    pass
            else:
                enc = self.meta_cache.get_hub_object(key, TileEncoder)
            self._tile_encoder = enc
            self._tile_encoder_commit_id = commit_id
            self.meta_cache.register_hub_object(key, enc)
        return self._tile_encoder

    @property
    def tile_encoder_exists(self) -> bool:
        commit_id = self.commit_id
        if self._tile_encoder is not None and self._tile_encoder_commit_id == commit_id:
            return True

        try:
            key = get_tensor_tile_encoder_key(self.key, commit_id)
            self.meta_cache[key]
            return True
        except KeyError:
            return False

    @property
    def creds_encoder(self):
        return None

    @property
    def num_chunks(self) -> int:
        if not self.chunk_id_encoder_exists:
            return 0
        return self.chunk_id_encoder.num_chunks

    @property
    def num_samples(self) -> int:
        """Returns the length of the primary axis of the tensor.
        Ignores any applied indexing and returns the total length.
        """
        return self.tensor_meta.length

    @property
    def last_chunk_key(self) -> str:
        last_chunk_name = self.last_appended_chunk_name
        commit_id = self.get_chunk_commit(last_chunk_name)
        return get_chunk_key(self.key, last_chunk_name, commit_id)

    def get_chunk_key_for_id(self, chunk_id) -> str:
        chunk_name = ChunkIdEncoder.name_from_id(chunk_id)
        commit_id = self.get_chunk_commit(chunk_name)
        return get_chunk_key(self.key, chunk_name, commit_id)

    @property
    def active_appended_chunk(self):
        return self._active_appended_chunk

    @active_appended_chunk.setter
    def active_appended_chunk(self, value):
        if self.active_appended_chunk is not None:
            self.cache.remove_hub_object(self.active_appended_chunk.key)
        self._active_appended_chunk = value
        if value is not None:
            self.cache.register_hub_object(value.key, value)

    @property
    def active_updated_chunk(self):
        return self._active_updated_chunk

    @active_updated_chunk.setter
    def active_updated_chunk(self, value):
        if self.active_updated_chunk is not None:
            self.cache.remove_hub_object(self.active_updated_chunk.key)
        self._active_updated_chunk = value
        if value is not None:
            self.cache.register_hub_object(value.key, value)

    @property
    def last_appended_chunk_name(self) -> str:
        return self.chunk_id_encoder.get_name_for_chunk(-1)

    @property
    def last_appended_chunk_id(self) -> str:
        return self.chunk_id_encoder.get_id_for_chunk(-1)

    def last_appended_chunk(self) -> Optional[BaseChunk]:
        last_index = self.num_samples - 1
        if self.num_chunks == 0 or last_index in self.tile_encoder:
            return None
        chunk_name = self.last_appended_chunk_name
        chunk_commit_id = self.get_chunk_commit(chunk_name)
        chunk_key = get_chunk_key(self.key, chunk_name, chunk_commit_id)
        chunk = self.get_chunk(chunk_key)
        chunk.key = chunk_key  # type: ignore
        chunk.id = self.last_appended_chunk_id  # type: ignore
        if chunk_commit_id != self.commit_id:
            chunk = self.copy_chunk_to_new_commit(chunk, chunk_name)
        if (
            self.active_appended_chunk is not None
            and self.active_appended_chunk.key != chunk_key
        ):
            self.write_chunk_to_storage(self.active_appended_chunk)
        self.active_appended_chunk = chunk
        return chunk

    def get_chunk(self, chunk_key: str, partial_chunk_bytes=0) -> BaseChunk:
        return self.cache.get_hub_object(
            chunk_key,
            self.chunk_class,
            self.chunk_args,
            partial_bytes=partial_chunk_bytes,
        )

    def get_chunk_from_chunk_id(
        self, chunk_id, copy: bool = False, partial_chunk_bytes=0
    ) -> BaseChunk:
        chunk_name = ChunkIdEncoder.name_from_id(chunk_id)
        chunk_commit_id = self.get_chunk_commit(chunk_name)
        chunk_key = get_chunk_key(self.key, chunk_name, chunk_commit_id)
        chunk = self.get_chunk(chunk_key, partial_chunk_bytes=partial_chunk_bytes)
        chunk.key = chunk_key  # type: ignore
        chunk.id = chunk_id  # type: ignore
        if copy and chunk_commit_id != self.commit_id:
            chunk = self.copy_chunk_to_new_commit(chunk, chunk_name)
        return chunk

    def get_video_chunk(self, chunk_id, copy: bool = False):
        """Returns video chunks. Chunk will contain presigned url to the video instead of data if the chunk is large."""
        chunk_name = ChunkIdEncoder.name_from_id(chunk_id)
        chunk_commit_id = self.get_chunk_commit(chunk_name)
        chunk_key = get_chunk_key(self.key, chunk_name, chunk_commit_id)

        base_storage = self.base_storage
        stream = False
        if isinstance(base_storage, (S3Provider, GCSProvider)):
            chunk_size = base_storage.get_object_size(chunk_key)
            stream = chunk_size > self.min_chunk_size
            if stream:
                chunk = self.cache.get_hub_object(
                    chunk_key, self.chunk_class, meta=self.chunk_args, url=True
                )
        if not stream:
            chunk = self.cache.get_hub_object(
                chunk_key, self.chunk_class, meta=self.chunk_args
            )
        chunk.key = chunk_key  # type: ignore
        chunk.id = chunk_id  # type: ignore
        if copy and chunk_commit_id != self.commit_id:
            chunk = self.copy_chunk_to_new_commit(chunk, chunk_name)
        return chunk, stream

    def copy_chunk_to_new_commit(self, chunk, chunk_name):
        """Copies the chunk to the current commit.

        Returns the copied chunk.
        """
        new_chunk_key = get_chunk_key(self.key, chunk_name, self.commit_id)
        chunk_id = chunk.id
        chunk = chunk.copy(self.chunk_args)
        chunk.key = new_chunk_key
        chunk.id = chunk_id
        if self.commit_chunk_set is not None:
            self.commit_chunk_set.add(chunk_name)
        return chunk

    def get_chunk_commit(self, chunk_name) -> str:
        """Returns the commit id that contains the chunk_name."""
        cur_node: Optional[CommitNode] = self.version_state["commit_node"]
        while cur_node is not None:
            commit_id = cur_node.commit_id
            chunk_set_key = get_tensor_commit_chunk_set_key(self.key, commit_id)
            try:
                # the first commit doesn't contain a chunk set, don't repeatedly try to fetch from storage
                if commit_id == FIRST_COMMIT_ID:
                    chunk_set = set()
                else:
                    chunk_set = self.meta_cache.get_hub_object(
                        chunk_set_key, CommitChunkSet
                    ).chunks
            except Exception:
                commit_chunk_set = CommitChunkSet()
                try:
                    self.meta_cache[chunk_set_key] = commit_chunk_set
                except ReadOnlyModeError:
                    # put CommitChunkSet in hub_objects to keep in cache temporarily, but won't write to storage
                    # this shouldn't happen in latest version of hub, chunk set would always be present
                    self.meta_cache.hub_objects[chunk_set_key] = commit_chunk_set
                chunk_set = set()
            if chunk_name in chunk_set:
                return commit_id
            cur_node = cur_node.parent  # type: ignore
        # the first commit doesn't have a commit chunk set, so any chunk that wasn't found belongs to the first commit
        return FIRST_COMMIT_ID

    def _write_initialization(self):
        ffw_chunk_id_encoder(self.chunk_id_encoder)

    def _convert_to_list(self, samples):
        if self.chunk_class != UncompressedChunk:
            return True
        elif isinstance(samples, np.ndarray):
            return samples[0].nbytes >= self.min_chunk_size
        return True

    def check_each_sample(self, samples):
        return

    def _sanitize_samples(self, samples):
        check_samples_type(samples)
        samples = [None if is_empty_list(sample) else sample for sample in samples]
        verified_samples = self.check_each_sample(samples)
        tensor_meta = self.tensor_meta
        all_empty = all(sample is None for sample in samples)
        if tensor_meta.htype is None and not all_empty:
            tensor_meta.set_htype(get_htype(samples))
        if tensor_meta.dtype is None and not all_empty:
            tensor_meta.set_dtype(get_dtype(samples))
        if self._convert_to_list(samples):
            samples = list(samples)
        if self._is_temp_label_tensor:
            samples = verified_samples = convert_to_hash(samples, self._hash_label_map)
        elif tensor_meta.htype in ("image.gray", "image.rgb"):
            mode = "L" if tensor_meta.htype == "image.gray" else "RGB"
            converted = []
            for sample in samples:
                if isinstance(sample, Sample):
                    converted.append(convert_sample(sample, mode))
                elif isinstance(sample, np.ndarray):
                    converted.append(convert_img_arr(sample, mode))
                else:
                    raise SampleHtypeMismatchError(tensor_meta.htype, type(sample))
            samples = verified_samples = converted
        elif tensor_meta.htype == "class_label":
            samples = verified_samples = self._convert_class_labels(samples)
        return samples, verified_samples

    def _convert_class_labels(self, samples):
        tensor_info = self.cache.get_hub_object(
            get_tensor_info_key(self.key, self.commit_id), Info
        )
        tensor_name = self.tensor_meta.name or self.key
        class_names = tensor_info.class_names
        labels, additions = convert_to_idx(samples, class_names)
        if additions:
            for new in additions:
                class_names.append(new[0])
                logger.info(
                    f"'{new[0]}' added to {tensor_name}.info.class_names at index {new[1]}"
                )
            tensor_info.is_dirty = True
        return labels

    def _samples_to_chunks(
        self,
        samples,
        start_chunk: Optional[BaseChunk] = None,
        register: bool = True,
        update_commit_diff: bool = False,
        update_tensor_meta: bool = True,
        start_chunk_row: Optional[int] = None,
        progressbar: bool = False,
    ):
        """Add samples to chunks, in case if there is a space on the start_chunk,
        othewise creating new chunk and append samples to newly created chunk

        Args:
            samples (List[Any]): Paramter that shows the list of samples to be added to the chunk
            start_chunk (BaseChunk, Optional): Parameter that points to the chunk on which the samples should be added
            register (bool): Parameter that shows if we need to register the chunk
            update_commit_diff (bool): Parameter that shows if we need to update the commit diffs
            update_tensor_meta (bool): Parameter that shows if it is needed to update tensor metas, this will be false in case of rechunking at the meta will not be changed
            start_chunk_row (int, Optional): Parameter that shows the chunk row that needs to be updated, those params are needed only in rechunking phase.
            progressbar (bool): Parameter that shows if need to show sample insertion progress

        Returns:
            Tuple[List[BaseChunk], Dict[Any, Any]]
        """
        current_chunk = start_chunk

        updated_chunks = []
        if current_chunk is None:
            current_chunk = self._create_new_chunk(register)
            updated_chunks.append(current_chunk)
        enc = self.chunk_id_encoder
        tiles = {}
        nsamples = len(samples)
        if register and update_commit_diff:
            commit_diff = self.commit_diff
        if progressbar:
            pbar = tqdm(total=len(samples))
        while len(samples) > 0:
            num_samples_added = current_chunk.extend_if_has_space(
                samples, update_tensor_meta=update_tensor_meta
            )  # type: ignore
            self.register_new_creds(num_samples_added, samples)
            if num_samples_added == 0:
                current_chunk = self._create_new_chunk(register, row=start_chunk_row)
                if start_chunk_row is not None:
                    start_chunk_row += 1
                updated_chunks.append(current_chunk)
            elif num_samples_added == PARTIAL_NUM_SAMPLES:
                sample = samples[0]
                if register and sample.is_first_write:
                    enc.register_samples(1)
                if sample.is_last_write:
                    if register:
                        self.tile_encoder.register_sample(sample, self.num_samples - 1)
                        if update_commit_diff:
                            commit_diff.add_data(1)
                    else:
                        tiles[nsamples - len(samples)] = (
                            sample.sample_shape,
                            sample.tile_shape,
                        )
                    samples = samples[1:]
                if len(samples) > 0:
                    current_chunk = self._create_new_chunk(
                        register, row=start_chunk_row
                    )
                    if start_chunk_row is not None:
                        start_chunk_row += 1
                    updated_chunks.append(current_chunk)
            else:
                if not updated_chunks:
                    updated_chunks.append(current_chunk)
                num = int(num_samples_added)
                if register:
                    enc.register_samples(num, row=start_chunk_row)
                    if update_commit_diff:
                        commit_diff.add_data(num)
                samples = samples[num:]
            if progressbar:
                pbar.update(num_samples_added)
        if progressbar:
            pbar.close()
        if register:
            return updated_chunks
        return updated_chunks, tiles

    def register_new_creds(self, num_samples_added, samples):
        return

    def update_creds(self, sample_index, sample):
        return

    def _extend(self, samples, progressbar, update_commit_diff=True):
        if isinstance(samples, hub.Tensor):
            samples = tqdm(samples) if progressbar else samples
            for sample in samples:
                self._extend(
                    [sample],
                    update_commit_diff=update_commit_diff,
                    progressbar=False,
                )  # TODO optimize this
            return
        if len(samples) == 0:
            return
        samples, verified_samples = self._sanitize_samples(samples)
        self._samples_to_chunks(
            samples,
            start_chunk=self.last_appended_chunk(),
            register=True,
            progressbar=progressbar,
            update_commit_diff=update_commit_diff,
        )
        return verified_samples

    def extend(
        self,
        samples,
        progressbar: bool = False,
        link_callback: Optional[Callable] = None,
    ):
        self.check_link_ready()
        self._write_initialization()
        initial_autoflush = self.cache.autoflush
        self.cache.autoflush = False

        if self.is_sequence:
            samples = tqdm(samples) if progressbar else samples
            for sample in samples:
                verified_sample = self._extend(
                    sample, progressbar=False, update_commit_diff=False
                )
                self.sequence_encoder.register_samples(len(sample), 1)
                self.commit_diff.add_data(1)
                ls = verified_sample or sample
                if link_callback:
                    link_callback(ls, flat=False)
                    for s in ls:
                        s = None if is_empty_list(s) else s
                        link_callback(s, flat=True)

        else:
            verified_samples = self._extend(samples, progressbar)
            ls = verified_samples or samples
            if link_callback:
                for sample in ls:
                    sample = None if is_empty_list(sample) else sample
                    link_callback(sample, flat=None)

        self.cache.autoflush = initial_autoflush
        self.cache.maybe_flush()

    def _create_new_chunk(self, register=True, row: Optional[int] = None) -> BaseChunk:
        """Creates and returns a new `Chunk`. Automatically creates an ID for it and puts a reference in the cache."""
        chunk_id = self.chunk_id_encoder.generate_chunk_id(register=register, row=row)
        chunk = self.chunk_class(*self.chunk_args)  # type: ignore
        chunk_name = ChunkIdEncoder.name_from_id(chunk_id)  # type: ignore
        chunk_key = get_chunk_key(self.key, chunk_name, self.commit_id)
        if self.commit_chunk_set is not None:
            self.commit_chunk_set.add(chunk_name)
        chunk.key = chunk_key  # type: ignore
        chunk.id = chunk_id  # type: ignore
        chunk._update_tensor_meta_length = register
        if self.active_appended_chunk is not None:
            self.write_chunk_to_storage(self.active_appended_chunk)
        self.active_appended_chunk = chunk
        return chunk

    def clear(self):
        """Clears all samples and cachables."""
        self.cache.check_readonly()

        commit_id = self.commit_id

        chunk_folder_path = get_chunk_key(self.key, "", commit_id)
        self.cache.clear(prefix=chunk_folder_path)

        enc_key = get_chunk_id_encoder_key(self.key, commit_id)
        self._chunk_id_encoder = None
        try:
            del self.meta_cache[enc_key]
        except KeyError:
            pass

        info_key = get_tensor_info_key(self.key, commit_id)
        try:
            self._info = None
            del self.cache[info_key]
        except KeyError:
            pass

        self.commit_diff.clear_data()

        tile_encoder_key = get_tensor_tile_encoder_key(self.key, commit_id)
        try:
            self._tile_encoder = None
            del self.cache[tile_encoder_key]
        except KeyError:
            pass

        seq_encoder_key = get_sequence_encoder_key(self.key, commit_id)
        try:
            self._sequence_encoder = None
            del self.cache[seq_encoder_key]
        except KeyError:
            pass

        self.tensor_meta.length = 0
        self.tensor_meta.min_shape = []
        self.tensor_meta.max_shape = []
        self.tensor_meta.is_dirty = True

        self.cache.maybe_flush()
        self.meta_cache.maybe_flush()

    def _replace_tiled_sample(self, global_sample_index: int, sample):
        new_chunks, tiles = self._samples_to_chunks(
            [sample], start_chunk=None, register=False
        )
        new_chunk_ids = [chunk.id for chunk in new_chunks]
        self.chunk_id_encoder._replace_chunks_for_tiled_sample(
            global_sample_index, new_chunk_ids
        )
        if tiles:
            self.tile_encoder.entries[global_sample_index] = tiles[0]
        else:
            del self.tile_encoder.entries[global_sample_index]

    def _update_tiled_sample(self, global_sample_index: int, index: Index, sample):
        if len(index.values) == 1:
            self._replace_tiled_sample(global_sample_index, sample)
            return
        enc = self.chunk_id_encoder
        tile_enc = self.tile_encoder
        chunk_ids = enc[global_sample_index]
        sample_shape = tile_enc.get_sample_shape(global_sample_index)
        tile_shape = tile_enc.get_tile_shape(global_sample_index)
        ordered_tile_ids = np.array(chunk_ids).reshape(
            tile_enc.get_tile_layout_shape(global_sample_index)
        )
        tiles_index, sample_index = translate_slices(
            [v.value for v in index.values[1:]], sample_shape, tile_shape  # type: ignore
        )
        required_tile_ids = ordered_tile_ids[tiles_index]
        tiles = np.vectorize(
            lambda chunk_id: self.get_chunk_from_chunk_id(
                chunk_id, copy=True
            ).read_sample(0, is_tile=True),
            otypes=[object],
        )(required_tile_ids)
        current_sample = coalesce_tiles(tiles, tile_shape, None, self.tensor_meta.dtype)
        new_sample = current_sample
        new_sample[sample_index] = sample
        new_tiles = break_into_tiles(
            new_sample, tile_enc.get_tile_shape(global_sample_index)
        )
        chunk_ids = required_tile_ids
        for chunk_id, tile in zip(chunk_ids.reshape(-1), new_tiles.reshape(-1)):
            chunk = self.get_chunk_from_chunk_id(int(chunk_id), copy=True)
            curr_shape = chunk.shapes_encoder[-1]
            assert curr_shape == tile.shape, (curr_shape, tile.shape)
            chunk.update_sample(0, tile)
            if (
                self.active_updated_chunk is not None
                and self.active_updated_chunk.key != chunk.key  # type: ignore
            ):
                self.write_chunk_to_storage(self.active_updated_chunk)
            self.active_updated_chunk = chunk

    def pad_and_append(
        self,
        num_samples_to_pad: int,
        value,
        append_link_callback=None,
        update_link_callback=None,
    ):
        """Pads the tensor with empty samples and appends value at the end."""
        self.check_link_ready()
        update_first_sample = False
        if num_samples_to_pad > 0:
            if self.num_samples == 0:
                # set htype, dtype, shape, we later update it with empty sample
                self.extend([value], link_callback=append_link_callback)
                num_samples_to_pad -= 1
                update_first_sample = True

            htype = self.tensor_meta.htype
            if htype in ("json", "text", "list"):
                empty_sample = get_empty_text_like_sample(htype)
                empty_samples = [empty_sample] * num_samples_to_pad
            elif self.tensor_meta.is_link:
                empty_sample = None
                empty_samples = [None] * num_samples_to_pad
            else:
                ndim = len(self.tensor_meta.max_shape)
                if self.is_sequence:
                    ndim += 1
                shape = tuple([num_samples_to_pad] + [0] * ndim)
                dtype = self.tensor_meta.dtype
                empty_sample = np.zeros(shape[1:], dtype=dtype)
                empty_samples = np.zeros(shape, dtype=dtype)  # type: ignore

            if update_first_sample:
                self.update(Index(0), empty_sample, link_callback=update_link_callback)

            # pad
            self.extend(empty_samples, link_callback=append_link_callback)

        self.extend([value], link_callback=append_link_callback)

    def update(
        self,
        index: Index,
        samples: Union[np.ndarray, Sequence[InputSample], InputSample],
        operator: Optional[str] = None,
        link_callback: Optional[Callable] = None,
    ):
        """Update data at `index` with `samples`."""
        self.check_link_ready()
        (self._sequence_update if self.is_sequence else self._update)(  # type: ignore
            index,
            samples,
            operator,
            link_callback=link_callback,
        )

    def _get_samples_to_move(self, chunk) -> List[Sample]:
        decompress = isinstance(chunk, ChunkCompressedChunk)
        samples_to_move: List[Sample] = []
        sum_bytes = 0

        for idx in range(chunk.num_samples - 1, 1, -1):
            sample_data = chunk.read_sample(idx, decompress=decompress)
            sum_bytes += len(sample_data)
            if sum_bytes > int(RANDOM_MAX_ALLOWED_CHUNK_SIZE / 2):
                break
            sample_shape = chunk.shapes_encoder[idx]
            new_sample = self._get_sample_object(
                sample_data, sample_shape, chunk.compression, chunk.dtype, decompress
            )
            samples_to_move.append(new_sample)
        samples_to_move.reverse()
        return samples_to_move

    def _get_chunk_samples(self, chunk) -> List[Sample]:
        decompress = isinstance(chunk, ChunkCompressedChunk)
        all_samples_in_chunk: List[Sample] = []

        for idx in range(chunk.num_samples):
            sample_data = chunk.read_sample(idx, decompress=decompress)
            sample_shape = chunk.shapes_encoder[idx]
            new_sample = self._get_sample_object(
                sample_data, sample_shape, chunk.compression, chunk.dtype, decompress
            )
            all_samples_in_chunk.append(new_sample)

        return all_samples_in_chunk

    def _get_sample_object(
        self, sample_data, sample_shape, compression, dtype, decompress
    ):
        if decompress:
            sample = Sample(array=sample_data, shape=sample_shape)
        else:
            sample = Sample(
                buffer=sample_data,
                shape=sample_shape,
                compression=compression,
                dtype=dtype,
            )

        if self.tensor_meta.htype in ("json", "text", "list"):
            sample.htype = self.tensor_meta.htype
        if self.tensor_meta.is_link:
            sample.htype = "text"
            sample = LinkedSample(sample.array[0])
        return sample

    def __rechunk(self, chunk: BaseChunk, chunk_row: int):
        samples_to_move = self._get_samples_to_move(chunk=chunk)
        num_samples = len(samples_to_move)
        if num_samples == 0:
            return
        new_chunk = self._create_new_chunk(register=True, row=chunk_row)
        new_chunk_row = chunk_row + 1

        self.chunk_id_encoder.decrease_samples(row=chunk_row, num_samples=num_samples)
        self.chunk_id_encoder.decrease_samples(
            row=new_chunk_row, num_samples=num_samples
        )
        chunk.pop_multiple(num_samples=len(samples_to_move))
        samples, _ = self._sanitize_samples(samples_to_move)
        self._samples_to_chunks(
            samples,
            start_chunk=new_chunk,
            register=True,
            update_commit_diff=True,
            update_tensor_meta=False,
            start_chunk_row=new_chunk_row,
        )

    def _merge_chunks(
        self,
        from_chunk: BaseChunk,
        from_chunk_row: int,
        to_chunk: BaseChunk,
        to_chunk_row: int,
    ):
        samples_to_move = self._get_chunk_samples(chunk=from_chunk)
        num_samples = len(samples_to_move)
        if num_samples == 0:
            return True

        from_chunk.pop_multiple(num_samples=num_samples)
        samples, _ = self._sanitize_samples(samples_to_move)
        to_chunk.is_dirty = True
        self.active_updated_chunk = to_chunk
        self._samples_to_chunks(
            samples,
            start_chunk=to_chunk,
            register=True,
            update_commit_diff=True,
            update_tensor_meta=False,
            start_chunk_row=to_chunk_row,
        )
        self.chunk_id_encoder.delete_chunk_id(row=from_chunk_row)
        try:
            del self.cache[from_chunk.key]  # type: ignore
        except KeyError:
            pass
        return True

    def _is_tiled(self, row: int) -> bool:
        """checkes whether the chunk is tiled or not

        Args:
            row (int): Represents the row of the chunk.

        Returns:
            bool: return true if the current chunk and previous/next row chunk have the same chunk index false otherwise.
        """

        arr = self.chunk_id_encoder.array
        if row >= 1 and len(arr) > 1:
            if arr[row][LAST_SEEN_INDEX_COLUMN] == arr[row - 1][LAST_SEEN_INDEX_COLUMN]:
                return True
        if len(arr) > row + 1:
            if arr[row][LAST_SEEN_INDEX_COLUMN] == arr[row + 1][LAST_SEEN_INDEX_COLUMN]:
                return True
        return False

    def _try_merge_with_next_chunk(self, chunk: BaseChunk, row: int) -> bool:
        next_chunk_id = self.chunk_id_encoder.get_next_chunk_id(row)
        if next_chunk_id is None:
            return False
        next_chunk_row = row + 1
        if self._is_tiled(next_chunk_row):
            return False

        next_chunk_name = ChunkIdEncoder.name_from_id(next_chunk_id)  # type: ignore
        next_chunk_commit_id = self.get_chunk_commit(next_chunk_name)
        chunk_key = get_chunk_key(self.key, next_chunk_name, next_chunk_commit_id)
        next_chunk_size = self.cache.get_object_size(chunk_key)
        next_chunk = self.get_chunk_from_chunk_id(int(next_chunk_id))
        if next_chunk_size + chunk.num_data_bytes < next_chunk.min_chunk_size:
            # merge with next chunk
            return self._merge_chunks(
                from_chunk=next_chunk,
                from_chunk_row=next_chunk_row,
                to_chunk=chunk,
                to_chunk_row=row,
            )
        return False

    def _try_merge_with_previous_chunk(self, chunk: BaseChunk, row: int) -> bool:
        prev_chunk_id = self.chunk_id_encoder.get_prev_chunk_id(row)
        if prev_chunk_id is None:
            return False

        prev_chunk_row = row - 1
        if self._is_tiled(prev_chunk_row):
            return False

        prev_chunk_name = ChunkIdEncoder.name_from_id(prev_chunk_id)  # type: ignore
        prev_chunk_commit_id = self.get_chunk_commit(prev_chunk_name)
        prev_chunk_key = get_chunk_key(self.key, prev_chunk_name, prev_chunk_commit_id)
        prev_chunk_size = self.cache.get_object_size(prev_chunk_key)
        prev_chunk = self.get_chunk_from_chunk_id(int(prev_chunk_id))
        if prev_chunk_size + chunk.num_data_bytes < prev_chunk.min_chunk_size:
            # merge with previous chunk
            return self._merge_chunks(
                from_chunk=chunk,
                from_chunk_row=row,
                to_chunk=prev_chunk,
                to_chunk_row=prev_chunk_row,
            )
        return False

    def _try_merge_with_neighbor_and_split(self, chunk: BaseChunk, row: int):
        if self._try_merge_with_previous_chunk(chunk, row) is False:
            self._try_merge_with_next_chunk(chunk, row)

    def _check_rechunk(self, chunk: BaseChunk, chunk_row: int):
        """function to check if there is a need to re-chunk the current one"""
        if (
            chunk.num_data_bytes < RANDOM_MINIMAL_CHUNK_SIZE
            and self.max_chunk_size > RANDOM_MINIMAL_CHUNK_SIZE
        ):
            self._try_merge_with_neighbor_and_split(chunk=chunk, row=chunk_row)

        elif (
            chunk.num_data_bytes > RANDOM_MAX_ALLOWED_CHUNK_SIZE
            or chunk.num_data_bytes > self.max_chunk_size + RANDOM_MINIMAL_CHUNK_SIZE
        ):
            self.__rechunk(chunk, chunk_row)

    def _update(
        self,
        index: Index,
        samples: Union[np.ndarray, Sequence[InputSample], InputSample],
        operator: Optional[str] = None,
        update_commit_diff: bool = True,
        link_callback: Optional[Callable] = None,
    ):
        """Update data at `index` with `samples`."""
        self._write_initialization()
        self.cached_data = None
        initial_autoflush = self.cache.autoflush
        self.cache.autoflush = False

        if operator is not None:
            return self._update_with_operator(index, samples, operator)

        enc = self.chunk_id_encoder
        index_length = index.length(self.num_samples)
        samples = make_sequence(samples, index_length)
        verified_samples = self.check_each_sample(samples)
        if self.tensor_meta.htype == "class_label":
            samples = self._convert_class_labels(samples)
        nbytes_after_updates = []
        global_sample_indices = tuple(index.values[0].indices(self.num_samples))
        is_sequence = self.is_sequence
        for i, sample in enumerate(samples):  # type: ignore
            sample = None if is_empty_list(sample) else sample
            global_sample_index = global_sample_indices[i]  # TODO!
            if self._is_tiled_sample(global_sample_index):
                self._update_tiled_sample(global_sample_index, index, sample)
            else:
                chunk = self.get_chunks_for_sample(global_sample_index, copy=True)[0]
                local_sample_index = enc.translate_index_relative_to_chunks(
                    global_sample_index
                )

                if len(index.values) <= 1 + int(self.is_sequence):
                    chunk.update_sample(local_sample_index, sample)
                else:
                    orig_sample = chunk.read_sample(local_sample_index, copy=True)
                    orig_sample[tuple(e.value for e in index.values[1:])] = sample
                    chunk.update_sample(local_sample_index, orig_sample)
                if (
                    self.active_updated_chunk is not None
                    and self.active_updated_chunk.key != chunk.key  # type: ignore
                ):
                    self.write_chunk_to_storage(self.active_updated_chunk)
                self.active_updated_chunk = chunk

                # only care about deltas if it isn't the last chunk
                if chunk.key != self.last_chunk_key:  # type: ignore
                    nbytes_after_updates.append(chunk.nbytes)
                self._check_rechunk(
                    chunk, chunk_row=enc.__getitem__(global_sample_index, True)[0][1]
                )
            self.update_creds(global_sample_index, sample)
            if update_commit_diff:
                self.commit_diff.update_data(global_sample_index)
            chunk_min, chunk_max = self.min_chunk_size, self.max_chunk_size
            check_suboptimal_chunks(nbytes_after_updates, chunk_min, chunk_max)

            if link_callback:
                new_sample = verified_samples[i] if verified_samples else sample
                link_callback(
                    global_sample_index,
                    sub_index=Index(index.values[1:]),
                    new_sample=new_sample,
                    flat=True if is_sequence else None,
                )
        self.cache.autoflush = initial_autoflush
        self.cache.maybe_flush()
        return verified_samples

    def _update_with_operator(
        self,
        index: Index,
        samples: Union[np.ndarray, Sequence[InputSample], InputSample],
        operator: str,
    ):
        """Update data at `index` with the output of elem-wise operatorion with samples"""
        try:
            if isinstance(samples, hub.core.tensor.Tensor):
                samples = samples.numpy()
            if len(index) > 1:
                index1 = Index(index.values[:1])
                index2 = Index(index.values[1:])
            else:
                index1 = index
                index2 = None
            arr = self._numpy(index1, use_data_cache=False)
            view = arr
            if index2:
                for v in index2.values:
                    view = view[v.value]  # type: ignore
        except DynamicTensorNumpyError:
            raise NotImplementedError(
                "Inplace update operations are not available for dynamic tensors yet."
            )
        tensor_meta = self.tensor_meta

        dt, ht = tensor_meta.dtype, tensor_meta.htype
        samples = intelligent_cast(samples, dt, ht)
        getattr(view, operator)(samples)
        self._update(index1, arr)

    def read_bytes_for_sample(self, global_sample_index: int) -> bytes:
        if self.tensor_meta.chunk_compression:
            raise Exception(
                "Cannot retreive original bytes for samples in chunk-wise compressed tensors."
            )
        enc = self.chunk_id_encoder
        chunks = self.get_chunks_for_sample(global_sample_index)
        if len(chunks) > 1:
            raise NotImplementedError(
                "read_bytes_for_sample() is not implemented for tiled samples."
            )
        chunk = chunks[0]
        buffer = chunk.memoryview_data
        if not buffer:
            return b""
        local_sample_index = enc.translate_index_relative_to_chunks(global_sample_index)
        sb, eb = chunk.byte_positions_encoder[local_sample_index]
        return buffer[sb:eb].tobytes()

    def read_shape_for_sample(
        self,
        global_sample_index: int,
    ) -> Tuple[int, ...]:
        enc = self.chunk_id_encoder
        if self._is_tiled_sample(global_sample_index):
            return self.tile_encoder.get_sample_shape(global_sample_index)
        local_sample_index = enc.translate_index_relative_to_chunks(global_sample_index)
        if self.is_video:
            chunk_id = enc[global_sample_index][0]
            chunk = self.get_video_chunk(chunk_id)[0]
        else:
            chunk_id, _, worst_case_header_size = self.get_chunk_info(
                global_sample_index, fetch_chunks=False
            )
            chunk = self.get_chunk_from_chunk_id(
                chunk_id, partial_chunk_bytes=worst_case_header_size
            )
        return tuple(map(int, chunk.shapes_encoder[local_sample_index]))

    @property
    def is_fixed_shape(self):
        tensor_meta = self.tensor_meta
        return tensor_meta.min_shape == tensor_meta.max_shape

    @property
    def num_samples_per_chunk(self):
        # should only be called if self.is_fixed_shape
        if self._num_samples_per_chunk is None:
            self._num_samples_per_chunk = (
                self.chunk_id_encoder.array[0, LAST_SEEN_INDEX_COLUMN] + 1
            )
        return self._num_samples_per_chunk

    def read_sample_from_chunk(
        self,
        global_sample_index: int,
        chunk: BaseChunk,
        cast: bool = True,
        copy: bool = False,
        decompress: bool = True,
    ) -> np.ndarray:
        enc = self.chunk_id_encoder
        if self.is_fixed_shape and self.tensor_meta.sample_compression is None:
            num_samples_per_chunk = self.num_samples_per_chunk
            local_sample_index = global_sample_index % num_samples_per_chunk
        else:
            local_sample_index = enc.translate_index_relative_to_chunks(
                global_sample_index
            )
        return chunk.read_sample(
            local_sample_index, cast=cast, copy=copy, decompress=decompress
        )

    def _get_full_chunk(self, index) -> bool:
        """Reads samples from chunks and returns as a boolean that says whether we need to fetch full chunks or only specified subset of it.
        Args:
            index (Index): Represents the samples to read from chunks. See `Index` for more information.
        Returns:
            bool: True/False, whether to fetch a full chunk or only a part of it.
        """
        threshold = 10

        if type(index.values[0].value) == slice:
            start = index.values[0].value.start or 0
            stop = index.values[0].value.stop or self.num_samples
            step = index.values[0].value.step or 1

            if start < 0:
                start = self.num_samples + start

            if stop < 0:
                stop = self.num_samples + start

            numpy_array_length = (stop - start) // step
            return numpy_array_length > threshold
        return False

    def numpy(
        self,
        index: Index,
        aslist: bool = False,
        use_data_cache: bool = True,
        fetch_chunks: bool = False,
        pad_tensor: bool = False,
    ) -> Union[np.ndarray, List[np.ndarray]]:
        """Reads samples from chunks and returns as a numpy array. If `aslist=True`, returns a sequence of numpy arrays.

        Args:
            index (Index): Represents the samples to read from chunks. See `Index` for more information.
            aslist (bool): If True, the samples will be returned as a list of numpy arrays. If False, returns a single numpy array. Defaults to False.
            use_data_cache (bool): If True, the data cache is used to speed up the read if possible. If False, the data cache is ignored. Defaults to True.
            fetch_chunks (bool): If True, full chunks will be retrieved from the storage, otherwise only required bytes will be retrieved.
                This will always be True even if specified as False in the following cases:
                - The tensor is ChunkCompressed
                - The chunk which is being accessed has more than 128 samples.
            pad_tensor (bool): If True, any index out of bounds will not throw an error, but instead will return an empty sample.

        Raises:
            DynamicTensorNumpyError: If shapes of the samples being read are not all the same.

        Returns:
            Union[np.ndarray, List[np.ndarray]]: Either a list of numpy arrays or a single numpy array (depending on the `aslist` argument).
        """
        self.check_link_ready()
        fetch_chunks = fetch_chunks or self._get_full_chunk(index)
        return (self._sequence_numpy if self.is_sequence else self._numpy)(
            index, aslist, use_data_cache, fetch_chunks, pad_tensor
        )

    def get_video_sample(self, global_sample_index, index, decompress=True):
        enc = self.chunk_id_encoder
        chunk_ids = enc[global_sample_index]
        local_sample_index = enc.translate_index_relative_to_chunks(global_sample_index)
        chunk, stream = self.get_video_chunk(chunk_ids[0])
        sub_index = index.values[1].value if len(index.values) > 1 else None  # type: ignore
        sample = chunk.read_sample(
            local_sample_index,
            sub_index=sub_index,
            stream=stream,
            decompress=decompress,
        )
        if decompress:
            return sample[tuple(entry.value for entry in index.values[2:])]
        return sample

    def get_chunk_info(self, global_sample_index, fetch_chunks):
        """Returns the chunk_id, row and worst case header size of chunk containing the given sample."""
        enc = self.chunk_id_encoder
        out = enc.__getitem__(global_sample_index, return_row_index=True)
        chunk_id, row = out[0][0], out[0][1]

        worst_case_header_size = 0
        num_samples_in_chunk = -1
        if (
            not fetch_chunks
            and isinstance(self.base_storage, (S3Provider, GCSProvider))
            and not isinstance(self.chunk_class, ChunkCompressedChunk)
        ):
            prev = enc.array[row - 1][LAST_SEEN_INDEX_COLUMN] if row > 0 else -1
            num_samples_in_chunk = enc.array[row][LAST_SEEN_INDEX_COLUMN] - prev
            worst_case_header_size += HEADER_SIZE_BYTES + 10  # 10 for version
            ENTRY_SIZE = 4
            if self.tensor_meta.max_shape == self.tensor_meta.min_shape:
                num_shape_entries = 1 * (len(self.tensor_meta.min_shape) + 1)
                if self.is_text_like:
                    num_bytes_entries = num_samples_in_chunk * 3
                elif self.tensor_meta.sample_compression is None:
                    num_bytes_entries = 1 * 3
                else:
                    num_bytes_entries = num_samples_in_chunk * 3
            else:
                num_shape_entries = num_samples_in_chunk * (
                    1 + len(self.tensor_meta.max_shape)
                )
                num_bytes_entries = num_samples_in_chunk * 3
            bytes_enc_size = num_bytes_entries * ENTRY_SIZE
            shape_enc_size = num_shape_entries * ENTRY_SIZE
            worst_case_header_size += shape_enc_size
            worst_case_header_size += bytes_enc_size

        return chunk_id, row, worst_case_header_size

    def get_basic_sample(self, global_sample_index, index, fetch_chunks=False):
        enc = self.chunk_id_encoder
        chunk_id, row, worst_case_header_size = self.get_chunk_info(
            global_sample_index, fetch_chunks
        )
        local_sample_index = enc.translate_index_relative_to_chunks(global_sample_index)
        chunk = self.get_chunk_from_chunk_id(
            chunk_id, partial_chunk_bytes=worst_case_header_size
        )
        return chunk.read_sample(
            local_sample_index, cast=self.tensor_meta.htype != "dicom"
        )[tuple(entry.value for entry in index.values[1:])]

    def get_non_tiled_sample(self, global_sample_index, index, fetch_chunks=False):
        if self.is_video:
            return self.get_video_sample(global_sample_index, index)
        return self.get_basic_sample(
            global_sample_index, index, fetch_chunks=fetch_chunks
        )

    def get_full_tiled_sample(self, global_sample_index):
        chunks = self.get_chunks_for_sample(global_sample_index)
        return combine_chunks(chunks, global_sample_index, self.tile_encoder)

    def get_partial_tiled_sample(self, global_sample_index, index):
        tile_enc = self.tile_encoder
        chunk_ids = self.chunk_id_encoder[global_sample_index]
        sample_shape = tile_enc.get_sample_shape(global_sample_index)
        tile_shape = tile_enc.get_tile_shape(global_sample_index)
        ordered_tile_ids = np.array(chunk_ids).reshape(
            tile_enc.get_tile_layout_shape(global_sample_index)
        )
        tiles_index, sample_index = translate_slices(
            [v.value for v in index.values[1:]], sample_shape, tile_shape  # type: ignore
        )
        required_tile_ids = ordered_tile_ids[tiles_index]
        tiles = np.vectorize(
            lambda chunk_id: self.get_chunk_from_chunk_id(chunk_id).read_sample(
                0, is_tile=True
            ),
            otypes=[object],
        )(required_tile_ids)
        sample = coalesce_tiles(tiles, tile_shape, None, self.tensor_meta.dtype)
        sample = sample[sample_index]
        return sample

    def get_single_sample(
        self, global_sample_index, index, fetch_chunks=False, pad_tensor=False
    ):
        if pad_tensor and global_sample_index >= self.tensor_meta.length:
            sample = self.get_empty_sample()
            try:
                return sample[tuple(entry.value for entry in index.values[1:])]
            except IndexError:
                return sample

        if not self._is_tiled_sample(global_sample_index):
            sample = self.get_non_tiled_sample(
                global_sample_index, index, fetch_chunks=fetch_chunks
            )
        elif len(index.values) == 1:
            sample = self.get_full_tiled_sample(global_sample_index)
        else:
            sample = self.get_partial_tiled_sample(global_sample_index, index)

        return sample

    def _numpy(
        self,
        index: Index,
        aslist: bool = False,
        use_data_cache: bool = True,
        fetch_chunks: bool = False,
        pad_tensor: bool = False,
    ) -> Union[np.ndarray, List[np.ndarray]]:
        """Reads samples from chunks and returns as a numpy array. If `aslist=True`, returns a sequence of numpy arrays.

        Args:
            index (Index): Represents the samples to read from chunks. See `Index` for more information.
            aslist (bool): If True, the samples will be returned as a list of numpy arrays. If False, returns a single numpy array. Defaults to False.
            use_data_cache (bool): If True, the data cache is used to speed up the read if possible. If False, the data cache is ignored. Defaults to True.
            fetch_chunks (bool): If True, full chunks will be retrieved from the storage, otherwise only required bytes will be retrieved.
                This will always be True even if specified as False in the following cases:
                - The tensor is ChunkCompressed
                - The chunk which is being accessed has more than 128 samples.
            pad_tensor (bool): If True, any index out of bounds will not throw an error, but instead will return an empty sample.

        Raises:
            DynamicTensorNumpyError: If shapes of the samples being read are not all the same.

        Returns:
            Union[np.ndarray, List[np.ndarray]]: Either a list of numpy arrays or a single numpy array (depending on the `aslist` argument).
        """
        length = self.num_samples
        last_shape = None
        if use_data_cache and self.is_data_cachable:
            samples = self.numpy_from_data_cache(index, length, aslist, pad_tensor)
        else:
            samples = []
            for global_sample_index in index.values[0].indices(length):
                sample = self.get_single_sample(
                    global_sample_index,
                    index,
                    fetch_chunks=fetch_chunks,
                    pad_tensor=pad_tensor,
                )
                samples.append(sample)
                check_sample_shape(sample.shape, last_shape, self.key, index, aslist)
                last_shape = sample.shape

        if aslist and all(map(np.isscalar, samples)):
            samples = list(arr.item() for arr in samples)

        if not index.values[0].subscriptable():
            samples = samples[0]

        if aslist:
            return samples
        return np.array(samples)

    def numpy_from_data_cache(self, index, length, aslist, pad_tensor=False):
        samples = []
        enc = self.chunk_id_encoder
        for global_sample_index in index.values[0].indices(length):
            if pad_tensor and global_sample_index >= self.tensor_meta.length:
                sample = self.get_empty_sample()
                try:
                    sample = sample[tuple(entry.value for entry in index.values[1:])]
                except IndexError:
                    pass
            else:
                if (
                    self.cached_data is None
                    or global_sample_index not in self.cache_range
                ):
                    row = enc.__getitem__(global_sample_index, True)[0][1]
                    chunks = self.get_chunks_for_sample(global_sample_index)
                    assert len(chunks) == 1

                    chunk_arr = self.chunk_id_encoder.array

                    chunk = chunks[0]
                    first_sample = 0 if row == 0 else chunk_arr[row - 1][1] + 1
                    last_sample = self.chunk_id_encoder.array[row][1]
                    num_samples = last_sample - first_sample + 1

                    full_shape = (num_samples,) + tuple(self.tensor_meta.max_shape)
                    dtype = self.tensor_meta.dtype

                    data_bytes = bytearray(chunk.data_bytes)
                    self.cached_data = np.frombuffer(data_bytes, dtype).reshape(
                        full_shape
                    )
                    self.cache_range = range(first_sample, last_sample + 1)

                sample = self.cached_data[global_sample_index - self.cache_range.start]  # type: ignore

                # need to copy if aslist otherwise user might modify the returned data
                # if not aslist, we already do np.array(samples) while formatting which copies
                sample = sample.copy() if aslist else sample
                sample = sample[tuple(entry.value for entry in index.values[1:])]
            samples.append(sample)
        return samples

    def get_chunks_for_sample(
        self,
        global_sample_index: int,
        copy: bool = False,
    ) -> List[BaseChunk]:
        """Retrives the `Chunk` object corresponding to `global_sample_index`.
        Args:
            global_sample_index (int): Index relative to the entire tensor representing the sample.
            copy (bool): If True and the chunk exists in a different commit to the current commit, it will be copied. Defaults to False.
        Returns:
            List[BaseChunk]: BaseChunk objects that contains `global_sample_index`.
        """
        return [
            self.get_chunk_from_chunk_id(chunk_id, copy)
            for chunk_id in self.chunk_id_encoder[global_sample_index]
        ]

    def validate_num_samples_is_synchronized(self):
        """Check if tensor meta length and chunk ID encoder are representing the same number of samples.
        Helpful for determining if a user has tampered with the tensor meta or the chunk ID encoder, or if
        the tensor was corruptd.

        Raises:
            CorruptedMetaError: tensor_meta and chunk_id_encoder must have the same num samples.
        """

        tensor_meta_length = self.tensor_meta.length

        # compare chunk ID encoder and tensor meta

        # update this if we change self.num_samples implementation later to use tensor meta length instead of chunk_id_encoder
        chunk_id_num_samples = self.num_samples

        if tensor_meta_length != chunk_id_num_samples:
            commit_id = self.commit_id
            tkey = get_tensor_meta_key(self.key, commit_id)
            ikey = get_chunk_id_encoder_key(self.key, commit_id)
            raise CorruptedMetaError(
                f"'{tkey}' and '{ikey}' have a record of different numbers of samples. Got {tensor_meta_length} and {chunk_id_num_samples} respectively."
            )

    def list_all_chunks(self) -> List[str]:
        """Return list of all chunks for current `version_state['commit_id']` and tensor"""
        commit_id = self.commit_id
        if commit_id == FIRST_COMMIT_ID:
            return [
                ChunkIdEncoder.name_from_id(chunk_id)
                for chunk_id in self.chunk_id_encoder.array[:, CHUNK_ID_COLUMN]
            ]  # type: ignore
        else:
            return list(self.commit_chunk_set.chunks)  # type: ignore

    def list_all_chunks_path(self) -> List[str]:
        """Return list of paths to all chunks"""
        commit_id = self.commit_id
        return [
            get_chunk_key(self.key, chunk, commit_id)
            for chunk in self.list_all_chunks()
        ]

    def list_orphaned_chunks(self, storage):
        """Return paths for orphaned chunks (chunks what are not linked to the `current_version`)"""

        commit_id = self.commit_id
        prefix: str = f"{self.key}/chunks/"

        if commit_id != FIRST_COMMIT_ID:
            prefix = f"versions/{commit_id}/{prefix}"

        all_chunks = [
            item.replace(prefix, "") for item in storage if item.startswith(prefix)
        ]
        linked_chunks = self.list_all_chunks()

        return [
            f"{prefix}{chunk}" for chunk in all_chunks if chunk not in linked_chunks
        ]

    def clear_unusd_chunks(self, storage: StorageProvider):
        # storage.delete_multiple(self.list_orphaned_chunks(storage))
        raise NotImplementedError(
            "requires StorageProvider to be able to list all chunks"
        )

    def pop(self, global_sample_index: int):
        self._write_initialization()
        if self.tensor_meta.length == 0:
            raise ValueError("There are no samples to pop")
        if global_sample_index < 0 or global_sample_index >= self.tensor_meta.length:
            raise IndexError(
                f"Index {global_sample_index} is out of range for tensor of length {self.tensor_meta.length}"
            )

        self.cached_data = None
        initial_autoflush = self.cache.autoflush
        self.cache.autoflush = False

        self.commit_diff.pop(global_sample_index)
        if self.is_sequence:
            # pop in reverse order else indices get shifted
            for idx in reversed(range(*self.sequence_encoder[global_sample_index])):
                self.pop_item(idx)
            self.sequence_encoder.pop(global_sample_index)
        else:
            self.pop_item(global_sample_index)

        self.cache.autoflush = initial_autoflush
        self.cache.maybe_flush()

    def pop_item(self, global_sample_index):
        enc = self.chunk_id_encoder
        if not self._is_tiled_sample(global_sample_index):
            local_sample_index = enc.translate_index_relative_to_chunks(
                global_sample_index
            )
        chunk_ids, rows, delete = enc.pop(global_sample_index)
        if len(chunk_ids) > 1:  # Tiled sample, delete all chunks
            del self.tile_encoder[global_sample_index]
        elif not delete:  # There are other samples in the last chunk
            chunk_to_update = self.get_chunk_from_chunk_id(chunk_ids[0], copy=True)
            chunk_to_update.pop(local_sample_index)

            self._check_rechunk(chunk_to_update, chunk_row=rows[0])

            if (
                self.active_updated_chunk is not None
                and self.active_updated_chunk.key != chunk_to_update.key  # type: ignore
            ):
                self.write_chunk_to_storage(self.active_updated_chunk)
            self.active_updated_chunk = chunk_to_update
        if delete:
            for chunk_key in map(self.get_chunk_key_for_id, chunk_ids):
                self.check_remove_active_chunks(chunk_key)
                try:
                    del self.cache[chunk_key]
                except KeyError:
                    pass

        self.tensor_meta.pop(global_sample_index)

    def write_chunk_to_storage(self, chunk):
        if chunk is None or not chunk.is_dirty:
            return
        storage = self.cache
        key = chunk.key
        storage[key] = chunk
        chunk.is_dirty = False

    @property
    def is_sequence(self):
        return self.tensor_meta.is_sequence

    @property
    def is_video(self):
        return (
            self.compression in VIDEO_COMPRESSIONS or self.tensor_meta.htype == "video"
        )

    @property
    def sequence_encoder_exists(self) -> bool:
        commit_id = self.commit_id
        if (
            self._sequence_encoder is not None
            and self._sequence_encoder_commit_id == commit_id
        ):
            return True
        try:
            key = get_sequence_encoder_key(self.key, commit_id)
            self.meta_cache[key]
            return True
        except KeyError:
            return False

    @property
    def _sequence_length(self):
        return self.sequence_encoder.num_samples

    @property
    def sequence_encoder(self) -> SequenceEncoder:
        """Gets the shape encoder from cache, if one is not found it creates a blank encoder.

        Raises:
            CorruptedMetaError: If shape encoding was corrupted.

        Returns:
            A SequenceEncoder instance storing the start and end indices of each sequence in the tensor.
        """

        if not self.is_sequence:
            return  # type: ignore
        commit_id = self.commit_id
        if (
            self._sequence_encoder is None
            or self._sequence_encoder_commit_id != commit_id
        ):
            commit_id = self.commit_id
            key = get_sequence_encoder_key(self.key, commit_id)
            if not self.sequence_encoder_exists:
                enc = SequenceEncoder()
                try:
                    self.meta_cache[key] = enc
                except ReadOnlyModeError:
                    pass
            else:
                enc = self.meta_cache.get_hub_object(key, SequenceEncoder)
            self._sequence_encoder = enc
            self._sequence_encoder_commit_id = commit_id
            self.meta_cache.register_hub_object(key, enc)
        return self._sequence_encoder

    def _sequence_numpy(
        self,
        index: Index,
        aslist: bool = False,
        use_data_cache: bool = True,
        fetch_chunks: bool = False,
        pad_tensor: bool = False,
    ):
        arr = self._numpy(
            self._get_flat_index_from_sequence_index(index),
            aslist=aslist,
            use_data_cache=use_data_cache,
            fetch_chunks=fetch_chunks,
            pad_tensor=pad_tensor,
        )
        if isinstance(arr, np.ndarray) and arr.size == 0:
            return self.get_empty_sample()
        if index.subscriptable_at(0) and index.subscriptable_at(1):
            if aslist:
                _item_length = self._sequence_item_length
                ret = []
                for i in index.values[0].indices(self._sequence_length):
                    item_length = _item_length or index.length_at(
                        1, -int(np.subtract(*self.sequence_encoder[i]))
                    )
                    ret.append(arr[:item_length])
                    arr = arr[item_length:]
                return ret
            else:
                try:
                    return arr.reshape(  # type: ignore
                        index.length_at(0, self._sequence_length), -1, *arr.shape[1:]  # type: ignore
                    )
                except ValueError as ve:
                    raise DynamicTensorNumpyError(self.key, index, "shape") from ve
        return arr

    def _translate_2d_index(
        self, x: Optional[IndexEntry] = None, y: Optional[IndexEntry] = None
    ) -> IndexEntry:
        x = x or IndexEntry()
        y = y or IndexEntry()
        _item_length = self._sequence_item_length
        if _item_length is None:

            def idx0_gen():
                for i in x.indices(self._sequence_length):
                    s, e = self.sequence_encoder[i]
                    for j in y.indices(e - s):
                        yield s + j

        else:

            def idx0_gen():
                for i in x.indices(self._sequence_length):
                    for j in y.indices(_item_length):
                        yield i * _item_length + j

        idx0_gen.__len__ = (  # type: ignore
            (
                lambda: sum(
                    [
                        y.length(-np.subtract(*self.sequence_encoder[i]))
                        for i in x.indices(self._sequence_length)
                    ]
                )
            )
            if _item_length is None
            else (lambda: x.length(self._sequence_length) * y.length(_item_length))  # type: ignore
        )
        return IndexEntry(idx0_gen)  # type: ignore

    def _get_flat_index_from_sequence_index(self, index: Index) -> Index:
        if len(index) == 1:
            index = Index([index.values[0], IndexEntry()])
        if index.values[0].is_trivial() and index.values[1].is_trivial():
            return Index([IndexEntry(), *index.values[2:]])
        if index.subscriptable_at(0) or index.subscriptable_at(1):
            idx0 = self._translate_2d_index(index.values[0], index.values[1])
            return Index([idx0, *index.values[2:]])  # type: ignore
        return Index(
            [
                IndexEntry(
                    self.sequence_encoder[index.values[0].value][0]  # type: ignore
                    + index.values[1].value
                ),
                *index.values[2:],
            ]
        )

    def _get_flat_samples_for_sequence_update(self, samples, index: Index):
        ndim = self.ndim(index)
        if isinstance(samples, np.ndarray):
            if index.subscriptable_at(0) and index.subscriptable_at(1):
                diff = ndim - samples.ndim
                if diff < 0:
                    samples, diff = samples.reshape(samples.shape[-ndim:]), 0
                if diff > 1:
                    return samples.reshape(1, *samples.shape).repeat(
                        self._translate_2d_index(*index.values[:2]).length(None), 0  # type: ignore
                    )
                elif diff == 1:
                    return (
                        samples.reshape(1, *samples.shape)
                        .repeat(index.length_at(0, self._sequence_length), 0)
                        .reshape(-1, *samples.shape[1:])
                    )
                else:
                    return samples.reshape(-1, *samples.shape[2:])
            return samples
        elif isinstance(samples, (str, bytes)):  # treated as scalars
            return samples
        elif isinstance(samples, Iterable):
            # Note: broadcasting is not supported here
            if index.subscriptable_at(0) and index.subscriptable_at(1):
                return list(chain(*samples))
            return samples
        else:
            return samples  # scalars

    def _sequence_update(
        self,
        index: Index,
        samples: Union[np.ndarray, Sequence[InputSample], InputSample],
        operator: Optional[str] = None,
        link_callback: Optional[Callable] = None,
    ):
        flat_idx = self._get_flat_index_from_sequence_index(index)
        flat_samples = self._get_flat_samples_for_sequence_update(samples, index)
        flat_verified_samples: List = self._update(
            flat_idx,
            flat_samples,
            operator,
            update_commit_diff=False,
            link_callback=link_callback,
        )
        i = 0
        verified_samples: Optional[List] = None
        if self.tensor_meta.htype == "class_label":
            samples = self._convert_class_labels(samples)
        if flat_verified_samples:
            verified_samples = []
            for sample in samples:  # type: ignore
                verified_sample = []
                for _ in sample:  # type: ignore
                    verified_sample.append(flat_verified_samples[i])
                    i += 1
                verified_samples.append(verified_sample)

        list(
            map(
                self.commit_diff.update_data,
                index.values[0].indices(self._sequence_length),
            )
        )
        if link_callback:
            ls = verified_samples or samples

            if isinstance(ls, np.ndarray):
                broadcast = ls.ndim < self.ndim(index)
            elif isinstance(ls, (bytes, str)):  # sacalars:
                broadcast = True
            elif isinstance(ls, Iterable):
                broadcast = False
            else:
                broadcast = True
            seq_len = self._sequence_length
            if broadcast:
                ls = repeat(ls)  # type: ignore
            for i, sample in zip(index.values[0].indices(seq_len), ls):  # type: ignore
                link_callback(
                    i, sub_index=Index(index.values[1:]), new_sample=sample, flat=False
                )

    @property
    def _sequence_item_length(self):
        enc = self.sequence_encoder
        nrows = len(enc._encoded)
        if nrows == 0:
            return 0
        if nrows == 1:
            s, e = enc[0]
            return e - s
        else:
            return None

    @property
    def _sequence_item_length_range(self):
        """Returns minimum and maximum length of items in a sequence"""
        enc = self.sequence_encoder
        nrows = len(enc._encoded)
        if nrows == 0:
            return 0, 0
        min_ = max_ = enc[0][1] - enc[0][0]
        for i in range(1, nrows):
            length = enc[i][1] - enc[i][0]
            if length < min_:
                min_ = length
            elif length > max_:
                max_ = length
        return min_, max_

    def check_link_ready(self):
        return

    def shape(
        self, index: Index, sample_shape_provider: Optional[Callable] = None
    ) -> Tuple[Optional[int], ...]:
        shape = self.shape_interval.astuple()
        idxs = index.values
        skip_dims = 0
        if None in shape or self.tensor_meta.is_link:
            if not idxs[0].subscriptable():
                if self.tensor_meta.htype in ("text", "json"):
                    shape = (1,)
                else:
                    if sample_shape_provider:
                        try:
                            shape = sample_shape_provider(idxs[0].value)  # type: ignore
                            if self.is_sequence:
                                if len(idxs) > 1 and not idxs[1].subscriptable():
                                    shape = tuple(shape[idxs[1].value].tolist())  # type: ignore
                                    skip_dims += 1
                                else:
                                    shape = (len(shape),) + (
                                        tuple(
                                            int(shape[0, i])  # type: ignore
                                            if np.all(shape[:, i] == shape[0, i])  # type: ignore
                                            else None
                                            for i in range(shape.shape[1])  # type: ignore
                                        )
                                        or (1,)
                                    )

                        except IndexError:  # Happens during transforms, sample shape tensor is not populated yet
                            shape = self.read_shape_for_sample(idxs[0].value)  # type: ignore
                    else:
                        self.check_link_ready()
                        shape = self.read_shape_for_sample(idxs[0].value)  # type: ignore
                skip_dims += 1
        elif not idxs[0].subscriptable():
            shape = shape[1:]
            skip_dims += 1
        shape = list(shape)  # type: ignore
        squeeze_dims = set()
        for i, idx in enumerate(idxs[skip_dims:]):
            if idx.subscriptable():
                shape[i] = idx.length(shape[i])  # type: ignore
            else:
                squeeze_dims.add(i)
        return tuple(shape[i] for i in range(len(shape)) if i not in squeeze_dims)

    def ndim(self, index: Optional[Index] = None) -> int:
        ndim = len(self.tensor_meta.min_shape) + 1
        if self.is_sequence:
            ndim += 1
        if index:
            for idx in index.values:
                if not idx.subscriptable():
                    ndim -= 1
        return ndim

    @property
    def shape_interval(self) -> ShapeInterval:
        """Returns a `ShapeInterval` object that describes this tensor's shape more accurately. Length is included.

        Note:
            If you are expecting a `tuple`, use `tensor.shape` instead.

        Example:
            >>> tensor.append(np.zeros((10, 10)))
            >>> tensor.append(np.zeros((10, 15)))
            >>> tensor.shape_interval
            ShapeInterval(lower=(2, 10, 10), upper=(2, 10, 15))
            >>> str(tensor.shape_interval)
            (2, 10, 10:15)

        Returns:
            ShapeInterval: Object containing `lower` and `upper` properties.
        """
        meta = self.tensor_meta
        if self.is_sequence:
            seq_length = self._sequence_length
            min_item_length, max_item_length = self._sequence_item_length_range
            min_length = [seq_length, min_item_length]
            max_length = [seq_length, max_item_length]
        else:
            min_length = max_length = [meta.length]
        min_shape = min_length + list(meta.min_shape)
        max_shape = max_length + list(meta.max_shape)

        return ShapeInterval(min_shape, max_shape)

    def _transform_callback(self, sample, flat: Optional[bool]):
        """Used in transforms to handle linked tensors."""
        assert self._all_chunk_engines is not None
        for k, v in self.tensor_meta.links.items():
            if flat is None or v["flatten_sequence"] == flat:
                self._all_chunk_engines[k].extend(
                    [get_link_transform(v["append"])(sample)]
                )

    def get_empty_sample(self):
        if self.num_samples == 0:
            raise ValueError("This tensor has no samples, cannot get empty sample.")
        htype = self.tensor_meta.htype
        dtype = self.tensor_meta.dtype
        if htype in ("text", "json", "list"):
            return get_empty_text_like_sample(htype)
        ndim = len(self.tensor_meta.max_shape)
        if self.is_sequence:
            ndim += 1
        shape = (0,) * ndim
        return np.ones(shape, dtype=dtype)

    @property
    def is_text_like(self):
        return (
            self.tensor_meta.htype in {"text", "json", "list"}
            or self.tensor_meta.is_link
        )

    def check_remove_active_chunks(self, chunk_key):
        if (
            self.active_appended_chunk is not None
            and self.active_appended_chunk.key == chunk_key
        ):
            self.active_appended_chunk = None
        if (
            self.active_updated_chunk is not None
            and self.active_updated_chunk.key == chunk_key
        ):
            self.active_updated_chunk = None

Classes

class ChunkEngine (key, cache, version_state, meta_cache=None)

Handles creating Chunks and filling them with incoming samples.

Data delegation: All samples must live inside a chunk. No chunks may contain partial samples, only 1 chunk per sample. A chunk holds the dynamic information for the samples they contain (like shape and byte ranges). For more information on the Chunk format, check out the Chunk class.

Chunkidencoder

The ChunkIdEncoder bidirectionally maps samples to the chunk IDs they live in. For more information, see ChunkIdEncoder's docstring.

Example

Given: Sample sizes: [1 * MB, 1 * MB, 14 * MB, 15 * MB, 15 * MB] Min chunk size: 16 * MB Max chunk size: 32 * MB

Basic logic: >>> chunks = [] >>> chunks.append(sum([1 * MB, 1 * MB, 14 * MB, 15 * MB])) # i=(0, 1, 2, 3) >>> chunks[-1] 31 * MB >>> chunks.append(sum([15 * MB])) # i=(4,) >>> chunks[-1] 15 * MB

Samples 0, 1, 2, and 3 can be stored in 1 chunk. sample 4 resides in it's own chunk.

If more samples come later: sizes = [15 * MB, 1 * MB]

Basic logic: >>> len(chunks) 2 >>> chunks[-1] 15 * MB >>> chunks[-1] += sum([15 * MB, 1 * MB]) # i=(5, 6) >>> chunks[-1] 31 * MB >>> sum(chunks) 62 * MB >>> len(chunks) 2

Because our max chunk size is 32 * MB, we try to fit as much data into this size as possible.

Args

key : str
Tensor key.
cache : LRUCache
Cache for which chunks and the metadata are stored.
version_state : Dict[str, Any]
The version state of the dataset, includes commit_id, commit_node, branch, branch_commit_map and commit_node_map.
meta_cache : LRUCache
Cache used for storing non chunk data such as tensor meta and chunk id encoder during transforms in memory.

Raises

ValueError
If invalid max chunk size.
Expand source code
class ChunkEngine:
    def __init__(
        self,
        key: str,
        cache: LRUCache,
        version_state: Dict[str, Any],
        meta_cache: LRUCache = None,
    ):
        """Handles creating `Chunk`s and filling them with incoming samples.

        Data delegation:
            All samples must live inside a chunk. No chunks may contain partial samples, only 1 chunk per sample.
            A chunk holds the dynamic information for the samples they contain (like shape and byte ranges).
            For more information on the `Chunk` format, check out the `Chunk` class.

        ChunkIdEncoder:
            The `ChunkIdEncoder` bidirectionally maps samples to the chunk IDs they live in. For more information,
            see `ChunkIdEncoder`'s docstring.

        Example:
            Given:
                Sample sizes: [1 * MB, 1 * MB, 14 * MB, 15 * MB, 15 * MB]
                Min chunk size: 16 * MB
                Max chunk size: 32 * MB


            Basic logic:
                >>> chunks = []
                >>> chunks.append(sum([1 * MB, 1 * MB, 14 * MB, 15 * MB]))  # i=(0, 1, 2, 3)
                >>> chunks[-1]
                31 * MB
                >>> chunks.append(sum([15 * MB]))  # i=(4,)
                >>> chunks[-1]
                15 * MB

            Samples 0, 1, 2, and 3 can be stored in 1 chunk. sample 4 resides in it's own chunk.

            If more samples come later: sizes = [15 * MB, 1 * MB]

            Basic logic:
                >>> len(chunks)
                2
                >>> chunks[-1]
                15 * MB
                >>> chunks[-1] += sum([15 * MB, 1 * MB])  # i=(5, 6)
                >>> chunks[-1]
                31 * MB
                >>> sum(chunks)
                62 * MB
                >>> len(chunks)
                2

            Because our max chunk size is 32 * MB, we try to fit as much data into this size as possible.


        Args:
            key (str): Tensor key.
            cache (LRUCache): Cache for which chunks and the metadata are stored.
            version_state (Dict[str, Any]): The version state of the dataset, includes commit_id, commit_node, branch, branch_commit_map and commit_node_map.
            meta_cache (LRUCache): Cache used for storing non chunk data such as tensor meta and chunk id encoder during transforms in memory.

        Raises:
            ValueError: If invalid max chunk size.
        """

        self.key = key
        self.cache = cache
        self.base_storage = get_base_storage(cache)
        self._meta_cache = meta_cache
        self.version_state = version_state
        self.compression = None
        self.chunk_class = BaseChunk

        self._tensor_meta: Optional[TensorMeta] = None
        self._tensor_meta_commit_id: Optional[str] = None

        self._chunk_id_encoder: Optional[ChunkIdEncoder] = None
        self._chunk_id_encoder_commit_id: Optional[str] = None

        self._sequence_encoder: Optional[SequenceEncoder] = None
        self._sequence_encoder_commit_id: Optional[str] = None

        self._tile_encoder: Optional[TileEncoder] = None
        self._tile_encoder_commit_id: Optional[str] = None

        self._commit_chunk_set: Optional[CommitChunkSet] = None
        self._commit_chunk_set_commit_id: Optional[str] = None

        self._commit_diff: Optional[CommitDiff] = None
        self._commit_diff_commit_id: Optional[str] = None

        self._active_appended_chunk: Optional[BaseChunk] = None
        self._active_updated_chunk: Optional[BaseChunk] = None

        self._info: Optional[Info] = None
        self._info_commit_id: Optional[str] = None

        self._all_chunk_engines: Optional[Dict[str, ChunkEngine]] = None
        self._is_temp_label_tensor: bool = False
        self._hash_label_map: Dict[int, str] = OrderedDict()

        tensor_meta = self.tensor_meta

        if tensor_meta.sample_compression:
            self.compression = tensor_meta.sample_compression
            self.chunk_class = SampleCompressedChunk
        elif tensor_meta.chunk_compression:
            self.compression = tensor_meta.chunk_compression
            self.chunk_class = ChunkCompressedChunk
        else:
            self.chunk_class = UncompressedChunk

        self.cached_data: Optional[np.ndarray] = None
        self.cache_range: range = range(0)

        self._chunk_args = None
        self._num_samples_per_chunk: Optional[int] = None

    @property
    def is_data_cachable(self):
        tensor_meta = self.tensor_meta
        return (
            self.chunk_class == UncompressedChunk
            and tensor_meta.htype not in ["text", "json", "list"]
            and tensor_meta.max_shape
            and (tensor_meta.max_shape == tensor_meta.min_shape)
            and (np.prod(tensor_meta.max_shape) < 20)
        )

    @property
    def commit_id(self):
        return self.version_state["commit_id"]

    @property
    def max_chunk_size(self):
        # no chunks may exceed this
        return (
            getattr(self.tensor_meta, "max_chunk_size", None) or DEFAULT_MAX_CHUNK_SIZE
        )

    @property
    def tiling_threshold(self):
        return (
            getattr(self.tensor_meta, "tiling_threshold", None)
            or DEFAULT_TILING_THRESHOLD
            or self.min_chunk_size
        )

    @property
    def chunk_args(self):
        if self._chunk_args is None:
            self._chunk_args = [
                self.min_chunk_size,
                self.max_chunk_size,
                self.tiling_threshold,
                self.tensor_meta,
                self.compression,
            ]
        return self._chunk_args

    @property
    def min_chunk_size(self):
        # only the last chunk may be less than this
        return self.max_chunk_size // 2

    @property
    def tensor_meta(self):
        commit_id = self.commit_id
        if self._tensor_meta is None or self._tensor_meta_commit_id != commit_id:
            key = get_tensor_meta_key(self.key, commit_id)
            self._tensor_meta = self.meta_cache.get_hub_object(key, TensorMeta)
            self._tensor_meta_commit_id = commit_id
            self.meta_cache.register_hub_object(key, self._tensor_meta)
        return self._tensor_meta

    @property
    def meta_cache(self) -> LRUCache:
        return self._meta_cache or self.cache

    @property
    def chunk_id_encoder(self) -> ChunkIdEncoder:
        """Gets the chunk id encoder from cache, if one is not found it creates a blank encoder.
        For more information on what `ChunkIdEncoder` is used for, see the `__init__` docstring.

        Raises:
            CorruptedMetaError: If chunk id encoding was corrupted.

        Returns:
            ChunkIdEncoder: The chunk ID encoder handles the mapping between sample indices
                and their corresponding chunks.
        """
        commit_id = self.commit_id
        if (
            self._chunk_id_encoder is None
            or self._chunk_id_encoder_commit_id != commit_id
        ):
            commit_id = self.commit_id
            key = get_chunk_id_encoder_key(self.key, commit_id)
            if not self.chunk_id_encoder_exists:
                enc = ChunkIdEncoder()
                try:
                    self.meta_cache[key] = enc
                except ReadOnlyModeError:
                    pass
            else:
                enc = self.meta_cache.get_hub_object(key, ChunkIdEncoder)
            self._chunk_id_encoder = enc
            self._chunk_id_encoder_commit_id = commit_id
            self.meta_cache.register_hub_object(key, enc)
        return self._chunk_id_encoder

    @property
    def commit_chunk_set(self) -> Optional[CommitChunkSet]:
        """Gets the commit chunk set from cache, if one is not found it creates a blank one.

        Returns:
            Optional[CommitChunkSet]: The commit chunk set keeps track of all the chunks present in the current commit, returns None for the first commit.
        """
        commit_id = self.commit_id
        if commit_id == FIRST_COMMIT_ID:
            # the first commit doesn't need a commit chunk set
            return None
        if (
            self._commit_chunk_set is None
            or self._commit_chunk_set_commit_id != commit_id
        ):
            key = get_tensor_commit_chunk_set_key(self.key, commit_id)
            if not self.commit_chunk_set_exists:
                cset = CommitChunkSet()
                try:
                    self.meta_cache[key] = cset
                except ReadOnlyModeError:
                    pass
            else:
                cset = self.meta_cache.get_hub_object(key, CommitChunkSet)
            self._commit_chunk_set = cset
            self._commit_chunk_set_commit_id = commit_id
            self.meta_cache.register_hub_object(key, cset)
        return self._commit_chunk_set

    @property
    def commit_chunk_set_exists(self) -> bool:
        """Checks if the commit chunk set exists for the given tensor in the current commit."""
        commit_id = self.commit_id
        if (
            self._commit_chunk_set is not None
            and self._commit_chunk_set_commit_id == commit_id
        ):
            return True

        try:
            key = get_tensor_commit_chunk_set_key(self.key, commit_id)
            self.meta_cache[key]
            return True
        except KeyError:
            return False

    @property
    def commit_diff(self) -> CommitDiff:
        """Gets the commit diff from cache, if one is not found it creates a blank one.

        Returns:
            CommitDiff: The commit diff keeps track of all the changes in the current commit.
        """
        commit_id = self.commit_id
        if self._commit_diff is None or self._commit_diff_commit_id != commit_id:
            key = get_tensor_commit_diff_key(self.key, commit_id)
            if not self.commit_diff_exists:
                diff = CommitDiff(self.num_samples)
                try:
                    self.meta_cache[key] = diff
                except ReadOnlyModeError:
                    pass
            else:
                diff = self.meta_cache.get_hub_object(key, CommitDiff)
            self._commit_diff = diff
            self._commit_diff_commit_id = commit_id
            self.meta_cache.register_hub_object(key, diff)
        return self._commit_diff

    @property
    def commit_diff_exists(self) -> bool:
        commit_id = self.commit_id
        if self._commit_diff is not None and self._commit_diff_commit_id == commit_id:
            return True
        try:
            key = get_tensor_commit_diff_key(self.key, commit_id)
            self.meta_cache[key]
            return True
        except KeyError:
            return False

    @property
    def chunk_id_encoder_exists(self) -> bool:
        commit_id = self.commit_id
        if (
            self._chunk_id_encoder is not None
            and self._chunk_id_encoder_commit_id == commit_id
        ):
            return True
        try:
            key = get_chunk_id_encoder_key(self.key, commit_id)
            self.meta_cache[key]
            return True
        except KeyError:
            return False

    def _is_tiled_sample(self, global_sample_index):
        return global_sample_index in self.tile_encoder

    @property
    def tile_encoder(self) -> TileEncoder:
        """Gets the tile encoder from cache, if one is not found it creates a blank encoder."""
        commit_id = self.commit_id
        if self._tile_encoder is None or self._tile_encoder_commit_id != commit_id:
            key = get_tensor_tile_encoder_key(self.key, commit_id)
            if not self.tile_encoder_exists:
                enc = TileEncoder()
                try:
                    self.meta_cache[key] = enc
                except ReadOnlyModeError:
                    pass
            else:
                enc = self.meta_cache.get_hub_object(key, TileEncoder)
            self._tile_encoder = enc
            self._tile_encoder_commit_id = commit_id
            self.meta_cache.register_hub_object(key, enc)
        return self._tile_encoder

    @property
    def tile_encoder_exists(self) -> bool:
        commit_id = self.commit_id
        if self._tile_encoder is not None and self._tile_encoder_commit_id == commit_id:
            return True

        try:
            key = get_tensor_tile_encoder_key(self.key, commit_id)
            self.meta_cache[key]
            return True
        except KeyError:
            return False

    @property
    def creds_encoder(self):
        return None

    @property
    def num_chunks(self) -> int:
        if not self.chunk_id_encoder_exists:
            return 0
        return self.chunk_id_encoder.num_chunks

    @property
    def num_samples(self) -> int:
        """Returns the length of the primary axis of the tensor.
        Ignores any applied indexing and returns the total length.
        """
        return self.tensor_meta.length

    @property
    def last_chunk_key(self) -> str:
        last_chunk_name = self.last_appended_chunk_name
        commit_id = self.get_chunk_commit(last_chunk_name)
        return get_chunk_key(self.key, last_chunk_name, commit_id)

    def get_chunk_key_for_id(self, chunk_id) -> str:
        chunk_name = ChunkIdEncoder.name_from_id(chunk_id)
        commit_id = self.get_chunk_commit(chunk_name)
        return get_chunk_key(self.key, chunk_name, commit_id)

    @property
    def active_appended_chunk(self):
        return self._active_appended_chunk

    @active_appended_chunk.setter
    def active_appended_chunk(self, value):
        if self.active_appended_chunk is not None:
            self.cache.remove_hub_object(self.active_appended_chunk.key)
        self._active_appended_chunk = value
        if value is not None:
            self.cache.register_hub_object(value.key, value)

    @property
    def active_updated_chunk(self):
        return self._active_updated_chunk

    @active_updated_chunk.setter
    def active_updated_chunk(self, value):
        if self.active_updated_chunk is not None:
            self.cache.remove_hub_object(self.active_updated_chunk.key)
        self._active_updated_chunk = value
        if value is not None:
            self.cache.register_hub_object(value.key, value)

    @property
    def last_appended_chunk_name(self) -> str:
        return self.chunk_id_encoder.get_name_for_chunk(-1)

    @property
    def last_appended_chunk_id(self) -> str:
        return self.chunk_id_encoder.get_id_for_chunk(-1)

    def last_appended_chunk(self) -> Optional[BaseChunk]:
        last_index = self.num_samples - 1
        if self.num_chunks == 0 or last_index in self.tile_encoder:
            return None
        chunk_name = self.last_appended_chunk_name
        chunk_commit_id = self.get_chunk_commit(chunk_name)
        chunk_key = get_chunk_key(self.key, chunk_name, chunk_commit_id)
        chunk = self.get_chunk(chunk_key)
        chunk.key = chunk_key  # type: ignore
        chunk.id = self.last_appended_chunk_id  # type: ignore
        if chunk_commit_id != self.commit_id:
            chunk = self.copy_chunk_to_new_commit(chunk, chunk_name)
        if (
            self.active_appended_chunk is not None
            and self.active_appended_chunk.key != chunk_key
        ):
            self.write_chunk_to_storage(self.active_appended_chunk)
        self.active_appended_chunk = chunk
        return chunk

    def get_chunk(self, chunk_key: str, partial_chunk_bytes=0) -> BaseChunk:
        return self.cache.get_hub_object(
            chunk_key,
            self.chunk_class,
            self.chunk_args,
            partial_bytes=partial_chunk_bytes,
        )

    def get_chunk_from_chunk_id(
        self, chunk_id, copy: bool = False, partial_chunk_bytes=0
    ) -> BaseChunk:
        chunk_name = ChunkIdEncoder.name_from_id(chunk_id)
        chunk_commit_id = self.get_chunk_commit(chunk_name)
        chunk_key = get_chunk_key(self.key, chunk_name, chunk_commit_id)
        chunk = self.get_chunk(chunk_key, partial_chunk_bytes=partial_chunk_bytes)
        chunk.key = chunk_key  # type: ignore
        chunk.id = chunk_id  # type: ignore
        if copy and chunk_commit_id != self.commit_id:
            chunk = self.copy_chunk_to_new_commit(chunk, chunk_name)
        return chunk

    def get_video_chunk(self, chunk_id, copy: bool = False):
        """Returns video chunks. Chunk will contain presigned url to the video instead of data if the chunk is large."""
        chunk_name = ChunkIdEncoder.name_from_id(chunk_id)
        chunk_commit_id = self.get_chunk_commit(chunk_name)
        chunk_key = get_chunk_key(self.key, chunk_name, chunk_commit_id)

        base_storage = self.base_storage
        stream = False
        if isinstance(base_storage, (S3Provider, GCSProvider)):
            chunk_size = base_storage.get_object_size(chunk_key)
            stream = chunk_size > self.min_chunk_size
            if stream:
                chunk = self.cache.get_hub_object(
                    chunk_key, self.chunk_class, meta=self.chunk_args, url=True
                )
        if not stream:
            chunk = self.cache.get_hub_object(
                chunk_key, self.chunk_class, meta=self.chunk_args
            )
        chunk.key = chunk_key  # type: ignore
        chunk.id = chunk_id  # type: ignore
        if copy and chunk_commit_id != self.commit_id:
            chunk = self.copy_chunk_to_new_commit(chunk, chunk_name)
        return chunk, stream

    def copy_chunk_to_new_commit(self, chunk, chunk_name):
        """Copies the chunk to the current commit.

        Returns the copied chunk.
        """
        new_chunk_key = get_chunk_key(self.key, chunk_name, self.commit_id)
        chunk_id = chunk.id
        chunk = chunk.copy(self.chunk_args)
        chunk.key = new_chunk_key
        chunk.id = chunk_id
        if self.commit_chunk_set is not None:
            self.commit_chunk_set.add(chunk_name)
        return chunk

    def get_chunk_commit(self, chunk_name) -> str:
        """Returns the commit id that contains the chunk_name."""
        cur_node: Optional[CommitNode] = self.version_state["commit_node"]
        while cur_node is not None:
            commit_id = cur_node.commit_id
            chunk_set_key = get_tensor_commit_chunk_set_key(self.key, commit_id)
            try:
                # the first commit doesn't contain a chunk set, don't repeatedly try to fetch from storage
                if commit_id == FIRST_COMMIT_ID:
                    chunk_set = set()
                else:
                    chunk_set = self.meta_cache.get_hub_object(
                        chunk_set_key, CommitChunkSet
                    ).chunks
            except Exception:
                commit_chunk_set = CommitChunkSet()
                try:
                    self.meta_cache[chunk_set_key] = commit_chunk_set
                except ReadOnlyModeError:
                    # put CommitChunkSet in hub_objects to keep in cache temporarily, but won't write to storage
                    # this shouldn't happen in latest version of hub, chunk set would always be present
                    self.meta_cache.hub_objects[chunk_set_key] = commit_chunk_set
                chunk_set = set()
            if chunk_name in chunk_set:
                return commit_id
            cur_node = cur_node.parent  # type: ignore
        # the first commit doesn't have a commit chunk set, so any chunk that wasn't found belongs to the first commit
        return FIRST_COMMIT_ID

    def _write_initialization(self):
        ffw_chunk_id_encoder(self.chunk_id_encoder)

    def _convert_to_list(self, samples):
        if self.chunk_class != UncompressedChunk:
            return True
        elif isinstance(samples, np.ndarray):
            return samples[0].nbytes >= self.min_chunk_size
        return True

    def check_each_sample(self, samples):
        return

    def _sanitize_samples(self, samples):
        check_samples_type(samples)
        samples = [None if is_empty_list(sample) else sample for sample in samples]
        verified_samples = self.check_each_sample(samples)
        tensor_meta = self.tensor_meta
        all_empty = all(sample is None for sample in samples)
        if tensor_meta.htype is None and not all_empty:
            tensor_meta.set_htype(get_htype(samples))
        if tensor_meta.dtype is None and not all_empty:
            tensor_meta.set_dtype(get_dtype(samples))
        if self._convert_to_list(samples):
            samples = list(samples)
        if self._is_temp_label_tensor:
            samples = verified_samples = convert_to_hash(samples, self._hash_label_map)
        elif tensor_meta.htype in ("image.gray", "image.rgb"):
            mode = "L" if tensor_meta.htype == "image.gray" else "RGB"
            converted = []
            for sample in samples:
                if isinstance(sample, Sample):
                    converted.append(convert_sample(sample, mode))
                elif isinstance(sample, np.ndarray):
                    converted.append(convert_img_arr(sample, mode))
                else:
                    raise SampleHtypeMismatchError(tensor_meta.htype, type(sample))
            samples = verified_samples = converted
        elif tensor_meta.htype == "class_label":
            samples = verified_samples = self._convert_class_labels(samples)
        return samples, verified_samples

    def _convert_class_labels(self, samples):
        tensor_info = self.cache.get_hub_object(
            get_tensor_info_key(self.key, self.commit_id), Info
        )
        tensor_name = self.tensor_meta.name or self.key
        class_names = tensor_info.class_names
        labels, additions = convert_to_idx(samples, class_names)
        if additions:
            for new in additions:
                class_names.append(new[0])
                logger.info(
                    f"'{new[0]}' added to {tensor_name}.info.class_names at index {new[1]}"
                )
            tensor_info.is_dirty = True
        return labels

    def _samples_to_chunks(
        self,
        samples,
        start_chunk: Optional[BaseChunk] = None,
        register: bool = True,
        update_commit_diff: bool = False,
        update_tensor_meta: bool = True,
        start_chunk_row: Optional[int] = None,
        progressbar: bool = False,
    ):
        """Add samples to chunks, in case if there is a space on the start_chunk,
        othewise creating new chunk and append samples to newly created chunk

        Args:
            samples (List[Any]): Paramter that shows the list of samples to be added to the chunk
            start_chunk (BaseChunk, Optional): Parameter that points to the chunk on which the samples should be added
            register (bool): Parameter that shows if we need to register the chunk
            update_commit_diff (bool): Parameter that shows if we need to update the commit diffs
            update_tensor_meta (bool): Parameter that shows if it is needed to update tensor metas, this will be false in case of rechunking at the meta will not be changed
            start_chunk_row (int, Optional): Parameter that shows the chunk row that needs to be updated, those params are needed only in rechunking phase.
            progressbar (bool): Parameter that shows if need to show sample insertion progress

        Returns:
            Tuple[List[BaseChunk], Dict[Any, Any]]
        """
        current_chunk = start_chunk

        updated_chunks = []
        if current_chunk is None:
            current_chunk = self._create_new_chunk(register)
            updated_chunks.append(current_chunk)
        enc = self.chunk_id_encoder
        tiles = {}
        nsamples = len(samples)
        if register and update_commit_diff:
            commit_diff = self.commit_diff
        if progressbar:
            pbar = tqdm(total=len(samples))
        while len(samples) > 0:
            num_samples_added = current_chunk.extend_if_has_space(
                samples, update_tensor_meta=update_tensor_meta
            )  # type: ignore
            self.register_new_creds(num_samples_added, samples)
            if num_samples_added == 0:
                current_chunk = self._create_new_chunk(register, row=start_chunk_row)
                if start_chunk_row is not None:
                    start_chunk_row += 1
                updated_chunks.append(current_chunk)
            elif num_samples_added == PARTIAL_NUM_SAMPLES:
                sample = samples[0]
                if register and sample.is_first_write:
                    enc.register_samples(1)
                if sample.is_last_write:
                    if register:
                        self.tile_encoder.register_sample(sample, self.num_samples - 1)
                        if update_commit_diff:
                            commit_diff.add_data(1)
                    else:
                        tiles[nsamples - len(samples)] = (
                            sample.sample_shape,
                            sample.tile_shape,
                        )
                    samples = samples[1:]
                if len(samples) > 0:
                    current_chunk = self._create_new_chunk(
                        register, row=start_chunk_row
                    )
                    if start_chunk_row is not None:
                        start_chunk_row += 1
                    updated_chunks.append(current_chunk)
            else:
                if not updated_chunks:
                    updated_chunks.append(current_chunk)
                num = int(num_samples_added)
                if register:
                    enc.register_samples(num, row=start_chunk_row)
                    if update_commit_diff:
                        commit_diff.add_data(num)
                samples = samples[num:]
            if progressbar:
                pbar.update(num_samples_added)
        if progressbar:
            pbar.close()
        if register:
            return updated_chunks
        return updated_chunks, tiles

    def register_new_creds(self, num_samples_added, samples):
        return

    def update_creds(self, sample_index, sample):
        return

    def _extend(self, samples, progressbar, update_commit_diff=True):
        if isinstance(samples, hub.Tensor):
            samples = tqdm(samples) if progressbar else samples
            for sample in samples:
                self._extend(
                    [sample],
                    update_commit_diff=update_commit_diff,
                    progressbar=False,
                )  # TODO optimize this
            return
        if len(samples) == 0:
            return
        samples, verified_samples = self._sanitize_samples(samples)
        self._samples_to_chunks(
            samples,
            start_chunk=self.last_appended_chunk(),
            register=True,
            progressbar=progressbar,
            update_commit_diff=update_commit_diff,
        )
        return verified_samples

    def extend(
        self,
        samples,
        progressbar: bool = False,
        link_callback: Optional[Callable] = None,
    ):
        self.check_link_ready()
        self._write_initialization()
        initial_autoflush = self.cache.autoflush
        self.cache.autoflush = False

        if self.is_sequence:
            samples = tqdm(samples) if progressbar else samples
            for sample in samples:
                verified_sample = self._extend(
                    sample, progressbar=False, update_commit_diff=False
                )
                self.sequence_encoder.register_samples(len(sample), 1)
                self.commit_diff.add_data(1)
                ls = verified_sample or sample
                if link_callback:
                    link_callback(ls, flat=False)
                    for s in ls:
                        s = None if is_empty_list(s) else s
                        link_callback(s, flat=True)

        else:
            verified_samples = self._extend(samples, progressbar)
            ls = verified_samples or samples
            if link_callback:
                for sample in ls:
                    sample = None if is_empty_list(sample) else sample
                    link_callback(sample, flat=None)

        self.cache.autoflush = initial_autoflush
        self.cache.maybe_flush()

    def _create_new_chunk(self, register=True, row: Optional[int] = None) -> BaseChunk:
        """Creates and returns a new `Chunk`. Automatically creates an ID for it and puts a reference in the cache."""
        chunk_id = self.chunk_id_encoder.generate_chunk_id(register=register, row=row)
        chunk = self.chunk_class(*self.chunk_args)  # type: ignore
        chunk_name = ChunkIdEncoder.name_from_id(chunk_id)  # type: ignore
        chunk_key = get_chunk_key(self.key, chunk_name, self.commit_id)
        if self.commit_chunk_set is not None:
            self.commit_chunk_set.add(chunk_name)
        chunk.key = chunk_key  # type: ignore
        chunk.id = chunk_id  # type: ignore
        chunk._update_tensor_meta_length = register
        if self.active_appended_chunk is not None:
            self.write_chunk_to_storage(self.active_appended_chunk)
        self.active_appended_chunk = chunk
        return chunk

    def clear(self):
        """Clears all samples and cachables."""
        self.cache.check_readonly()

        commit_id = self.commit_id

        chunk_folder_path = get_chunk_key(self.key, "", commit_id)
        self.cache.clear(prefix=chunk_folder_path)

        enc_key = get_chunk_id_encoder_key(self.key, commit_id)
        self._chunk_id_encoder = None
        try:
            del self.meta_cache[enc_key]
        except KeyError:
            pass

        info_key = get_tensor_info_key(self.key, commit_id)
        try:
            self._info = None
            del self.cache[info_key]
        except KeyError:
            pass

        self.commit_diff.clear_data()

        tile_encoder_key = get_tensor_tile_encoder_key(self.key, commit_id)
        try:
            self._tile_encoder = None
            del self.cache[tile_encoder_key]
        except KeyError:
            pass

        seq_encoder_key = get_sequence_encoder_key(self.key, commit_id)
        try:
            self._sequence_encoder = None
            del self.cache[seq_encoder_key]
        except KeyError:
            pass

        self.tensor_meta.length = 0
        self.tensor_meta.min_shape = []
        self.tensor_meta.max_shape = []
        self.tensor_meta.is_dirty = True

        self.cache.maybe_flush()
        self.meta_cache.maybe_flush()

    def _replace_tiled_sample(self, global_sample_index: int, sample):
        new_chunks, tiles = self._samples_to_chunks(
            [sample], start_chunk=None, register=False
        )
        new_chunk_ids = [chunk.id for chunk in new_chunks]
        self.chunk_id_encoder._replace_chunks_for_tiled_sample(
            global_sample_index, new_chunk_ids
        )
        if tiles:
            self.tile_encoder.entries[global_sample_index] = tiles[0]
        else:
            del self.tile_encoder.entries[global_sample_index]

    def _update_tiled_sample(self, global_sample_index: int, index: Index, sample):
        if len(index.values) == 1:
            self._replace_tiled_sample(global_sample_index, sample)
            return
        enc = self.chunk_id_encoder
        tile_enc = self.tile_encoder
        chunk_ids = enc[global_sample_index]
        sample_shape = tile_enc.get_sample_shape(global_sample_index)
        tile_shape = tile_enc.get_tile_shape(global_sample_index)
        ordered_tile_ids = np.array(chunk_ids).reshape(
            tile_enc.get_tile_layout_shape(global_sample_index)
        )
        tiles_index, sample_index = translate_slices(
            [v.value for v in index.values[1:]], sample_shape, tile_shape  # type: ignore
        )
        required_tile_ids = ordered_tile_ids[tiles_index]
        tiles = np.vectorize(
            lambda chunk_id: self.get_chunk_from_chunk_id(
                chunk_id, copy=True
            ).read_sample(0, is_tile=True),
            otypes=[object],
        )(required_tile_ids)
        current_sample = coalesce_tiles(tiles, tile_shape, None, self.tensor_meta.dtype)
        new_sample = current_sample
        new_sample[sample_index] = sample
        new_tiles = break_into_tiles(
            new_sample, tile_enc.get_tile_shape(global_sample_index)
        )
        chunk_ids = required_tile_ids
        for chunk_id, tile in zip(chunk_ids.reshape(-1), new_tiles.reshape(-1)):
            chunk = self.get_chunk_from_chunk_id(int(chunk_id), copy=True)
            curr_shape = chunk.shapes_encoder[-1]
            assert curr_shape == tile.shape, (curr_shape, tile.shape)
            chunk.update_sample(0, tile)
            if (
                self.active_updated_chunk is not None
                and self.active_updated_chunk.key != chunk.key  # type: ignore
            ):
                self.write_chunk_to_storage(self.active_updated_chunk)
            self.active_updated_chunk = chunk

    def pad_and_append(
        self,
        num_samples_to_pad: int,
        value,
        append_link_callback=None,
        update_link_callback=None,
    ):
        """Pads the tensor with empty samples and appends value at the end."""
        self.check_link_ready()
        update_first_sample = False
        if num_samples_to_pad > 0:
            if self.num_samples == 0:
                # set htype, dtype, shape, we later update it with empty sample
                self.extend([value], link_callback=append_link_callback)
                num_samples_to_pad -= 1
                update_first_sample = True

            htype = self.tensor_meta.htype
            if htype in ("json", "text", "list"):
                empty_sample = get_empty_text_like_sample(htype)
                empty_samples = [empty_sample] * num_samples_to_pad
            elif self.tensor_meta.is_link:
                empty_sample = None
                empty_samples = [None] * num_samples_to_pad
            else:
                ndim = len(self.tensor_meta.max_shape)
                if self.is_sequence:
                    ndim += 1
                shape = tuple([num_samples_to_pad] + [0] * ndim)
                dtype = self.tensor_meta.dtype
                empty_sample = np.zeros(shape[1:], dtype=dtype)
                empty_samples = np.zeros(shape, dtype=dtype)  # type: ignore

            if update_first_sample:
                self.update(Index(0), empty_sample, link_callback=update_link_callback)

            # pad
            self.extend(empty_samples, link_callback=append_link_callback)

        self.extend([value], link_callback=append_link_callback)

    def update(
        self,
        index: Index,
        samples: Union[np.ndarray, Sequence[InputSample], InputSample],
        operator: Optional[str] = None,
        link_callback: Optional[Callable] = None,
    ):
        """Update data at `index` with `samples`."""
        self.check_link_ready()
        (self._sequence_update if self.is_sequence else self._update)(  # type: ignore
            index,
            samples,
            operator,
            link_callback=link_callback,
        )

    def _get_samples_to_move(self, chunk) -> List[Sample]:
        decompress = isinstance(chunk, ChunkCompressedChunk)
        samples_to_move: List[Sample] = []
        sum_bytes = 0

        for idx in range(chunk.num_samples - 1, 1, -1):
            sample_data = chunk.read_sample(idx, decompress=decompress)
            sum_bytes += len(sample_data)
            if sum_bytes > int(RANDOM_MAX_ALLOWED_CHUNK_SIZE / 2):
                break
            sample_shape = chunk.shapes_encoder[idx]
            new_sample = self._get_sample_object(
                sample_data, sample_shape, chunk.compression, chunk.dtype, decompress
            )
            samples_to_move.append(new_sample)
        samples_to_move.reverse()
        return samples_to_move

    def _get_chunk_samples(self, chunk) -> List[Sample]:
        decompress = isinstance(chunk, ChunkCompressedChunk)
        all_samples_in_chunk: List[Sample] = []

        for idx in range(chunk.num_samples):
            sample_data = chunk.read_sample(idx, decompress=decompress)
            sample_shape = chunk.shapes_encoder[idx]
            new_sample = self._get_sample_object(
                sample_data, sample_shape, chunk.compression, chunk.dtype, decompress
            )
            all_samples_in_chunk.append(new_sample)

        return all_samples_in_chunk

    def _get_sample_object(
        self, sample_data, sample_shape, compression, dtype, decompress
    ):
        if decompress:
            sample = Sample(array=sample_data, shape=sample_shape)
        else:
            sample = Sample(
                buffer=sample_data,
                shape=sample_shape,
                compression=compression,
                dtype=dtype,
            )

        if self.tensor_meta.htype in ("json", "text", "list"):
            sample.htype = self.tensor_meta.htype
        if self.tensor_meta.is_link:
            sample.htype = "text"
            sample = LinkedSample(sample.array[0])
        return sample

    def __rechunk(self, chunk: BaseChunk, chunk_row: int):
        samples_to_move = self._get_samples_to_move(chunk=chunk)
        num_samples = len(samples_to_move)
        if num_samples == 0:
            return
        new_chunk = self._create_new_chunk(register=True, row=chunk_row)
        new_chunk_row = chunk_row + 1

        self.chunk_id_encoder.decrease_samples(row=chunk_row, num_samples=num_samples)
        self.chunk_id_encoder.decrease_samples(
            row=new_chunk_row, num_samples=num_samples
        )
        chunk.pop_multiple(num_samples=len(samples_to_move))
        samples, _ = self._sanitize_samples(samples_to_move)
        self._samples_to_chunks(
            samples,
            start_chunk=new_chunk,
            register=True,
            update_commit_diff=True,
            update_tensor_meta=False,
            start_chunk_row=new_chunk_row,
        )

    def _merge_chunks(
        self,
        from_chunk: BaseChunk,
        from_chunk_row: int,
        to_chunk: BaseChunk,
        to_chunk_row: int,
    ):
        samples_to_move = self._get_chunk_samples(chunk=from_chunk)
        num_samples = len(samples_to_move)
        if num_samples == 0:
            return True

        from_chunk.pop_multiple(num_samples=num_samples)
        samples, _ = self._sanitize_samples(samples_to_move)
        to_chunk.is_dirty = True
        self.active_updated_chunk = to_chunk
        self._samples_to_chunks(
            samples,
            start_chunk=to_chunk,
            register=True,
            update_commit_diff=True,
            update_tensor_meta=False,
            start_chunk_row=to_chunk_row,
        )
        self.chunk_id_encoder.delete_chunk_id(row=from_chunk_row)
        try:
            del self.cache[from_chunk.key]  # type: ignore
        except KeyError:
            pass
        return True

    def _is_tiled(self, row: int) -> bool:
        """checkes whether the chunk is tiled or not

        Args:
            row (int): Represents the row of the chunk.

        Returns:
            bool: return true if the current chunk and previous/next row chunk have the same chunk index false otherwise.
        """

        arr = self.chunk_id_encoder.array
        if row >= 1 and len(arr) > 1:
            if arr[row][LAST_SEEN_INDEX_COLUMN] == arr[row - 1][LAST_SEEN_INDEX_COLUMN]:
                return True
        if len(arr) > row + 1:
            if arr[row][LAST_SEEN_INDEX_COLUMN] == arr[row + 1][LAST_SEEN_INDEX_COLUMN]:
                return True
        return False

    def _try_merge_with_next_chunk(self, chunk: BaseChunk, row: int) -> bool:
        next_chunk_id = self.chunk_id_encoder.get_next_chunk_id(row)
        if next_chunk_id is None:
            return False
        next_chunk_row = row + 1
        if self._is_tiled(next_chunk_row):
            return False

        next_chunk_name = ChunkIdEncoder.name_from_id(next_chunk_id)  # type: ignore
        next_chunk_commit_id = self.get_chunk_commit(next_chunk_name)
        chunk_key = get_chunk_key(self.key, next_chunk_name, next_chunk_commit_id)
        next_chunk_size = self.cache.get_object_size(chunk_key)
        next_chunk = self.get_chunk_from_chunk_id(int(next_chunk_id))
        if next_chunk_size + chunk.num_data_bytes < next_chunk.min_chunk_size:
            # merge with next chunk
            return self._merge_chunks(
                from_chunk=next_chunk,
                from_chunk_row=next_chunk_row,
                to_chunk=chunk,
                to_chunk_row=row,
            )
        return False

    def _try_merge_with_previous_chunk(self, chunk: BaseChunk, row: int) -> bool:
        prev_chunk_id = self.chunk_id_encoder.get_prev_chunk_id(row)
        if prev_chunk_id is None:
            return False

        prev_chunk_row = row - 1
        if self._is_tiled(prev_chunk_row):
            return False

        prev_chunk_name = ChunkIdEncoder.name_from_id(prev_chunk_id)  # type: ignore
        prev_chunk_commit_id = self.get_chunk_commit(prev_chunk_name)
        prev_chunk_key = get_chunk_key(self.key, prev_chunk_name, prev_chunk_commit_id)
        prev_chunk_size = self.cache.get_object_size(prev_chunk_key)
        prev_chunk = self.get_chunk_from_chunk_id(int(prev_chunk_id))
        if prev_chunk_size + chunk.num_data_bytes < prev_chunk.min_chunk_size:
            # merge with previous chunk
            return self._merge_chunks(
                from_chunk=chunk,
                from_chunk_row=row,
                to_chunk=prev_chunk,
                to_chunk_row=prev_chunk_row,
            )
        return False

    def _try_merge_with_neighbor_and_split(self, chunk: BaseChunk, row: int):
        if self._try_merge_with_previous_chunk(chunk, row) is False:
            self._try_merge_with_next_chunk(chunk, row)

    def _check_rechunk(self, chunk: BaseChunk, chunk_row: int):
        """function to check if there is a need to re-chunk the current one"""
        if (
            chunk.num_data_bytes < RANDOM_MINIMAL_CHUNK_SIZE
            and self.max_chunk_size > RANDOM_MINIMAL_CHUNK_SIZE
        ):
            self._try_merge_with_neighbor_and_split(chunk=chunk, row=chunk_row)

        elif (
            chunk.num_data_bytes > RANDOM_MAX_ALLOWED_CHUNK_SIZE
            or chunk.num_data_bytes > self.max_chunk_size + RANDOM_MINIMAL_CHUNK_SIZE
        ):
            self.__rechunk(chunk, chunk_row)

    def _update(
        self,
        index: Index,
        samples: Union[np.ndarray, Sequence[InputSample], InputSample],
        operator: Optional[str] = None,
        update_commit_diff: bool = True,
        link_callback: Optional[Callable] = None,
    ):
        """Update data at `index` with `samples`."""
        self._write_initialization()
        self.cached_data = None
        initial_autoflush = self.cache.autoflush
        self.cache.autoflush = False

        if operator is not None:
            return self._update_with_operator(index, samples, operator)

        enc = self.chunk_id_encoder
        index_length = index.length(self.num_samples)
        samples = make_sequence(samples, index_length)
        verified_samples = self.check_each_sample(samples)
        if self.tensor_meta.htype == "class_label":
            samples = self._convert_class_labels(samples)
        nbytes_after_updates = []
        global_sample_indices = tuple(index.values[0].indices(self.num_samples))
        is_sequence = self.is_sequence
        for i, sample in enumerate(samples):  # type: ignore
            sample = None if is_empty_list(sample) else sample
            global_sample_index = global_sample_indices[i]  # TODO!
            if self._is_tiled_sample(global_sample_index):
                self._update_tiled_sample(global_sample_index, index, sample)
            else:
                chunk = self.get_chunks_for_sample(global_sample_index, copy=True)[0]
                local_sample_index = enc.translate_index_relative_to_chunks(
                    global_sample_index
                )

                if len(index.values) <= 1 + int(self.is_sequence):
                    chunk.update_sample(local_sample_index, sample)
                else:
                    orig_sample = chunk.read_sample(local_sample_index, copy=True)
                    orig_sample[tuple(e.value for e in index.values[1:])] = sample
                    chunk.update_sample(local_sample_index, orig_sample)
                if (
                    self.active_updated_chunk is not None
                    and self.active_updated_chunk.key != chunk.key  # type: ignore
                ):
                    self.write_chunk_to_storage(self.active_updated_chunk)
                self.active_updated_chunk = chunk

                # only care about deltas if it isn't the last chunk
                if chunk.key != self.last_chunk_key:  # type: ignore
                    nbytes_after_updates.append(chunk.nbytes)
                self._check_rechunk(
                    chunk, chunk_row=enc.__getitem__(global_sample_index, True)[0][1]
                )
            self.update_creds(global_sample_index, sample)
            if update_commit_diff:
                self.commit_diff.update_data(global_sample_index)
            chunk_min, chunk_max = self.min_chunk_size, self.max_chunk_size
            check_suboptimal_chunks(nbytes_after_updates, chunk_min, chunk_max)

            if link_callback:
                new_sample = verified_samples[i] if verified_samples else sample
                link_callback(
                    global_sample_index,
                    sub_index=Index(index.values[1:]),
                    new_sample=new_sample,
                    flat=True if is_sequence else None,
                )
        self.cache.autoflush = initial_autoflush
        self.cache.maybe_flush()
        return verified_samples

    def _update_with_operator(
        self,
        index: Index,
        samples: Union[np.ndarray, Sequence[InputSample], InputSample],
        operator: str,
    ):
        """Update data at `index` with the output of elem-wise operatorion with samples"""
        try:
            if isinstance(samples, hub.core.tensor.Tensor):
                samples = samples.numpy()
            if len(index) > 1:
                index1 = Index(index.values[:1])
                index2 = Index(index.values[1:])
            else:
                index1 = index
                index2 = None
            arr = self._numpy(index1, use_data_cache=False)
            view = arr
            if index2:
                for v in index2.values:
                    view = view[v.value]  # type: ignore
        except DynamicTensorNumpyError:
            raise NotImplementedError(
                "Inplace update operations are not available for dynamic tensors yet."
            )
        tensor_meta = self.tensor_meta

        dt, ht = tensor_meta.dtype, tensor_meta.htype
        samples = intelligent_cast(samples, dt, ht)
        getattr(view, operator)(samples)
        self._update(index1, arr)

    def read_bytes_for_sample(self, global_sample_index: int) -> bytes:
        if self.tensor_meta.chunk_compression:
            raise Exception(
                "Cannot retreive original bytes for samples in chunk-wise compressed tensors."
            )
        enc = self.chunk_id_encoder
        chunks = self.get_chunks_for_sample(global_sample_index)
        if len(chunks) > 1:
            raise NotImplementedError(
                "read_bytes_for_sample() is not implemented for tiled samples."
            )
        chunk = chunks[0]
        buffer = chunk.memoryview_data
        if not buffer:
            return b""
        local_sample_index = enc.translate_index_relative_to_chunks(global_sample_index)
        sb, eb = chunk.byte_positions_encoder[local_sample_index]
        return buffer[sb:eb].tobytes()

    def read_shape_for_sample(
        self,
        global_sample_index: int,
    ) -> Tuple[int, ...]:
        enc = self.chunk_id_encoder
        if self._is_tiled_sample(global_sample_index):
            return self.tile_encoder.get_sample_shape(global_sample_index)
        local_sample_index = enc.translate_index_relative_to_chunks(global_sample_index)
        if self.is_video:
            chunk_id = enc[global_sample_index][0]
            chunk = self.get_video_chunk(chunk_id)[0]
        else:
            chunk_id, _, worst_case_header_size = self.get_chunk_info(
                global_sample_index, fetch_chunks=False
            )
            chunk = self.get_chunk_from_chunk_id(
                chunk_id, partial_chunk_bytes=worst_case_header_size
            )
        return tuple(map(int, chunk.shapes_encoder[local_sample_index]))

    @property
    def is_fixed_shape(self):
        tensor_meta = self.tensor_meta
        return tensor_meta.min_shape == tensor_meta.max_shape

    @property
    def num_samples_per_chunk(self):
        # should only be called if self.is_fixed_shape
        if self._num_samples_per_chunk is None:
            self._num_samples_per_chunk = (
                self.chunk_id_encoder.array[0, LAST_SEEN_INDEX_COLUMN] + 1
            )
        return self._num_samples_per_chunk

    def read_sample_from_chunk(
        self,
        global_sample_index: int,
        chunk: BaseChunk,
        cast: bool = True,
        copy: bool = False,
        decompress: bool = True,
    ) -> np.ndarray:
        enc = self.chunk_id_encoder
        if self.is_fixed_shape and self.tensor_meta.sample_compression is None:
            num_samples_per_chunk = self.num_samples_per_chunk
            local_sample_index = global_sample_index % num_samples_per_chunk
        else:
            local_sample_index = enc.translate_index_relative_to_chunks(
                global_sample_index
            )
        return chunk.read_sample(
            local_sample_index, cast=cast, copy=copy, decompress=decompress
        )

    def _get_full_chunk(self, index) -> bool:
        """Reads samples from chunks and returns as a boolean that says whether we need to fetch full chunks or only specified subset of it.
        Args:
            index (Index): Represents the samples to read from chunks. See `Index` for more information.
        Returns:
            bool: True/False, whether to fetch a full chunk or only a part of it.
        """
        threshold = 10

        if type(index.values[0].value) == slice:
            start = index.values[0].value.start or 0
            stop = index.values[0].value.stop or self.num_samples
            step = index.values[0].value.step or 1

            if start < 0:
                start = self.num_samples + start

            if stop < 0:
                stop = self.num_samples + start

            numpy_array_length = (stop - start) // step
            return numpy_array_length > threshold
        return False

    def numpy(
        self,
        index: Index,
        aslist: bool = False,
        use_data_cache: bool = True,
        fetch_chunks: bool = False,
        pad_tensor: bool = False,
    ) -> Union[np.ndarray, List[np.ndarray]]:
        """Reads samples from chunks and returns as a numpy array. If `aslist=True`, returns a sequence of numpy arrays.

        Args:
            index (Index): Represents the samples to read from chunks. See `Index` for more information.
            aslist (bool): If True, the samples will be returned as a list of numpy arrays. If False, returns a single numpy array. Defaults to False.
            use_data_cache (bool): If True, the data cache is used to speed up the read if possible. If False, the data cache is ignored. Defaults to True.
            fetch_chunks (bool): If True, full chunks will be retrieved from the storage, otherwise only required bytes will be retrieved.
                This will always be True even if specified as False in the following cases:
                - The tensor is ChunkCompressed
                - The chunk which is being accessed has more than 128 samples.
            pad_tensor (bool): If True, any index out of bounds will not throw an error, but instead will return an empty sample.

        Raises:
            DynamicTensorNumpyError: If shapes of the samples being read are not all the same.

        Returns:
            Union[np.ndarray, List[np.ndarray]]: Either a list of numpy arrays or a single numpy array (depending on the `aslist` argument).
        """
        self.check_link_ready()
        fetch_chunks = fetch_chunks or self._get_full_chunk(index)
        return (self._sequence_numpy if self.is_sequence else self._numpy)(
            index, aslist, use_data_cache, fetch_chunks, pad_tensor
        )

    def get_video_sample(self, global_sample_index, index, decompress=True):
        enc = self.chunk_id_encoder
        chunk_ids = enc[global_sample_index]
        local_sample_index = enc.translate_index_relative_to_chunks(global_sample_index)
        chunk, stream = self.get_video_chunk(chunk_ids[0])
        sub_index = index.values[1].value if len(index.values) > 1 else None  # type: ignore
        sample = chunk.read_sample(
            local_sample_index,
            sub_index=sub_index,
            stream=stream,
            decompress=decompress,
        )
        if decompress:
            return sample[tuple(entry.value for entry in index.values[2:])]
        return sample

    def get_chunk_info(self, global_sample_index, fetch_chunks):
        """Returns the chunk_id, row and worst case header size of chunk containing the given sample."""
        enc = self.chunk_id_encoder
        out = enc.__getitem__(global_sample_index, return_row_index=True)
        chunk_id, row = out[0][0], out[0][1]

        worst_case_header_size = 0
        num_samples_in_chunk = -1
        if (
            not fetch_chunks
            and isinstance(self.base_storage, (S3Provider, GCSProvider))
            and not isinstance(self.chunk_class, ChunkCompressedChunk)
        ):
            prev = enc.array[row - 1][LAST_SEEN_INDEX_COLUMN] if row > 0 else -1
            num_samples_in_chunk = enc.array[row][LAST_SEEN_INDEX_COLUMN] - prev
            worst_case_header_size += HEADER_SIZE_BYTES + 10  # 10 for version
            ENTRY_SIZE = 4
            if self.tensor_meta.max_shape == self.tensor_meta.min_shape:
                num_shape_entries = 1 * (len(self.tensor_meta.min_shape) + 1)
                if self.is_text_like:
                    num_bytes_entries = num_samples_in_chunk * 3
                elif self.tensor_meta.sample_compression is None:
                    num_bytes_entries = 1 * 3
                else:
                    num_bytes_entries = num_samples_in_chunk * 3
            else:
                num_shape_entries = num_samples_in_chunk * (
                    1 + len(self.tensor_meta.max_shape)
                )
                num_bytes_entries = num_samples_in_chunk * 3
            bytes_enc_size = num_bytes_entries * ENTRY_SIZE
            shape_enc_size = num_shape_entries * ENTRY_SIZE
            worst_case_header_size += shape_enc_size
            worst_case_header_size += bytes_enc_size

        return chunk_id, row, worst_case_header_size

    def get_basic_sample(self, global_sample_index, index, fetch_chunks=False):
        enc = self.chunk_id_encoder
        chunk_id, row, worst_case_header_size = self.get_chunk_info(
            global_sample_index, fetch_chunks
        )
        local_sample_index = enc.translate_index_relative_to_chunks(global_sample_index)
        chunk = self.get_chunk_from_chunk_id(
            chunk_id, partial_chunk_bytes=worst_case_header_size
        )
        return chunk.read_sample(
            local_sample_index, cast=self.tensor_meta.htype != "dicom"
        )[tuple(entry.value for entry in index.values[1:])]

    def get_non_tiled_sample(self, global_sample_index, index, fetch_chunks=False):
        if self.is_video:
            return self.get_video_sample(global_sample_index, index)
        return self.get_basic_sample(
            global_sample_index, index, fetch_chunks=fetch_chunks
        )

    def get_full_tiled_sample(self, global_sample_index):
        chunks = self.get_chunks_for_sample(global_sample_index)
        return combine_chunks(chunks, global_sample_index, self.tile_encoder)

    def get_partial_tiled_sample(self, global_sample_index, index):
        tile_enc = self.tile_encoder
        chunk_ids = self.chunk_id_encoder[global_sample_index]
        sample_shape = tile_enc.get_sample_shape(global_sample_index)
        tile_shape = tile_enc.get_tile_shape(global_sample_index)
        ordered_tile_ids = np.array(chunk_ids).reshape(
            tile_enc.get_tile_layout_shape(global_sample_index)
        )
        tiles_index, sample_index = translate_slices(
            [v.value for v in index.values[1:]], sample_shape, tile_shape  # type: ignore
        )
        required_tile_ids = ordered_tile_ids[tiles_index]
        tiles = np.vectorize(
            lambda chunk_id: self.get_chunk_from_chunk_id(chunk_id).read_sample(
                0, is_tile=True
            ),
            otypes=[object],
        )(required_tile_ids)
        sample = coalesce_tiles(tiles, tile_shape, None, self.tensor_meta.dtype)
        sample = sample[sample_index]
        return sample

    def get_single_sample(
        self, global_sample_index, index, fetch_chunks=False, pad_tensor=False
    ):
        if pad_tensor and global_sample_index >= self.tensor_meta.length:
            sample = self.get_empty_sample()
            try:
                return sample[tuple(entry.value for entry in index.values[1:])]
            except IndexError:
                return sample

        if not self._is_tiled_sample(global_sample_index):
            sample = self.get_non_tiled_sample(
                global_sample_index, index, fetch_chunks=fetch_chunks
            )
        elif len(index.values) == 1:
            sample = self.get_full_tiled_sample(global_sample_index)
        else:
            sample = self.get_partial_tiled_sample(global_sample_index, index)

        return sample

    def _numpy(
        self,
        index: Index,
        aslist: bool = False,
        use_data_cache: bool = True,
        fetch_chunks: bool = False,
        pad_tensor: bool = False,
    ) -> Union[np.ndarray, List[np.ndarray]]:
        """Reads samples from chunks and returns as a numpy array. If `aslist=True`, returns a sequence of numpy arrays.

        Args:
            index (Index): Represents the samples to read from chunks. See `Index` for more information.
            aslist (bool): If True, the samples will be returned as a list of numpy arrays. If False, returns a single numpy array. Defaults to False.
            use_data_cache (bool): If True, the data cache is used to speed up the read if possible. If False, the data cache is ignored. Defaults to True.
            fetch_chunks (bool): If True, full chunks will be retrieved from the storage, otherwise only required bytes will be retrieved.
                This will always be True even if specified as False in the following cases:
                - The tensor is ChunkCompressed
                - The chunk which is being accessed has more than 128 samples.
            pad_tensor (bool): If True, any index out of bounds will not throw an error, but instead will return an empty sample.

        Raises:
            DynamicTensorNumpyError: If shapes of the samples being read are not all the same.

        Returns:
            Union[np.ndarray, List[np.ndarray]]: Either a list of numpy arrays or a single numpy array (depending on the `aslist` argument).
        """
        length = self.num_samples
        last_shape = None
        if use_data_cache and self.is_data_cachable:
            samples = self.numpy_from_data_cache(index, length, aslist, pad_tensor)
        else:
            samples = []
            for global_sample_index in index.values[0].indices(length):
                sample = self.get_single_sample(
                    global_sample_index,
                    index,
                    fetch_chunks=fetch_chunks,
                    pad_tensor=pad_tensor,
                )
                samples.append(sample)
                check_sample_shape(sample.shape, last_shape, self.key, index, aslist)
                last_shape = sample.shape

        if aslist and all(map(np.isscalar, samples)):
            samples = list(arr.item() for arr in samples)

        if not index.values[0].subscriptable():
            samples = samples[0]

        if aslist:
            return samples
        return np.array(samples)

    def numpy_from_data_cache(self, index, length, aslist, pad_tensor=False):
        samples = []
        enc = self.chunk_id_encoder
        for global_sample_index in index.values[0].indices(length):
            if pad_tensor and global_sample_index >= self.tensor_meta.length:
                sample = self.get_empty_sample()
                try:
                    sample = sample[tuple(entry.value for entry in index.values[1:])]
                except IndexError:
                    pass
            else:
                if (
                    self.cached_data is None
                    or global_sample_index not in self.cache_range
                ):
                    row = enc.__getitem__(global_sample_index, True)[0][1]
                    chunks = self.get_chunks_for_sample(global_sample_index)
                    assert len(chunks) == 1

                    chunk_arr = self.chunk_id_encoder.array

                    chunk = chunks[0]
                    first_sample = 0 if row == 0 else chunk_arr[row - 1][1] + 1
                    last_sample = self.chunk_id_encoder.array[row][1]
                    num_samples = last_sample - first_sample + 1

                    full_shape = (num_samples,) + tuple(self.tensor_meta.max_shape)
                    dtype = self.tensor_meta.dtype

                    data_bytes = bytearray(chunk.data_bytes)
                    self.cached_data = np.frombuffer(data_bytes, dtype).reshape(
                        full_shape
                    )
                    self.cache_range = range(first_sample, last_sample + 1)

                sample = self.cached_data[global_sample_index - self.cache_range.start]  # type: ignore

                # need to copy if aslist otherwise user might modify the returned data
                # if not aslist, we already do np.array(samples) while formatting which copies
                sample = sample.copy() if aslist else sample
                sample = sample[tuple(entry.value for entry in index.values[1:])]
            samples.append(sample)
        return samples

    def get_chunks_for_sample(
        self,
        global_sample_index: int,
        copy: bool = False,
    ) -> List[BaseChunk]:
        """Retrives the `Chunk` object corresponding to `global_sample_index`.
        Args:
            global_sample_index (int): Index relative to the entire tensor representing the sample.
            copy (bool): If True and the chunk exists in a different commit to the current commit, it will be copied. Defaults to False.
        Returns:
            List[BaseChunk]: BaseChunk objects that contains `global_sample_index`.
        """
        return [
            self.get_chunk_from_chunk_id(chunk_id, copy)
            for chunk_id in self.chunk_id_encoder[global_sample_index]
        ]

    def validate_num_samples_is_synchronized(self):
        """Check if tensor meta length and chunk ID encoder are representing the same number of samples.
        Helpful for determining if a user has tampered with the tensor meta or the chunk ID encoder, or if
        the tensor was corruptd.

        Raises:
            CorruptedMetaError: tensor_meta and chunk_id_encoder must have the same num samples.
        """

        tensor_meta_length = self.tensor_meta.length

        # compare chunk ID encoder and tensor meta

        # update this if we change self.num_samples implementation later to use tensor meta length instead of chunk_id_encoder
        chunk_id_num_samples = self.num_samples

        if tensor_meta_length != chunk_id_num_samples:
            commit_id = self.commit_id
            tkey = get_tensor_meta_key(self.key, commit_id)
            ikey = get_chunk_id_encoder_key(self.key, commit_id)
            raise CorruptedMetaError(
                f"'{tkey}' and '{ikey}' have a record of different numbers of samples. Got {tensor_meta_length} and {chunk_id_num_samples} respectively."
            )

    def list_all_chunks(self) -> List[str]:
        """Return list of all chunks for current `version_state['commit_id']` and tensor"""
        commit_id = self.commit_id
        if commit_id == FIRST_COMMIT_ID:
            return [
                ChunkIdEncoder.name_from_id(chunk_id)
                for chunk_id in self.chunk_id_encoder.array[:, CHUNK_ID_COLUMN]
            ]  # type: ignore
        else:
            return list(self.commit_chunk_set.chunks)  # type: ignore

    def list_all_chunks_path(self) -> List[str]:
        """Return list of paths to all chunks"""
        commit_id = self.commit_id
        return [
            get_chunk_key(self.key, chunk, commit_id)
            for chunk in self.list_all_chunks()
        ]

    def list_orphaned_chunks(self, storage):
        """Return paths for orphaned chunks (chunks what are not linked to the `current_version`)"""

        commit_id = self.commit_id
        prefix: str = f"{self.key}/chunks/"

        if commit_id != FIRST_COMMIT_ID:
            prefix = f"versions/{commit_id}/{prefix}"

        all_chunks = [
            item.replace(prefix, "") for item in storage if item.startswith(prefix)
        ]
        linked_chunks = self.list_all_chunks()

        return [
            f"{prefix}{chunk}" for chunk in all_chunks if chunk not in linked_chunks
        ]

    def clear_unusd_chunks(self, storage: StorageProvider):
        # storage.delete_multiple(self.list_orphaned_chunks(storage))
        raise NotImplementedError(
            "requires StorageProvider to be able to list all chunks"
        )

    def pop(self, global_sample_index: int):
        self._write_initialization()
        if self.tensor_meta.length == 0:
            raise ValueError("There are no samples to pop")
        if global_sample_index < 0 or global_sample_index >= self.tensor_meta.length:
            raise IndexError(
                f"Index {global_sample_index} is out of range for tensor of length {self.tensor_meta.length}"
            )

        self.cached_data = None
        initial_autoflush = self.cache.autoflush
        self.cache.autoflush = False

        self.commit_diff.pop(global_sample_index)
        if self.is_sequence:
            # pop in reverse order else indices get shifted
            for idx in reversed(range(*self.sequence_encoder[global_sample_index])):
                self.pop_item(idx)
            self.sequence_encoder.pop(global_sample_index)
        else:
            self.pop_item(global_sample_index)

        self.cache.autoflush = initial_autoflush
        self.cache.maybe_flush()

    def pop_item(self, global_sample_index):
        enc = self.chunk_id_encoder
        if not self._is_tiled_sample(global_sample_index):
            local_sample_index = enc.translate_index_relative_to_chunks(
                global_sample_index
            )
        chunk_ids, rows, delete = enc.pop(global_sample_index)
        if len(chunk_ids) > 1:  # Tiled sample, delete all chunks
            del self.tile_encoder[global_sample_index]
        elif not delete:  # There are other samples in the last chunk
            chunk_to_update = self.get_chunk_from_chunk_id(chunk_ids[0], copy=True)
            chunk_to_update.pop(local_sample_index)

            self._check_rechunk(chunk_to_update, chunk_row=rows[0])

            if (
                self.active_updated_chunk is not None
                and self.active_updated_chunk.key != chunk_to_update.key  # type: ignore
            ):
                self.write_chunk_to_storage(self.active_updated_chunk)
            self.active_updated_chunk = chunk_to_update
        if delete:
            for chunk_key in map(self.get_chunk_key_for_id, chunk_ids):
                self.check_remove_active_chunks(chunk_key)
                try:
                    del self.cache[chunk_key]
                except KeyError:
                    pass

        self.tensor_meta.pop(global_sample_index)

    def write_chunk_to_storage(self, chunk):
        if chunk is None or not chunk.is_dirty:
            return
        storage = self.cache
        key = chunk.key
        storage[key] = chunk
        chunk.is_dirty = False

    @property
    def is_sequence(self):
        return self.tensor_meta.is_sequence

    @property
    def is_video(self):
        return (
            self.compression in VIDEO_COMPRESSIONS or self.tensor_meta.htype == "video"
        )

    @property
    def sequence_encoder_exists(self) -> bool:
        commit_id = self.commit_id
        if (
            self._sequence_encoder is not None
            and self._sequence_encoder_commit_id == commit_id
        ):
            return True
        try:
            key = get_sequence_encoder_key(self.key, commit_id)
            self.meta_cache[key]
            return True
        except KeyError:
            return False

    @property
    def _sequence_length(self):
        return self.sequence_encoder.num_samples

    @property
    def sequence_encoder(self) -> SequenceEncoder:
        """Gets the shape encoder from cache, if one is not found it creates a blank encoder.

        Raises:
            CorruptedMetaError: If shape encoding was corrupted.

        Returns:
            A SequenceEncoder instance storing the start and end indices of each sequence in the tensor.
        """

        if not self.is_sequence:
            return  # type: ignore
        commit_id = self.commit_id
        if (
            self._sequence_encoder is None
            or self._sequence_encoder_commit_id != commit_id
        ):
            commit_id = self.commit_id
            key = get_sequence_encoder_key(self.key, commit_id)
            if not self.sequence_encoder_exists:
                enc = SequenceEncoder()
                try:
                    self.meta_cache[key] = enc
                except ReadOnlyModeError:
                    pass
            else:
                enc = self.meta_cache.get_hub_object(key, SequenceEncoder)
            self._sequence_encoder = enc
            self._sequence_encoder_commit_id = commit_id
            self.meta_cache.register_hub_object(key, enc)
        return self._sequence_encoder

    def _sequence_numpy(
        self,
        index: Index,
        aslist: bool = False,
        use_data_cache: bool = True,
        fetch_chunks: bool = False,
        pad_tensor: bool = False,
    ):
        arr = self._numpy(
            self._get_flat_index_from_sequence_index(index),
            aslist=aslist,
            use_data_cache=use_data_cache,
            fetch_chunks=fetch_chunks,
            pad_tensor=pad_tensor,
        )
        if isinstance(arr, np.ndarray) and arr.size == 0:
            return self.get_empty_sample()
        if index.subscriptable_at(0) and index.subscriptable_at(1):
            if aslist:
                _item_length = self._sequence_item_length
                ret = []
                for i in index.values[0].indices(self._sequence_length):
                    item_length = _item_length or index.length_at(
                        1, -int(np.subtract(*self.sequence_encoder[i]))
                    )
                    ret.append(arr[:item_length])
                    arr = arr[item_length:]
                return ret
            else:
                try:
                    return arr.reshape(  # type: ignore
                        index.length_at(0, self._sequence_length), -1, *arr.shape[1:]  # type: ignore
                    )
                except ValueError as ve:
                    raise DynamicTensorNumpyError(self.key, index, "shape") from ve
        return arr

    def _translate_2d_index(
        self, x: Optional[IndexEntry] = None, y: Optional[IndexEntry] = None
    ) -> IndexEntry:
        x = x or IndexEntry()
        y = y or IndexEntry()
        _item_length = self._sequence_item_length
        if _item_length is None:

            def idx0_gen():
                for i in x.indices(self._sequence_length):
                    s, e = self.sequence_encoder[i]
                    for j in y.indices(e - s):
                        yield s + j

        else:

            def idx0_gen():
                for i in x.indices(self._sequence_length):
                    for j in y.indices(_item_length):
                        yield i * _item_length + j

        idx0_gen.__len__ = (  # type: ignore
            (
                lambda: sum(
                    [
                        y.length(-np.subtract(*self.sequence_encoder[i]))
                        for i in x.indices(self._sequence_length)
                    ]
                )
            )
            if _item_length is None
            else (lambda: x.length(self._sequence_length) * y.length(_item_length))  # type: ignore
        )
        return IndexEntry(idx0_gen)  # type: ignore

    def _get_flat_index_from_sequence_index(self, index: Index) -> Index:
        if len(index) == 1:
            index = Index([index.values[0], IndexEntry()])
        if index.values[0].is_trivial() and index.values[1].is_trivial():
            return Index([IndexEntry(), *index.values[2:]])
        if index.subscriptable_at(0) or index.subscriptable_at(1):
            idx0 = self._translate_2d_index(index.values[0], index.values[1])
            return Index([idx0, *index.values[2:]])  # type: ignore
        return Index(
            [
                IndexEntry(
                    self.sequence_encoder[index.values[0].value][0]  # type: ignore
                    + index.values[1].value
                ),
                *index.values[2:],
            ]
        )

    def _get_flat_samples_for_sequence_update(self, samples, index: Index):
        ndim = self.ndim(index)
        if isinstance(samples, np.ndarray):
            if index.subscriptable_at(0) and index.subscriptable_at(1):
                diff = ndim - samples.ndim
                if diff < 0:
                    samples, diff = samples.reshape(samples.shape[-ndim:]), 0
                if diff > 1:
                    return samples.reshape(1, *samples.shape).repeat(
                        self._translate_2d_index(*index.values[:2]).length(None), 0  # type: ignore
                    )
                elif diff == 1:
                    return (
                        samples.reshape(1, *samples.shape)
                        .repeat(index.length_at(0, self._sequence_length), 0)
                        .reshape(-1, *samples.shape[1:])
                    )
                else:
                    return samples.reshape(-1, *samples.shape[2:])
            return samples
        elif isinstance(samples, (str, bytes)):  # treated as scalars
            return samples
        elif isinstance(samples, Iterable):
            # Note: broadcasting is not supported here
            if index.subscriptable_at(0) and index.subscriptable_at(1):
                return list(chain(*samples))
            return samples
        else:
            return samples  # scalars

    def _sequence_update(
        self,
        index: Index,
        samples: Union[np.ndarray, Sequence[InputSample], InputSample],
        operator: Optional[str] = None,
        link_callback: Optional[Callable] = None,
    ):
        flat_idx = self._get_flat_index_from_sequence_index(index)
        flat_samples = self._get_flat_samples_for_sequence_update(samples, index)
        flat_verified_samples: List = self._update(
            flat_idx,
            flat_samples,
            operator,
            update_commit_diff=False,
            link_callback=link_callback,
        )
        i = 0
        verified_samples: Optional[List] = None
        if self.tensor_meta.htype == "class_label":
            samples = self._convert_class_labels(samples)
        if flat_verified_samples:
            verified_samples = []
            for sample in samples:  # type: ignore
                verified_sample = []
                for _ in sample:  # type: ignore
                    verified_sample.append(flat_verified_samples[i])
                    i += 1
                verified_samples.append(verified_sample)

        list(
            map(
                self.commit_diff.update_data,
                index.values[0].indices(self._sequence_length),
            )
        )
        if link_callback:
            ls = verified_samples or samples

            if isinstance(ls, np.ndarray):
                broadcast = ls.ndim < self.ndim(index)
            elif isinstance(ls, (bytes, str)):  # sacalars:
                broadcast = True
            elif isinstance(ls, Iterable):
                broadcast = False
            else:
                broadcast = True
            seq_len = self._sequence_length
            if broadcast:
                ls = repeat(ls)  # type: ignore
            for i, sample in zip(index.values[0].indices(seq_len), ls):  # type: ignore
                link_callback(
                    i, sub_index=Index(index.values[1:]), new_sample=sample, flat=False
                )

    @property
    def _sequence_item_length(self):
        enc = self.sequence_encoder
        nrows = len(enc._encoded)
        if nrows == 0:
            return 0
        if nrows == 1:
            s, e = enc[0]
            return e - s
        else:
            return None

    @property
    def _sequence_item_length_range(self):
        """Returns minimum and maximum length of items in a sequence"""
        enc = self.sequence_encoder
        nrows = len(enc._encoded)
        if nrows == 0:
            return 0, 0
        min_ = max_ = enc[0][1] - enc[0][0]
        for i in range(1, nrows):
            length = enc[i][1] - enc[i][0]
            if length < min_:
                min_ = length
            elif length > max_:
                max_ = length
        return min_, max_

    def check_link_ready(self):
        return

    def shape(
        self, index: Index, sample_shape_provider: Optional[Callable] = None
    ) -> Tuple[Optional[int], ...]:
        shape = self.shape_interval.astuple()
        idxs = index.values
        skip_dims = 0
        if None in shape or self.tensor_meta.is_link:
            if not idxs[0].subscriptable():
                if self.tensor_meta.htype in ("text", "json"):
                    shape = (1,)
                else:
                    if sample_shape_provider:
                        try:
                            shape = sample_shape_provider(idxs[0].value)  # type: ignore
                            if self.is_sequence:
                                if len(idxs) > 1 and not idxs[1].subscriptable():
                                    shape = tuple(shape[idxs[1].value].tolist())  # type: ignore
                                    skip_dims += 1
                                else:
                                    shape = (len(shape),) + (
                                        tuple(
                                            int(shape[0, i])  # type: ignore
                                            if np.all(shape[:, i] == shape[0, i])  # type: ignore
                                            else None
                                            for i in range(shape.shape[1])  # type: ignore
                                        )
                                        or (1,)
                                    )

                        except IndexError:  # Happens during transforms, sample shape tensor is not populated yet
                            shape = self.read_shape_for_sample(idxs[0].value)  # type: ignore
                    else:
                        self.check_link_ready()
                        shape = self.read_shape_for_sample(idxs[0].value)  # type: ignore
                skip_dims += 1
        elif not idxs[0].subscriptable():
            shape = shape[1:]
            skip_dims += 1
        shape = list(shape)  # type: ignore
        squeeze_dims = set()
        for i, idx in enumerate(idxs[skip_dims:]):
            if idx.subscriptable():
                shape[i] = idx.length(shape[i])  # type: ignore
            else:
                squeeze_dims.add(i)
        return tuple(shape[i] for i in range(len(shape)) if i not in squeeze_dims)

    def ndim(self, index: Optional[Index] = None) -> int:
        ndim = len(self.tensor_meta.min_shape) + 1
        if self.is_sequence:
            ndim += 1
        if index:
            for idx in index.values:
                if not idx.subscriptable():
                    ndim -= 1
        return ndim

    @property
    def shape_interval(self) -> ShapeInterval:
        """Returns a `ShapeInterval` object that describes this tensor's shape more accurately. Length is included.

        Note:
            If you are expecting a `tuple`, use `tensor.shape` instead.

        Example:
            >>> tensor.append(np.zeros((10, 10)))
            >>> tensor.append(np.zeros((10, 15)))
            >>> tensor.shape_interval
            ShapeInterval(lower=(2, 10, 10), upper=(2, 10, 15))
            >>> str(tensor.shape_interval)
            (2, 10, 10:15)

        Returns:
            ShapeInterval: Object containing `lower` and `upper` properties.
        """
        meta = self.tensor_meta
        if self.is_sequence:
            seq_length = self._sequence_length
            min_item_length, max_item_length = self._sequence_item_length_range
            min_length = [seq_length, min_item_length]
            max_length = [seq_length, max_item_length]
        else:
            min_length = max_length = [meta.length]
        min_shape = min_length + list(meta.min_shape)
        max_shape = max_length + list(meta.max_shape)

        return ShapeInterval(min_shape, max_shape)

    def _transform_callback(self, sample, flat: Optional[bool]):
        """Used in transforms to handle linked tensors."""
        assert self._all_chunk_engines is not None
        for k, v in self.tensor_meta.links.items():
            if flat is None or v["flatten_sequence"] == flat:
                self._all_chunk_engines[k].extend(
                    [get_link_transform(v["append"])(sample)]
                )

    def get_empty_sample(self):
        if self.num_samples == 0:
            raise ValueError("This tensor has no samples, cannot get empty sample.")
        htype = self.tensor_meta.htype
        dtype = self.tensor_meta.dtype
        if htype in ("text", "json", "list"):
            return get_empty_text_like_sample(htype)
        ndim = len(self.tensor_meta.max_shape)
        if self.is_sequence:
            ndim += 1
        shape = (0,) * ndim
        return np.ones(shape, dtype=dtype)

    @property
    def is_text_like(self):
        return (
            self.tensor_meta.htype in {"text", "json", "list"}
            or self.tensor_meta.is_link
        )

    def check_remove_active_chunks(self, chunk_key):
        if (
            self.active_appended_chunk is not None
            and self.active_appended_chunk.key == chunk_key
        ):
            self.active_appended_chunk = None
        if (
            self.active_updated_chunk is not None
            and self.active_updated_chunk.key == chunk_key
        ):
            self.active_updated_chunk = None

Subclasses

Instance variables

var active_appended_chunk
Expand source code
@property
def active_appended_chunk(self):
    return self._active_appended_chunk
var active_updated_chunk
Expand source code
@property
def active_updated_chunk(self):
    return self._active_updated_chunk
var chunk_args
Expand source code
@property
def chunk_args(self):
    if self._chunk_args is None:
        self._chunk_args = [
            self.min_chunk_size,
            self.max_chunk_size,
            self.tiling_threshold,
            self.tensor_meta,
            self.compression,
        ]
    return self._chunk_args
var chunk_id_encoder

Gets the chunk id encoder from cache, if one is not found it creates a blank encoder. For more information on what ChunkIdEncoder is used for, see the __init__ docstring.

Raises

CorruptedMetaError
If chunk id encoding was corrupted.

Returns

ChunkIdEncoder
The chunk ID encoder handles the mapping between sample indices and their corresponding chunks.
Expand source code
@property
def chunk_id_encoder(self) -> ChunkIdEncoder:
    """Gets the chunk id encoder from cache, if one is not found it creates a blank encoder.
    For more information on what `ChunkIdEncoder` is used for, see the `__init__` docstring.

    Raises:
        CorruptedMetaError: If chunk id encoding was corrupted.

    Returns:
        ChunkIdEncoder: The chunk ID encoder handles the mapping between sample indices
            and their corresponding chunks.
    """
    commit_id = self.commit_id
    if (
        self._chunk_id_encoder is None
        or self._chunk_id_encoder_commit_id != commit_id
    ):
        commit_id = self.commit_id
        key = get_chunk_id_encoder_key(self.key, commit_id)
        if not self.chunk_id_encoder_exists:
            enc = ChunkIdEncoder()
            try:
                self.meta_cache[key] = enc
            except ReadOnlyModeError:
                pass
        else:
            enc = self.meta_cache.get_hub_object(key, ChunkIdEncoder)
        self._chunk_id_encoder = enc
        self._chunk_id_encoder_commit_id = commit_id
        self.meta_cache.register_hub_object(key, enc)
    return self._chunk_id_encoder
var chunk_id_encoder_exists
Expand source code
@property
def chunk_id_encoder_exists(self) -> bool:
    commit_id = self.commit_id
    if (
        self._chunk_id_encoder is not None
        and self._chunk_id_encoder_commit_id == commit_id
    ):
        return True
    try:
        key = get_chunk_id_encoder_key(self.key, commit_id)
        self.meta_cache[key]
        return True
    except KeyError:
        return False
var commit_chunk_set

Gets the commit chunk set from cache, if one is not found it creates a blank one.

Returns

Optional[CommitChunkSet]
The commit chunk set keeps track of all the chunks present in the current commit, returns None for the first commit.
Expand source code
@property
def commit_chunk_set(self) -> Optional[CommitChunkSet]:
    """Gets the commit chunk set from cache, if one is not found it creates a blank one.

    Returns:
        Optional[CommitChunkSet]: The commit chunk set keeps track of all the chunks present in the current commit, returns None for the first commit.
    """
    commit_id = self.commit_id
    if commit_id == FIRST_COMMIT_ID:
        # the first commit doesn't need a commit chunk set
        return None
    if (
        self._commit_chunk_set is None
        or self._commit_chunk_set_commit_id != commit_id
    ):
        key = get_tensor_commit_chunk_set_key(self.key, commit_id)
        if not self.commit_chunk_set_exists:
            cset = CommitChunkSet()
            try:
                self.meta_cache[key] = cset
            except ReadOnlyModeError:
                pass
        else:
            cset = self.meta_cache.get_hub_object(key, CommitChunkSet)
        self._commit_chunk_set = cset
        self._commit_chunk_set_commit_id = commit_id
        self.meta_cache.register_hub_object(key, cset)
    return self._commit_chunk_set
var commit_chunk_set_exists

Checks if the commit chunk set exists for the given tensor in the current commit.

Expand source code
@property
def commit_chunk_set_exists(self) -> bool:
    """Checks if the commit chunk set exists for the given tensor in the current commit."""
    commit_id = self.commit_id
    if (
        self._commit_chunk_set is not None
        and self._commit_chunk_set_commit_id == commit_id
    ):
        return True

    try:
        key = get_tensor_commit_chunk_set_key(self.key, commit_id)
        self.meta_cache[key]
        return True
    except KeyError:
        return False
var commit_diff

Gets the commit diff from cache, if one is not found it creates a blank one.

Returns

CommitDiff
The commit diff keeps track of all the changes in the current commit.
Expand source code
@property
def commit_diff(self) -> CommitDiff:
    """Gets the commit diff from cache, if one is not found it creates a blank one.

    Returns:
        CommitDiff: The commit diff keeps track of all the changes in the current commit.
    """
    commit_id = self.commit_id
    if self._commit_diff is None or self._commit_diff_commit_id != commit_id:
        key = get_tensor_commit_diff_key(self.key, commit_id)
        if not self.commit_diff_exists:
            diff = CommitDiff(self.num_samples)
            try:
                self.meta_cache[key] = diff
            except ReadOnlyModeError:
                pass
        else:
            diff = self.meta_cache.get_hub_object(key, CommitDiff)
        self._commit_diff = diff
        self._commit_diff_commit_id = commit_id
        self.meta_cache.register_hub_object(key, diff)
    return self._commit_diff
var commit_diff_exists
Expand source code
@property
def commit_diff_exists(self) -> bool:
    commit_id = self.commit_id
    if self._commit_diff is not None and self._commit_diff_commit_id == commit_id:
        return True
    try:
        key = get_tensor_commit_diff_key(self.key, commit_id)
        self.meta_cache[key]
        return True
    except KeyError:
        return False
var commit_id
Expand source code
@property
def commit_id(self):
    return self.version_state["commit_id"]
var creds_encoder
Expand source code
@property
def creds_encoder(self):
    return None
var is_data_cachable
Expand source code
@property
def is_data_cachable(self):
    tensor_meta = self.tensor_meta
    return (
        self.chunk_class == UncompressedChunk
        and tensor_meta.htype not in ["text", "json", "list"]
        and tensor_meta.max_shape
        and (tensor_meta.max_shape == tensor_meta.min_shape)
        and (np.prod(tensor_meta.max_shape) < 20)
    )
var is_fixed_shape
Expand source code
@property
def is_fixed_shape(self):
    tensor_meta = self.tensor_meta
    return tensor_meta.min_shape == tensor_meta.max_shape
var is_sequence
Expand source code
@property
def is_sequence(self):
    return self.tensor_meta.is_sequence
var is_text_like
Expand source code
@property
def is_text_like(self):
    return (
        self.tensor_meta.htype in {"text", "json", "list"}
        or self.tensor_meta.is_link
    )
var is_video
Expand source code
@property
def is_video(self):
    return (
        self.compression in VIDEO_COMPRESSIONS or self.tensor_meta.htype == "video"
    )
var last_appended_chunk_id
Expand source code
@property
def last_appended_chunk_id(self) -> str:
    return self.chunk_id_encoder.get_id_for_chunk(-1)
var last_appended_chunk_name
Expand source code
@property
def last_appended_chunk_name(self) -> str:
    return self.chunk_id_encoder.get_name_for_chunk(-1)
var last_chunk_key
Expand source code
@property
def last_chunk_key(self) -> str:
    last_chunk_name = self.last_appended_chunk_name
    commit_id = self.get_chunk_commit(last_chunk_name)
    return get_chunk_key(self.key, last_chunk_name, commit_id)
var max_chunk_size
Expand source code
@property
def max_chunk_size(self):
    # no chunks may exceed this
    return (
        getattr(self.tensor_meta, "max_chunk_size", None) or DEFAULT_MAX_CHUNK_SIZE
    )
var meta_cache
Expand source code
@property
def meta_cache(self) -> LRUCache:
    return self._meta_cache or self.cache
var min_chunk_size
Expand source code
@property
def min_chunk_size(self):
    # only the last chunk may be less than this
    return self.max_chunk_size // 2
var num_chunks
Expand source code
@property
def num_chunks(self) -> int:
    if not self.chunk_id_encoder_exists:
        return 0
    return self.chunk_id_encoder.num_chunks
var num_samples

Returns the length of the primary axis of the tensor. Ignores any applied indexing and returns the total length.

Expand source code
@property
def num_samples(self) -> int:
    """Returns the length of the primary axis of the tensor.
    Ignores any applied indexing and returns the total length.
    """
    return self.tensor_meta.length
var num_samples_per_chunk
Expand source code
@property
def num_samples_per_chunk(self):
    # should only be called if self.is_fixed_shape
    if self._num_samples_per_chunk is None:
        self._num_samples_per_chunk = (
            self.chunk_id_encoder.array[0, LAST_SEEN_INDEX_COLUMN] + 1
        )
    return self._num_samples_per_chunk
var sequence_encoder

Gets the shape encoder from cache, if one is not found it creates a blank encoder.

Raises

CorruptedMetaError
If shape encoding was corrupted.

Returns

A SequenceEncoder instance storing the start and end indices of each sequence in the tensor.

Expand source code
@property
def sequence_encoder(self) -> SequenceEncoder:
    """Gets the shape encoder from cache, if one is not found it creates a blank encoder.

    Raises:
        CorruptedMetaError: If shape encoding was corrupted.

    Returns:
        A SequenceEncoder instance storing the start and end indices of each sequence in the tensor.
    """

    if not self.is_sequence:
        return  # type: ignore
    commit_id = self.commit_id
    if (
        self._sequence_encoder is None
        or self._sequence_encoder_commit_id != commit_id
    ):
        commit_id = self.commit_id
        key = get_sequence_encoder_key(self.key, commit_id)
        if not self.sequence_encoder_exists:
            enc = SequenceEncoder()
            try:
                self.meta_cache[key] = enc
            except ReadOnlyModeError:
                pass
        else:
            enc = self.meta_cache.get_hub_object(key, SequenceEncoder)
        self._sequence_encoder = enc
        self._sequence_encoder_commit_id = commit_id
        self.meta_cache.register_hub_object(key, enc)
    return self._sequence_encoder
var sequence_encoder_exists
Expand source code
@property
def sequence_encoder_exists(self) -> bool:
    commit_id = self.commit_id
    if (
        self._sequence_encoder is not None
        and self._sequence_encoder_commit_id == commit_id
    ):
        return True
    try:
        key = get_sequence_encoder_key(self.key, commit_id)
        self.meta_cache[key]
        return True
    except KeyError:
        return False
var shape_interval

Returns a ShapeInterval object that describes this tensor's shape more accurately. Length is included.

Note

If you are expecting a tuple, use tensor.shape instead.

Example

>>> tensor.append(np.zeros((10, 10)))
>>> tensor.append(np.zeros((10, 15)))
>>> tensor.shape_interval
ShapeInterval(lower=(2, 10, 10), upper=(2, 10, 15))
>>> str(tensor.shape_interval)
(2, 10, 10:15)

Returns

ShapeInterval
Object containing lower and upper properties.
Expand source code
@property
def shape_interval(self) -> ShapeInterval:
    """Returns a `ShapeInterval` object that describes this tensor's shape more accurately. Length is included.

    Note:
        If you are expecting a `tuple`, use `tensor.shape` instead.

    Example:
        >>> tensor.append(np.zeros((10, 10)))
        >>> tensor.append(np.zeros((10, 15)))
        >>> tensor.shape_interval
        ShapeInterval(lower=(2, 10, 10), upper=(2, 10, 15))
        >>> str(tensor.shape_interval)
        (2, 10, 10:15)

    Returns:
        ShapeInterval: Object containing `lower` and `upper` properties.
    """
    meta = self.tensor_meta
    if self.is_sequence:
        seq_length = self._sequence_length
        min_item_length, max_item_length = self._sequence_item_length_range
        min_length = [seq_length, min_item_length]
        max_length = [seq_length, max_item_length]
    else:
        min_length = max_length = [meta.length]
    min_shape = min_length + list(meta.min_shape)
    max_shape = max_length + list(meta.max_shape)

    return ShapeInterval(min_shape, max_shape)
var tensor_meta
Expand source code
@property
def tensor_meta(self):
    commit_id = self.commit_id
    if self._tensor_meta is None or self._tensor_meta_commit_id != commit_id:
        key = get_tensor_meta_key(self.key, commit_id)
        self._tensor_meta = self.meta_cache.get_hub_object(key, TensorMeta)
        self._tensor_meta_commit_id = commit_id
        self.meta_cache.register_hub_object(key, self._tensor_meta)
    return self._tensor_meta
var tile_encoder

Gets the tile encoder from cache, if one is not found it creates a blank encoder.

Expand source code
@property
def tile_encoder(self) -> TileEncoder:
    """Gets the tile encoder from cache, if one is not found it creates a blank encoder."""
    commit_id = self.commit_id
    if self._tile_encoder is None or self._tile_encoder_commit_id != commit_id:
        key = get_tensor_tile_encoder_key(self.key, commit_id)
        if not self.tile_encoder_exists:
            enc = TileEncoder()
            try:
                self.meta_cache[key] = enc
            except ReadOnlyModeError:
                pass
        else:
            enc = self.meta_cache.get_hub_object(key, TileEncoder)
        self._tile_encoder = enc
        self._tile_encoder_commit_id = commit_id
        self.meta_cache.register_hub_object(key, enc)
    return self._tile_encoder
var tile_encoder_exists
Expand source code
@property
def tile_encoder_exists(self) -> bool:
    commit_id = self.commit_id
    if self._tile_encoder is not None and self._tile_encoder_commit_id == commit_id:
        return True

    try:
        key = get_tensor_tile_encoder_key(self.key, commit_id)
        self.meta_cache[key]
        return True
    except KeyError:
        return False
var tiling_threshold
Expand source code
@property
def tiling_threshold(self):
    return (
        getattr(self.tensor_meta, "tiling_threshold", None)
        or DEFAULT_TILING_THRESHOLD
        or self.min_chunk_size
    )

Methods

def check_each_sample(self, samples)
Expand source code
def check_each_sample(self, samples):
    return
Expand source code
def check_link_ready(self):
    return
def check_remove_active_chunks(self, chunk_key)
Expand source code
def check_remove_active_chunks(self, chunk_key):
    if (
        self.active_appended_chunk is not None
        and self.active_appended_chunk.key == chunk_key
    ):
        self.active_appended_chunk = None
    if (
        self.active_updated_chunk is not None
        and self.active_updated_chunk.key == chunk_key
    ):
        self.active_updated_chunk = None
def clear(self)

Clears all samples and cachables.

Expand source code
def clear(self):
    """Clears all samples and cachables."""
    self.cache.check_readonly()

    commit_id = self.commit_id

    chunk_folder_path = get_chunk_key(self.key, "", commit_id)
    self.cache.clear(prefix=chunk_folder_path)

    enc_key = get_chunk_id_encoder_key(self.key, commit_id)
    self._chunk_id_encoder = None
    try:
        del self.meta_cache[enc_key]
    except KeyError:
        pass

    info_key = get_tensor_info_key(self.key, commit_id)
    try:
        self._info = None
        del self.cache[info_key]
    except KeyError:
        pass

    self.commit_diff.clear_data()

    tile_encoder_key = get_tensor_tile_encoder_key(self.key, commit_id)
    try:
        self._tile_encoder = None
        del self.cache[tile_encoder_key]
    except KeyError:
        pass

    seq_encoder_key = get_sequence_encoder_key(self.key, commit_id)
    try:
        self._sequence_encoder = None
        del self.cache[seq_encoder_key]
    except KeyError:
        pass

    self.tensor_meta.length = 0
    self.tensor_meta.min_shape = []
    self.tensor_meta.max_shape = []
    self.tensor_meta.is_dirty = True

    self.cache.maybe_flush()
    self.meta_cache.maybe_flush()
def clear_unusd_chunks(self, storage)
Expand source code
def clear_unusd_chunks(self, storage: StorageProvider):
    # storage.delete_multiple(self.list_orphaned_chunks(storage))
    raise NotImplementedError(
        "requires StorageProvider to be able to list all chunks"
    )
def copy_chunk_to_new_commit(self, chunk, chunk_name)

Copies the chunk to the current commit.

Returns the copied chunk.

Expand source code
def copy_chunk_to_new_commit(self, chunk, chunk_name):
    """Copies the chunk to the current commit.

    Returns the copied chunk.
    """
    new_chunk_key = get_chunk_key(self.key, chunk_name, self.commit_id)
    chunk_id = chunk.id
    chunk = chunk.copy(self.chunk_args)
    chunk.key = new_chunk_key
    chunk.id = chunk_id
    if self.commit_chunk_set is not None:
        self.commit_chunk_set.add(chunk_name)
    return chunk
def extend(self, samples, progressbar=False, link_callback=None)
Expand source code
def extend(
    self,
    samples,
    progressbar: bool = False,
    link_callback: Optional[Callable] = None,
):
    self.check_link_ready()
    self._write_initialization()
    initial_autoflush = self.cache.autoflush
    self.cache.autoflush = False

    if self.is_sequence:
        samples = tqdm(samples) if progressbar else samples
        for sample in samples:
            verified_sample = self._extend(
                sample, progressbar=False, update_commit_diff=False
            )
            self.sequence_encoder.register_samples(len(sample), 1)
            self.commit_diff.add_data(1)
            ls = verified_sample or sample
            if link_callback:
                link_callback(ls, flat=False)
                for s in ls:
                    s = None if is_empty_list(s) else s
                    link_callback(s, flat=True)

    else:
        verified_samples = self._extend(samples, progressbar)
        ls = verified_samples or samples
        if link_callback:
            for sample in ls:
                sample = None if is_empty_list(sample) else sample
                link_callback(sample, flat=None)

    self.cache.autoflush = initial_autoflush
    self.cache.maybe_flush()
def get_basic_sample(self, global_sample_index, index, fetch_chunks=False)
Expand source code
def get_basic_sample(self, global_sample_index, index, fetch_chunks=False):
    enc = self.chunk_id_encoder
    chunk_id, row, worst_case_header_size = self.get_chunk_info(
        global_sample_index, fetch_chunks
    )
    local_sample_index = enc.translate_index_relative_to_chunks(global_sample_index)
    chunk = self.get_chunk_from_chunk_id(
        chunk_id, partial_chunk_bytes=worst_case_header_size
    )
    return chunk.read_sample(
        local_sample_index, cast=self.tensor_meta.htype != "dicom"
    )[tuple(entry.value for entry in index.values[1:])]
def get_chunk(self, chunk_key, partial_chunk_bytes=0)
Expand source code
def get_chunk(self, chunk_key: str, partial_chunk_bytes=0) -> BaseChunk:
    return self.cache.get_hub_object(
        chunk_key,
        self.chunk_class,
        self.chunk_args,
        partial_bytes=partial_chunk_bytes,
    )
def get_chunk_commit(self, chunk_name)

Returns the commit id that contains the chunk_name.

Expand source code
def get_chunk_commit(self, chunk_name) -> str:
    """Returns the commit id that contains the chunk_name."""
    cur_node: Optional[CommitNode] = self.version_state["commit_node"]
    while cur_node is not None:
        commit_id = cur_node.commit_id
        chunk_set_key = get_tensor_commit_chunk_set_key(self.key, commit_id)
        try:
            # the first commit doesn't contain a chunk set, don't repeatedly try to fetch from storage
            if commit_id == FIRST_COMMIT_ID:
                chunk_set = set()
            else:
                chunk_set = self.meta_cache.get_hub_object(
                    chunk_set_key, CommitChunkSet
                ).chunks
        except Exception:
            commit_chunk_set = CommitChunkSet()
            try:
                self.meta_cache[chunk_set_key] = commit_chunk_set
            except ReadOnlyModeError:
                # put CommitChunkSet in hub_objects to keep in cache temporarily, but won't write to storage
                # this shouldn't happen in latest version of hub, chunk set would always be present
                self.meta_cache.hub_objects[chunk_set_key] = commit_chunk_set
            chunk_set = set()
        if chunk_name in chunk_set:
            return commit_id
        cur_node = cur_node.parent  # type: ignore
    # the first commit doesn't have a commit chunk set, so any chunk that wasn't found belongs to the first commit
    return FIRST_COMMIT_ID
def get_chunk_from_chunk_id(self, chunk_id, copy=False, partial_chunk_bytes=0)
Expand source code
def get_chunk_from_chunk_id(
    self, chunk_id, copy: bool = False, partial_chunk_bytes=0
) -> BaseChunk:
    chunk_name = ChunkIdEncoder.name_from_id(chunk_id)
    chunk_commit_id = self.get_chunk_commit(chunk_name)
    chunk_key = get_chunk_key(self.key, chunk_name, chunk_commit_id)
    chunk = self.get_chunk(chunk_key, partial_chunk_bytes=partial_chunk_bytes)
    chunk.key = chunk_key  # type: ignore
    chunk.id = chunk_id  # type: ignore
    if copy and chunk_commit_id != self.commit_id:
        chunk = self.copy_chunk_to_new_commit(chunk, chunk_name)
    return chunk
def get_chunk_info(self, global_sample_index, fetch_chunks)

Returns the chunk_id, row and worst case header size of chunk containing the given sample.

Expand source code
def get_chunk_info(self, global_sample_index, fetch_chunks):
    """Returns the chunk_id, row and worst case header size of chunk containing the given sample."""
    enc = self.chunk_id_encoder
    out = enc.__getitem__(global_sample_index, return_row_index=True)
    chunk_id, row = out[0][0], out[0][1]

    worst_case_header_size = 0
    num_samples_in_chunk = -1
    if (
        not fetch_chunks
        and isinstance(self.base_storage, (S3Provider, GCSProvider))
        and not isinstance(self.chunk_class, ChunkCompressedChunk)
    ):
        prev = enc.array[row - 1][LAST_SEEN_INDEX_COLUMN] if row > 0 else -1
        num_samples_in_chunk = enc.array[row][LAST_SEEN_INDEX_COLUMN] - prev
        worst_case_header_size += HEADER_SIZE_BYTES + 10  # 10 for version
        ENTRY_SIZE = 4
        if self.tensor_meta.max_shape == self.tensor_meta.min_shape:
            num_shape_entries = 1 * (len(self.tensor_meta.min_shape) + 1)
            if self.is_text_like:
                num_bytes_entries = num_samples_in_chunk * 3
            elif self.tensor_meta.sample_compression is None:
                num_bytes_entries = 1 * 3
            else:
                num_bytes_entries = num_samples_in_chunk * 3
        else:
            num_shape_entries = num_samples_in_chunk * (
                1 + len(self.tensor_meta.max_shape)
            )
            num_bytes_entries = num_samples_in_chunk * 3
        bytes_enc_size = num_bytes_entries * ENTRY_SIZE
        shape_enc_size = num_shape_entries * ENTRY_SIZE
        worst_case_header_size += shape_enc_size
        worst_case_header_size += bytes_enc_size

    return chunk_id, row, worst_case_header_size
def get_chunk_key_for_id(self, chunk_id)
Expand source code
def get_chunk_key_for_id(self, chunk_id) -> str:
    chunk_name = ChunkIdEncoder.name_from_id(chunk_id)
    commit_id = self.get_chunk_commit(chunk_name)
    return get_chunk_key(self.key, chunk_name, commit_id)
def get_chunks_for_sample(self, global_sample_index, copy=False)

Retrives the Chunk object corresponding to global_sample_index.

Args

global_sample_index : int
Index relative to the entire tensor representing the sample.
copy : bool
If True and the chunk exists in a different commit to the current commit, it will be copied. Defaults to False.

Returns

List[BaseChunk]
BaseChunk objects that contains global_sample_index.
Expand source code
def get_chunks_for_sample(
    self,
    global_sample_index: int,
    copy: bool = False,
) -> List[BaseChunk]:
    """Retrives the `Chunk` object corresponding to `global_sample_index`.
    Args:
        global_sample_index (int): Index relative to the entire tensor representing the sample.
        copy (bool): If True and the chunk exists in a different commit to the current commit, it will be copied. Defaults to False.
    Returns:
        List[BaseChunk]: BaseChunk objects that contains `global_sample_index`.
    """
    return [
        self.get_chunk_from_chunk_id(chunk_id, copy)
        for chunk_id in self.chunk_id_encoder[global_sample_index]
    ]
def get_empty_sample(self)
Expand source code
def get_empty_sample(self):
    if self.num_samples == 0:
        raise ValueError("This tensor has no samples, cannot get empty sample.")
    htype = self.tensor_meta.htype
    dtype = self.tensor_meta.dtype
    if htype in ("text", "json", "list"):
        return get_empty_text_like_sample(htype)
    ndim = len(self.tensor_meta.max_shape)
    if self.is_sequence:
        ndim += 1
    shape = (0,) * ndim
    return np.ones(shape, dtype=dtype)
def get_full_tiled_sample(self, global_sample_index)
Expand source code
def get_full_tiled_sample(self, global_sample_index):
    chunks = self.get_chunks_for_sample(global_sample_index)
    return combine_chunks(chunks, global_sample_index, self.tile_encoder)
def get_non_tiled_sample(self, global_sample_index, index, fetch_chunks=False)
Expand source code
def get_non_tiled_sample(self, global_sample_index, index, fetch_chunks=False):
    if self.is_video:
        return self.get_video_sample(global_sample_index, index)
    return self.get_basic_sample(
        global_sample_index, index, fetch_chunks=fetch_chunks
    )
def get_partial_tiled_sample(self, global_sample_index, index)
Expand source code
def get_partial_tiled_sample(self, global_sample_index, index):
    tile_enc = self.tile_encoder
    chunk_ids = self.chunk_id_encoder[global_sample_index]
    sample_shape = tile_enc.get_sample_shape(global_sample_index)
    tile_shape = tile_enc.get_tile_shape(global_sample_index)
    ordered_tile_ids = np.array(chunk_ids).reshape(
        tile_enc.get_tile_layout_shape(global_sample_index)
    )
    tiles_index, sample_index = translate_slices(
        [v.value for v in index.values[1:]], sample_shape, tile_shape  # type: ignore
    )
    required_tile_ids = ordered_tile_ids[tiles_index]
    tiles = np.vectorize(
        lambda chunk_id: self.get_chunk_from_chunk_id(chunk_id).read_sample(
            0, is_tile=True
        ),
        otypes=[object],
    )(required_tile_ids)
    sample = coalesce_tiles(tiles, tile_shape, None, self.tensor_meta.dtype)
    sample = sample[sample_index]
    return sample
def get_single_sample(self, global_sample_index, index, fetch_chunks=False, pad_tensor=False)
Expand source code
def get_single_sample(
    self, global_sample_index, index, fetch_chunks=False, pad_tensor=False
):
    if pad_tensor and global_sample_index >= self.tensor_meta.length:
        sample = self.get_empty_sample()
        try:
            return sample[tuple(entry.value for entry in index.values[1:])]
        except IndexError:
            return sample

    if not self._is_tiled_sample(global_sample_index):
        sample = self.get_non_tiled_sample(
            global_sample_index, index, fetch_chunks=fetch_chunks
        )
    elif len(index.values) == 1:
        sample = self.get_full_tiled_sample(global_sample_index)
    else:
        sample = self.get_partial_tiled_sample(global_sample_index, index)

    return sample
def get_video_chunk(self, chunk_id, copy=False)

Returns video chunks. Chunk will contain presigned url to the video instead of data if the chunk is large.

Expand source code
def get_video_chunk(self, chunk_id, copy: bool = False):
    """Returns video chunks. Chunk will contain presigned url to the video instead of data if the chunk is large."""
    chunk_name = ChunkIdEncoder.name_from_id(chunk_id)
    chunk_commit_id = self.get_chunk_commit(chunk_name)
    chunk_key = get_chunk_key(self.key, chunk_name, chunk_commit_id)

    base_storage = self.base_storage
    stream = False
    if isinstance(base_storage, (S3Provider, GCSProvider)):
        chunk_size = base_storage.get_object_size(chunk_key)
        stream = chunk_size > self.min_chunk_size
        if stream:
            chunk = self.cache.get_hub_object(
                chunk_key, self.chunk_class, meta=self.chunk_args, url=True
            )
    if not stream:
        chunk = self.cache.get_hub_object(
            chunk_key, self.chunk_class, meta=self.chunk_args
        )
    chunk.key = chunk_key  # type: ignore
    chunk.id = chunk_id  # type: ignore
    if copy and chunk_commit_id != self.commit_id:
        chunk = self.copy_chunk_to_new_commit(chunk, chunk_name)
    return chunk, stream
def get_video_sample(self, global_sample_index, index, decompress=True)
Expand source code
def get_video_sample(self, global_sample_index, index, decompress=True):
    enc = self.chunk_id_encoder
    chunk_ids = enc[global_sample_index]
    local_sample_index = enc.translate_index_relative_to_chunks(global_sample_index)
    chunk, stream = self.get_video_chunk(chunk_ids[0])
    sub_index = index.values[1].value if len(index.values) > 1 else None  # type: ignore
    sample = chunk.read_sample(
        local_sample_index,
        sub_index=sub_index,
        stream=stream,
        decompress=decompress,
    )
    if decompress:
        return sample[tuple(entry.value for entry in index.values[2:])]
    return sample
def last_appended_chunk(self)
Expand source code
def last_appended_chunk(self) -> Optional[BaseChunk]:
    last_index = self.num_samples - 1
    if self.num_chunks == 0 or last_index in self.tile_encoder:
        return None
    chunk_name = self.last_appended_chunk_name
    chunk_commit_id = self.get_chunk_commit(chunk_name)
    chunk_key = get_chunk_key(self.key, chunk_name, chunk_commit_id)
    chunk = self.get_chunk(chunk_key)
    chunk.key = chunk_key  # type: ignore
    chunk.id = self.last_appended_chunk_id  # type: ignore
    if chunk_commit_id != self.commit_id:
        chunk = self.copy_chunk_to_new_commit(chunk, chunk_name)
    if (
        self.active_appended_chunk is not None
        and self.active_appended_chunk.key != chunk_key
    ):
        self.write_chunk_to_storage(self.active_appended_chunk)
    self.active_appended_chunk = chunk
    return chunk
def list_all_chunks(self)

Return list of all chunks for current version_state['commit_id'] and tensor

Expand source code
def list_all_chunks(self) -> List[str]:
    """Return list of all chunks for current `version_state['commit_id']` and tensor"""
    commit_id = self.commit_id
    if commit_id == FIRST_COMMIT_ID:
        return [
            ChunkIdEncoder.name_from_id(chunk_id)
            for chunk_id in self.chunk_id_encoder.array[:, CHUNK_ID_COLUMN]
        ]  # type: ignore
    else:
        return list(self.commit_chunk_set.chunks)  # type: ignore
def list_all_chunks_path(self)

Return list of paths to all chunks

Expand source code
def list_all_chunks_path(self) -> List[str]:
    """Return list of paths to all chunks"""
    commit_id = self.commit_id
    return [
        get_chunk_key(self.key, chunk, commit_id)
        for chunk in self.list_all_chunks()
    ]
def list_orphaned_chunks(self, storage)

Return paths for orphaned chunks (chunks what are not linked to the current_version)

Expand source code
def list_orphaned_chunks(self, storage):
    """Return paths for orphaned chunks (chunks what are not linked to the `current_version`)"""

    commit_id = self.commit_id
    prefix: str = f"{self.key}/chunks/"

    if commit_id != FIRST_COMMIT_ID:
        prefix = f"versions/{commit_id}/{prefix}"

    all_chunks = [
        item.replace(prefix, "") for item in storage if item.startswith(prefix)
    ]
    linked_chunks = self.list_all_chunks()

    return [
        f"{prefix}{chunk}" for chunk in all_chunks if chunk not in linked_chunks
    ]
def ndim(self, index=None)
Expand source code
def ndim(self, index: Optional[Index] = None) -> int:
    ndim = len(self.tensor_meta.min_shape) + 1
    if self.is_sequence:
        ndim += 1
    if index:
        for idx in index.values:
            if not idx.subscriptable():
                ndim -= 1
    return ndim
def numpy(self, index, aslist=False, use_data_cache=True, fetch_chunks=False, pad_tensor=False)

Reads samples from chunks and returns as a numpy array. If aslist=True, returns a sequence of numpy arrays.

Args

index : Index
Represents the samples to read from chunks. See Index for more information.
aslist : bool
If True, the samples will be returned as a list of numpy arrays. If False, returns a single numpy array. Defaults to False.
use_data_cache : bool
If True, the data cache is used to speed up the read if possible. If False, the data cache is ignored. Defaults to True.
fetch_chunks : bool
If True, full chunks will be retrieved from the storage, otherwise only required bytes will be retrieved. This will always be True even if specified as False in the following cases: - The tensor is ChunkCompressed - The chunk which is being accessed has more than 128 samples.
pad_tensor : bool
If True, any index out of bounds will not throw an error, but instead will return an empty sample.

Raises

DynamicTensorNumpyError
If shapes of the samples being read are not all the same.

Returns

Union[np.ndarray, List[np.ndarray]]
Either a list of numpy arrays or a single numpy array (depending on the aslist argument).
Expand source code
def numpy(
    self,
    index: Index,
    aslist: bool = False,
    use_data_cache: bool = True,
    fetch_chunks: bool = False,
    pad_tensor: bool = False,
) -> Union[np.ndarray, List[np.ndarray]]:
    """Reads samples from chunks and returns as a numpy array. If `aslist=True`, returns a sequence of numpy arrays.

    Args:
        index (Index): Represents the samples to read from chunks. See `Index` for more information.
        aslist (bool): If True, the samples will be returned as a list of numpy arrays. If False, returns a single numpy array. Defaults to False.
        use_data_cache (bool): If True, the data cache is used to speed up the read if possible. If False, the data cache is ignored. Defaults to True.
        fetch_chunks (bool): If True, full chunks will be retrieved from the storage, otherwise only required bytes will be retrieved.
            This will always be True even if specified as False in the following cases:
            - The tensor is ChunkCompressed
            - The chunk which is being accessed has more than 128 samples.
        pad_tensor (bool): If True, any index out of bounds will not throw an error, but instead will return an empty sample.

    Raises:
        DynamicTensorNumpyError: If shapes of the samples being read are not all the same.

    Returns:
        Union[np.ndarray, List[np.ndarray]]: Either a list of numpy arrays or a single numpy array (depending on the `aslist` argument).
    """
    self.check_link_ready()
    fetch_chunks = fetch_chunks or self._get_full_chunk(index)
    return (self._sequence_numpy if self.is_sequence else self._numpy)(
        index, aslist, use_data_cache, fetch_chunks, pad_tensor
    )
def numpy_from_data_cache(self, index, length, aslist, pad_tensor=False)
Expand source code
def numpy_from_data_cache(self, index, length, aslist, pad_tensor=False):
    samples = []
    enc = self.chunk_id_encoder
    for global_sample_index in index.values[0].indices(length):
        if pad_tensor and global_sample_index >= self.tensor_meta.length:
            sample = self.get_empty_sample()
            try:
                sample = sample[tuple(entry.value for entry in index.values[1:])]
            except IndexError:
                pass
        else:
            if (
                self.cached_data is None
                or global_sample_index not in self.cache_range
            ):
                row = enc.__getitem__(global_sample_index, True)[0][1]
                chunks = self.get_chunks_for_sample(global_sample_index)
                assert len(chunks) == 1

                chunk_arr = self.chunk_id_encoder.array

                chunk = chunks[0]
                first_sample = 0 if row == 0 else chunk_arr[row - 1][1] + 1
                last_sample = self.chunk_id_encoder.array[row][1]
                num_samples = last_sample - first_sample + 1

                full_shape = (num_samples,) + tuple(self.tensor_meta.max_shape)
                dtype = self.tensor_meta.dtype

                data_bytes = bytearray(chunk.data_bytes)
                self.cached_data = np.frombuffer(data_bytes, dtype).reshape(
                    full_shape
                )
                self.cache_range = range(first_sample, last_sample + 1)

            sample = self.cached_data[global_sample_index - self.cache_range.start]  # type: ignore

            # need to copy if aslist otherwise user might modify the returned data
            # if not aslist, we already do np.array(samples) while formatting which copies
            sample = sample.copy() if aslist else sample
            sample = sample[tuple(entry.value for entry in index.values[1:])]
        samples.append(sample)
    return samples
def pad_and_append(self, num_samples_to_pad, value, append_link_callback=None, update_link_callback=None)

Pads the tensor with empty samples and appends value at the end.

Expand source code
def pad_and_append(
    self,
    num_samples_to_pad: int,
    value,
    append_link_callback=None,
    update_link_callback=None,
):
    """Pads the tensor with empty samples and appends value at the end."""
    self.check_link_ready()
    update_first_sample = False
    if num_samples_to_pad > 0:
        if self.num_samples == 0:
            # set htype, dtype, shape, we later update it with empty sample
            self.extend([value], link_callback=append_link_callback)
            num_samples_to_pad -= 1
            update_first_sample = True

        htype = self.tensor_meta.htype
        if htype in ("json", "text", "list"):
            empty_sample = get_empty_text_like_sample(htype)
            empty_samples = [empty_sample] * num_samples_to_pad
        elif self.tensor_meta.is_link:
            empty_sample = None
            empty_samples = [None] * num_samples_to_pad
        else:
            ndim = len(self.tensor_meta.max_shape)
            if self.is_sequence:
                ndim += 1
            shape = tuple([num_samples_to_pad] + [0] * ndim)
            dtype = self.tensor_meta.dtype
            empty_sample = np.zeros(shape[1:], dtype=dtype)
            empty_samples = np.zeros(shape, dtype=dtype)  # type: ignore

        if update_first_sample:
            self.update(Index(0), empty_sample, link_callback=update_link_callback)

        # pad
        self.extend(empty_samples, link_callback=append_link_callback)

    self.extend([value], link_callback=append_link_callback)
def pop(self, global_sample_index)
Expand source code
def pop(self, global_sample_index: int):
    self._write_initialization()
    if self.tensor_meta.length == 0:
        raise ValueError("There are no samples to pop")
    if global_sample_index < 0 or global_sample_index >= self.tensor_meta.length:
        raise IndexError(
            f"Index {global_sample_index} is out of range for tensor of length {self.tensor_meta.length}"
        )

    self.cached_data = None
    initial_autoflush = self.cache.autoflush
    self.cache.autoflush = False

    self.commit_diff.pop(global_sample_index)
    if self.is_sequence:
        # pop in reverse order else indices get shifted
        for idx in reversed(range(*self.sequence_encoder[global_sample_index])):
            self.pop_item(idx)
        self.sequence_encoder.pop(global_sample_index)
    else:
        self.pop_item(global_sample_index)

    self.cache.autoflush = initial_autoflush
    self.cache.maybe_flush()
def pop_item(self, global_sample_index)
Expand source code
def pop_item(self, global_sample_index):
    enc = self.chunk_id_encoder
    if not self._is_tiled_sample(global_sample_index):
        local_sample_index = enc.translate_index_relative_to_chunks(
            global_sample_index
        )
    chunk_ids, rows, delete = enc.pop(global_sample_index)
    if len(chunk_ids) > 1:  # Tiled sample, delete all chunks
        del self.tile_encoder[global_sample_index]
    elif not delete:  # There are other samples in the last chunk
        chunk_to_update = self.get_chunk_from_chunk_id(chunk_ids[0], copy=True)
        chunk_to_update.pop(local_sample_index)

        self._check_rechunk(chunk_to_update, chunk_row=rows[0])

        if (
            self.active_updated_chunk is not None
            and self.active_updated_chunk.key != chunk_to_update.key  # type: ignore
        ):
            self.write_chunk_to_storage(self.active_updated_chunk)
        self.active_updated_chunk = chunk_to_update
    if delete:
        for chunk_key in map(self.get_chunk_key_for_id, chunk_ids):
            self.check_remove_active_chunks(chunk_key)
            try:
                del self.cache[chunk_key]
            except KeyError:
                pass

    self.tensor_meta.pop(global_sample_index)
def read_bytes_for_sample(self, global_sample_index)
Expand source code
def read_bytes_for_sample(self, global_sample_index: int) -> bytes:
    if self.tensor_meta.chunk_compression:
        raise Exception(
            "Cannot retreive original bytes for samples in chunk-wise compressed tensors."
        )
    enc = self.chunk_id_encoder
    chunks = self.get_chunks_for_sample(global_sample_index)
    if len(chunks) > 1:
        raise NotImplementedError(
            "read_bytes_for_sample() is not implemented for tiled samples."
        )
    chunk = chunks[0]
    buffer = chunk.memoryview_data
    if not buffer:
        return b""
    local_sample_index = enc.translate_index_relative_to_chunks(global_sample_index)
    sb, eb = chunk.byte_positions_encoder[local_sample_index]
    return buffer[sb:eb].tobytes()
def read_sample_from_chunk(self, global_sample_index, chunk, cast=True, copy=False, decompress=True)
Expand source code
def read_sample_from_chunk(
    self,
    global_sample_index: int,
    chunk: BaseChunk,
    cast: bool = True,
    copy: bool = False,
    decompress: bool = True,
) -> np.ndarray:
    enc = self.chunk_id_encoder
    if self.is_fixed_shape and self.tensor_meta.sample_compression is None:
        num_samples_per_chunk = self.num_samples_per_chunk
        local_sample_index = global_sample_index % num_samples_per_chunk
    else:
        local_sample_index = enc.translate_index_relative_to_chunks(
            global_sample_index
        )
    return chunk.read_sample(
        local_sample_index, cast=cast, copy=copy, decompress=decompress
    )
def read_shape_for_sample(self, global_sample_index)
Expand source code
def read_shape_for_sample(
    self,
    global_sample_index: int,
) -> Tuple[int, ...]:
    enc = self.chunk_id_encoder
    if self._is_tiled_sample(global_sample_index):
        return self.tile_encoder.get_sample_shape(global_sample_index)
    local_sample_index = enc.translate_index_relative_to_chunks(global_sample_index)
    if self.is_video:
        chunk_id = enc[global_sample_index][0]
        chunk = self.get_video_chunk(chunk_id)[0]
    else:
        chunk_id, _, worst_case_header_size = self.get_chunk_info(
            global_sample_index, fetch_chunks=False
        )
        chunk = self.get_chunk_from_chunk_id(
            chunk_id, partial_chunk_bytes=worst_case_header_size
        )
    return tuple(map(int, chunk.shapes_encoder[local_sample_index]))
def register_new_creds(self, num_samples_added, samples)
Expand source code
def register_new_creds(self, num_samples_added, samples):
    return
def shape(self, index, sample_shape_provider=None)
Expand source code
def shape(
    self, index: Index, sample_shape_provider: Optional[Callable] = None
) -> Tuple[Optional[int], ...]:
    shape = self.shape_interval.astuple()
    idxs = index.values
    skip_dims = 0
    if None in shape or self.tensor_meta.is_link:
        if not idxs[0].subscriptable():
            if self.tensor_meta.htype in ("text", "json"):
                shape = (1,)
            else:
                if sample_shape_provider:
                    try:
                        shape = sample_shape_provider(idxs[0].value)  # type: ignore
                        if self.is_sequence:
                            if len(idxs) > 1 and not idxs[1].subscriptable():
                                shape = tuple(shape[idxs[1].value].tolist())  # type: ignore
                                skip_dims += 1
                            else:
                                shape = (len(shape),) + (
                                    tuple(
                                        int(shape[0, i])  # type: ignore
                                        if np.all(shape[:, i] == shape[0, i])  # type: ignore
                                        else None
                                        for i in range(shape.shape[1])  # type: ignore
                                    )
                                    or (1,)
                                )

                    except IndexError:  # Happens during transforms, sample shape tensor is not populated yet
                        shape = self.read_shape_for_sample(idxs[0].value)  # type: ignore
                else:
                    self.check_link_ready()
                    shape = self.read_shape_for_sample(idxs[0].value)  # type: ignore
            skip_dims += 1
    elif not idxs[0].subscriptable():
        shape = shape[1:]
        skip_dims += 1
    shape = list(shape)  # type: ignore
    squeeze_dims = set()
    for i, idx in enumerate(idxs[skip_dims:]):
        if idx.subscriptable():
            shape[i] = idx.length(shape[i])  # type: ignore
        else:
            squeeze_dims.add(i)
    return tuple(shape[i] for i in range(len(shape)) if i not in squeeze_dims)
def update(self, index, samples, operator=None, link_callback=None)

Update data at index with samples.

Expand source code
def update(
    self,
    index: Index,
    samples: Union[np.ndarray, Sequence[InputSample], InputSample],
    operator: Optional[str] = None,
    link_callback: Optional[Callable] = None,
):
    """Update data at `index` with `samples`."""
    self.check_link_ready()
    (self._sequence_update if self.is_sequence else self._update)(  # type: ignore
        index,
        samples,
        operator,
        link_callback=link_callback,
    )
def update_creds(self, sample_index, sample)
Expand source code
def update_creds(self, sample_index, sample):
    return
def validate_num_samples_is_synchronized(self)

Check if tensor meta length and chunk ID encoder are representing the same number of samples. Helpful for determining if a user has tampered with the tensor meta or the chunk ID encoder, or if the tensor was corruptd.

Raises

CorruptedMetaError
tensor_meta and chunk_id_encoder must have the same num samples.
Expand source code
def validate_num_samples_is_synchronized(self):
    """Check if tensor meta length and chunk ID encoder are representing the same number of samples.
    Helpful for determining if a user has tampered with the tensor meta or the chunk ID encoder, or if
    the tensor was corruptd.

    Raises:
        CorruptedMetaError: tensor_meta and chunk_id_encoder must have the same num samples.
    """

    tensor_meta_length = self.tensor_meta.length

    # compare chunk ID encoder and tensor meta

    # update this if we change self.num_samples implementation later to use tensor meta length instead of chunk_id_encoder
    chunk_id_num_samples = self.num_samples

    if tensor_meta_length != chunk_id_num_samples:
        commit_id = self.commit_id
        tkey = get_tensor_meta_key(self.key, commit_id)
        ikey = get_chunk_id_encoder_key(self.key, commit_id)
        raise CorruptedMetaError(
            f"'{tkey}' and '{ikey}' have a record of different numbers of samples. Got {tensor_meta_length} and {chunk_id_num_samples} respectively."
        )
def write_chunk_to_storage(self, chunk)
Expand source code
def write_chunk_to_storage(self, chunk):
    if chunk is None or not chunk.is_dirty:
        return
    storage = self.cache
    key = chunk.key
    storage[key] = chunk
    chunk.is_dirty = False