Module hub.core.query.query

Expand source code
from typing import Any, Callable, List, Union, Optional
from hub.core.dataset import Dataset
from hub.core.io import IOBlock, SampleStreaming
from hub.core.index import Index
from hub.core.tensor import Tensor


import numpy as np


NP_RESULT = Union[np.ndarray, List[np.ndarray]]
NP_ACCESS = Callable[[str], NP_RESULT]


class DatasetQuery:
    def __init__(
        self,
        dataset,
        query: str,
        progress_callback: Callable[[int, bool], None] = lambda *_: None,
    ):
        self._dataset = dataset
        self._query = query
        self._pg_callback = progress_callback
        self._cquery = compile(query, "", "eval")
        self._tensors = [
            tensor
            for tensor in dataset.tensors.keys()
            if normalize_query_tensors(tensor) in query
        ]
        self._blocks = expand(dataset, self._tensors)
        self._np_access: List[NP_ACCESS] = [
            _get_np(dataset, block) for block in self._blocks
        ]
        self._wrappers = self._export_tensors()
        self._groups = self._export_groups(self._wrappers)

    def execute(self) -> List[int]:
        idx_map: List[int] = list()

        for f, blk in zip(self._np_access, self._blocks):
            cache = {tensor: f(tensor) for tensor in self._tensors}
            for local_idx in range(len(blk)):
                p = {
                    tensor: self._wrap_value(tensor, cache[tensor][local_idx])
                    for tensor in self._tensors
                }
                p.update(self._groups)
                if eval(self._cquery, p):
                    global_index = blk.indices()[local_idx]
                    idx_map.append(global_index)
                    self._pg_callback(local_idx, True)
                else:
                    self._pg_callback(local_idx, False)
        return idx_map

    def _wrap_value(self, tensor, val):
        if tensor in self._wrappers:
            return self._wrappers[tensor].with_value(val)
        else:
            return val

    def _export_tensors(self):
        return {
            tensor_key: export_tensor(tensor)
            for tensor_key, tensor in self._dataset.tensors.items()
        }

    def _export_groups(self, wrappers):
        return {
            extract_prefix(tensor_key): GroupTensor(
                self._dataset, wrappers, extract_prefix(tensor_key)
            )
            for tensor_key in self._dataset.tensors.keys()
            if "/" in tensor_key
        }


def normalize_query_tensors(tensor_key: str) -> str:
    return tensor_key.replace("/", ".")


def extract_prefix(tensor_key: str) -> str:
    return tensor_key.split("/")[0]


def _get_np(dataset: Dataset, block: IOBlock) -> NP_ACCESS:
    idx = block.indices()

    def f(tensor: str) -> NP_RESULT:
        tensor_obj = dataset.tensors[tensor]
        tensor_obj.index = Index()
        return tensor_obj[idx]

    return f


def expand(dataset, tensor: List[str]) -> List[IOBlock]:
    return SampleStreaming(dataset, tensor).list_blocks()


def export_tensor(tensor: Tensor):
    if tensor.htype == "class_label":
        return ClassLabelsTensor(tensor)

    return EvalObject()


class EvalObject:
    def __init__(self) -> None:
        self._val: Any = None
        self._numpy: Optional[Union[np.ndarray, List[np.ndarray]]] = None

    @property
    def value(self):
        return self._val

    def with_value(self, v: Any):
        self._val = v
        self._numpy = None
        return self

    @property
    def numpy_value(self):
        if self._numpy is None:
            self._numpy = self._val.numpy(
                aslist=self._val.is_dynamic, fetch_chunks=True
            )
        return self._numpy

    def contains(self, v: Any):
        return v in self.numpy_value

    def __getitem__(self, item):
        r = EvalObject()
        return r.with_value(self.value[item])

    @property
    def min(self):
        """Returns np.min() for the tensor"""
        return np.amin(self.numpy_value)

    @property
    def max(self):
        """Returns np.max() for the tensor"""
        return np.amax(self.numpy_value)

    @property
    def mean(self):
        """Returns np.mean() for the tensor"""
        return self.numpy_value.mean()

    @property
    def shape(self):
        """Returns shape of the underlying numpy array"""
        return self.value.shape  # type: ignore

    @property
    def size(self):
        """Returns size of the underlying numpy array"""
        return self.value.size  # type: ignore

    def _to_np(self, o):
        if isinstance(o, EvalObject):
            return o.numpy_value
        return o

    def __eq__(self, o: object) -> bool:
        val = self.numpy_value
        o = self._to_np(o)
        if isinstance(val, (list, np.ndarray)):
            if isinstance(o, (list, tuple)):
                return set(o) == set(val)
            else:
                return o in val
        else:
            return val == o

    def __lt__(self, o: object) -> bool:
        o = self._to_np(o)
        return self.numpy_value < o

    def __le__(self, o: object) -> bool:
        o = self._to_np(o)
        return self.numpy_value <= o

    def __gt__(self, o: object) -> bool:
        o = self._to_np(o)
        return self.numpy_value > o

    def __ge__(self, o: object) -> bool:
        o = self._to_np(o)
        return self.numpy_value >= o

    def __mod__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value % o

    def __add__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value + o

    def __sub__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value - o

    def __div__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value / o

    def __floordiv__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value // o

    def __mul__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value * o

    def __pow__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value**o

    def __contains__(self, o: object):
        o = self._to_np(o)
        return self.contains(o)

    @property
    def sample_info(self):
        return self._val.sample_info


class GroupTensor:
    def __init__(self, dataset: Dataset, wrappers, prefix: str) -> None:
        self.prefix = prefix
        self.dataset = dataset
        self.wrappers = wrappers
        self._subgroup = self.expand()

    def __getattr__(self, __name: str) -> Any:
        return self._subgroup[self.normalize_key(__name)]

    def expand(self):
        r = {}
        for tensor in [
            self.normalize_key(t)
            for t in self.dataset.tensors
            if t.startswith(self.prefix)
        ]:
            prefix = self.prefix + "/" + extract_prefix(tensor)
            if "/" in tensor:
                r[tensor] = GroupTensor(self.dataset, self.wrappers, prefix)
            else:
                r[tensor] = self.wrappers[prefix]

        return r

    def normalize_key(self, key: str) -> str:
        return key.replace(self.prefix + "/", "")


class ClassLabelsTensor(EvalObject):
    def __init__(self, tensor: Tensor) -> None:
        super(ClassLabelsTensor, self).__init__()
        _classes = tensor.info["class_names"]  # type: ignore
        self._classes_dict = {v: idx for idx, v in enumerate(_classes)}

    def _norm_labels(self, o: object):
        o = self._to_np(o)
        if isinstance(o, str):
            return self._classes_dict[o]
        elif isinstance(o, int):
            return o
        elif isinstance(o, (list, tuple)):
            return o.__class__(map(self._norm_labels, o))

    def __eq__(self, o: object) -> bool:
        try:
            o = self._norm_labels(o)
        except KeyError:
            return False
        return super(ClassLabelsTensor, self).__eq__(o)

    def __lt__(self, o: object) -> bool:
        o = self._to_np(o)
        if isinstance(o, str):
            raise ValueError("label class is not comparable")
        return self.numpy_value < o

    def __le__(self, o: object) -> bool:
        o = self._to_np(o)
        if isinstance(o, str):
            raise ValueError("label class is not comparable")
        return self.numpy_value <= o

    def __gt__(self, o: object) -> bool:
        o = self._to_np(o)
        if isinstance(o, str):
            raise ValueError("label class is not comparable")
        return self.numpy_value > o

    def __ge__(self, o: object) -> bool:
        o = self._to_np(o)
        if isinstance(o, str):
            raise ValueError("label class is not comparable")
        return self.numpy_value >= o

    def contains(self, v: Any):
        v = self._to_np(v)
        if isinstance(v, str):
            v = self._classes_dict[v]
        return super(ClassLabelsTensor, self).contains(v)

Functions

def expand(dataset, tensor)
Expand source code
def expand(dataset, tensor: List[str]) -> List[IOBlock]:
    return SampleStreaming(dataset, tensor).list_blocks()
def export_tensor(tensor)
Expand source code
def export_tensor(tensor: Tensor):
    if tensor.htype == "class_label":
        return ClassLabelsTensor(tensor)

    return EvalObject()
def extract_prefix(tensor_key)
Expand source code
def extract_prefix(tensor_key: str) -> str:
    return tensor_key.split("/")[0]
def normalize_query_tensors(tensor_key)
Expand source code
def normalize_query_tensors(tensor_key: str) -> str:
    return tensor_key.replace("/", ".")

Classes

class ClassLabelsTensor (tensor)
Expand source code
class ClassLabelsTensor(EvalObject):
    def __init__(self, tensor: Tensor) -> None:
        super(ClassLabelsTensor, self).__init__()
        _classes = tensor.info["class_names"]  # type: ignore
        self._classes_dict = {v: idx for idx, v in enumerate(_classes)}

    def _norm_labels(self, o: object):
        o = self._to_np(o)
        if isinstance(o, str):
            return self._classes_dict[o]
        elif isinstance(o, int):
            return o
        elif isinstance(o, (list, tuple)):
            return o.__class__(map(self._norm_labels, o))

    def __eq__(self, o: object) -> bool:
        try:
            o = self._norm_labels(o)
        except KeyError:
            return False
        return super(ClassLabelsTensor, self).__eq__(o)

    def __lt__(self, o: object) -> bool:
        o = self._to_np(o)
        if isinstance(o, str):
            raise ValueError("label class is not comparable")
        return self.numpy_value < o

    def __le__(self, o: object) -> bool:
        o = self._to_np(o)
        if isinstance(o, str):
            raise ValueError("label class is not comparable")
        return self.numpy_value <= o

    def __gt__(self, o: object) -> bool:
        o = self._to_np(o)
        if isinstance(o, str):
            raise ValueError("label class is not comparable")
        return self.numpy_value > o

    def __ge__(self, o: object) -> bool:
        o = self._to_np(o)
        if isinstance(o, str):
            raise ValueError("label class is not comparable")
        return self.numpy_value >= o

    def contains(self, v: Any):
        v = self._to_np(v)
        if isinstance(v, str):
            v = self._classes_dict[v]
        return super(ClassLabelsTensor, self).contains(v)

Ancestors

Methods

def contains(self, v)
Expand source code
def contains(self, v: Any):
    v = self._to_np(v)
    if isinstance(v, str):
        v = self._classes_dict[v]
    return super(ClassLabelsTensor, self).contains(v)

Inherited members

class DatasetQuery (dataset, query, progress_callback=<function DatasetQuery.<lambda>>)
Expand source code
class DatasetQuery:
    def __init__(
        self,
        dataset,
        query: str,
        progress_callback: Callable[[int, bool], None] = lambda *_: None,
    ):
        self._dataset = dataset
        self._query = query
        self._pg_callback = progress_callback
        self._cquery = compile(query, "", "eval")
        self._tensors = [
            tensor
            for tensor in dataset.tensors.keys()
            if normalize_query_tensors(tensor) in query
        ]
        self._blocks = expand(dataset, self._tensors)
        self._np_access: List[NP_ACCESS] = [
            _get_np(dataset, block) for block in self._blocks
        ]
        self._wrappers = self._export_tensors()
        self._groups = self._export_groups(self._wrappers)

    def execute(self) -> List[int]:
        idx_map: List[int] = list()

        for f, blk in zip(self._np_access, self._blocks):
            cache = {tensor: f(tensor) for tensor in self._tensors}
            for local_idx in range(len(blk)):
                p = {
                    tensor: self._wrap_value(tensor, cache[tensor][local_idx])
                    for tensor in self._tensors
                }
                p.update(self._groups)
                if eval(self._cquery, p):
                    global_index = blk.indices()[local_idx]
                    idx_map.append(global_index)
                    self._pg_callback(local_idx, True)
                else:
                    self._pg_callback(local_idx, False)
        return idx_map

    def _wrap_value(self, tensor, val):
        if tensor in self._wrappers:
            return self._wrappers[tensor].with_value(val)
        else:
            return val

    def _export_tensors(self):
        return {
            tensor_key: export_tensor(tensor)
            for tensor_key, tensor in self._dataset.tensors.items()
        }

    def _export_groups(self, wrappers):
        return {
            extract_prefix(tensor_key): GroupTensor(
                self._dataset, wrappers, extract_prefix(tensor_key)
            )
            for tensor_key in self._dataset.tensors.keys()
            if "/" in tensor_key
        }

Methods

def execute(self)
Expand source code
def execute(self) -> List[int]:
    idx_map: List[int] = list()

    for f, blk in zip(self._np_access, self._blocks):
        cache = {tensor: f(tensor) for tensor in self._tensors}
        for local_idx in range(len(blk)):
            p = {
                tensor: self._wrap_value(tensor, cache[tensor][local_idx])
                for tensor in self._tensors
            }
            p.update(self._groups)
            if eval(self._cquery, p):
                global_index = blk.indices()[local_idx]
                idx_map.append(global_index)
                self._pg_callback(local_idx, True)
            else:
                self._pg_callback(local_idx, False)
    return idx_map
class EvalObject
Expand source code
class EvalObject:
    def __init__(self) -> None:
        self._val: Any = None
        self._numpy: Optional[Union[np.ndarray, List[np.ndarray]]] = None

    @property
    def value(self):
        return self._val

    def with_value(self, v: Any):
        self._val = v
        self._numpy = None
        return self

    @property
    def numpy_value(self):
        if self._numpy is None:
            self._numpy = self._val.numpy(
                aslist=self._val.is_dynamic, fetch_chunks=True
            )
        return self._numpy

    def contains(self, v: Any):
        return v in self.numpy_value

    def __getitem__(self, item):
        r = EvalObject()
        return r.with_value(self.value[item])

    @property
    def min(self):
        """Returns np.min() for the tensor"""
        return np.amin(self.numpy_value)

    @property
    def max(self):
        """Returns np.max() for the tensor"""
        return np.amax(self.numpy_value)

    @property
    def mean(self):
        """Returns np.mean() for the tensor"""
        return self.numpy_value.mean()

    @property
    def shape(self):
        """Returns shape of the underlying numpy array"""
        return self.value.shape  # type: ignore

    @property
    def size(self):
        """Returns size of the underlying numpy array"""
        return self.value.size  # type: ignore

    def _to_np(self, o):
        if isinstance(o, EvalObject):
            return o.numpy_value
        return o

    def __eq__(self, o: object) -> bool:
        val = self.numpy_value
        o = self._to_np(o)
        if isinstance(val, (list, np.ndarray)):
            if isinstance(o, (list, tuple)):
                return set(o) == set(val)
            else:
                return o in val
        else:
            return val == o

    def __lt__(self, o: object) -> bool:
        o = self._to_np(o)
        return self.numpy_value < o

    def __le__(self, o: object) -> bool:
        o = self._to_np(o)
        return self.numpy_value <= o

    def __gt__(self, o: object) -> bool:
        o = self._to_np(o)
        return self.numpy_value > o

    def __ge__(self, o: object) -> bool:
        o = self._to_np(o)
        return self.numpy_value >= o

    def __mod__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value % o

    def __add__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value + o

    def __sub__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value - o

    def __div__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value / o

    def __floordiv__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value // o

    def __mul__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value * o

    def __pow__(self, o: object):
        o = self._to_np(o)
        return self.numpy_value**o

    def __contains__(self, o: object):
        o = self._to_np(o)
        return self.contains(o)

    @property
    def sample_info(self):
        return self._val.sample_info

Subclasses

Instance variables

var max

Returns np.max() for the tensor

Expand source code
@property
def max(self):
    """Returns np.max() for the tensor"""
    return np.amax(self.numpy_value)
var mean

Returns np.mean() for the tensor

Expand source code
@property
def mean(self):
    """Returns np.mean() for the tensor"""
    return self.numpy_value.mean()
var min

Returns np.min() for the tensor

Expand source code
@property
def min(self):
    """Returns np.min() for the tensor"""
    return np.amin(self.numpy_value)
var numpy_value
Expand source code
@property
def numpy_value(self):
    if self._numpy is None:
        self._numpy = self._val.numpy(
            aslist=self._val.is_dynamic, fetch_chunks=True
        )
    return self._numpy
var sample_info
Expand source code
@property
def sample_info(self):
    return self._val.sample_info
var shape

Returns shape of the underlying numpy array

Expand source code
@property
def shape(self):
    """Returns shape of the underlying numpy array"""
    return self.value.shape  # type: ignore
var size

Returns size of the underlying numpy array

Expand source code
@property
def size(self):
    """Returns size of the underlying numpy array"""
    return self.value.size  # type: ignore
var value
Expand source code
@property
def value(self):
    return self._val

Methods

def contains(self, v)
Expand source code
def contains(self, v: Any):
    return v in self.numpy_value
def with_value(self, v)
Expand source code
def with_value(self, v: Any):
    self._val = v
    self._numpy = None
    return self
class GroupTensor (dataset, wrappers, prefix)
Expand source code
class GroupTensor:
    def __init__(self, dataset: Dataset, wrappers, prefix: str) -> None:
        self.prefix = prefix
        self.dataset = dataset
        self.wrappers = wrappers
        self._subgroup = self.expand()

    def __getattr__(self, __name: str) -> Any:
        return self._subgroup[self.normalize_key(__name)]

    def expand(self):
        r = {}
        for tensor in [
            self.normalize_key(t)
            for t in self.dataset.tensors
            if t.startswith(self.prefix)
        ]:
            prefix = self.prefix + "/" + extract_prefix(tensor)
            if "/" in tensor:
                r[tensor] = GroupTensor(self.dataset, self.wrappers, prefix)
            else:
                r[tensor] = self.wrappers[prefix]

        return r

    def normalize_key(self, key: str) -> str:
        return key.replace(self.prefix + "/", "")

Methods

def expand(self)
Expand source code
def expand(self):
    r = {}
    for tensor in [
        self.normalize_key(t)
        for t in self.dataset.tensors
        if t.startswith(self.prefix)
    ]:
        prefix = self.prefix + "/" + extract_prefix(tensor)
        if "/" in tensor:
            r[tensor] = GroupTensor(self.dataset, self.wrappers, prefix)
        else:
            r[tensor] = self.wrappers[prefix]

    return r
def normalize_key(self, key)
Expand source code
def normalize_key(self, key: str) -> str:
    return key.replace(self.prefix + "/", "")