koladata

Home
Overview
Fundamentals
Glossary
Cheatsheet
API Reference
Quick Recipes
Deep Dive
Common Pitfalls and Gotchas
Persistent Storage

View the Project on GitHub google/koladata

Koda Cheatsheet

go/koda-cheatsheet

Basic APIs

Import

>>> from koladata import kd

# Additional extension libraries if needed
# E.g. Numpy/Pandas conversions
>>> from koladata import kd_ext

Primitives and DataItem

# Primitive dtypes
>>> kd.INT32
DataItem(INT32, schema: SCHEMA)
>>> kd.INT64
DataItem(INT64, schema: SCHEMA)
>>> kd.FLOAT32
DataItem(FLOAT32, schema: SCHEMA)
>>> kd.FLOAT64
DataItem(FLOAT64, schema: SCHEMA)
>>> kd.STRING
DataItem(STRING, schema: SCHEMA)
>>> kd.BYTES
DataItem(BYTES, schema: SCHEMA)
>>> kd.BOOLEAN
DataItem(BOOLEAN, schema: SCHEMA)
>>> kd.MASK
DataItem(MASK, schema: SCHEMA)

# MASK type values
>>> kd.present
DataItem(present, schema: MASK)
>>> kd.missing
DataItem(missing, schema: MASK)

# DataItem creation
>>> i = kd.item(1)

>>> assert kd.is_item(i)
>>> assert kd.is_primitive(i)
>>> kd.get_dtype(i)
DataItem(INT32, schema: SCHEMA)

# DataItem creation with explicit dtypes
>>> kd.int32(1)
DataItem(1, schema: INT32)
>>> kd.int64(2)
DataItem(2, schema: INT64)
>>> kd.float32(1.1)
DataItem(1.1, schema: FLOAT32)
>>> kd.float64(2.2)
DataItem(2.2, schema: FLOAT64)
>>> kd.str('a')
DataItem('a', schema: STRING)
>>> kd.bytes(b'a')
DataItem(b'a', schema: BYTES)
>>> kd.bool(True)
DataItem(True, schema: BOOLEAN)
>>> kd.mask(None)
DataItem(missing, schema: MASK)

# Or use kd.item with explicit dtype
>>> kd.item(1, kd.INT32)
DataItem(1, schema: INT32)
>>> kd.item(2, kd.INT64)
DataItem(2, schema: INT64)

# kd.from_py is a universal converter
# Same as kd.item
>>> kd.from_py(1)
DataItem(1, schema: OBJECT)

DataSlice

>>> ds = kd.slice([[1, 2], [3, None, 5]])
>>> assert kd.is_slice(ds)

>>> ds.get_size()
DataItem(5, schema: INT64)
>>> ds.get_ndim()
DataItem(2, schema: INT64)
>>> ds.get_shape()
JaggedShape(2, [2, 3])

>>> ds.get_dtype()
DataItem(INT32, schema: SCHEMA)

# DataSlice creation with explicit dtypes
>>> kd.int32([[1, 2], [3, None, 5]])
DataSlice([[1, 2], [3, None, 5]], schema: INT32, ...)
>>> kd.int64([[1, 2], [3, None, 5]])
DataSlice([[1, 2], [3, None, 5]], schema: INT64, ...)
>>> kd.float32([[1., 2.],[3., None, 5.]])
DataSlice([[1.0, 2.0], [3.0, None, 5.0]], schema: FLOAT32, ...)
>>> kd.float64([[1., 2.],[3., None, 5.]])
DataSlice([[1.0, 2.0], [3.0, None, 5.0]], schema: FLOAT64, ...)
>>> kd.str(['a', None, 'b'])
DataSlice(['a', None, 'b'], schema: STRING, ...)
>>> kd.bytes([b'a', None, b'b'])
DataSlice([b'a', None, b'b'], schema: BYTES, ...)
>>> kd.bool([True, None, False])
DataSlice([True, None, False], schema: BOOLEAN, ...)
>>> kd.mask([kd.present, kd.missing])
DataSlice([present, missing], schema: MASK, ...)

# Or use kd.slice with explicit dtype
>>> kd.slice([[1, 2], [3, None, 5]], kd.INT32)
DataSlice([[1, 2], [3, None, 5]], schema: INT32, ...)
>>> kd.slice([[1, 2], [3, None, 5]], kd.INT64)
DataSlice([[1, 2], [3, None, 5]], schema: INT64, ...)

# kd.from_py is a universal converter
# Same as kd.slice
>>> kd.from_py([[1, 2], [3, None, 5]])
DataItem(List[List[1, 2], List[3, None, 5]], schema: OBJECT, ...)

>>> ds = kd.slice([[1, 2], [3, None, 5]])

# Navigate DataSlice as "nested" lists
>>> ds.L[1].L[2]
DataItem(5, schema: INT32)

# Items at idx in the first dims
>>> ds.L[1]
DataSlice([3, None, 5], schema: INT32, present: 2/3)


# Use in Python for-loop
>>> for i in ds.L:
...   for j in i.L:
...     print(j)
1
2
3
None
5

# Subslice
>>> ds.S[1, 2]
DataItem(5, schema: INT32)

# Take the third items in the last dimension
>>> ds.S[2]
DataSlice([None, 5], schema: INT32, ...)
>>> ds.S[1:, :2]
DataSlice([[3, None]], schema: INT32, ...)

# Get items at idx
>>> ds.take(2)
DataSlice([None, 5], schema: INT32, ...)
>>> ds.take(kd.slice([1, 2]))
DataSlice([2, 5], schema: INT32, ...)

# Expand the outermost dimension into a list
>>> kd.to_pylist(ds)
[DataSlice([1, 2], schema: INT32,...), DataSlice([3, None, 5], schema: INT32,...)]

# Convert to a nested Python list
>>> ds.to_py()
[[1, 2], [3, None, 5]]

# Reverse the order of items in the last dimension
>>> kd.reverse(ds)
DataSlice([[2, 1], [5, None, 3]], schema: INT32, ...)

# DataItem is 0-dim DataSlice
>>> i = kd.slice(1)
>>> i.get_ndim()
DataItem(0, ...)

# Visualize a DataSlice (go/koda-ds-display)
ds.display()

Entities

Entities can be thought of as instances of protos or C++ structs. That is, they don’t directly store their own schema. Instead, their schema is stored at DataSlice level and all entities in a DataSlice share the same schema.

# Entity creation with named schema
>>> e = kd.new(x=1, y=2, schema='Point')
>>> es = kd.new(x=kd.slice([1, 2, None]),
...             y=kd.slice([4, None, 6]),
...             schema='Point')

>>> assert e.get_schema() == es.get_schema()

>>> assert e.is_entity()

# Use an existing schema
>>> s = kd.named_schema('Point', x=kd.INT32, y=kd.INT32)
>>> e = kd.new(x=1, y=2, schema=s)

# which is equivalent to
>>> e2 = s.new(x=1, y=2)
>>> kd.testing.assert_equivalent(e, e2)

# When `schema=` is not provided, a new
# schema is created for each invocation
>>> e1 = kd.new(x=1, y=2)
>>> e2 = kd.new(x=1, y=2)
>>> assert e1.get_schema() != e2.get_schema()

# Use provided itemids
>>> itemid = kd.new_itemid()
>>> e3 = kd.new(x=1, y=2, itemid=itemid)
>>> e4 = kd.new(x=1, y=2, itemid=itemid)
>>> assert e3.get_itemid() == e4.get_itemid()

# Get available attributes
# As all entities share the same schema,
# intersection= argument does not matter for them.
>>> kd.dir(e)
['x', 'y']

# Access attribute
>>> e.x
DataItem(1,...)
>>> e.get_attr('y')
DataItem(2,...)
>>> e.maybe('z')
DataItem(None,...)
>>> e.get_attr('z', default=0)
DataItem(0,...)
>>> es.get_attr('x', default=0)
DataSlice([1, 2, 0],...)

# Entities are immutable by default, modification is done
# by creating a new entity with the same ItemId and
# updated attributes
>>> e = kd.new(x=1, y=2, schema='Point')

# Update attributes
# Update a single attribute
>>> e1 = e.with_attr('x', 3)
>>> e1 = e.with_attr('z', 4)

# Also override schema
>>> e1 = e.with_attr('y', 'a', overwrite_schema=True)

# Remove a single attribute
>>> e1 = e.with_attr('x', None)

# Update/remove multiple attributes
>>> e2 = e.with_attrs(z=4, x=None)

# Also override schema for 'y'
>>> e2 = e.with_attrs(z=4, y='a', overwrite_schema=True)

# Create an update and apply it separately
>>> upd = kd.attrs(e, z=4, y=10)
>>> e3 = e.updated(upd)

# Allows mixing multiple updates
>>> e4 = e.updated(kd.attrs(e, z=4), kd.attrs(e, y=None))

# Update nested attributes
>>> nested = kd.new(a=kd.new(c=kd.new(e=1), d=2), b=3)
>>> nested = nested.updated(kd.attrs(nested.a.c, e=4),
...                         kd.attrs(nested.a, d=5),
...                         kd.attrs(nested, b=6))

Lists

# Create a list from a Python list
>>> l1 = kd.list([1, 2, 3])
>>> l2 = kd.list([[1, 2], [3], [4, 5]])

# Create multiple lists by imploding
# the last dimension of a DataSlice
>>> l3 = kd.implode(kd.slice([[1, 2], [3], [4, 5]]))

# l2 and l3 are different
>>> assert kd.is_item(l2)
>>> l2.get_size()
DataItem(1,...)
>>> l3.get_size()
DataItem(3,...)

>>> assert kd.is_list(l1)
>>> kd.list_size(l1)
DataItem(3, ...)

# Use provided itemids
>>> itemid = kd.new_listid()
>>> l4 = kd.list([1, 2, 3], itemid=itemid)
>>> l5 = kd.list([4, 5, 6], itemid=itemid)
>>> assert l4.get_itemid() == l5.get_itemid()

# Python-like list operations
>>> l1[0]
DataItem(1,...)
>>> kd.get_item(l1, 0)
DataItem(1,...)
>>> l1[1:]
DataSlice([2, 3],...)

# Slice by a DataSlice
>>> l1[kd.slice([0, 2])]
DataSlice([1, 3],...)
>>> l1[kd.slice([[2, 1], [0, None]])]
DataSlice([[3, 2], [1, None]],...)

# Explode a list
>>> l1[:]
DataSlice([1, 2, 3],...)
>>> kd.explode(l1)
DataSlice([1, 2, 3],...)

>>> kd.explode(l2, ndim=2)
DataSlice([[1, 2], [3], [4, 5]],...)

# Explode all lists repeatedly
>>> kd.explode(l2, ndim=-1)
DataSlice([[1, 2], [3], [4, 5]],...)

# Filter out items
>>> l6 = kd.list([1, 2, None, 4])
>>> l6.select_items(lambda x: x >= 2)
DataSlice([2, 4],...)

# Append a single item
>>> l7 = l1.with_list_append_update(4)

# Append multiple items
>>> l8 = l1.with_list_append_update(kd.slice([4, 5]))

# Note that list update does not support
# updating/removing existing items

# Create an update and apply it separately
>>> upd = kd.list_append_update(l1, 5)
>>> l1.updated(upd)
DataItem(List[1, 2, 3, 5], schema: LIST[INT32],...)

# Accumulating updates is not supported
# The last update overrides previous ones
# Note 5 is not appended
>>> l1.updated(kd.list_append_update(l1, 5),
...            kd.list_append_update(l1, kd.slice([6, 7])))
DataItem(List[1, 2, 3, 6, 7], schema: LIST[INT32],...)

# Returns a list with a different ItemId and items
# concatenated from the list items of arguments
>>> kd.concat_lists(kd.list([1, 2]), kd.list([3, 4]))
DataItem(List[1, 2, 3, 4], schema: LIST[INT32],...)

# Return a list with a different ItemId and
# appended items
>>> kd.appended_list(kd.list([1, 2]), 3)
DataItem(List[1, 2, 3], schema: LIST[INT32],...)

>>> kd.appended_list(kd.list([1, 2]), kd.slice([3, 4]))
DataItem(List[1, 2, 3, 4], schema: LIST[INT32],...)

Dicts

# Create a dict from a Python dict
>>> d1 = kd.dict({'a': 1, 'b': 2})
>>> d2 = kd.dict(kd.slice(['a', 'b']),
...              kd.slice([1, 2])) # Same as above
>>> kd.testing.assert_equivalent(d1, d2)

# Create multiple dicts
>>> d2 = kd.dict(kd.slice([['a', 'b'], ['c']]),
...              kd.slice([[1, 2], [3]]))

>>> assert kd.is_dict(d1)

>>> d1.dict_size()
DataItem(2, schema: INT64)

# Use provided itemids
itemid = kd.new_dictid()
d3 = kd.dict({'a': 1, 'b': 2}, itemid=itemid)
d4 = kd.dict({'c': 3, 'd': 4}, itemid=itemid)
assert d3.get_itemid() == d4.get_itemid()

>>> k = d1.get_keys(); sorted(k.to_py()) # order of keys/values is arbitrary
['a', 'b']

>>> v = d1.get_values(); sorted(v.to_py()) # order of keys/values is arbitrary
[1, 2]

>>> d1['a']
DataItem(1, schema: INT32,...)

>>> kd.testing.assert_equivalent(kd.get_item(d1, 'a'),d1['a']) # Same as above

# Filter out keys/values
>>> d1.select_keys(lambda k: k != 'b')
DataSlice(['a']...)

>>> d1.select_values(lambda v: v > 1)
DataSlice([2]...)

# Dicts are immutable by default, modification is done
# by creating a new dict with the same ItemId and
# updated key/values

# Update a key/value
>>> d4 = d1.with_dict_update('c', 5)

# Update multiple key/values
>>> another_dict = kd.dict({'a': 3, 'c': 5})
>>> d5 = d1.with_dict_update(another_dict)
>>> d6 = d1.with_dict_update(kd.slice(['a', 'c']),
...                          kd.slice([3, 5])) # Same as above
>>> kd.testing.assert_equivalent(d5, d6)

# Note that dict update does not support
# removing values for now
>>> d2 = d1.with_dict_update('a', None) # Dict{'a': 1, 'b': 2} rather than Dict{'b': 2}
>>> sorted(d2.get_keys().to_py())
['a', 'b']


# Create an update and apply it separately
>>> upd = kd.dict_update(d1, another_dict)
>>> d6 = d1.updated(upd)

# Allows mixing multiple updates
>>> d7 = d1.updated(kd.dict_update(d1, 'c', 5),
...                 kd.dict_update(d1, another_dict))

Objects

Objects can be thought of as Python objects. They directly store their own schema as schema attribute similar to how Python objects store class attribute. This allows objects in a DataSlice to have different schemas. Entities, Lists, Dicts and primitives can be objects. Entities, Lists and Dicts store their own schema as an internal __schema__ attribute while primitives’ schema is determined by the type of their value.

# Entity objects
>>> o = kd.obj(x=1, y=2)
>>> os = kd.obj(x=kd.slice([1, 2, None]),
...             y=kd.slice([4, None, 6]))

>>> os = kd.slice([kd.obj(x=1),
...                kd.obj(y=2.0),
...                kd.obj(x=1.0, y='a')])

>>> os.get_schema()
DataItem(OBJECT, schema: SCHEMA, ...)
>>> os.get_obj_schema()
DataSlice([
  IMPLICIT_ENTITY(x=INT32),
  IMPLICIT_ENTITY(y=FLOAT32),
  IMPLICIT_ENTITY(x=FLOAT32, y=STRING),
], schema: SCHEMA, ...)

# Use provided itemids
>>> itemid = kd.new_itemid()
>>> o1 = kd.obj(x=1, y=2, itemid=itemid)
>>> o2 = kd.obj(x=1, y=2, itemid=itemid)
>>> assert o1.get_itemid() == o2.get_itemid()

# Get available attributes
>>> os1 = kd.slice([kd.obj(x=1), kd.obj(x=1.0, y='a')])

# Attributes present in all objects
>>> kd.dir(os1)
Traceback (most recent call last):
ValueError: dir() cannot determine attribute names because objects have different attributes. Please specify intersection= explicitly.

# Or
>>> kd.dir(os1, intersection=True)
['x']

>>> kd.dir(os1, intersection=False)
['x', 'y']

# Access attribute
>>> o.x
DataItem(1, schema: INT32, ...)
>>> o.get_attr('y')
DataItem(2, schema: INT32, ...)
>>> o.maybe('z')
DataItem(None, schema: NONE, ...)
>>> o.get_attr('z', default=0)
DataItem(0, schema: INT32, ...)
>>> os.get_attr('x', default=0)
DataSlice([1.0, 0.0, 1.0], schema: FLOAT32, ...)

# Objects are immutable by default, modification is done
# by creating a new object with the same ItemId and
# updated attributes
>>> o = kd.obj(x=1, y=2)

# Update a single attribute
>>> o1 = o.with_attr('x', 3)
>>> o1 = o.with_attr('z', 4)

# Also override schema
# no overwrite_schema=True is needed
>>> o1 = o.with_attr('y', 'a')

# Remove a single attribute
>>> o1 = o.with_attr('x', None)

# Update/remove multiple attributes
>>> o2 = o.with_attrs(z=4, x=None)

# Also override schema for 'y'
>>> o2 = o.with_attrs(z=4, y='a')

# Create an update and apply it separately
>>> upd = kd.attrs(o, z=4, y=10)
>>> o3 = o.updated(upd)

# Allows mixing multiple updates
>>> o4 = o.updated(kd.attrs(o, z=4), kd.attrs(o, y=None))

# Update nested attributes
>>> nested = kd.obj(a=kd.obj(c=kd.obj(e=1), d=2), b=3)
>>> nested = nested.updated(kd.attrs(nested.a.c, e=4),
...                         kd.attrs(nested.a, d=5),
...                         kd.attrs(nested, b=6))

# List and dict can be objects too
# To convert a list/dict to an object,
# use kd.obj()
>>> l = kd.list([1, 2, 3])
>>> l_obj = kd.obj(l)
>>> l_obj[:]
DataSlice([1, 2, 3], schema: INT32, ...)

>>> d = kd.dict({'a': 1, 'b': 2})
>>> d_obj = kd.obj(d)
>>> kd.sort(d_obj.get_keys())
DataSlice(['a', 'b'], schema: STRING, ...)
>>> d_obj['a']
DataItem(1, schema: INT32, ...)

# Convert an entity to an object
>>> e = kd.new(x=1, y=2)
>>> e_obj = kd.obj(e)

# Actually, we can pass primitive to kd.obj()
>>> p_obj = kd.obj(1)
>>> p_obj = kd.obj('a')

# An OBJECT Dataslice with entity, list,
# dict and primitive items
>>> kd.slice([kd.obj(a=1), 1, kd.obj(kd.list([1, 2])),
...           kd.obj(kd.dict({'a': 1}))])
DataSlice([Obj(a=1), 1, List[1, 2], Dict{'a'=1}], schema: OBJECT, ...)

Subslicing DataSlices

Subslicing is an operation of getting part of the items in a DataSlice. Sub-slicing slices all dimensions of a DataSlice or the last dimension when only one dimension is specified. The API is called “subslice” because it slices a DataSlice to create a sub-DataSlice.

See kd.subslice APIs for more details.

>>> ds = kd.slice([[1, 2, 3], [4, 5]])

# Slice by indices
>>> kd.subslice(ds, 1, 1)
DataItem(5, schema: INT32)
>>> kd.subslice(ds, 1, -1)
DataItem(5, schema: INT32)

# Slice by range
>>> kd.subslice(ds, 0, slice(1))
DataSlice([1], schema: INT32, ...)
>>> kd.subslice(ds, 0, slice(2, 4))
DataSlice([3], schema: INT32, ...)

# 'S[]' as Syntactic sugar
>>> ds.S[1, 1]
DataItem(5, schema: INT32)
>>> ds.S[0, :2]
DataSlice([1, 2], schema: INT32, ...)

# Multi-dimension
>>> ds = kd.slice([[[1, 2, 3], [4, 5, 6]],
...             [[7, 8, 9], [10, 11, 12]]])

>>> ds.S[1]
DataSlice([[2, 5], [8, 11]], schema: INT32, ...)
>>> ds.S[1:, 0, :2]
DataSlice([[7, 8]], schema: INT32, ...)
>>> ds.S[0, :2]
DataSlice([[1, 2], [7, 8]], schema: INT32, ...)
>>> ds.S[kd.slice([[0, 1], [1]]), 1]
DataSlice([[2, 5], [11]], schema: INT32, ...)

# Advanced slicing: slice by DataSlice
>>> ds = kd.slice([[1, 2, 3], [4, 5]])
>>> ds.S[kd.slice([0, 0]), 1:]
DataSlice([[2, 3], [2, 3]], schema: INT32, ...)
>>> ds.S[kd.slice([[0, 2], [0, 1]]),
...         kd.slice([[0, 0], [1, 0]])]
DataSlice([[1, None], [2, 4]], schema: INT32, ...)

# slice boundaries can also be DataSlices
>>> ds = kd.slice([[1, 2, 3], [4, 5]])
>>> ds.S[kd.slice([0, 0]):kd.slice([2, 1])]
DataSlice([[1, 2], [4]], schema: INT32, ...)
>>> ds.S[kd.slice([0, 0]):kd.slice([2, 1]), :]
DataSlice([[[1, 2, 3], [4, 5]], [[1, 2, 3]]], schema: INT32, ...)
>>> ds.S[:kd.index(ds) + 1]
DataSlice([[[1], [1, 2], [1, 2, 3]], [[4], [4, 5]]], schema: INT32, ...)
>>> kd.agg_sum(ds.S[:kd.index(ds) + 1])
DataSlice([[1, 3, 6], [4, 9]], schema: INT32, ...)

# Ellipsis
>>> ds = kd.slice([[[1, 2, 3], [4, 5]],
...                 [[6, 7]], [[8], [9, 10]]])
>>> ds.S[1:, ...]
DataSlice([[[6, 7]], [[8], [9, 10]]], schema: INT32, ...)
>>> ds.S[1:, ..., :1]
DataSlice([[[6]], [[8], [9]]], schema: INT32, ...)
>>> ds.S[1:, 0, ..., :1]
DataSlice([[6], [8]], schema: INT32, ...)

Python-like Slicing

While subslicing provides an efficient way to slice all dimensions, it is sometimes convenient to slice dimensions one by one similar to how to iterate through a nested Python list. DataSlice provides a way to slice the first dimension using .L notation. L stands for Python list.

>>> ds = kd.slice([[1, 2, 3], [4, 5]])

# Note that 'l' is a one-dim DataSlice
# and 'i' is a DataItem
>>> for l in ds.L:
...   print(type(l))
...   for i in l.L:
...     print(type(i))
...     print(i)
<class 'koladata.types.data_slice.DataSlice'>
<class 'koladata.types.data_item.DataItem'>
1
<class 'koladata.types.data_item.DataItem'>
2
<class 'koladata.types.data_item.DataItem'>
3
<class 'koladata.types.data_slice.DataSlice'>
<class 'koladata.types.data_item.DataItem'>
4
<class 'koladata.types.data_item.DataItem'>
5

# A Python list of one-dim DataSlice
>>> ds.L[:]
DataSlice([[1, 2, 3], [4, 5]], schema: INT32,...)

>>> ds.L[1]
DataSlice([4, 5], schema: INT32,...)

Editable Containers

>>> x = kd.named_container()
>>> x.a = 1
>>> x.b = kd.list([1, 2, 3])
>>> kd.obj(**vars(x))
DataItem(Obj(a=1, b=List[1, 2, 3]), schema: OBJECT,...)

DataSlice Shape (a.k.a. Partition Tree)

>>> ds = kd.slice([[[1, 2], [3]], [[4, 5]]])
>>> shape = ds.get_shape()
>>> shape
JaggedShape(2, [2, 1], [2, 1, 2])
>>> kd.shapes.ndim(shape)
DataItem(3, schema: INT64)
>>> kd.shapes.size(shape)
DataItem(5, schema: INT64)

# Get the shape with N-1 dimensions
>>> shape[:-1]
JaggedShape(2, [2, 1])

# Get the shape with first dimension
>>> shape[:2]
JaggedShape(2, [2, 1])

# Create a new shape directly
>>> shape1 = kd.shapes.new(2, [2, 1], [2, 1, 2])
>>> assert shape1 == shape

Changing shape of a DataSlice

>>> ds = kd.slice([[[1, 2], [3]], [[4, 5]]])

# Flatten all dimensions
>>> ds.flatten()
DataSlice([1, 2, 3, 4, 5], schema: INT32, ...)

# Flatten dimensions from the second to the end
>>> ds.flatten(2)
DataSlice([[[1, 2], [3]], [[4, 5]]], schema: INT32, ...)

# Flatten dimensions from the second to the end
>>> ds.flatten(-2)
DataSlice([[1, 2, 3], [4, 5]], schema: INT32, ...)

# Flatten dimensions from the first to the third
>>> ds.flatten(0, 2)
DataSlice([[1, 2], [3], [4, 5]], schema: INT32, ...)

# When the start and end dimensions are the same,
# insert a new dimension
>>> ds.flatten(2, 2)
DataSlice([[[[1, 2]], [[3]]], [[[4, 5]]]], schema: INT32, ...)

# Reshape to the shape of another DataSlice
>>> ds1 = kd.slice([1, 2, 3, 4, 5])
>>> ds.reshape_as(ds1)
DataSlice([1, 2, 3, 4, 5], schema: INT32, ...)
>>> ds.reshape(ds1.get_shape())
DataSlice([1, 2, 3, 4, 5], schema: INT32, ...)
>>> ds2 = kd.slice([1, None, 3])

# Repeats values
>>> ds2.repeat(2)
DataSlice([[1, 1], [None, None], [3, 3]], schema: INT32, ...)
>>> kd.repeat(ds2, 2)
DataSlice([[1, 1], [None, None], [3, 3]], schema: INT32, ...)

# Repeats present values
>>> kd.repeat_present(ds2, 2)
DataSlice([[1, 1], [], [3, 3]], schema: INT32, ...)

Broadcasting and Aligning

# Expands x based on the shape of target
>>> kd.slice([100, 200]).expand_to(
...     kd.slice([[1,2],[3,4,5]]))
DataSlice([[100, 100], [200, 200, 200]], schema: INT32, ...)

# Implodes last ndim dimensions into lists,
# expands the slice of lists, explodes the result
>>> kd.slice([100, 200]).expand_to(
...     kd.slice([[1,2],[3,4,5]]), ndim=1)
DataSlice([[[100, 200], [100, 200]], [[100, 200], [100, 200], [100, 200]]], schema: INT32, ...)
>>> x=kd.slice([100, 200])
>>> target=kd.slice([[1,2],[3,4,5]])
>>> assert kd.is_expandable_to(x, target, 0)

# Whether any of args is expandable to the other
>>> assert kd.is_shape_compatible(x, target)

# Expands DataSlices to the same common shape
>>> kd.align(kd.slice([[1, 2, 3], [4, 5]]),
...          kd.slice('a'), kd.slice([1, 2]))
(DataSlice([[1, 2, 3], [4, 5]], schema: INT32, ...), DataSlice([['a', 'a', 'a'], ['a', 'a']], schema: STRING, ...), DataSlice([[1, 1, 1], [2, 2]], schema: INT32, ...))

ItemIds and UUIDs

# A new ItemId is allocated when a new
# object/entity/list/dict/schema is created
>>> o1 = kd.obj(x=1, y=2)
>>> o2 = kd.obj(x=1, y=2)
>>> assert o1.get_itemid() != o2.get_itemid()

# Get ItemId from object/entity/list/dict
>>> itemid = o1.get_itemid()

# ItemId is a 128 integer
# Print out the ItemId in the base-62 format
>>> str(itemid)
'Entity:$...'

>>> str(kd.list().get_itemid())
'List:$...'

>>> str(kd.dict().get_itemid())
'Dict:$...'

>>> str(kd.schema.new_schema().get_itemid())
'Schema:$...'

# Encode to/from base-62 number (as string).
>>> str_id = kd.encode_itemid(itemid)
>>> itemid1 = kd.decode_itemid(str_id)
>>> assert itemid1 == itemid

# Convert ItemId back to the original
# object/entity/list/dict
>>> kd.reify(itemid1, o1)
DataItem(Obj(x=1, y=2), schema: OBJECT, ...)

# int64 hash values
>>> kd.hash_itemid(itemid)
DataItem(..., schema: INT64)

# ItemIds can be allocated directly
>>> kd.new_itemid()
DataItem(Entity:..., schema: ITEMID)
>>> kd.new_listid()
DataItem(List:..., schema: ITEMID)
>>> kd.new_dictid()
DataItem(Dict:..., schema: ITEMID)

# UUIDs are unique determined

# A new UUID is allocated when a new uu
# object/entity/schema is created
>>> o3 = kd.uuobj(x=1, y=2)
>>> o4 = kd.uuobj(x=1, y=2)
>>> assert o3.get_itemid() == o4.get_itemid()

>>> kd.uu(x=1, y=2) # UU Entity
DataItem(Entity(x=1, y=2), schema: ENTITY(x=INT32, y=INT32), ...)
>>> kd.uuobj(x=1, y=2) # UU Object
DataItem(Obj(x=1, y=2), schema: OBJECT, ...)
>>> kd.uu_schema(x=kd.INT32, y=kd.INT64) # UU Schema
DataItem(ENTITY(x=INT32, y=INT64), schema: SCHEMA, ...)

# UUID has a # prefix compared to $ for ItemId
>>> str(o3.get_itemid())
'Entity:#...'

# Compute UUID from attr values
>>> i1 = kd.uuid(x=1, y=2)
>>> i2 = kd.uuid(x=1, y=2)
>>> assert i1 == i2

# Generate uuids based on the values of the
# nested contents rather than their ItemId
>>> kd.deep_uuid(ds)
DataSlice([
  [
    [Entity:#..., Entity:#...],
    [Entity:#...],
  ],
  [[Entity:#..., Entity:#...]],
], schema: ITEMID, ...)

# Can be used to compare entities/lists/dicts/objects
# by value rather than ItemIds
>>> e1 = kd.new(a=1, b=kd.new(c=2))
>>> e2 = kd.new(a=1, b=kd.new(c=2))
>>> assert kd.deep_uuid(e1) == kd.deep_uuid(e2)

# Uuid computation does not depend on schemas
# Note that o1 is an object and e1 is an entity
>>> o1_obj = kd.obj(a=1, b=kd.obj(c=2))
>>> assert kd.deep_uuid(o1_obj) == kd.deep_uuid(e1)

# A missing/removed attribute is different from
# an attribute which does not exist
>>> e3 = kd.new(a=1, b=kd.new(c=2), c=None)
>>> assert kd.deep_uuid(e1) != kd.deep_uuid(e3)

>>> l1 = kd.list([kd.obj(a=1), kd.obj(b=2)])
>>> l2 = kd.list([kd.obj(a=1), kd.obj(b=2)])
>>> assert kd.deep_uuid(l1) == kd.deep_uuid(l2)

# Compute UUIDs by aggregating items
# over the last dimension
>>> kd.agg_uuid(kd.slice([[1, 2, 3], [4, 5, 6]]))
DataSlice([Entity:#..., Entity:#...], schema: ITEMID, ...)

DataSlice Operators

Koda provides a comprehensive list of operators under kd module. Most pointwise operatorĆ’ automatically broadcast all input DataSlices to the same shape.

Item Creation Operators

kd.***_like(x) creates a DataSlice of * with the **same shape and sparsity as DataSlice x.

kd.***_shaped_as(x) creates a DataSlice of * with the **same shape as DataSlice x.

kd.***_shaped(shape) creates a DataSlice of * with the **shape shape.

>>> x = kd.slice([[1, None], [3, 4]])
>>> s = x.get_shape()

>>> kd.val_like(x, 1)
DataSlice([[1, None], [1, 1]], schema: INT32,...)
>>> kd.val_shaped_as(x, 1)
DataSlice([[1, 1], [1, 1]], schema: INT32,...)
>>> kd.val_shaped(s, 1)
DataSlice([[1, 1], [1, 1]], schema: INT32,...)

# Note there is kd.present_like
>>> kd.present_shaped_as(x)
DataSlice([[present, present], [present, present]], schema: MASK,...)
>>> kd.present_shaped(s)
DataSlice([[present, present], [present, present]], schema: MASK,...)

# Note there is kd.empty_like
>>> kd.empty_shaped_as(x)
DataSlice([[missing, missing], [missing, missing]], schema: MASK, present: 0/4)
>>> kd.empty_shaped(s)
DataSlice([[missing, missing], [missing, missing]], schema: MASK, present: 0/4)

# Empty Object creation
# Note obj_xxx does not take schema= argument
>>> kd.obj_like(x)
DataSlice([
  [Obj():..., None],
  [Obj():..., Obj():...],
], schema: OBJECT, present: 3/4, bag_id: ...)
>>> kd.obj_shaped_as(x)
DataSlice([
  [Obj():..., Obj():...],
  [Obj():..., Obj():...],
], schema: OBJECT, present: 4/4, bag_id: ...)
>>> kd.obj_shaped(s)
DataSlice([
  [Obj():..., Obj():...],
  [Obj():..., Obj():...],
], schema: OBJECT, present: 4/4, bag_id: ...)

# Non-empty Object creation
>>> attr_dss = {'y': kd.slice([[1, 2], [3, 4]])}
>>> o1 = kd.obj_like(x, **attr_dss); o1 # keeps sparsity of x
DataSlice([[Obj(y=1), None], [Obj(y=3), Obj(y=4)]], schema: OBJECT, ...)

>>> o2 = kd.obj_shaped_as(x, **attr_dss); o2
DataSlice([[Obj(y=1), Obj(y=2)], [Obj(y=3), Obj(y=4)]], schema: OBJECT,...)

>>> o3 = kd.obj_shaped(s, **attr_dss)
>>> kd.testing.assert_equivalent(o2, o3)

# With provided itemids
>>> itemid = kd.new_itemid_shaped_as(x)
>>> o1 = kd.obj_like(x, itemid=itemid)
>>> o2 = kd.obj_shaped_as(x, itemid=itemid)
>>> o3 = kd.obj_shaped(s, itemid=itemid)

# Empty Entity creation
>>> kd.new_like(x)
DataSlice([
  [Entity():..., None],
  [Entity():..., Entity():...],
], schema: ENTITY(), present: 3/4, bag_id: ...)
>>> kd.new_shaped_as(x)
DataSlice([
  [Entity():..., Entity():...],
  [Entity():..., Entity():...],
], schema: ENTITY(), present: 4/4, bag_id: ...)
>>> kd.new_shaped(s)
DataSlice([
  [Entity():..., Entity():...],
  [Entity():..., Entity():...],
], schema: ENTITY(), present: 4/4, bag_id: ...)

# Non-empty Entity creation
>>> kd.new_like(x, **attr_dss)
DataSlice([[Entity(y=1), None], [Entity(y=3), Entity(y=4)]], schema: ENTITY(y=INT32), present: 3/4,...)

>>> e1 = kd.new_shaped_as(x, **attr_dss)
>>> e2 = kd.new_shaped(s, **attr_dss)
>>> kd.testing.assert_equivalent(e1, e2.with_schema(e1.get_schema()))


# With provided itemids
>>> itemid = kd.new_itemid_shaped_as(x)
>>> kd.new_like(x, itemid=itemid)
DataSlice([
      [Entity():$..., None],
      [Entity():$..., Entity():$...],
    ], schema: ENTITY(), present: 3/4,...)

>>> e1 = kd.new_shaped_as(x, itemid=itemid)
>>> e2 = kd.new_shaped(s, itemid=itemid)
>>> kd.testing.assert_equivalent(e1, e2.with_schema(e1.get_schema()))

>>> schema = kd.schema.new_schema(y=kd.INT64)
>>> e = kd.new_like(x, schema=schema)

# Empty List creation
>>> kd.list_like(x)
DataSlice([[List[], None], [List[], List[]]], schema: LIST[OBJECT], present: 3/4, bag_id: ...)
>>> kd.list_shaped_as(x)
DataSlice([[List[], List[]], [List[], List[]]], schema: LIST[OBJECT], present: 4/4, bag_id: ...)
>>> kd.list_shaped(s)
DataSlice([[List[], List[]], [List[], List[]]], schema: LIST[OBJECT], present: 4/4, bag_id: ...)

# Non-empty List creation
>>> list_item_ds = kd.slice([[[1, 2], [3, 4]],[[5, 6], [7, 8]]])
>>> kd.list_like(x, list_item_ds)
DataSlice([[List[1, 2], None], [List[5, 6], List[7, 8]]], schema: LIST[INT32], present: 3/4,...)

>>> l1 = kd.list_shaped_as(x, list_item_ds); l1
DataSlice([[List[1, 2], List[3, 4]], [List[5, 6], List[7, 8]]], schema: LIST[INT32], present: 4/4,...)

>>> l2 = kd.list_shaped(s, list_item_ds)
>>> kd.testing.assert_equivalent(l1, l2)

# With provided itemids
>>> list_itemid = kd.new_listid_shaped_as(x)
>>> l1 = kd.list_like(x, itemid=list_itemid)

>>> l1 = kd.list_shaped_as(x, itemid=list_itemid)
>>> l2 = kd.list_shaped(s, itemid=list_itemid)
>>> kd.testing.assert_equivalent(l1, l2)

>>> schema = kd.list_schema(kd.INT64)
>>> l1 = kd.list_like(x, schema=schema)

# Empty Dict creation
>>> kd.dict_like(x)
DataSlice([[Dict{}, None], [Dict{}, Dict{}]], schema: DICT{OBJECT, OBJECT}, present: 3/4, bag_id: ...)
>>> kd.dict_shaped_as(x)
DataSlice([[Dict{}, Dict{}], [Dict{}, Dict{}]], schema: DICT{OBJECT, OBJECT}, present: 4/4, bag_id: ...)
>>> kd.dict_shaped(s)
DataSlice([[Dict{}, Dict{}], [Dict{}, Dict{}]], schema: DICT{OBJECT, OBJECT}, present: 4/4, bag_id: ...)

# Non-empty Dict creation
>>> key_ds = kd.slice([['a', 'b'],['c', 'd']])
>>> value_ds = kd.slice([[1, 2],[3, 4]])
>>> d1 = kd.dict_like(x, key_ds, value_ds)

>>> d2 = kd.dict_shaped_as(x, key_ds, value_ds)
>>> d3 = kd.dict_shaped(s, key_ds, value_ds)
>>> kd.testing.assert_equivalent(d2, d3)

# With provided itemids
>>> dict_itemid = kd.new_dictid_shaped_as(x)
>>> d1 = kd.dict_like(x, itemid=dict_itemid)
>>> d2 = kd.dict_shaped_as(x, itemid=dict_itemid)
>>> d3 = kd.dict_shaped(s, itemid=dict_itemid)
>>> kd.testing.assert_equivalent(d2, d3)

>>> schema = kd.dict_schema(kd.STRING, kd.INT32)
>>> d4 = kd.dict_like(x, schema=schema)

Pointwise Comparison Operators

>>> a = kd.slice([1, 2, 3])
>>> b = kd.slice([3, 2, 1])

# Use infix operators
>>> a > b
DataSlice([missing, missing, present], schema: MASK,...)
>>> a >= b
DataSlice([missing, present, present], schema: MASK,...)
>>> a < b
DataSlice([present, missing, missing], schema: MASK,...)
>>> a <= b
DataSlice([present, present, missing], schema: MASK,...)
>>> a == b
DataSlice([missing, present, missing], schema: MASK,...)
>>> a != b
DataSlice([present, missing, present], schema: MASK,...)

# Use kd operators
>>> kd.greater(a, b)
DataSlice([missing, missing, present], schema: MASK,...)
>>> kd.greater_equal(a, b)
DataSlice([missing, present, present], schema: MASK,...)
>>> kd.less(a, b)
DataSlice([present, missing, missing], schema: MASK,...)
>>> kd.less_equal(a, b)
DataSlice([present, present, missing], schema: MASK,...)
>>> kd.equal(a, b)
DataSlice([missing, present, missing], schema: MASK,...)
>>> kd.not_equal(a, b)
DataSlice([present, missing, present], schema: MASK,...)

DataSlice Comparison Operator

>>> a = kd.slice([1, None, 3])
>>> b = kd.slice([1, None, 3])

>>> assert kd.full_equal(a, b)

# Note it is different from
>>> assert not kd.all(a == b)

# Auto-alignment rule applies
>>> assert kd.full_equal(kd.item(1), kd.slice([1, 1]))

# Type promotion rule applies
>>> assert kd.full_equal(kd.item(1), kd.slice([1, 1.0]))

Mask Operators

# Masking
>>> a = kd.slice([kd.present, kd.missing, kd.present])
>>> b = kd.slice([kd.present, kd.missing, kd.missing])

>>> res1 = a & b; res1
DataSlice([present, missing, missing], schema: MASK, present: 1/3)
>>> res2 = kd.mask_and(a, b)
>>> kd.testing.assert_equivalent(res1, res2)


>>> res1 = a | b; res1
DataSlice([present, missing, present], schema: MASK, present: 2/3)
>>> res2 = kd.mask_or(a, b)
>>> kd.testing.assert_equivalent(res1, res2)

>>> a == b
DataSlice([present, missing, missing], schema: MASK, present: 1/3)
>>> kd.mask_equal(a, b) # which is different from
DataSlice([present, present, missing], schema: MASK, present: 2/3)

>>> a != b
DataSlice([missing, missing, missing], schema: MASK, present: 0/3)
>>> kd.mask_not_equal(a, b) # which is different from
DataSlice([missing, missing, present], schema: MASK, present: 1/3)

Presence Checking Operators

>>> a = kd.slice([1, None, 3])

>>> kd.has(a)
DataSlice([present, missing, present], schema: MASK,...)
>>> kd.has_not(a)
DataSlice([missing, present, missing], schema: MASK,...)

>>> b = kd.slice([kd.obj(a=1), kd.obj(kd.list([1, 2])),
...               kd.obj(kd.dict({1: 2})), None, 1])

>>> kd.has_entity(b)
DataSlice([present, missing, missing, missing, missing], schema: MASK,...)
>>> kd.has_list(b)
DataSlice([missing, present, missing, missing, missing], schema: MASK,...)
>>> kd.has_dict(b)
DataSlice([missing, missing, present, missing, missing], schema: MASK,...)
>>> kd.has_primitive(b)
DataSlice([missing, missing, missing, missing, present], schema: MASK,...)

Masking and Coalesce Operators

# Masking
>>> a = kd.slice([1, None, 3])
>>> b = kd.slice([4, 5, 6])

# Use infix operator
>>> a & (a > 1)
DataSlice([None, None, 3], schema: INT32,...)
>>> kd.apply_mask(a, a > 1)
DataSlice([None, None, 3], schema: INT32,...)
>>> b & kd.has(a)
DataSlice([4, None, 6], schema: INT32,...)

# Use infix operator
>>> a | b
DataSlice([1, 5, 3], schema: INT32,...)
>>> kd.coalesce(a, b)
DataSlice([1, 5, 3], schema: INT32,...)

# Works with a DataSlice with a different schema
>>> c = kd.slice(['4', '5', '6'])
>>> a | c
DataSlice([1, '5', 3], schema: OBJECT,...)

# Make sure inputs are disjoint
>>> kd.disjoint_coalesce(a, c)
Traceback (most recent call last):
ValueError: kd.masking.disjoint_coalesce: `x` and `y` cannot overlap, but found the following intersecting values for the flattened and aligned inputs:
...

>>> d = kd.slice([None, '5', None])
>>> kd.disjoint_coalesce(a, d)
DataSlice([1, '5', 3], schema: OBJECT,...)

Conditional Selection Operators

>>> a = kd.slice([1, 2, 3])
>>> kd.select(a, a > 1)
DataSlice([2, 3], schema: INT32,...)
>>> a.select(a > 1)
DataSlice([2, 3], schema: INT32,...)
>>> kd.select(a, lambda x: x > 1)
DataSlice([2, 3], schema: INT32,...)

# Use expand_filter=False to avoid expanding
# the filter to the shape of the DataSlice
# so that empty dimensions are removed too
>>> b = kd.slice([[1, None], [None, 4]])
>>> fltr = kd.slice([kd.present, kd.missing])
>>> kd.select(b, fltr)
DataSlice([[1, None], []], schema: INT32,...)
>>> kd.select(b, fltr, expand_filter=False)
DataSlice([[1, None]], schema: INT32,...)

# Put items in present positions in filter.
>>> kd.inverse_select(
...     kd.slice([1, 2]),
...     kd.slice([kd.missing, kd.present, kd.present])
... )
DataSlice([None, 1, 2], schema: INT32,...)

>>> a = kd.slice([1, None, 3])
>>> kd.select_present(a)
DataSlice([1, 3], schema: INT32,...)
>>> kd.select(a, kd.has(a))
DataSlice([1, 3], schema: INT32,...)

>>> b = kd.slice([3, 2, 1])
>>> c = kd.slice([-1, 0, 1])
>>> kd.cond(c > 1, a, b)
DataSlice([3, 2, 1], schema: INT32,...)

Range Operator

>>> kd.range(1, 5)
DataSlice([1, 2, 3, 4], schema: INT64,...)
>>> kd.range(1, kd.slice([3, 5]))
DataSlice([[1, 2], [1, 2, 3, 4]], schema: INT64,...)
>>> kd.range(kd.slice([0, 3]), kd.slice([3, 5]))
DataSlice([[0, 1, 2], [3, 4]], schema: INT64,...)

Math Operators

>>> x = kd.slice([1, 2, 3])
>>> y = kd.slice([4, 5, 6])
>>> kd.testing.assert_equivalent(x + y, kd.math.add(x, y))
>>> kd.testing.assert_equivalent(x - y, kd.math.subtract(x, y))
>>> kd.testing.assert_equivalent(x * y, kd.math.multiply(x, y))
>>> kd.testing.assert_equivalent(x / y, kd.math.divide(x, y))
>>> kd.testing.assert_equivalent(x % y, kd.math.mod(x, y))
>>> kd.testing.assert_equivalent(x ** y, kd.math.pow(x, y))
>>> kd.testing.assert_equivalent(x // y, kd.math.floordiv(x, y))

>>> _ = kd.math.log(x)
>>> _ = kd.math.round(x)
>>> _ = kd.math.floor(x)
>>> _ = kd.math.ceil(x)
>>> _ = kd.math.pos(x)
>>> _ = kd.math.abs(x)
>>> _ = kd.math.neg(x)
>>> _ = kd.math.maximum(x, y)
>>> _ = kd.math.minimum(x, y)

>>> _ = kd.agg_min(x)
>>> _ = kd.agg_max(x)
>>> _ = kd.agg_sum(x)
>>> _ = kd.min(x)
>>> _ = kd.max(x)
>>> _ = kd.sum(x)

>>> _ = kd.math.cum_min(x)
>>> _ = kd.math.cum_max(x)
>>> _ = kd.math.cum_sum(x)
>>> _ = kd.math.agg_mean(x)
>>> _ = kd.math.agg_median(x)
>>> _ = kd.math.agg_std(x)
>>> _ = kd.math.agg_var(x)
>>> _ = kd.math.cdf(x)
>>> _ = kd.math.agg_inverse_cdf(x, 1e-3)

String Operators

>>> x = kd.str('Hello World')
>>> _ = kd.strings.length(x)
>>> _ = kd.strings.upper(x)
>>> _ = kd.strings.lower(x)
>>> _ = kd.strings.split(x, ' ')
>>> _ = kd.strings.contains(x, 'ello')
>>> _ = kd.strings.regex_extract(x, '(Hel+o)')
>>> _ = kd.strings.regex_match(x, '\.+')
>>> _ = kd.strings.find(x, 'World')
>>> _ = kd.strings.replace(x, 'World', 'Earth')
>>> _ = kd.strings.substr(x, 2, 5)
>>> _ = kd.strings.count(x, 'o')
>>> kd.strings.agg_join(kd.slice(['a', 'b']), 'c')
DataItem('acb', schema: STRING)

>>> kd.strings.join(kd.slice(['a', 'b']), kd.slice(['c', 'd']))
DataSlice(['ac', 'bd'], schema: STRING,...)

>>> kd.strings.encode('abc')
DataItem(b'abc', schema: BYTES)
>>> kd.strings.decode(b'abc')
DataItem('abc', schema: STRING)

# Encode any bytes into base64 format string
# Useful for putting bytes into JSON
>>> kd.strings.encode_base64(b'abc')
DataItem('YWJj', schema: STRING)

>>> kd.strings.decode_base64('YWJj')
DataItem(b'abc', schema: BYTES)

String Format Operators

>>> kd.strings.printf('Hello %s', 'World')
DataItem('Hello World', schema: STRING)
>>> kd.strings.printf('Hello %s',
...   kd.slice(['World', 'Universe']))
DataSlice(['Hello World', 'Hello Universe'], schema: STRING,...)
>>> o = kd.obj(greeting='Hello', name='World')
>>> kd.strings.printf('%s %s', o.greeting, o.name)
DataItem('Hello World', schema: STRING)
>>> kd.strings.printf('%s %d', "Hello", 1)
DataItem('Hello 1', schema: STRING)

# Python like f-string Format API
# Note that ":s" after the DataSlice var name
>>> name = kd.str('World')
>>> kd.strings.fstr(f'Hello {name:s}')
DataItem('Hello World', schema: STRING)
>>> name = kd.slice(['World', 'Universe'])
>>> kd.strings.fstr(f'Hello {name:s}')
DataSlice(['Hello World', 'Hello Universe'], schema: STRING,...)
>>> o=kd.obj(greeting='Hello', name='World')
>>> kd.strings.fstr(f'{o.greeting:s} {o.name:s}')
DataItem('Hello World', schema: STRING)
>>> greeting, count = kd.str('Hello'), kd.int32(1)
>>> kd.strings.fstr(f'{greeting:s} {count:s}')
DataItem('Hello 1', schema: STRING)

# Use format to specify kwargs
>>> kd.strings.format('Hello {name}', name='World')
DataItem('Hello World', schema: STRING)
>>> kd.strings.format('Hello {name}',
...   name=kd.slice(['World', 'Universe']))
DataSlice(['Hello World', 'Hello Universe'], schema: STRING,...)
>>> kd.strings.format('{greeting} {count}',
...   greeting='Hello', count=1)
DataItem('Hello 1', schema: STRING)

Aggregational Operators

# collapse over the last dimension and
# returns the value if all item has the
# same value or None otherwise
>>> x = kd.slice([[1], [], [2, None], [3, 4]])
>>> kd.collapse(x)
DataSlice([1, None, 2, None], schema: INT32, present: 2/4)

# 'count' returns counts of present items
>>> kd.agg_count(x)
DataSlice([1, 0, 1, 2], schema: INT64, present: 4/4)

# aggregates over the last 2 dimensions
>>> x1 = kd.agg_count(x, ndim=2); x1
DataItem(4, schema: INT64)

# aggregates across all dimensions
>>> kd.testing.assert_equivalent(kd.count(x), x1)

# aggregates over the last dimension and
# returns the accumulated count of present items
>>> kd.cum_count(x)
DataSlice([[1], [], [1, None], [1, 2]], schema: INT64, present: 4/5)

# aggregates over the last 2 dimensions
>>> kd.cum_count(x, ndim=2)
DataSlice([[1], [], [2, None], [3, 4]], schema: INT64, present: 4/5)

# 'index' returns indices starting from 0
# aggregates over the last dimension
>>> kd.index(x)
DataSlice([[0], [], [0, None], [0, 1]], schema: INT64, present: 4/5)

# indices at the second dimension
# note it is `dim` rather than `ndim` and
# the result is aligned with x
>>> kd.index(x, dim=1)
DataSlice([[0], [], [0, None], [0, 1]], schema: INT64, present: 4/5)

# indices at the first dimension
>>> kd.index(x, dim=0)
DataSlice([[0], [], [2, 2], [3, 3]], schema: INT64, present: 5/5)

>>> x = kd.slice([[kd.present, kd.present], [],
...               [kd.present, kd.missing], [kd.missing]])

# 'all' returns present if all items are present
# aggregates over the last dimension
>>> kd.agg_all(x)
DataSlice([present, present, missing, missing], schema: MASK, present: 2/4)

# aggregates across all dimensions
>>> kd.all(x)
DataItem(missing, schema: MASK)

# 'any' returns present if any item is present
>>> kd.agg_any(x)
DataSlice([present, missing, present, missing], schema: MASK, present: 2/4)

# aggregates across all dimensions
>>> kd.any(x)
DataItem(present, schema: MASK)

Type casting operators

# Same operators as for creating a slice of specific type.
>>> kd.int32(kd.slice([1., 2., 3.]))
DataSlice([1, 2, 3], schema: INT32, present: 3/3)
>>> kd.int64(kd.slice([1, 2, 3]))
DataSlice([1, 2, 3], schema: INT64, present: 3/3)
>>> kd.float32(kd.slice([1, 2, 3]))
DataSlice([1.0, 2.0, 3.0], schema: FLOAT32, present: 3/3)
>>> kd.float64(kd.slice([1., 2., 3.]))
DataSlice([1.0, 2.0, 3.0], schema: FLOAT64, present: 3/3)
>>> kd.bool(kd.slice([0, 1, 2]))
DataSlice([False, True, True], schema: BOOLEAN, present: 3/3)
>>> kd.str(kd.slice([1, 2, 3]))
DataSlice(['1', '2', '3'], schema: STRING, present: 3/3)
>>> kd.bytes(kd.slice([b'1', b'2', b'3']))
DataSlice([b'1', b'2', b'3'], schema: BYTES, present: 3/3)

# Dispatches to the relevant kd.* operator
>>> kd.cast_to(kd.slice([1, 2, 3]), kd.INT64)
DataSlice([1, 2, 3], schema: INT64, present: 3/3)

Expand_to Operators

>>> a = kd.slice([[1, 2], [3]])
>>> b = kd.slice([[[1, 2], [3]], [[4, 5]]])

>>> kd.expand_to(a, b)
DataSlice([[[1, 1], [2]], [[3, 3]]], schema: INT32, present: 5/5)

# which is equivalent to
>>> a.expand_to(b)
DataSlice([[[1, 1], [2]], [[3, 3]]], schema: INT32, present: 5/5)

>>> c = kd.slice([[1], [2, 3]])

>>> kd.expand_to(a, c)
Traceback (most recent call last):
ValueError: kd.shapes.expand_to_shape: DataSlice with shape=JaggedShape(2, [2, 1]) cannot be expanded to shape=JaggedShape(2, [1, 2])
...

>>> kd.expand_to(a, c, ndim=0)
Traceback (most recent call last):
ValueError: kd.shapes.expand_to_shape: DataSlice with shape=JaggedShape(2, [2, 1]) cannot be expanded to shape=JaggedShape(2, [1, 2])
...

>>> kd.expand_to(a, c, ndim=1)
DataSlice([[[1, 2]], [[3], [3]]], schema: INT32, present: 4/4)

>>> kd.expand_to(a, c, ndim=2)
DataSlice([[[[1, 2], [3]]], [[[1, 2], [3]], [[1, 2], [3]]]], schema: INT32, present: 9/9)

>>> kd.expand_to(a, c, ndim=3)
Traceback (most recent call last):
ValueError: kd.shapes.expand_to_shape: ndim must be a positive integer and <= x.ndim, got 3
...

Explosion and Implosion Operators

# kd.explode explodes the Lists ndim times
>>> a = kd.list([[[1, 2], [3]], [[4], [5]]])

>>> res1 = kd.explode(a, ndim=1); res1
DataSlice([List[List[1, 2], List[3]], List[List[4], List[5]]], schema: LIST[LIST[INT32]], present: 2/2, ...)

>>> kd.testing.assert_equivalent(a.explode(ndim=1), res1) # Same as above

>>> kd.explode(a, ndim=2)
DataSlice([[List[1, 2], List[3]], [List[4], List[5]]], schema: LIST[INT32], present: 4/4, ...)

>>> kd.explode(a, ndim=3)
DataSlice([[[1, 2], [3]], [[4], [5]]], schema: INT32, present: 5/5, ...)

# explode until the DataSlice does not contain
# Lists any more.
>>> kd.explode(a, ndim=-1)
DataSlice([[[1, 2], [3]], [[4], [5]]], schema: INT32, present: 5/5, ...)

# kd.implode implodes the DataSlice ndim times
>>> a = kd.slice([[[1, 2], [3]], [[4], [5]]])

>>> kd.implode(a, ndim=1)
DataSlice([[List[1, 2], List[3]], [List[4], List[5]]], schema: LIST[INT32], present: 4/4, ...)

# which is equivalent to
>>> a.implode(ndim=1)
DataSlice([[List[1, 2], List[3]], [List[4], List[5]]], schema: LIST[INT32], present: 4/4, ...)

>>> kd.implode(a, ndim=2)
DataSlice([List[List[1, 2], List[3]], List[List[4], List[5]]], schema: LIST[LIST[INT32]], present: 2/2, ...)

>>> kd.implode(a, ndim=3)
DataItem(List[List[List[1, 2], List[3]], List[List[4], List[5]]], schema: LIST[LIST[LIST[INT32]]], ...)

# implode until the result is a List DataItem
>>> kd.implode(a, ndim=-1)
DataItem(List[List[List[1, 2], List[3]], List[List[4], List[5]]], schema: LIST[LIST[LIST[INT32]]], ...)

Joining DataSlice Operators

# Stacks DataSlices and creates a new
# dimension at `ndim`
>>> a = kd.slice([[1, None], [4]])
>>> b = kd.slice([[7, 7], [7]])

>>> res1 = kd.stack(a, b, ndim=0); res1
DataSlice([[[1, 7], [None, 7]], [[4, 7]]], schema: INT32, present: 5/6)

>>> kd.testing.assert_equivalent(kd.stack(a, b), res1) # Same as above

>>> kd.stack(a, b, ndim=1)
DataSlice([[[1, None], [7, 7]], [[4], [7]]], schema: INT32, present: 5/6)

>>> kd.stack(a, b, ndim=2)
DataSlice([[[1, None], [4]], [[7, 7], [7]]], schema: INT32, present: 5/6)

>>> kd.stack(a, b, ndim=4)
Traceback (most recent call last):
ValueError: invalid ndim=4 for rank=2 stack...

# Zip DataSlices over the last dimension
>>> kd.zip(a, b)
DataSlice([[[1, 7], [None, 7]], [[4, 7]]], schema: INT32, present: 5/6)

# Zip supports auto-alignment
>>> kd.zip(a, 1)
DataSlice([[[1, 1], [None, 1]], [[4, 1]]], schema: INT32, present: 5/6)

# Zip is equivalent to kd.stack(..., ndim=0)
# but supports auto-alignment
>>> kd.testing.assert_equivalent(kd.stack(a, b), kd.zip(a,b))

>>> kd.stack(a, 1)
Traceback (most recent call last):
ValueError: all concat/stack args must have the same rank, got 2 and 0...

# Concatenates DataSlices on dimension `ndim`
>>> a = kd.slice([[[1, 2], [3]], [[5], [7, 8]]])
>>> b = kd.slice([[[1], [2]], [[3], [4]]])

>>> res1 = kd.concat(a, b, ndim=1); res1
DataSlice([[[1, 2, 1], [3, 2]], [[5, 3], [7, 8, 4]]], schema: INT32, present: 10/10)

>>> kd.testing.assert_equivalent(kd.concat(a, b), res1) # Same as above

>>> kd.concat(a, b, ndim=2)
DataSlice([[[1, 2], [3], [1], [2]], [[5], [7, 8], [3], [4]]], schema: INT32, present: 10/10)

>>> kd.concat(a, b, ndim=3)
DataSlice([[[1, 2], [3]], [[5], [7, 8]], [[1], [2]], [[3], [4]]], schema: INT32, present: 10/10)

>>> kd.concat(a, b, ndim=4)
Traceback (most recent call last):
ValueError: invalid ndim=4 for rank=3 concat...

Group_by Operators

>>> cities = kd.obj(
...   name=kd.slice(['sf', 'sj', 'la', 'nyc', 'albany']),
...   population=kd.slice([100, 200, 300, 400, 500]),
...   state=kd.slice(['ca', 'ca', 'ca', 'ny', 'ny'])
... )

# Group cities by state name over the last dimension
# Note the result has one more dimension
>>> cities_grouped = kd.group_by(cities, cities.state)

>>> state_population = kd.agg_sum(cities_grouped.population)

# We can create the state lists
>>> states = kd.obj(
...   cities=kd.implode(cities_grouped),
...   name=kd.collapse(cities_grouped.state),
...   population=state_population
... )

# Note that 'state' for 'la' is missing
>>> cities = kd.obj(
...   name=kd.slice(['sf', 'sj', 'la', 'nyc', 'albany']),
...   population=kd.slice([100, 200, 300, 400, 500]),
...   state=kd.slice(['ca', 'ca', None, 'ny', 'ny'])
... )

>>> cities_grouped = kd.group_by(cities, cities.state)

# Note that 'la' is missing
>>> cities_grouped.name
DataSlice([['sf', 'sj'], ['nyc', 'albany']], schema: STRING, present: 4/4, ...)

# Sort by both state name and population
# Note that we need to convert MASK to BOOLEAN
# if we want to use it as group_by value
>>> cities_grouped = kd.group_by(cities, cities.state,
...   kd.cond(cities.population > 100, True, False))

>>> cities_grouped.name
DataSlice([['sf'], ['sj'], ['nyc', 'albany']], schema: STRING, present: 4/4, ...)

Unique Operator

>>> ds1 = kd.slice([[1, 1, 1.], [1, '1', None]])

# Get unique items over the last dimension
>>> kd.unique(ds1)
DataSlice([[1, 1.0], [1, '1']], schema: OBJECT, present: 4/4)

# Get unique items across all dimensions
>>> kd.unique(ds1.flatten())
DataSlice([1, 1.0, '1'], schema: OBJECT, present: 3/3)

>>> o1 = kd.obj(a=1)
>>> o2 = kd.obj(a=2)
>>> l = kd.list()
>>> d = kd.dict()
>>> ds2 = kd.slice([[o1, o1, o2], [o2, None, None]])

# For entities/objects/lists/dicts,
# compare their ItemIds
>>> kd.unique(ds2)
DataSlice([[Obj(a=1), Obj(a=2)], [Obj(a=2)]], schema: OBJECT, present: 3/3, ...)
# One-to-many mapping
>>> docs = kd.obj(
...   did=kd.slice([[1, 2], [1, 2, 3]]),
...   score=kd.slice([[4, 5], [6, 7, 8]]),
... )
>>> queries = kd.obj(did=kd.slice(
...   [kd.list([1, 3, 1]), kd.list([3, 1])]))
>>> kd.translate(queries.did[:], docs.did, docs.score)
DataSlice([[4, None, 4], [8, 6]], schema: INT32, present: 4/5, ...)

# Many-to-one mapping
>>> docs = kd.obj(
...     qid=kd.slice([[1, 2, 2], [2, 2, 3]]),
...     score=kd.slice([[1, 2, 3], [4, 5, 6]]),
... )
>>> queries = kd.obj(
...   qid=kd.slice([[1, 2], [3, 2]]))

>>> new_docs = kd.translate_group(
...   queries.qid, docs.qid, docs)

# Add docs list since the result of
# translate_group is aligned with queries
>>> queries = queries.with_attrs(docs=kd.implode(new_docs))

>>> queries.docs[:].score
DataSlice([[[1], [2, 3]], [[6], [4, 5]]], schema: INT32, present: 6/6, ...)

# Many-to-many mapping is not directly
# supported as operator but can be done
# as multiple steps. E.g.
>>> docs = kd.obj(
...     qid=kd.slice([1, 2, 2, 2, 2, 3]),
...     score=kd.slice([1, 2, 3, 4, 5, 6]),
... )

>>> query_terms = kd.obj(
...     qid=kd.slice([1, 1, 2, 2, 5]),
...     term=kd.slice(['a', 'b', 'c', 'd', 'e']),
... )

>>> docs_grouped = kd.group_by(docs, docs.qid)
>>> terms_grouped = kd.group_by(query_terms, query_terms.qid)
>>> queries = kd.obj(
...   docs=kd.implode(docs_grouped),
...   terms=kd.implode(terms_grouped)
... )

Random Number and Sampling Operators

# Generate random integers
>>> x = kd.slice([[1, 2], [], [3, None], [None]])
>>> shape = x.get_shape()

# random int in [0, max_int)
>>> _ = kd.randint_like(x)

# random int in [0, 10)
>>> _ = kd.randint_like(x, 10)

# random int in [-5, 200)
>>> _ = kd.randint_like(x, -5, 200)

# setting the seed is supported
>>> _ = kd.randint_like(x, 10, seed=123)

# shaped_as creates same shape as x
>>> _ = kd.randint_shaped_as(x, 10)

# shaped creates using an explicit shape
>>> _ = kd.randint_shaped(shape, 10)

>>> ds = kd.slice([[1, 2, None, 4],
...             [5, None, None, 8]])
>>> key = kd.slice([['a', 'b', 'c', 'd'],
...              ['a', 'b', 'c', 'd']])

# Sampling is performed on flatten 'ds'
# rather than on the last dimension.
# Sample each element with .5 probability.
>>> _ = kd.sample(ds, 0.5, 123)

# Use 'key' for stability
>>> _ = kd.sample(ds, 0.5, 123, key)

# Select 2 items from last dimension
>>> _ = kd.sample_n(ds, 2, 123)

# Select one item from the first and
# two items from the second
>>> _ = kd.sample_n(ds, kd.slice([1, 2]), 123)

# Use 'key' for stability
>>> _ = kd.sample_n(ds, 2, 123, key)

>>> ds2 = kd.slice([[1, 2, 3], [4, None]])

# Shuffle along the last dimension
>>> _ = kd.shuffle(ds2)

# Shuffle along the second-to-last dimension
>>> _ = kd.shuffle(ds2, 1)

# Shuffle items within the last two dimensions
>>> _ = kd.shuffle(ds2.flatten(-2)).reshape_as(ds2)

# Split a DataSlice into train, test, validation
# `examples` is a 1D slice loaded from a dataset
# examples = kd.shuffle(my_dataset)
# train_examples = examples.S[:800]
# test_examples = examples.S[800:900]
# validation_examples = examples.S[900:]

Sort and Rank Operators

>>> ds1 = kd.slice([[10, 5, 10, 5], [30, 10]])
>>> ds2 = kd.slice([[1, 2, 3, 4], [2, 1]])

# Ranking returns ranked positions
# Rank over the last dimension
>>> kd.ordinal_rank(ds1)
DataSlice([[2, 0, 3, 1], [1, 0]], schema: INT64, present: 6/6)

# Rank with tie breaker
>>> kd.ordinal_rank(ds1, tie_breaker=ds2)
DataSlice([[2, 0, 3, 1], [1, 0]], schema: INT64, present: 6/6)

# Rank with descending order
>>> kd.ordinal_rank(ds1, descending=True)
DataSlice([[0, 2, 1, 3], [0, 1]], schema: INT64, present: 6/6)

# Sorting returns sorted DataSlice
# Sort over the last dimension
>>> kd.sort(ds1)
DataSlice([[5, 5, 10, 10], [10, 30]], schema: INT32, present: 6/6)

# Sort with descending order
>>> kd.sort(ds1, descending=True)
DataSlice([[10, 10, 5, 5], [30, 10]], schema: INT32, present: 6/6)

# Sort based on sort_by
>>> kd.sort(ds1, sort_by=ds2)
DataSlice([[10, 5, 10, 5], [10, 30]], schema: INT32, present: 6/6)

# Find the n-th element in the last dimension
>>> kd.sort(ds1).take(2)
DataSlice([10, None], schema: INT32, present: 1/2)

>>> kd.sort(ds1).take(kd.slice([0, 1]))
DataSlice([5, 30], schema: INT32, present: 2/2)

Reverse Operator

>>> ds = kd.slice([[10, 5, 10, 5], [30, 10]])

# Reverse items in the last dimension
>>> kd.reverse(ds)
DataSlice([[5, 10, 5, 10], [10, 30]], schema: INT32, present: 6/6)

# Reverse items across all dimensions
>>> kd.reverse(ds.flatten()).reshape_as(ds)
DataSlice([[10, 30, 5, 10], [5, 10]], schema: INT32, present: 6/6)

map_py Operator

>>> ds = kd.slice([1, 2, 3])

# Pointwise
>>> kd.map_py(lambda x: x + 1, ds,
...           schema=kd.INT64)
DataSlice([2, 3, 4], schema: INT64, present: 3/3)

# Aggregational
>>> kd.map_py(lambda x: len(x), ds, ndim=1)
DataItem(3, schema: INT32)

# Aggregational but no dimension change
>>> kd.map_py(lambda x: sorted(x), ds, ndim=1)[:]
DataSlice([1, 2, 3], schema: INT32, present: 3/3,...)

# Expansion
>>> kd.map_py(lambda x: [x] * 3, ds,
...           schema=kd.list_schema(kd.INT32))[:]
DataSlice([[1, 1, 1], [2, 2, 2], [3, 3, 3]], schema: INT32...)

# Pass Objects
>>> a = kd.obj(x=kd.slice([[1, 2], [3, None, 5]]),
...            y=kd.slice([[6, 7], [8, 9, None]]))
>>> kd.map_py(lambda a: a.x + a.y, a,
...           schema=kd.INT32)
DataSlice([[7, 9], [11, None, None]], schema: INT32, present: 3/5)

>>> kd.map_py(lambda a: [obj.x - obj.y for obj in a],
...           a, ndim=1)[:]
DataSlice([[-5, -5], [-5, None, None]], schema: INT32, present: 3/5...)

# Pass Objects and primitives
>>> b = kd.slice([[1, 2], [3, None, 5]])
>>> kd.map_py(lambda a, b: a.x + a.y + b, a, b)
DataSlice([[8, 11], [14, None, None]], schema: INT32, present: 3/5)

# Return Objects
>>> f = lambda x: kd.obj(x=1) if x.y < 3 else kd.obj(y=1)
>>> res = kd.map_py(f, kd.obj(y=kd.slice([[1, 2], [3]])))

# With max_threads to speed up the I/O-bound
# executions (e.g. reading from disk or RPCs)
>>> _ = kd.map_py(f, kd.obj(y=kd.slice([[1, 2], [3]])), max_threads=20)

# Pass multiple arguments
>>> f = lambda x, y, z: x + y + z
>>> kd.map_py(f, 1, 2, 3) # positional
DataItem(6, schema: INT32)

>>> kd.map_py(f, z=3, y=2, x=1) # keyword
DataItem(6, schema: INT32)

>>> kd.map_py(f, 1, z=3, y=2) # mix of positional and keyword
DataItem(6, schema: INT32)

Python Function Operators

>>> a = kd.slice([1, 2, 3])

# Use python for a simple operation transformation.
>>> def f1(a):
...   # Note that 'a' is passed as a DataSlice
...   return a + 1

>>> c = kd.apply_py(f1, a); c
DataSlice([2, 3, 4], schema: INT32, present: 3/3)

# Random Shuffle.
>>> import random

>>> def f2(a):
...   a = a.to_py()  # Use .to_py() to convert DataSlice to a standard python list.
...   random.shuffle(a)
...   return kd.slice(a)

>>> _ = kd.apply_py(f2, a)

# A simple aggregation.
>>> def f3(a, b):
...   return sum(x * y for x, y in zip(a.to_py(), b.to_py()))

>>> b = kd.slice([4, 5, 6])

>>> kd.apply_py(f3, a, b)
DataItem(32, schema: INT32)

# Using 'kwargs'.
>>> kd.apply_py(f3, a=a, b=b)
DataItem(32, schema: INT32)


# 'kd.map_py' executes a python function row-wise, on the given slices.
>>> kd.map_py(lambda a: a + 1, a)
DataSlice([2, 3, 4], schema: INT32, present: 3/3)

# Conditional variant & schema for the resulting slice.
>>> kd.map_py_on_selected(
...     lambda a: a + 1, a > 1, a, schema=kd.FLOAT64
... )
DataSlice([None, 3.0, 4.0], schema: FLOAT64, present: 2/3)

>>> kd.map_py_on_cond(
...     lambda a, b: a + b, lambda a, b: a - b, a > 1, b=b, a=a, schema=kd.INT64
... )
DataSlice([-3, 7, 9], schema: INT64, present: 3/3)

Schema

Koda schema is used to describe the type of contents. There are five types of schemas:

Schema is used to specify the behaviors for:

Schemas can be attached to DataSlices or individual items. Attaching a schema to a DataSlice means declaring that each item has the same schema. Individual items can store their own schema in a special attribute __schema__ and such item is called Object. It allows storing Objects with different schemas in one DataSlice. Primitives can be considered as OBJECT schemas as dtypes are embedded in their values.

DataSlice Schema refers to schemas attached to DataSlices. Embedded Schema refers schemas embedded inside Koda Objects as __schema__ attribute or dtypes for primitives. DataSlice Schema can be primitive schema, Entity schema, OBJECT, or ITEMID. Embedded Schema cannot be ITEMID.

Entity schemas can be either explicit or implicit. Explicit schemas are created using kd.schema.new_schema/list_schema/dict_schema() while implicit schemas are created as a by-product of kd.obj().

Schema Creation

kd.named_schema/list_schema/dict_schema/uu_schema(...) creates an explicit schema.

# Create a named schema
>>> Point = kd.named_schema('Point', x=kd.INT32, y=kd.FLOAT64)

# Attribute 'end' can be anything
>>> Line = kd.named_schema('Line', start=Point, end=kd.OBJECT)

# Get the attribute start's schema
>>> _ = Line.start

# Check if it is an Entity schema
>>> assert Point.is_entity_schema()
>>> assert Line.is_entity_schema()

# List schema
>>> ls1 = kd.list_schema(kd.INT64)

# List entity schema is 's1'
>>> ls2 = kd.list_schema(Point)

# Get the List item schema
>>> _ = ls2.get_item_schema()

# Check if it is a List schema
>>> assert ls2.is_list_schema()

# Dict schema
# Dict value schema is kd.OBJECT. That is,
# schemas are stored in entity __schema__
# attribute
>>> ds1 = kd.dict_schema(kd.STRING, kd.OBJECT)

# Dict value schema is 's2'
>>> ds2 = kd.dict_schema(kd.ITEMID, Line)

# Get the Dict key/value schema
>>> ds2.get_key_schema()
DataItem(ITEMID, schema: SCHEMA, ...)
>>> _ = ds2.get_value_schema()

# Check if it is a Dict schema
>>> assert ds2.is_dict_schema()

# UU schema
>>> uus1 = kd.uu_schema(x=kd.INT32, y=kd.FLOAT64)

# UU schemas with the same contents are the same
>>> uus2 = kd.uu_schema(x=kd.INT32, y=kd.FLOAT64)
>>> assert uus1 == uus2

# It is also an Entity schema
>>> assert uus1.is_entity_schema()

# In fact, named, list and dict schemas are also
# UU schemas
>>> Point1 = kd.named_schema('Point', x=kd.INT32, y=kd.FLOAT64)
>>> assert Point1.get_itemid() == Point.get_itemid()

## Create non-uu schema whose ItemId is allocated
>>> s1 = kd.schema.new_schema(x=kd.INT32, y=kd.FLOAT64)
>>> s2 = kd.schema.new_schema(x=kd.INT32, y=kd.FLOAT64)

>>> assert s1.get_itemid() != s2.get_itemid()

Item Creation Using Schemas

Entity schemas can be used to create entities by using schema.new(...) or kd.new(..., schema=schema). List and dict schemas do not support schema.new syntax.

>>> Point = kd.named_schema('Point', x=kd.INT32, y=kd.FLOAT64)
>>> Line = kd.named_schema('Line', start=Point, end=kd.OBJECT)

>>> i1 = Point.new(x=1, y=2.3)

# which is equivalent to
>>> i1 = kd.new(x=1, y=2.3, schema=Point)
>>> i2 = Line.new(z=i1, w='4')

>>> s3 = kd.list_schema(kd.INT64)
>>> s4 = kd.list_schema(Point)

>>> i3 = kd.list([1, 2, 3, 4], schema=s3)
>>> i4 = kd.list([i1, i1], schema=s4)

>>> s5 = kd.dict_schema(kd.STRING, kd.OBJECT)
>>> s6 = kd.dict_schema(kd.ITEMID, Line)

>>> i5 = kd.dict({'a': kd.obj()}, schema=s5)
>>> i6 = kd.dict(kd.obj().get_itemid(), i2, schema=s6)

Item Creation without Schema

When no schema is provided to kd.new, an schema is automatically derived based on provided arguments or keyword arguments. When a string is provided as schema, an uu schema is created based on the name.

# kd.new() creates entities with derived schema
>>> i1 = kd.new(x=1, y=2.0, z='3')

# The result DataItem has a auto-drived schema
>>> assert i1.get_schema().x == kd.INT32
>>> assert i1.get_schema().y == kd.FLOAT32
>>> assert i1.get_schema().z == kd.STRING

>>> i2 = kd.new(x=1, y=2.0, z='3')

# Schemas are different because two schemas
# with different ItemIds are created
>>> assert i1.get_schema() != i2.get_schema()

>>> i3 = kd.new(x=1, y=2.0, schema='Point')
>>> i4 = kd.new(x=2, y=3.0, schema='Point')

# Schemas are the same because two uu schemas
# with the same ItemIds are created
>>> assert i3.get_schema() == i4.get_schema()

Object Creation With Implicit Schemas

kd.obj(...) creates an object with an implicit schema.

# An implicit schema is created and
# attached to the Koda object
>>> o1 = kd.obj(x=1, y=2.0, z='3')

# The schema of the DataItem is kd.OBJECT
>>> assert o1.get_schema() == kd.OBJECT

# The implicit schema is stored
# as __schema__ attribute and can be accessed
# using get_obj_schema()
>>> o1.get_obj_schema()
DataItem(IMPLICIT_ENTITY(x=INT32, y=FLOAT32, z=STRING), schema: SCHEMA, ...)

# A new and different implicit schema is created
# every time kd.obj() is called
>>> o2 = kd.obj(x=1, y=2.0, z='3')
>>> assert o1.get_obj_schema() != o2.get_obj_schema()

# A new implicit schema is created for each
# newly created objects
>>> o_ds = kd.obj(x=kd.slice([1, 2]))

# The schema of the DataSlice is kd.OBJECT
>>> assert o_ds.get_schema() == kd.OBJECT

>>> obj_ss = o_ds.get_obj_schema()
>>> obj_ss.get_size()
DataItem(2, schema: INT64)
>>> obj_ss.take(0)
DataItem(IMPLICIT_ENTITY(x=INT32), schema: SCHEMA, ...)
>>> assert obj_ss.take(0) != obj_ss.take(1)

Debugging and Assertion Tools

The recommended workflow for debugging traced Koda flows is to run the flow eagerly and use standard Python tools to debug. However, in some cases this is cumbersome or even impossible. Here, we show some of the tracing-compatible tools available to your disposal. These work by embedding the assertions and debug printing directly into the traced Functor.

Input and Output Schema Validation

To ensure runtime correctness of inputs and outputs, Koda provides the kd.check_inputs and kd.check_output decorators. Their functionality is preserved in traced functors, including those decorated with kd.trace_as_fn().

# Validates that both `hours` and `minutes` are INT32, and that the output is a
# STRING.
>>> @kd.check_inputs(hours=kd.INT32, minutes=kd.INT32)
... @kd.check_output(kd.STRING)
... def timestamp(hours, minutes):
...   return kd.str(hours) + ':' + kd.str(minutes)

>>> timestamp(kd.slice([10, 10, 10]), kd.slice([15, 30, 45]))
DataSlice(['10:15', '10:30', '10:45'], schema: STRING, present: 3/3)

>>> timestamp(kd.slice([10, 10, 10]), kd.slice([15.35, 30.12, 45.1]))
Traceback (most recent call last):
TypeError: ...

For structured data, the inputs and outputs are required to have the expected schema.

>>> Doc = kd.schema.named_schema('Doc', doc_id=kd.INT64, score=kd.FLOAT32)
>>> Query = kd.schema.named_schema(
...     'Query',
...     query_text=kd.STRING,
...     query_id=kd.INT32,
...     docs=kd.list_schema(Doc),
... )

>>> @kd.check_inputs(query=Query)
... @kd.check_output(Doc)
... def get_docs(query):
...   return query.docs[:]


>>> doc = Doc.new(doc_id=2, score=3.0)
>>> query = Query.new(query_text='foo', query_id=1, docs=kd.list([doc]))

>>> _ = get_docs(query)

>>> get_docs(doc)
Traceback (most recent call last):
TypeError: ...

In some cases, checking for the exact schema is undesirable as we may only care that some specific attributes are present. For this, we can use kd.duck_type and friends.

# Input: Asserts that `query` has a `docs` attribute that is a LIST of values
# with an INT64 attribute `doc_id`.
# Output: Asserts that the output has an INT64 `doc_id` attribute.
>>> @kd.check_inputs(
...     query=kd.duck_type(docs=kd.duck_list(kd.duck_type(doc_id=kd.INT64)))
... )
... @kd.check_output(kd.duck_type(doc_id=kd.INT64))
... def get_docs(query):
...   return query.docs[:]

>>> get_docs(query)
DataSlice([...], schema: ..., present: 1/1, ...)

>>> get_docs(doc)
Traceback (most recent call last):
TypeError: ...

Runtime Assertion through kd.with_assertion.

To add arbitrary runtime assertions, in case kd.check_inputs and kd.check_outputs is not enough, Koda provides kd.with_assertion. This can be used on any slice with more relaxed restrictions on the condition compared to the schema assertion tools.

>>> def solve_quadratic_quation(a, b, c):
...   # Solving  ax^2 + bx + c = 0
...   d = b * b - 4 * a * c
...   d = kd.assertion.with_assertion(d, d >= 0, 'no real solution')
...   return kd.tuple(
...     (-b - kd.math.sqrt(d)) / 2 / a,
...     (-b + kd.math.sqrt(d)) / 2 / a)

>>> solve_quadratic_quation(kd.item(1), kd.item(1), kd.item(-2))
(DataItem(-2.0, schema: FLOAT32), DataItem(1.0, schema: FLOAT32))

>>> solve_quadratic_quation(kd.item(1), kd.item(1), kd.item(5))
Traceback (most recent call last):
ValueError: [FAILED_PRECONDITION] no real solution...

>>> def is_smaller_than_vectorized(x, y):
...   # It's ambiguous how to treat non-scalar conditionals and they are thefore not
...   # supported. Non-scalar data must therefore be handled explicitly.
...   return kd.assertion.with_assertion(x, kd.all(x < y), 'x must be less than y')

>>> is_smaller_than_vectorized(kd.slice([3, 4]), kd.slice([5, 5]))
DataSlice([3, 4], schema: INT32, present: 2/2)

In the examples above, the error message is computed up-front. If the error message depends on some inputs and is expensive to compute, we may wish to avoid the computation in case the condition is true, and only compute it if it’s actually required. For this, kd.with_assertion supports passing a callback functor to compute the message.

>>> def is_smaller_than(x, y):
...   return kd.assertion.with_assertion(
...       x,
...       kd.all(kd.slice([x < y])),
...       lambda x, y: kd.format('x={x} must be smaller than y={y}', x=x, y=y),
...       x,  # Passed as input 0.
...       y,  # Passed as input 1.
...   )

>>> is_smaller_than(kd.slice(3), kd.slice(5))
DataItem(3, schema: INT32)

>>> is_smaller_than(kd.slice(5), kd.slice(3))
Traceback (most recent call last):
ValueError: [FAILED_PRECONDITION] x=5 must be smaller than y=3...

Note that the result of the computation must be used in order for it to be properly embedded in the traced Functor.

>>> @kd.trace_as_fn()
... def is_smaller_than_bad(x, y):
...   kd.assertion.with_assertion(x, x < y, 'x must be less than y')
...   return x

>>> @kd.trace_as_fn()
... def is_smaller_than_good(x, y):
...   res = kd.assertion.with_assertion(x, x < y, 'x must be less than y')
...   return res

# Raises only in eager mode
>>> is_smaller_than_bad(kd.slice(5), kd.slice(2))
Traceback (most recent call last):
ValueError: [FAILED_PRECONDITION] x must be less than y...

# doesn't raise when traced, because the result of kd.assertion.with_assertion is not used.
>>> kd.fn(is_smaller_than_bad)(kd.slice(5), kd.slice(2))
DataItem(5, schema: INT32)

>>> is_smaller_than_good(kd.slice(5), kd.slice(2))
Traceback (most recent call last):
ValueError: [FAILED_PRECONDITION] x must be less than y...

Debug Printing with kd.with_print

Python print-statements are useful for general debugging. Since these are not traceable, Koda provides the kd.with_print operator, with prints available in both Python and C++ (stdout).

>>> def print_input(x):
...   return kd.with_print(x, 'the input is:', x)

>>> print_input(kd.slice([1, 2, 3]))
the input is: [1, 2, 3]
    DataSlice([1, 2, 3], schema: INT32, present: 3/3)

As with kd.with_assertion, the operator result must be used for it to be properly embedded into the traced Functor.

The output may be truncated by default. Use e.g. kd.slices.get_repr to control the repr behavior.

Interoperability

From_py

# Primitives
>>> kd.from_py(1)
DataItem(1, schema: OBJECT)
>>> kd.from_py(1, schema=kd.INT64)
DataItem(1, schema: INT64)
>>> kd.from_py(1.0)
DataItem(1.0, schema: OBJECT)
>>> kd.from_py(1.0, schema=kd.FLOAT64)
DataItem(1.0, schema: FLOAT64)
>>> kd.from_py('a')
DataItem('a', schema: OBJECT)
>>> kd.from_py(b'a')
DataItem(b'a', schema: OBJECT)
>>> kd.from_py(True)
DataItem(True, schema: OBJECT)
>>> kd.from_py(None) # NONE schema rather than MASK
DataItem(None, schema: OBJECT)

# Entity/Object
>>> import dataclasses
>>> @dataclasses.dataclass
... class PyObj:
...   x: int
...   y: float

>>> py_obj = PyObj(x=1, y=2.0)

>>> kd.from_py(py_obj)
DataItem(Obj(x=1, y=2.0), schema: OBJECT, bag_id:...)

>>> kd.from_py(py_obj, schema=kd.OBJECT) # Same as above
DataItem(Obj(x=1, y=2.0), schema: OBJECT, bag_id:...)

>>> s1 = kd.named_schema('Point', x=kd.INT32,
...                      y=kd.FLOAT64)
>>> kd.from_py(py_obj, schema=s1) # Entity
DataItem(Entity(x=1, y=2.0), schema: Point(x=INT32, y=FLOAT64), bag_id:...)

# Infer the schema from the Python type
>>> s2 = kd.schema.schema_from_py(PyObj)
>>> kd.from_py(py_obj, schema=s2) # Entity
DataItem(Entity(x=1, y=2.0), schema: __schema_from_py__builtins.PyObj(x=INT64, y=FLOAT32), bag_id:...)

# dict_as_obj=True
>>> py_dict = {'x': 1, 'y': 2.0}
>>> kd.from_py(py_dict, dict_as_obj=True) # Object
DataItem(Obj(x=1, y=2.0), schema: OBJECT, bag_id:...)

>>> kd.from_py(py_dict, schema=kd.OBJECT)
DataItem(Dict{...'y'=2.0...}, schema: OBJECT, bag_id:...)

>>> kd.from_py(py_dict, dict_as_obj=True,
...            schema=s1) # Entity
DataItem(Entity(x=1, y=2.0), schema: Point(x=INT32, y=FLOAT64), bag_id:...)

# List
>>> py_list = [[1, 2], [3], [4, 5]]
>>> kd.from_py(py_list)
DataItem(List[List[1, 2], List[3], List[4, 5]], schema: OBJECT, bag_id:...)
>>> s3 = kd.list_schema(kd.list_schema(kd.INT64))
>>> kd.from_py(py_list, schema=s3)
DataItem(List[List[1, 2], List[3], List[4, 5]], schema: LIST[LIST[INT64]], bag_id:...)

# Infer the schema from Python type
>>> s4 = kd.schema.schema_from_py(list[list[int]])

# Dict
>>> py_dict = {'x': 1, 'y': 2.0}
>>> kd.from_py(py_dict)
DataItem(Dict{...'y'=2.0...}, schema: OBJECT, bag_id:...)
>>> s3 = kd.dict_schema(kd.STRING, kd.FLOAT64)
>>> kd.from_py(py_dict, schema=s3)
DataItem(Dict{...'y'=2.0...}, schema: DICT{STRING, FLOAT64}, bag_id:...)

# Use provided itemids
>>> id1 = kd.new_itemid()
>>> kd.from_py(py_obj, itemid=id1)
DataItem(Obj(x=1, y=2.0), schema: OBJECT, bag_id:...)
>>> id2 = kd.new_listid()
>>> kd.from_py(py_list, itemid=id2)
DataItem(List[List[1, 2], List[3], List[4, 5]], schema: OBJECT, bag_id:...)
>>> id3 = kd.new_dictid()
>>> kd.from_py(py_dict, itemid=id3)
DataItem(Dict{...'y'=2.0...}, schema: OBJECT, bag_id:...)

# Use from_dim to treat the first X Phthon
# lists as DataSlice dimensions
>>> py_list = [[1, 2], [3], [4, 5]]
>>> kd.from_py(py_list, from_dim=2)
DataSlice([[1, 2], [3], [4, 5]], schema: OBJECT, present: 5/5)

>>> py_dicts = [{'x': 1, 'y': 2.0}, {'x': 3, 'y': 4.0}]

# Dicts as Objects
>>> kd.from_py(py_dicts, from_dim=1)
DataSlice([Dict{...'y'=2.0...}, Dict{...'y'=4.0...}], schema: OBJECT, present: 2/2, bag_id:...)
>>> kd.from_py(py_dicts, from_dim=1, schema=s3)
DataSlice([Dict{...'y'=2.0...}, Dict{...'y'=4.0...}], schema: DICT{STRING, FLOAT64}, present: 2/2, bag_id:...)

To_py

# Primitive DataItem
>>> kd.item(1.0).to_py()
1.0
>>> kd.int64(1).to_py()
1

# There is no MASK type in Python
>>> kd.present.to_py()
DataItem(present, schema: MASK)

>>> str(kd.missing.to_py())
'None'

# DataSlice of primitives
>>> py_input = [[1, 2], [3], [4, 5]]
>>> ds = kd.slice(py_input)
>>> py_list = kd.to_py(ds)
>>> assert ds.to_py() == py_list # same as above
>>> assert py_list == py_input

# Entity DataItem
>>> e = kd.new(x=1, y=2.0)
>>> e.to_py()
Obj(x=1, y=2.0)

>>> e.to_py(obj_as_dict=True)
{'x': 1, 'y': 2.0}

>>> s = kd.named_schema('Point', x=kd.INT32, y=kd.FLOAT32)
>>> e1 = kd.new(x=1, schema=s)
>>> e1.to_py(obj_as_dict=True)
{'x': 1, 'y': None}

>>> e1.to_py(obj_as_dict=True,
...          include_missing_attrs=False)
{'x': 1}

# Object DataItem
>>> o = kd.obj(x=1, y=2.0)
>>> o.to_py()
Obj(x=1, y=2.0)

>>> o.to_py(obj_as_dict=True)
{'x': 1, 'y': 2.0}

# List DataItem
>>> l = kd.list([1, 2, 3])
>>> l.to_py()
[1, 2, 3]

# Dict DataItem
>>> d = kd.dict({'a': 1, 'b': 2})
>>> py_dict = d.to_py() # {'a': 1, 'b': 2}

# Nested Entity/Object/List/Dict item
# Each item is counted as one depth
# When max_depth (default to 2) is reached,
# the item is kept as a DataItem
>>> i = kd.obj(a=kd.obj(b=kd.obj(c=1),
...                     d=kd.list([2, 3]),
...                     e=kd.dict({'f': 4})))
>>> i.to_py()
Obj(a=Obj(b=DataItem(Obj(c=1), schema: OBJECT, bag_id: ...),
d=DataItem(List[2, 3], schema: LIST[INT32], bag_id: ...),
e=DataItem(Dict{'f'=4}, schema: DICT{STRING, INT32}, bag_id: ...)))

>>> i.to_py(max_depth=3)
Obj(a=Obj(b=Obj(c=1), d=[2, 3], e={'f': 4}))

# Use -1 to convert everything to Python
>>> i.to_py(max_depth=-1)
Obj(a=Obj(b=Obj(c=1), d=[2, 3], e={'f': 4}))

# Primitive DataSlice
>>> ds = kd.slice([[1, 2], [None, 3]])
>>> ds.to_py()
[[1, 2], [None, 3]]

# Entity DataSlice
>>> ds = kd.new(a=kd.slice([1, None, 3]),
...             y=kd.slice([4, 5, None]))
>>> res = ds.to_py(); res
[Obj(a=1, y=4), Obj(a=None, y=5), Obj(a=3, y=None)]

# Note that every time the object gets its own Python class
>>> assert res[0].__class__ != ds.to_py()[0].__class__

# Object DataSlice
>>> ds = kd.obj(a=kd.slice([1, None, 3]),
...             y=kd.slice([4, 5, None]))
>>> ds.to_py()
[Obj(a=1, y=4), Obj(a=None, y=5), Obj(a=3, y=None)]

# List DataSlice
>>> lists = kd.implode(kd.slice([[1, 2], [None, 3]]))
>>> lists.to_py()
[[1, 2], [None, 3]]

# Dict DataSlice
>>> dicts = kd.dict(kd.slice([['a', 'b'], ['c']]),
...                 kd.slice([[1, None], [3]]))
>>> dicts.to_py()
[{...'b': None...}, {'c': 3}]

# Use output_class argument to specify the exact class of the output:
>>> @dataclasses.dataclass
... class SomeObj:
...   a: int | None = None
...   y: int | None = None

# For a slice we can specify the type of the elements.
>>> res = ds.to_py(output_class=SomeObj); res
[SomeObj(a=1, y=4), SomeObj(a=None, y=5), SomeObj(a=3, y=None)]
>>> assert res[0].__class__ == SomeObj
>>> kd.list([kd.new(a=2, y=6)]).to_py(output_class=list[SomeObj])
[SomeObj(a=2, y=6)]
>>> kd.list([2, 3, 4]).to_py(output_class=tuple[int,...])
(2, 3, 4)

From/To Pytree

kd.from_pytree is equivalent to kd.from_py and kd.to_pytree is equivalent to kd.to_py(obj_as_dict=True).

From/To Numpy Array

>>> import numpy as np
>>> from koladata import kd_ext
>>> npkd = kd_ext.npkd
>>> arr1 = np.array([1, 2, 0, 3], dtype=np.int32)
>>> npkd.from_array(arr1)
DataSlice([1, 2, 0, 3], schema: INT32, present: 4/4)

>>> arr2 = np.array([1, 2, 0, 3], dtype=np.int64)
>>> npkd.from_array(arr2)
DataSlice([1, 2, 0, 3], schema: INT64, present: 4/4)

>>> arr3 = np.array([1., 2., 0., 3.], dtype=np.float64)
>>> npkd.from_array(arr3)
DataSlice([1.0, 2.0, 0.0, 3.0], schema: FLOAT64, present: 4/4)

# Numpy does not support sparse array
# None is converted to nan
>>> arr4 = np.float64([1., None, 0., 3.])
>>> npkd.from_array(arr4)
DataSlice([1.0, nan, 0.0, 3.0], schema: FLOAT64, present: 4/4)

>>> ds1 = kd.int32([1, 2, 3])
>>> npkd.to_array(ds1)
array([1, 2, 3], dtype=int32)

>>> ds2 = kd.int64([1, 2, 3])
>>> npkd.to_array(ds2)
array([1, 2, 3])

>>> ds3 = kd.float32([1., 2., 3.])
>>> npkd.to_array(ds3)
array([1., 2., 3.], dtype=float32)

# Numpy does not support sparse array
# Missing is converted to nan
>>> ds4 = kd.float64([1., None, 3.])
>>> npkd.to_array(ds4)
array([ 1., nan,  3.])

# With multidimensional array:
>>> ds5 = kd.slice([[1, 2], [3, 4]])
>>> npkd.to_array(ds5)
array([[1, 2],
           [3, 4]], dtype=int32)

# Numpy does not support mixed types
# All types are converted to strings
>>> ds6 = kd.slice([1, 2.0, '3'])
>>> npkd.to_array(ds6)
array(['1', '2.0', '3'], dtype='<U32')

# Entities/objects/lists/dicts are converted to
# DataItems and stored as objects in Numpy array
>>> ds7 = kd.obj(x=kd.slice([1, 2, 3]))
>>> arr = npkd.to_array(ds7); arr
array([DataItem(Obj(x=1), schema: OBJECT, bag_id: ...),
           DataItem(Obj(x=2), schema: OBJECT, bag_id: ...),
           DataItem(Obj(x=3), schema: OBJECT, bag_id: ...)], dtype=object)

# It does support round-trip conversion
>>> ds8 = npkd.from_array(arr)
>>> assert kd.full_equal(ds7, ds8)

From/To Panda DataFrame

>>> import pandas as pd
>>> from koladata import kd_ext

>>> pdkd = kd_ext.pdkd

# Primitive conversion is almost the same as numpy
>>> df1 = pd.DataFrame({'x': [1, 2, 3]})
>>> pdkd.from_dataframe(df1)
DataSlice([Entity(x=1), Entity(x=2), Entity(x=3)], schema: ENTITY(x=INT64), present: 3/3, bag_id:...)

# Convert to objects instead of entities
>>> pdkd.from_dataframe(df1, as_obj=True)
DataSlice([Obj(x=1), Obj(x=2), Obj(x=3)], schema: OBJECT, present: 3/3, bag_id:...)

# Multi-dimension array is supported
>>> index = pd.MultiIndex.from_arrays(
...   [[0, 0, 1, 3, 3], [0, 1, 0, 0, 1]])
>>> df2 = pd.DataFrame({'x': [1, 2, 3, 4, 5]}, index=index)
>>> pdkd.from_dataframe(df2)
DataSlice([[Entity(x=1), Entity(x=2)],
[Entity(x=3)],
[],
[Entity(x=4), Entity(x=5)]], schema: ENTITY(x=INT64), present: 5/5, bag_id: ...)

# Primitive DataSlice is converted to a column
# named 'self_'
>>> ds1 = kd.slice([1, 2, 3])
>>> pdkd.to_dataframe(ds1)
   self_
    0      1
    1      2
    2      3

>>> ds2 = kd.slice([[1, 2], [3, 4]])
>>> pdkd.to_dataframe(ds2)
         self_
    0 0      1
      1      2
    1 0      3
      1      4

# Entity attributes become columns
>>> ds3 = kd.new(x=kd.slice([1, 2, 3]))
>>> pdkd.to_dataframe(ds3)
       x
    0  1
    1  2
    2  3

# Union of object attributes become columns
>>> ds4 = kd.slice([kd.obj(x=1), kd.obj(y=2),
...                 kd.obj(x=3, y=4)])
>>> pdkd.to_dataframe(ds4)
          x     y
    0     1  <NA>
    1  <NA>     2
    2     3     4

# Use 'cols' to specify the columns
# Can use attribute names or Exprs
>>> ds5 = kd.new(x=kd.slice([1, 2, 3]),
...              y=kd.slice([4, 5, 6]))
>>> pdkd.to_dataframe(ds5, cols=['x'])
       x
    0  1
    1  2
    2  3
>>> pdkd.to_dataframe(ds5, cols=[kd.S.y])
       S.y
    0    4
    1    5
    2    6

From/To Json

# Parse any JSON primitive or container, or a
# mixture. Uses OBJECT schema by default.
>>> kd.from_json('null')
DataItem(None, schema: OBJECT, bag_id:...)

>>> kd.from_json('true')
DataItem(True, schema: OBJECT, bag_id:...)
>>> kd.from_json('false')
DataItem(False, schema: OBJECT, bag_id:...)
>>> kd.from_json('1')
DataItem(1, schema: OBJECT, bag_id:...)
>>> kd.from_json('1.1')
DataItem(1.1, schema: OBJECT, bag_id:...)
>>> kd.from_json('"a"')
DataItem('a', schema: OBJECT, bag_id:...)
>>> kd.from_json('"\u2728"')
DataItem('✨', schema: OBJECT, bag_id:...)
>>> kd.from_json('["x", "y"]')
DataItem(List['x', 'y'], schema: OBJECT, bag_id:...)

# The input can be a DataSlice of strings.
# The output will be a DataSlice of the same
# shape.
>>> kd.from_json(kd.slice(['1', '"a"']))
DataSlice([1, 'a'], schema: OBJECT, present: 2/2, bag_id:...)

# Use an explicit schema to validate, filter, and
# convert while parsing.
>>> kd.from_json('1', schema=kd.FLOAT32)
DataItem(1.0, schema: FLOAT32, bag_id:...)
>>> kd.from_json('1', schema=kd.STRING)
DataItem('1', schema: STRING, bag_id:...)
>>> kd.from_json('false', schema=kd.MASK)
DataItem(missing, schema: MASK, bag_id:...)
>>> kd.from_json(
...     '{"x": 1}',
...     schema=kd.dict_schema(kd.STRING, kd.INT32)
... )
DataItem(Dict{'x'=1}, schema: DICT{STRING, INT32}, bag_id:...)
>>> kd.from_json(
...     '{"x": 1}',
...     schema=kd.schema.new_schema(x=kd.INT32)
... )
DataItem(Entity(x=1), schema: ENTITY(x=INT32), bag_id:...)

# JSON objects' key order is recorded by default.
>>> kd.from_json('{"x": 1, "a": 2, "y": 3}')
DataItem(Obj(a=2, json_object_keys=List['x', 'a', 'y'], json_object_values=List[1, 2, 3], x=1, y=3), schema: OBJECT, bag_id:...)

# Handle invalid inputs instead of throwing
# exceptions.
>>> kd.from_json('not valid', on_invalid=':(')
DataItem(':(', schema: OBJECT, bag_id:...)
>>> kd.from_json('1', schema=kd.MASK, on_invalid='?')
DataItem('?', schema: OBJECT, bag_id:...)

# Round-trip Koda -> JSON -> Koda. Schema must
# be preserved separately.
>>> a = kd.new(x=1, y=kd.list([2, 3]))
>>> b = kd.from_json(kd.to_json(a),
...                  schema=a.get_schema())
>>> assert a.to_py() == b.to_py()
# Format any JSON-compatible Koda value.
>>> kd.to_json(None)
DataItem(None, schema: STRING)
>>> kd.to_json(kd.present)
DataItem('true', schema: STRING)
>>> kd.to_json(kd.missing)
DataItem(None, schema: STRING)
>>> kd.to_json(True)
DataItem('true', schema: STRING)
>>> kd.to_json(False)
DataItem('false', schema: STRING)
>>> kd.to_json(123)
DataItem('123', schema: STRING)
>>> kd.to_json(1.23)
DataItem('1.23...', schema: STRING)
>>> kd.to_json(float('inf'))
DataItem('"inf"', schema: STRING)
>>> kd.to_json('foo')
DataItem('"foo"', schema: STRING)
>>> kd.to_json('✨')
DataItem('"\\u2728"', schema: STRING)
>>> kd.to_json(kd.list([1, 2, 3]))
DataItem('[1, 2, 3]', schema: STRING)
>>> kd.to_json(
...     kd.implode(kd.slice([[1], [2, 3]]), -1)
... )
DataItem('[[1], [2, 3]]', schema: STRING)
>>> kd.to_json(kd.dict({'x': 1, 'y': 2}))
DataItem('{"x": 1, "y": 2}', schema: STRING)
>>> kd.to_json(kd.new(x=1, y=2))
DataItem('{"x": 1, "y": 2}', schema: STRING)

# The input can be a DataSlice.
# The output will be a DataSlice of strings
# with the same shape.
>>> kd.to_json(kd.slice([[1], [2, 3]]))
DataSlice([['1'], ['2', '3']], schema: STRING, present: 3/3)

# Control indentation and padding.
>>> value = kd.new(x=1, y=2, z=kd.dict({'a': 'b'}))
>>> kd.to_json(value, indent=0)
DataItem('{\n"x": 1,\n"y": 2,\n"z": {\n"a": "b"\n}\n}', schema: STRING)
>>> kd.to_json(value, indent=2)
DataItem('{\n  "x": 1,\n  "y": 2,\n  "z": {\n    "a": "b"\n  }\n}', schema: STRING)

# Control unicode escaping.
>>> kd.to_json('✨')
DataItem('"\\u2728"', schema: STRING)
>>> kd.to_json('✨', ensure_ascii=False)
DataItem('"✨"', schema: STRING)

# Precisely control JSON object keys and values.
>>> kd.to_json(kd.new(
...     x=1,
...     y=2,
...     json_object_keys=kd.list(['y', 'x'])
... ))
DataItem('{"y": 2, "x": 1}', schema: STRING)
>>> kd.to_json(kd.new(
...     json_object_keys=kd.list(['y', 'x', 'y']),
...     json_object_values=kd.list([1, 2, 3]),
... ))
DataItem('{"y": 1, "x": 2, "y": 3}', schema: STRING)

# Round-trip JSON -> Koda -> JSON. Padding and
# number formatting differences are lost.
>>> a = kd.str('{"x": 1, "z": {"a": "b"}, "y": 2}')
>>> b = kd.to_json(kd.from_json(a))
>>> assert a.to_py() == b.to_py()

From/To Bytes (a.k.a. Serialization)

# Serialize DataSlice into bytes
>>> ds = kd.slice([1, 2, 3])
>>> s = kd.dumps(ds)
>>> ds = kd.loads(s)

# Serialize DataBag into bytes
>>> db = kd.new(x=1, y=2).get_bag()
>>> s = kd.dumps(db)
>>> db = kd.loads(s)

Bags of Attributes

DataBag

# Empty immutable bag creation
>>> db_immutable = kd.bag()
>>> assert not db_immutable.is_mutable()

# Mutable bag creation
>>> db1 = kd.mutable_bag()
>>> assert db1.is_mutable()

# Get the bag from the DataSlice
>>> obj = kd.uuobj(x=1, y=2)
>>> db2 = obj.get_bag()

# Bag created from item creation APIs
# under kd is immutable
>>> assert not db2.is_mutable()

# Attach a bag to the DataSlice
>>> obj1 = obj.with_bag(db1)

# Remove the bag from the DataSlice
>>> obj2 = obj1.no_bag()

# Approximate number of triples in the bag
>>> db2.get_approx_size()
5

# Print quick stats about the bag
>>> repr(db2)
'DataBag...'

# Print out data and schema triples
>>> db2.contents_repr()
DataBag...
SchemaBag...

# Print out data triples only
>>> db2.data_triples_repr()
DataBag...

# Print out schema triples only
>>> db2.schema_triples_repr()
SchemaBag...

# Create a new mutable bag by forking the bag
>>> db3 = db2.fork()
>>> assert db3.is_mutable()

# Create a new immutable bag by freezing the bag
>>> db4 = db3.freeze()
>>> assert not db4.is_mutable()

Merging DataBags

# Merge two bags by creating a new bag
# with db1 and db2 as fallbacks
# db2 overrides db1 in terms of conflicts
>>> _ = db1 << db2
>>> _ = kd.updated_bag(db1, db2) # Same as above

# Can be chained together
>>> _ = db1 << db2 << db3
>>> _ = kd.updated_bag(db1, db2, db3) # Same as above

# Merge two bags by creating a new bag
# with db2 and db1 as fallbacks
# db1 overrides db2 in terms of conflicts
>>> _ = db1 >> db2
>>> _ = kd.enriched_bag(db1, db2) # Same as above

# Can be chained together
>>> _ = db1 >> db2 >> db3
>>> _ = kd.enriched_bag(db1, db2, db3) # Same as above

# Merge db2 into db1 in place
# db2 overrides db1 in terms of conflicts
# schema conflicts are not allowed
>>> _ = db1.merge_inplace(db2)

# Merge db2 and db3 into db1 in place
>>> _ = db1.merge_inplace([db2, db3])

# Do not overwrite
>>> _ = db1.merge_inplace(db2, overwrite=False)

# Allow schema conflicts
>>> _ = db1.merge_inplace(db2, allow_schema_conflicts=True)

# Disallow data conflicts
>>> _ = db1.merge_inplace(db2, allow_data_conflicts=False)

# Merge db and its fallbacks into a single bag in place
# It improves lookup performance (e.g. accessing attrs,
# list items) but can be expensive to merge all bags
>>> db3 = db.merge_fallbacks()

Extract/Clone

>>> s1 = kd.uu_schema(x=kd.INT32, y=kd.INT32)
>>> s2 = kd.uu_schema(z=s1, w=s1)

>>> i1 = s2.new(z=s1.new(x=1, y=2),
...             w=s1.new(x=3, y=4))

# extract creates a copy of i1 in a new
# bag with the same ItemIds
>>> i2 = i1.extract()
>>> assert i2.get_itemid() == i1.get_itemid()
>>> assert i2.z.get_itemid() == i1.z.get_itemid()

# Extract a subset of attributes
>>> s3 = kd.uu_schema(w=s1)
>>> i3 = i1.extract(schema=s3)

>>> assert not i3.has_attr('z')

# clone creates a copy of i1 in with a new ItemId
# and keep same ItemIds for attributes
>>> i4 = i1.clone()
>>> assert i4.get_itemid() != i1.get_itemid()
>>> assert i4.z.get_itemid() == i1.z.get_itemid()
>>> kd.dir(i4.z)
['x', 'y']

# use specific ItemIds instead of creating new ones
>>> id1 = kd.new_itemid()
>>> i5 = i1.clone(itemid=id1)
>>> assert i5.get_itemid() == id1
>>> assert i5.z.get_itemid() == i1.z.get_itemid()
>>> kd.dir(i5.z)
['x', 'y']

# shallow_clone creates a copy of i1 in with a new
# ItemId and keep same ItemIds for top-level attributes
>>> i6 = i1.shallow_clone()
>>> assert i6.get_itemid() != i1.get_itemid()
>>> assert i6.z.get_itemid() == i1.z.get_itemid()
>>> kd.dir(i6.z)
[]

# use specific ItemIds instead of creating new ones
>>> id2 = kd.new_itemid()
>>> i7 = i1.shallow_clone(itemid=id2)
>>> assert i7.get_itemid() == id2
>>> assert i7.z.get_itemid() == i1.z.get_itemid()
>>> kd.dir(i7.z)
[]

# deep_clone creates a copy of i1 and its attributes
# with new ItemIds
>>> i8 = i1.deep_clone()
>>> assert i8.get_itemid() != i1.get_itemid()
>>> assert i8.z.get_itemid() != i1.z.get_itemid()
>>> kd.dir(i8.z)
['x', 'y']

# deep_clone does not support itemid= argument

Stub/Ref

A Entity/Object/Dict stub is an empty Entity/Object/Dict with necessary metadata needed to describe its schema in a new bag. A List stub is a list with list item stubs in a new bag. It can be used to add new data to the same item in the newly created bag which can merged into the original data.

A Entity/Object/List/Dict ref is a reference to the same Entity/Object/List/Dict without a bag attached.

>>> obj = kd.obj(x=1, y=kd.obj(z=1))

# Create a obj stub without attributes
>>> obj2 = obj.stub()
>>> assert obj2.get_itemid() == obj.get_itemid()
>>> assert kd.dir(obj2, intersection=False) == []

# Create a list stub with list item stubs
>>> l = kd.list([kd.obj(x=1), kd.obj(x=2)])
>>> l2 = l.stub()
>>> assert l2.get_itemid() == l.get_itemid()

# List items are stubbed as well
>>> assert kd.agg_all(l2[:] == l[:])
>>> assert kd.dir(l2[:], intersection=False) == []

# Create a dict stub without entries
>>> d = kd.dict({'a': kd.obj(x=1), 'b': kd.obj(x=2)})
>>> d2 = d.stub()
>>> assert d2.get_itemid() == d.get_itemid()
>>> assert kd.dict_size(d2) == 0

# Get a ref
>>> obj_ref = obj.ref()
>>> assert obj_ref.get_itemid() == obj.get_itemid()
>>> assert not obj_ref.has_bag()

Tracing (a.k.a JIT Compilation)

Tracing

Tracing is a process of transforming computation logic represented by a Python function to a computation graph represented by a Koda Functor. Tracing provides a smooth transition from eager workflow useful for interactive iteration to lazy workflow which can have better performance and path to serving.

Trace a Python Function

To trace a Python function, we can use kd.trace_py_fn(f). Variadic arguments are not supported for tracing.

>>> a1 = kd.slice([1, 2])
>>> a2 = kd.slice([2, 3])
>>> b = kd.item('b')
>>> c = kd.item('c')

>>> def f1(a, b, c):
...   return kd.cond(kd.all(a > 1), b, c)

>>> f1(a1, b, c)
DataItem('c', schema: STRING)
>>> f1(a2, b, c)
DataItem('b', schema: STRING)

>>> f = kd.trace_py_fn(f1)

>>> f(a=a1, b=b, c=c)
DataItem('c', schema: STRING)
>>> f(a=a2, b=b, c=c)
DataItem('b', schema: STRING)

>>> def f2(a, b, c):
...   return b if kd.all(a > 1) else c

>>> f2(a1, b, c)
DataItem('c', schema: STRING)
>>> f2(a2, b, c)
DataItem('b', schema: STRING)

# Fail because f2 use "if" statement and
# the tracer I.a Expr cannot be used in "if"
>>> kd.trace_py_fn(f2)
Traceback (most recent call last):
TypeError: __bool__ disabled for 'arolla.abc.expr.Expr'...

# Use positional only and keyword only arguments
>>> def f3(pos_only, /, pos_or_kw, *, kwonly):
...   return kd.cond(kd.all(pos_only > 1), pos_or_kw, kwonly)

>>> f3(a1, b, kwonly=c)
DataItem('c', schema: STRING)
>>> f3(a2, pos_or_kw=b, kwonly=c)
DataItem('b', schema: STRING)

>>> f = kd.trace_py_fn(f3)
>>> f(a1, b, kwonly=c)
DataItem('c', schema: STRING)
>>> f(a2, pos_or_kw=b, kwonly=c)
DataItem('b', schema: STRING)

# Variadic arguments are not supported for tracing
>>> def f4(*args):
...   pass

# Fails to trace
>>> kd.trace_py_fn(f4)
Traceback (most recent call last):
ValueError: Failed to trace the function...Variadic arguments are only supported in tracing when they are actually not used in the function body...

>>> def f5(**kwargs):
...   pass

# Fails to trace
>>> kd.trace_py_fn(f5)
Traceback (most recent call last):
ValueError: Failed to trace the function...Variadic arguments are only supported in tracing when they are actually not used in the function body...

# Variadic argument can be added to allow pass
# unused arguments
>>> def f6(a1, a2, *unused):
...   return a1 + a2

# 'c' is unused
>>> kd.trace_py_fn(f6)(a1, a2, c)
DataSlice([3, 5], schema: INT32, present: 2/2)

How tracing works?

There are three execution modes in Koda:

  • Eager (i.e. kd.eager): always execute eagerly
  • Auto (i.e. kd): execute eagerly in normal context or lazily in tracing mode
  • Lazy (i.e. kd.lazy): return Expr as result which can be executed lazily

kd.eager and kd are similar to np and jnp in JAX, correspondingly. There is no pure lazy model (i.e. kd.lazy) in JAX as JAX does not directly expose lazy operators.

To simplify the mental model and avoid unexpected surprises, tracing in Koda works the same way as the just-in-time (JIT) compilation in JAX. It involves three steps:

  • Functor signature is derived from Python function signature
  • The Python function is called with tracers (just replace arg with I.arg) and the result is a Koda Expr
  • A Functor is created from the Expr and function signature

It is different from TensorFlow 2 which analyzes the Python AST and is able to trace Python control logic (e.g. if/for). Therefore, users should primarily use kd operators in the Python function and try to avoid Python control logic (e.g. if/for) and kd.lazy/kd.functor operators unless understanding what they actually do.

In most cases, users only need kd for eager and tracing mode and kd.lazy for if they want to use Expr directly. kd.eager is only useful for eager execution in the tracing mode.

>>> def f1(a):
...   b = kd.item(1) + kd.item(2)
...   return kd.math.add(a, b)

>>> f1(1)
DataItem(4, schema: INT32)

# Returns Functor(return=I.a + 1 + 2)
>>> kd.trace_py_fn(f1)
DataItem(Functor f1[a](returns=(I.a + (DataItem(1, schema: INT32) + DataItem(2, schema: INT32))...)
>>> kd.trace_py_fn(f1)(a=1)
DataItem(4, schema: INT32)

>>> def f2(a):
...   b = kd.eager.item(1) + kd.eager.item(2)
...   return kd.math.add(a, b)

>>> f2(1)
DataItem(4, schema: INT32)

# Returns Functor(return=I.a + 3)
>>> kd.trace_py_fn(f2)
DataItem(Functor f2[a](returns=(I.a + DataItem(3, schema: INT32))📍), schema: OBJECT, bag_id:...)

>>> kd.trace_py_fn(f2)(a=1)
DataItem(4, schema: INT32)

Invoke Another Function in a Traced Function

If we want to trace the invoked function, need to add @trace_as_fn decorator to that function. Otherwise, it is inlined into the traced function.

>>> def fn1(a, b):
...   x = a + 1
...   y = b + 2
...   z = 3
...   return x + y + z

>>> def fn2(c):
...   return fn1(c, 1)

# The content of fn1 is inlined
>>> kd.trace_py_fn(fn2)
DataItem(Functor fn2[c](
      returns=(((I.c + DataItem(1, schema: INT32))📍 + DataItem(3, schema: INT32))📍 + DataItem(3, schema: INT32))📍,
    ), schema: OBJECT, bag_id:...)

>>> @kd.trace_as_fn()
... def fn3(a, b):
...   x = a + 1
...   y = b + 2
...   z = 3
...   return x + y + z

>>> def fn4(c):
...   return fn1(c, 1)

# fn3 is traced into a separate functor
>>> kd.trace_py_fn(fn4)
DataItem(Functor fn4[c](
      returns=(((I.c + DataItem(1, schema: INT32))📍 + DataItem(3, schema: INT32))📍 + DataItem(3, schema: INT32))📍,
    ), schema: OBJECT, bag_id: ...)

Avoid tracing Some Operations

If some operators should always be executed eagerly, use kd.eager instead of kd.

>>> def f1(a):
...   b = kd.eager.item(1) + kd.eager.item(2)
...   return a + b

>>> kd.trace_py_fn(f1)
DataItem(Functor f1[a](returns=(I.a + DataItem(3, schema: INT32))📍), schema: OBJECT, bag_id:...)

Avoid tracing Inside a Python Function

If you want to treat a Python function as a whole and execute statements in the function line by line, you can @kd.trace_as_fn(functor_factory=kd.py_fn) decorator to the Python function. It allows you to use any Python codes including if, for or sending RPC which do not work for tracing at the cost of losing all tracing benefits.

>>> a1 = kd.slice([1, 2])
>>> a2 = kd.slice([2, 3])
>>> b = kd.item('b')
>>> c = kd.item('c')

>>> @kd.trace_as_fn(functor_factory=kd.py_fn)
... def f1(a, b, c):
...   return b if kd.all(a > 1) else c

>>> f1(a1, b, c)
DataItem('c', schema: STRING)

>>> f1(a2, b, c)
DataItem('b', schema: STRING)

>>> traced_f1 = kd.trace_py_fn(f1)
>>> traced_f1(a1, b, c)
DataItem('c', schema: STRING)
>>> traced_f1(a2, b, c)
DataItem('b', schema: STRING)

# You can call f1 in another traced function
>>> def f2(a1, a2, b, c):
...   return f1(a1, b, c) + f1(a2, b, b)

>>> traced_f2 = kd.trace_py_fn(f2)
>>> traced_f2(a1, a2, b, c)
DataItem('cb', schema: STRING)

Improve Structure/Readability of Resulting Functor

By default, names of local variables are lost during tracing. To preserve these names, we can use kd.with_name. In the eager mode, kd.with_name is a no-op. In the lazy mode, it is just kd.lazy.with_name which adds a name annotation to the resulting Expr.

>>> def fn1(a, b):
...   x = a + 1
...   y = b + 2
...   z = 3
...   return x + y + z


# Note: the "📍" symbols in the expressions above are concise representations
# of `annotation.source_location` operators attached automatically during
# tracing to improve traceback in case of evaluation errors.

>>> kd.trace_py_fn(fn1)
DataItem(Functor fn1[a, b](
      returns=(((I.a + DataItem(1, schema: INT32))📍 + (I.b + DataItem(2, schema: INT32))📍)📍 + DataItem(3, schema: INT32))📍,
    ), schema: OBJECT, bag_id:...)

>>> def fn2(a, b):
...   x = kd.with_name(a + 1, 'x')
...   y = kd.with_name(b + 2, 'y')
...   z = 3
...   return x + y + z

>>> kd.trace_py_fn(fn2)
DataItem(Functor fn2[a, b](
      returns=((V.x📍 + V.y📍)📍 + DataItem(3, schema: INT32))📍,
      x=(I.a + DataItem(1, schema: INT32))📍,
      y=(I.b + DataItem(2, schema: INT32))📍,
    ), schema: OBJECT, bag_id:...)

Advanced: Expr

Most Koda users should need Exprs rarely and they should use tracing to convert eager logic into computation graph. Expr can still be useful for advanced users who want to express/manipulate computation logic (e.g. ranking/score logic) directly.

A Koda Expr is a tree-like structure representing computation logic. More precisely, it is a DAG (Directed Acyclic Graph) consisting of input nodes, literal nodes, and operator nodes.

Input nodes are terminal nodes representing data (e.g. DataSlices) to be provided at Expr evaluation time. In Koda, inputs are denoted as I.input_name. I.self.input_name or S.input_name for short is a special input node to allow using positional argument in kd.eval. See the Evaluating Koda Expr section for details.

Literal nodes are terminal nodes representing data (e.g. DataSlices, primitives) provided at Expr creation time.

Operator nodes are internal nodes representing an operation to be performed on the values of sub-Exprs. For example, I.a + I.b returns an Expr representing a + operation, with two input nodes as its children. Koda Expr provides a comprehensive list of operators for basic operations under kd.lazy module.

Tip: Almost all operators from kd module have their corresponding Expr version in kd.lazy module. A few exceptions (e.g. kd.eval(expr), kd.expr.pack_expr(expr), etc.) should be self-explanatory based on the context.

Useful Aliases

>>> I = kd.I
>>> V = kd.V
>>> S = kd.S

Creating Koda Expr

# Create an Expr to represent a + b
>>> expr1 = I.a + I.b

# which is equivalent to
>>> expr2 = kd.lazy.math.add(I.a, I.b)

# We can also use literals
>>> expr3 = I.a + 1

# Build Expr by composing other Exprs
>>> add_ab = I.a + I.b
>>> weighted_ab = I.w * add_ab
>>> score = kd.lazy.agg_sum(weighted_ab)

# Print out an Expr
>>> score
kd.agg_sum(I.w * (I.a + I.b), unspecified)

# Use [] for list explosion or dict lookup
>>> a = kd.slice([1, 2, 3])
>>> kd.eval(kd.lazy.implode(I.a)[:], a=a)
DataSlice([1, 2, 3], schema: INT32, present: 3/3, bag_id:...)
>>> b = kd.dict({1: 2, 3: 4})
>>> kd.eval(I.b[I.a], a=a, b=b)
DataSlice([2, None, 4], schema: INT32, present: 2/3, bag_id:...)

# Use . for accessing attributes on objects
>>> d = kd.obj(x=1, y=2)
>>> kd.eval(I.d.x + I.d.y, d=d)
DataItem(3, schema: INT32)

# Use () for functor calls
>>> kd.eval(I.f(a=1, b=2), f=kd.fn(I.a + I.b))
DataItem(3, schema: INT32)

# Check if it is an Expr
>>> assert kd.is_expr(expr1)

Evaluating Koda Expr

>>> expr = I.a + I.b
>>> res1 = kd.eval(expr, a=kd.slice([1, 2, 3]), b=kd.item(2)); res1
DataSlice([3, 4, 5], schema: INT32, present: 3/3)

# which is equivalent to
>>> res2 = expr.eval(a=kd.slice([1, 2, 3]), b=kd.item(2))
>>> kd.testing.assert_equivalent(res1, res2)

# inputs can be bound to the same DataSlice
>>> x = kd.slice([1, 2, 3])
>>> kd.eval(expr, a=x, b=x)
DataSlice([2, 4, 6], schema: INT32, present: 3/3)

# use positional argument
>>> (I.self * 2).eval(3)
DataItem(6, schema: INT32)
>>> res1 = (I.self.a * I.self.b).eval(kd.obj(a=1, b=2)); res1
DataItem(2, schema: INT32)

# Use S as shortcut for I.self
>>> res2 = (S.a * S.b).eval(kd.obj(a=1, b=2))
>>> kd.testing.assert_equivalent(res1, res2)

# Mix positional and keyword arguments
>>> (S.a * S.b * I.c).eval(kd.obj(a=1, b=2), c=3)
DataItem(6, schema: INT32)

Packing/Unpacking Koda Expr to/from DataItem

>>> add_ab = I.a + I.b
>>> weighted_ab = I.w * add_ab
>>> score = kd.lazy.agg_sum(weighted_ab)

# Pack Expr into a DataItem
>>> packed_expr = kd.expr.pack_expr(score)

# Packed Expr is a DataItem with schema EXPR
>>> assert not kd.is_expr(packed_expr)
>>> assert kd.is_item(packed_expr)
>>> assert packed_expr.get_schema() == kd.EXPR

# Creates a task object containing
# both data and expr
>>> task = kd.obj(expr=packed_expr,
...               a=kd.list([1, 2, 3]),
...               b=kd.list([4, 5, 6]),
...               w=kd.list([0.1, 0.2, 0.3]))

# Unpack Expr
>>> _= kd.eval(kd.expr.unpack_expr(task.expr),
...         a=task.a[:], b=task.b[:],
...         w=task.w[:])

Substituting Sub-Exprs

>>> expr1 = I.a + I.b
>>> expr2 = I.b + I.c
>>> expr3 = expr1 * expr2

# Substitute by providing one sub pair
>>> kd.expr.sub(expr3, I.a, I.d)
(I.d + I.b) * (I.b + I.c)

# which is equivalent to
>>> expr3.sub(I.a, I.d)
((I.a + I.b) * (I.b + I.c)).sub(I.a, I.d)

# Substitute by providing multiple sub pairs
>>> kd.expr.sub(expr3, (I.a, I.d), (I.b, I.c))
(I.d + I.c) * (I.c + I.c)

# Note the substitution is done by traversing
# expr post-order and comparing fingerprints
# of sub-Exprs in the original expression and
# these in sub pairs
>>> kd.expr.sub(I.x + I.y,
...            (I.x, I.z), (I.x + I.y, I.k))
I.k

>>> kd.expr.sub(I.x + I.y,
...            (I.x, I.y), (I.y + I.y, I.z))
I.y + I.y

Substituting Input Nodes

>>> expr = (I.a + I.b) * I.c

# Substitute by other inputs
>>> new_expr = kd.expr.sub_inputs(expr, b=I.c, c=I.a)

# which is equivalent to
>>> new_expr = expr.sub_inputs(b=I.c, c=I.a)

# Substitute by Exprs
>>> new_expr = kd.expr.sub_inputs(expr, c=I.a + I.b)

# Substitute by literals
>>> new_expr = kd.expr.sub_inputs(expr, b=kd.slice([2, 4]))

Defining Custom Operators

# Create a lambda operator 'score'
# under the existing namespace 'kd.core'
>>> @kd.optools.add_to_registry()
... @kd.optools.as_lambda_operator('kd.core.score')
... def score(a, b, w):
...   return kd.agg_sum(w * (a + b))

>>> a = kd.slice([1, 20, 3000])
>>> b = kd.slice([2, 20, 4000])
>>> w = kd.slice([1, 1, 0.1])
>>> kd.eval(score(I.a, I.b, I.w),
...         a=a, b=b, w=w)
DataItem(743.0, schema: FLOAT32)

# It can be used as eager op too
>>> kd.core.score(a, b, w)
DataItem(743.0, schema: FLOAT32)

# We can also add the operator to a
# different namespace 'E'
>>> @kd.optools.add_to_registry()
... @kd.optools.as_lambda_operator('E.my_func')
... def my_func(a):
...   return kd.math.add(a, a)


# Create the operator container 'E'
>>> from arolla import arolla
>>> E = arolla.OperatorsContainer(unsafe_extra_namespaces=['E']).E
>>> kd.eval(E.my_func(I.a), a=1)
DataItem(2, schema: INT32)

>>> expect_data_slice = kd.optools.constraints.expect_data_slice

# Create an operator based on Py function
>>> @kd.optools.add_to_registry()
... @kd.optools.as_py_function_operator(
...     'kd.core.fn',
...     qtype_constraints=[
...         expect_data_slice(arolla.P.a),
...         expect_data_slice(arolla.P.pos)
...     ],
...     qtype_inference_expr=kd.qtypes.DATA_SLICE,
... )
... def fn(a, pos):
...   return kd.slice(a.S[pos].to_py() + 1)

>>> kd.eval(kd.lazy.core.fn(kd.slice([1, 2, 3]), 1))
DataItem(3, schema: INT32)

# Register operator alias so that
# kd.math.add is same as E.Add
>>> kd.optools.add_alias('kd.math.add', 'E.Add')

Advanced: Functor

Most Koda users should need to create Functors rarely and they should use tracing to convert eager logic into Functors.

A Koda Functor is a special DataItem containing a single item representing a callable function. It can be called/run using kd.call(fn, **inputs) or simply fn(**inputs).

Creating Koda Functor

Koda Functors can be created from the following objects:

  • Koda Expr (e.g. kd.functor.expr_fn(expr, **vars))
  • Format string similar to Python f-string f'text {input:s} text' (e.g. kd.functor.fstr_fn(str_fmt, **vars))
  • Python function (e.g. kd.trace_py_fn(pfn)) through tracing
  • Python function (e.g. kd.py_fn(pfn, **vars), kd.trace_py_fn(pfn, tracing=False)) by wrapping the Python function directly

kd.fn(fn_obj, **vars) is the universal adaptor for kd.functor.expr_fn, and kd.trace_py_fn where fn_obj can be expr or py_fn.

Conceptually, creating a functor is equivalent to declaring a Python function where the function signature consists of a function name and parameters and the function body consists of local variable assignments and a return statement.

  • Koda functor does not have a functor name, which is similar to a Python lambda function
  • Koda functor parameters are not explicitly declared but automatically derived from fn_obj and **vars
  • Local variables are notated as V.var_name, variable assignments are done through **vars in kd.fn and a variable can refer to other variables
  • Return statement is represented as fn_obj in kd.fn and can refer to local variables
# Create a Functor from Expr
>>> fn1 = kd.functor.expr_fn(I.a + I.b)

# Check if it is a Functor
>>> assert kd.is_fn(fn1)

# Create a Functor with local variables
>>> fn2 = kd.functor.expr_fn(V.a + I.b, c=I.d, a=V.c + I.d)

# Create a Functor from format string
>>> fn3 = kd.functor.fstr_fn(
...   f'a:{I.a:s} + b:{I.b:s} = {(I.a + I.b):s}')

# Create a Functor with local variables
>>> fn4 = kd.functor.fstr_fn(
...   f'{I.s.name:s} ({V.names:s})\n',
...   names=kd.lazy.strings.agg_join(I.s.cities[:].name, ', '))
>>> fn5 = kd.functor.fstr_fn(f'A: {V.fn(s=I.s1):s}'
...                          f'B: {V.fn(s=I.s2):s}',
...                          fn=fn4)

>>> def foo(a, b):
...   return a.get_size() + b.get_size()

# Create a Functor from Py function
>>> fn6 = kd.trace_py_fn(foo)  # with tracing
>>> fn7 = kd.py_fn(foo)  # no tracing

# Equivalent to the above
>>> fn7 = kd.trace_py_fn(foo, tracing=False)

# With default value for 'y'
>>> fn8 = kd.py_fn(lambda x, y: x + y, y=1)

# Use the universal adapter kd.fn
>>> fn9 = kd.fn(I.a + I.b)
>>> fn10 = kd.fn(lambda x, y: x + y)
>>> fn11 = kd.fn(fn9)  # no-op

# Create a functor calling another functor
>>> fn12 = kd.fn(kd.lazy.call(fn1, a=I.c, b=I.d))
>>> fn13 = kd.functor.fstr_fn(
...   f'result {V.f(a=I.c, b=I.d):s}', f=fn1)

Calling Koda Functor

>>> f = kd.fn(V.a + I.b, c=I.d, a=V.c + I.d)

# Pass inputs as **kwargs
# Preferred when input names are static
>>> f(b=2, d=3)
DataItem(8, schema: INT32)
>>> kd.call(f, b=2, d=3) # Same as above
DataItem(8, schema: INT32)

>>> # Call a functor from another functor
>>> f = kd.fn(I.x + I.y)
>>> g = kd.fn(V.nf(x=I.x, y=2), nf=f)
>>> g(x=1)
DataItem(3, schema: INT32)

# Alternative
>>> g = kd.fn(kd.lazy.call(V.nf, x=I.x, y=2), nf=f)
>>> g(x=1)
DataItem(3, schema: INT32)

# Functors can also be called using `fstr_fn`
>>> f = kd.fn(I.x + I.y)
>>> g = kd.functor.fstr_fn(f'result: {I.f(x=I.x, y=V.y):s}',
...                 y=1)
>>> g(f=f, x=1)
DataItem('result: 2', schema: STRING)

Partially Binding Koda Functor Parameters

>>> fn1 = kd.fn(I.a + I.b)

# Bind inputs with default values
>>> fn2 = fn1.bind(a=1)

>>> fn2(a=3, b=2)
DataItem(5, schema: INT32)
>>> fn2(b=2)
DataItem(3, schema: INT32)

# We can also bind positionally.
>>> fn_add = kd.fn(lambda x, y: x + y)
>>> fn_add_5 = fn_add.bind(5)  # 5 is bound to x
>>> fn_add_5(6)
DataItem(11, schema: INT32)
>>> fn_add_5(y=6)
DataItem(11, schema: INT32)

# Exprs can also be bound to arguments.
>>> fn3 = fn1.bind(a=I.b + 1)
>>> fn3(b=2)
DataItem(5, schema: INT32)

# Binding DataItem params also works.
>>> fn4 = fn1.bind(a=kd.item(1))

# Raise because binding a DataSlice is not supported
>>> fn5 = fn1.bind(a=kd.slice([1, 2]))
Traceback (most recent call last):
...
ValueError: variable [a] must be a data item, but has shape: JaggedShape(2)

# We have to use kd.list instead
>>> fn5 = fn1.bind(a=kd.list([1, 2]))

map_py_fn Functor

Also see kd.map_py() above.

# Pointwise
>>> _ = kd.functor.map_py_fn(lambda x, y: x + y)

# Aggregation
>>> _ = kd.functor.map_py_fn(lambda x: len(x), ndim=1)

# Aggregaional but no dimension change
>>> _ = kd.functor.map_py_fn(lambda x: sorted(x), ndim=1)

# Expansion
>>> _ = kd.functor.map_py_fn(lambda x: [x] * 10)

Advanced: Iterables Operators

Koda Iterables are sequences of items with the same type (e.g. DataSlice, DataBag). Iterable is a higher level concept above DataSlice/DataBag. It is primarily used for streaming and parallel computation.

Note the differences between Iterables and DataSlices are:

Create Iterables

Iterables can be created directly from individual items directly or by yielded from kd.for_ and kd.while_ (see the sections below).

# create from individual items
# 1/2/3/4 are wrapped into DataItems
>>> kd.iterables.make(1, 2, 3, 4)
ITERABLE[DATA_SLICE]{sequence(DataItem(1, schema: INT32), DataItem(2, schema: INT32), DataItem(3, schema: INT32), DataItem(4, schema: INT32), value_qtype=DATA_SLICE)}

# Iterable of DataBags
>>> o = kd.obj()
>>> kd.iterables.make(kd.attrs(o, a=1),
...                   kd.attrs(o, b=2))
ITERABLE[DATA_BAG]{sequence(DataBag ...:
  ..., DataBag ...:
  ..., value_qtype=DATA_BAG)}

# unordered may improve performance of
# parallel computation because items are
# added whenever they are ready
>>> def expensive_computation(x):
...   for i in range(1000):
...     x += i
...   return x
>>>

>>> kd.iterables.make_unordered(
...     expensive_computation(1),
...     expensive_computation(2),
...     expensive_computation(3))
ITERABLE[DATA_SLICE]{sequence(...DataItem(499501, schema: INT32)..., value_qtype=DATA_SLICE)}

Combine multiple Iterables

>>> i1 = kd.iterables.make(1, 2, 3, 4)
>>> i2 = kd.iterables.make(5, 6, 7, 8)

# Chain Iterables with the provided order
>>> kd.iterables.chain(i1, i2)
ITERABLE[DATA_SLICE]{sequence(DataItem(1, schema: INT32), DataItem(2, schema: INT32), DataItem(3, schema: INT32), DataItem(4, schema: INT32), DataItem(5, schema: INT32), DataItem(6, schema: INT32), DataItem(7, schema: INT32), DataItem(8, schema: INT32), value_qtype=DATA_SLICE)}

# Interleave Iterables
# the order within each iterable is preserved
# the order of interleaving of different
# iterables can be arbitrary
>>> kd.iterables.interleave(i1, i2)
ITERABLE[DATA_SLICE]{sequence(..., value_qtype=DATA_SLICE)}

Transform Iterables

# Apply fn over Iterable items
# fn must return Iterables which are chained
>>> kd.functor.flat_map_chain(
...     kd.iterables.make(1, 10),
...     lambda x: kd.iterables.make(x, x * 2, x * 3),
... )
ITERABLE[DATA_SLICE]{sequence(DataItem(1, schema: INT32), DataItem(2, schema: INT32), DataItem(3, schema: INT32), DataItem(10, schema: INT32), DataItem(20, schema: INT32), DataItem(30, schema: INT32), value_qtype=DATA_SLICE)}

# Apply fn over Iterable items
# fn must return Iterables which are interleaved
>>> kd.functor.flat_map_interleaved(
...     kd.iterables.make(1, 10),
...     lambda x: kd.iterables.make(x, x * 2, x * 3),
... )
ITERABLE[DATA_SLICE]{sequence(..., value_qtype=DATA_SLICE)}

Reduce one Iterable into one DataSlice/DataBag

# Concatenate DataSlices in an Iterable
# using kd.concat(*dss, ndim)
# For now, only DataSlices with ndim>0 are supported
>>> i = kd.iterables.make(kd.slice([1, 2, 3]),
...                       kd.slice([4, 5]))

>>> kd.iterables.reduce_concat(i, initial_value=kd.slice([]))
DataSlice([1, 2, 3, 4, 5], schema: INT32, present: 5/5)

# Merge DataBags in an Iterable
# using kd.updated_bag(**bags)
>>> o = kd.obj(a=1)
>>> i = kd.iterables.make(kd.attrs(o, b=2),
...                       kd.attrs(o, a=20, c=3))
>>> merged_bag = kd.iterables.reduce_updated_bag(i,
...     initial_value=kd.attrs(o, a=10))
>>> o.updated(merged_bag)
DataItem(Obj(a=20, b=2, c=3), schema: OBJECT, bag_id:...)

# Reduce by applying a functor cumulatively to
# the items of an iterable similar to functools.reduce
>>> i = kd.iterables.make(kd.int32(2), kd.int32(3),
...                       kd.int32(5), kd.int32(7))

>>> kd.functor.reduce(lambda a, b: a * b,
...                   i, initial_value=1)
DataItem(210, schema: INT32)

Advanced: Control/loop Operators

kd.if_

kd.if_ is different from kd.cond in three ways:

  • kd.if_ provides short-circuited if (i.e. only one branch is executed) while kd.cond does not.
  • kd.if_ takes a MASK DataItem as condition while kd.cond can take a MASK DataSlice with any ndim.
  • kd.if_ takes Koda functors as yes/no branches while kd.cond takes DataSlices.
>>> def p(a):
...   print(a)
...   return a

>>> f1 = kd.py_fn(p)

# kd.cond executes both branches
>>> kd.cond(kd.missing, f1(1), f1(2))
1
2
DataItem(2, schema: INT32)

>>> def q(a):
...   a += 1
...   print(a)
...   return a

>>> f2 = kd.py_fn(q)

# kd.if_ executes only one branch
# Note it takes functors rather DataSlices as inputs
>>> kd.if_(kd.missing, f1, f2, 1)
2
DataItem(2, schema: INT32)

Multiple conditions

# Pure Python version
>>> def foo(ds, cond1, cond2, cond3):
...   if cond1:
...     return ds + 1
...   elif cond2:
...     return ds - 1
...   elif cond3:
...     return ds / 2
...   else:
...     return ds * 2

# kd.if_ version with lambdas
>>> def foo(ds, cond1, cond2, cond3):
...   return kd.if_(
...       cond1,
...       lambda x: x + 1,
...       lambda x: kd.if_(
...           cond2,
...           lambda x: x - 1,
...           lambda x: kd.if_(
...               cond3,
...               lambda x: x / 2,
...               lambda x: x * 2,
...               x),
...           x),
...       ds)

# kd.if_ version using functions
# Note the order of branches defined is reversed
>>> def foo(ds, cond1, cond2, cond3):
...   def branch3(x):
...     return kd.if_(cond3,
...                   lambda y: y / 2,
...                   lambda y: y * 2,
...                   x)

...   def branch2(x):
...     return kd.if_(cond2,
...                   lambda y: y - 1,
...                   branch3,
...                   x)

...   def branch1(x):
...     return x + 1

...   return kd.if_(cond1, branch1, branch2, ds)

kd.for_

For more detailed usages, please refer to docstrings. Note that kd.for_ takes an Iterable rather than a DataSlice as input. It iterates elements (i.e. DataSlice, DataBag) in the Iterable rather than items in the DataSlice.

>>> inputs = [kd.slice([1, 2]), kd.slice([3, 4])]

# Python version
>>> def foo(iterable):
...   returns = 0
...   for i in iterable:
...     returns = i + returns
...   return returns

>>> foo(inputs)
DataSlice([4, 6], schema: INT32, present: 2/2)

# Koda version
>>> def foo(iterable):
...   return kd.for_(
...       iterable,
...       lambda i, returns:
...           kd.namedtuple(returns=(i + returns)),
...       returns=0)

>>> foo(kd.iterables.make(*inputs))
DataSlice([4, 6], schema: INT32, present: 2/2)

finalize_fn can be used to execute logic after iterating all elements in the iterable.

>>> inputs = [kd.slice([1, 2]), kd.slice([3, 4])]

# Python version
>>> def bar(iterable):
...   a = 0
...   b = 1
...   for i in iterable:
...     a = i + a
...     b = i * b
...   return a + b

>>> bar(inputs)
DataSlice([7, 14], schema: INT32, present: 2/2)

# Koda version
# Note body_fn and finalize_fn return namedtuple
>>> def bar(iterable):
...   return kd.for_(
...       iterable,
...       lambda i, a, b, returns:
...           kd.namedtuple(a=(i + a), b=(i * b)),
...       finalize_fn=lambda a, b, returns:
...           kd.namedtuple(returns=(a + b)),
...       a=0,
...       b=1,
...       returns=0)

>>> bar(kd.iterables.make(*inputs))
DataSlice([7, 14], schema: INT32, present: 2/2)

condition_fn can be used to stop the loop early. It must return a MASK DataItem.

>>> inputs = [kd.item(1), kd.item(2),
...           kd.item(3), kd.item(4)]

# Python version
>>> def baz(iterable):
...   returns = 0
...   for i in iterable:
...     if returns > 5:
...       break
...     returns = i + returns
...   return returns

>>> baz(inputs)
DataItem(6, schema: INT32)

# Koda version
>>> def baz(iterable):
...   return kd.for_(
...       iterable,
...       lambda i, returns:
...           kd.namedtuple(returns=(i + returns)),
...       condition_fn=lambda returns: returns < 5,
...       returns=0)

>>> baz(kd.iterables.make(*inputs))
DataItem(6, schema: INT32)

Instead of returning a single value, kd.for_ can yield multiple values during iteration and return an Iterable with all yielded values chained together. It can be done by returning yields or yields_interleaved in the namedtuple of the body_fn. When yields_interleaved is specified, the behavior is the same as yields, but the values are interleaved instead of chained.

>>> inputs = [kd.item(1), kd.item(2),
...           kd.item(3), kd.item(4)]

# Python version
>>> def baz(iterable):
...   res = kd.item(0)
...   for i in iterable:
...     if res > 5:
...       break
...     yield res
...     yield res + 1
...     res = i + res

>>> baz(inputs)
<generator object baz at ...>

# Koda version
>>> def baz(iterable):
...   return kd.for_(
...       iterable,
...       lambda i, res:
...           kd.namedtuple(
...               res=(i + res),
...               yields=kd.iterables.make(res, res + 1)),
...       condition_fn=lambda res: res < 5,
...       res=0,
...       yields=kd.iterables.make())

>>> baz(kd.iterables.make(*inputs))
ITERABLE[DATA_SLICE]{sequence(DataItem(0, schema: INT32), DataItem(1, schema: INT32), DataItem(1, schema: INT32), DataItem(2, schema: INT32), DataItem(3, schema: INT32), DataItem(4, schema: INT32), value_qtype=DATA_SLICE)}

kd.while_

For more detailed usages, please refer to docstrings.

>>> ds = kd.slice([5, 4, 6])

# Python version
>>> def factorial(n):
...   returns = 1
...   while kd.any(n > 1):
...     returns = returns * n
...     n = kd.cond(n > 1, n - 1, n)
...   return returns

>>> factorial(ds)
DataSlice([120, 24, 720], schema: INT32, present: 3/3)

# Koda version
>>> def factorial(n):
...   return kd.while_(
...       lambda n, returns: kd.any(n > 1),
...       lambda n, returns: kd.namedtuple(
...           returns=returns * n,
...           n=kd.cond(n > 1, n - 1, n),
...       ),
...       n=n,
...       returns=1)

>>> factorial(ds)
DataSlice([120, 24, 720], schema: INT32, present: 3/3)

Instead of returning a single value, kd.while_ can yield multiple values during iteration and return an Iterable with all yielded values chained together. It can be done by returning yields or yields_interleaved in the namedtuple of the body_fn. When yields_interleaved is specified, the behavior is the same as yields, but the values are interleaved instead of chained.

>>> ds = kd.slice([5, 4, 6])

>>> def foo(n):
...   returns = 1
...   while kd.any(n > 1):
...     yield kd.select(n, n > 1)
...     n = kd.cond(n > 1, n - 1, n)
...   return returns

>>> foo(ds)
<generator object foo at ...>

# Koda version
>>> def foo(n):
...   return kd.while_(
...       lambda n: kd.any(n > 1),
...       lambda n: kd.namedtuple(
...           n=kd.cond(n > 1, n - 1, n),
...           yields=kd.iterables.make(
...               kd.select(n, n > 1)
...           )
...       ),
...       n=n,
...       yields=kd.iterables.make())

>>> foo(ds)
ITERABLE[DATA_SLICE]{sequence(DataSlice([5, 4, 6], schema: INT32, present: 3/3), DataSlice([4, 3, 5], schema: INT32, present: 3/3), DataSlice([3, 2, 4], schema: INT32, present: 3/3), DataSlice([2, 3], schema: INT32, present: 2/2), DataSlice([2], schema: INT32, present: 1/1), value_qtype=DATA_SLICE)}

Advanced: Multithreaded evaluation

kd.parallel.call_multithreaded

Normally, Koda functor evaluation is single-threaded:

>>> import time

>>> @kd.trace_as_fn(functor_factory=kd.py_fn)
... def f1(x):
...   time.sleep(0.1)
...   print('Start f1')
...   time.sleep(0.5)
...   print('Finish f1')
...   return x + 1

>>> @kd.trace_as_fn(functor_factory=kd.py_fn)
... def f2(x):
...   time.sleep(0.2)
...   print('Start f2')
...   time.sleep(0.5)
...   print('Finish f2')
...   return x + 2

>>> def f(x):
...   return f1(x) + f2(x)

>>> kd.fn(f)(5)
Start f1
Finish f1
Start f2
Finish f2
DataItem(13, schema: INT32)

However, kd.parallel.call_multithreaded allows to evaluate independent parts of the computation in parallel:

>>> kd.parallel.call_multithreaded(f, 5)
Start f1
Start f2
Finish f1
Finish f2
DataItem(13, schema: INT32)

We can control the maximum number of threads:

>>> kd.parallel.call_multithreaded(f, 5, max_threads=1)
Start f1
Finish f1
Start f2
Finish f2
DataItem(13, schema: INT32)

What can be evaluated in parallel?

In a single functor, its variables are evaluated in parallel when using kd.parallel.call_multithreaded. If you create your functor via tracing, variables will be those nodes that are annotated with kd.with_name. This allows to balance the level of parallelism against to the overhead of starting a new parallel task for each simple operation.

# No parallel execution, everything is serial.
>>> kd.parallel.call_multithreaded(
...   lambda x, y: x ** 2 + y ** 2,
...   x=1, y=2)
DataItem(5.0, schema: FLOAT32)

# x ** 2 and y ** 2 are computed in parallel.
>>> kd.parallel.call_multithreaded(
...   lambda x, y:
...       kd.with_name(x ** 2, 'a')
...       + kd.with_name(y ** 2, 'b'),
...   x=1, y=2)
DataItem(5.0, schema: FLOAT32)

If a functor calls another functor, then inside the parallel evaluation we create additional variables for the inputs of that call and for the result of the call.

Since variables are evaluated in parallel, this means that effectively we will evaluate all inputs to such call in parallel, then evaluate the sub-functor (in parallel to any other variables that might be evaluated), and then evaluate the part of the computation that uses the result of the sub-functor.

>>> @kd.trace_as_fn()
... def f(x, y):
...   return x + y

# In the call below, first we evaluate x**2, y**2,
# x**3, y**3 in parallel. As soon as either both
# of x**2 and y**2, or both of x**3 and y**3
# finish, we start evaluating the corresponding
# f() sub-functor call, and when both of those are
# ready we add them up to produce final result.
>>> kd.parallel.call_multithreaded(
...   lambda x, y:
...       f(x ** 2, y ** 2) + f(x ** 3, y ** 3),
...   x=1, y=2)
DataItem(14.0, schema: FLOAT32)

Note that without the @kd.trace_as_fn() annotation on f(x, y) everything would end up being a single variable in the traced functor, and no parallelization would happen at all.

Other operations that call a sub-functor, such as kd.map, kd.if_ or kd.while_ also do all sub-functor calls in the parallel mode, meaning that the variables inside the sub-functor will be evaluated in parallel, and that sub-functor calls themselves can be parallelized when one does not depend on the result of the other.

>>> @kd.trace_as_fn(functor_factory=kd.py_fn)
... def process(x):
...   time.sleep(0.1 * x.to_py())
...   print('Start', x)
...   time.sleep(0.5)
...   print('Finish', x)
...   return x + 1

>>> kd.parallel.call_multithreaded(
...     lambda x: kd.map(process, x), kd.range(3))
Start 0
Start 1
Start 2
Finish 0
Finish 1
Finish 2
DataSlice([1, 2, 3], schema: INT64, present: 3/3)

What else can be evaluated in parallel?

If the sub-functor itself has several variables, or calls yet another functor, the corresponding tasks will all be evaluated in parallel in kd.parallel.call_multithreaded. Moreover, if a variable in a sub-functor depends only on some of the inputs, it may start evaluating even before all inputs to the sub-functor call are ready.

>>> @kd.trace_as_fn(functor_factory=kd.py_fn)
... def step(x, msg, pause):
...   time.sleep(pause.to_py() * 0.1)
...   print('Start', msg)
...   time.sleep(pause.to_py() * 0.5)
...   print('Finish', msg)
...   return x + 1

>>> @kd.trace_as_fn()
... def f(x, y):
...   return (
...       step(x, 'inner1', 0.1)
...       + step(y, 'inner2', 0.1))

>>> @kd.trace_as_fn()
... def g():
...   return f(
...       step(1, 'outer1', 0.1),
...       step(1, 'outer2', 0.3))

>>> kd.parallel.call_multithreaded(g)
Start outer1
Start outer2
Finish outer1
Start inner1
Finish inner1
Finish outer2
Start inner2
Finish inner2
DataItem(6, schema: INT32)

Similarly, if a sub-functor returns a tuple/namedtuple, the parts of the outside computation that use only one field of that tuple/namedtuple can start evaluation even before the entire tuple is completed:

>>> @kd.trace_as_fn(functor_factory=kd.py_fn)
... def step(x, msg, pause):
...   time.sleep(pause.to_py())
...   print('Start', msg)
...   time.sleep(pause.to_py()*5)
...   print('Finish', msg)
...   return x + 1

>>> @kd.trace_as_fn(return_type_as=(
...     kd.types.DataSlice, kd.types.DataSlice))
... def f(x):
...   return (
...       step(x, 'inner1', 0.1),
...       step(x, 'inner2', 0.3))

>>> @kd.trace_as_fn()
... def g():
...   inner = f(1)
...   return (
...       step(inner[0], 'outer1', 0.1)
...       + step(inner[1], 'outer2', 0.1))

>>> kd.parallel.call_multithreaded(g)
Start inner1
Start inner2
Finish inner1
Start outer1
Finish outer1
Finish inner2
Start outer2
Finish outer2
DataItem(6, schema: INT32)

Interactions with external multithreading primitives

In the implementation of kd.parallel.call_multithreaded, we analyze the functor statically to make sure we evaluate all dependencies of a computation before trying to evaluate it. This allows the evaluation to never use one of the threads for a blocking wait, so even with max_threads=1 the computation should succeed (of course it won’t be faster than serial in that case).

However, if you use blocking wait inside one of the operations (most likely via py_fn, since the operations in the Koda standard library never do that), then it is possible to get into the state where all max_threads threads are blocked.

If you are waiting for an external event, such as an RPC, then the computation will resume eventually when that event happens. However, if you are waiting for an event that is triggered by another branch of your own computation, it might happen that that branch will never get scheduled since all threads are blocked, and you will have a deadlock.

Therefore, please never use a blocking wait on an event that is triggered by another branch of your computation. In particular, please never call kd.parallel.call_multithreaded from inside a function annotated with kd.trace_as_fn(functor_factory=kd.py_fn) that is itself being evaluated via kd.parallel.call_multithreaded!

Streaming

Operations with iterables, such as kd.for_, become streaming operations when using kd.parallel.call_multithreaded. In other words the subsequent processing of an iterable item may start even before the next item(s) have been computed for the same iterable, for example:

>>> @kd.trace_as_fn(functor_factory=kd.py_fn)
... def step(x):
...   time.sleep(0.1 * x.to_py())
...   print('Start', x)
...   time.sleep(0.5 * x.to_py())
...   print('Finish', x)
...   return x + 10

>>> def f():
...   base = kd.iterables.make(1, 2, 3)
...   first = kd.for_(
...       base,
...       lambda x: kd.namedtuple(
...           yields=kd.iterables.make(step(x))),
...       yields=kd.iterables.make())
...   second = kd.for_(
...       first,
...       lambda x: kd.namedtuple(
...           yields=kd.iterables.make(step(x))),
...       yields=kd.iterables.make())
...   return kd.functor.reduce(
...       lambda x, y: x + y, second, initial_value=0)

>>> kd.parallel.call_multithreaded(f)
Start 1
Start 2
Start 3
Finish 1
Finish 2
Start 11
Finish 3
Start 12
Start 13
Finish 11
Finish 12
Finish 13
DataItem(66, schema: INT32)

If your entire computation returns an iterable, then you need to use kd.parallel.yield_multithreaded instead which allows you to get the results as a Python iterator:

>>> @kd.trace_as_fn(functor_factory=kd.py_fn)
... def step(x):
...   time.sleep(0.1 * x.to_py())
...   print('Start', x)
...   time.sleep(0.5 * x.to_py())
...   print('Finish', x)
...   return x + 10

>>> def f():
...   base = kd.iterables.make(1, 2, 3)
...   return kd.for_(
...       base,
...       lambda x: kd.namedtuple(
...           yields=kd.iterables.make(step(x))),
...       yields=kd.iterables.make())

>>> for item in kd.parallel.yield_multithreaded(f):
...   print('Got', item)
Start 1
Start 2
Start 3
Finish 1
Got 11
Finish 2
Got 12
Finish 3
Got 13

Advanced: Mutable Workflow

While it is enough for most Koda users to just use immutable workflow, it is possible to use mutable workflow by managing DataBags manually in order to achieve better performance.

Note code using mutable APIs cannot be traced and has more limited options for productionalization.

Creating Mutable Entities/Objects/Lists/Dicts from a DataBag

>>> db = kd.mutable_bag()

# Create a mutable entity and modify its attributes
>>> e = db.new(x=1, y=2)
>>> e.x = 3
>>> e.z = 'a'

# immutable version e.with_attrs(y=None)
>>> e.y = None; e
DataItem(Entity(x=3, z='a'), schema: ENTITY(x=INT32, y=INT32, z=STRING), bag_id:...)

# same as 'e.z = None' if 'z' already exists, no-op otherwise
>>> del e.z; e
DataItem(Entity(x=3), schema: ENTITY(x=INT32, y=INT32, z=STRING), bag_id:...)

# Create a mutable object and modify its attributes
>>> o = db.obj(x=1, y=2)
>>> o.x = 'a'
>>> o.z = 3

# immutable version o.with_attrs(y=None).
>>> o.y = None; o
DataItem(Obj(x='a', y=None, z=3), schema: OBJECT, bag_id:...)

# remove both the attribute and its schema
>>> del o.z; o
DataItem(Obj(x='a', y=None), schema: OBJECT, bag_id:...)

# Create a mutable list and
# modify its items
>>> l = db.list([1, 2, 3])
>>> l[1] = 20
>>> l.append(4)
>>> l.append(kd.slice([5, 6]))
>>> l[2] = None; l
DataItem(List[1, 20, None, 4, 5, 6], schema: LIST[INT32], bag_id:...)

>>> del l[:4]; l
DataItem(List[5, 6], schema: LIST[INT32], bag_id:...)

# Create a mutable dict and modify its entries
>>> d = db.dict({'a': 1, 'b': 2})
>>> d['a'] = 20
>>> d['c'] = 30
>>> d['a'] = None
>>> del d['b'];  # same as above
>>> d[kd.slice(['a', 'b', 'c'])]
DataSlice([None, None, 30], schema: INT32, present: 1/3, bag_id:...)


# Other APIs works similarly
>>> _ = db.new_like(e)
>>> _ = db.new_shaped(e.get_shape())
>>> _ = db.implode(kd.slice([1, 2, 3]))

Forking and Freezing DataBags

An immutable DataBag can be forked to create a mutable DataBag at a cost of O(1). Similarly, a mutable DataBag can be frozen to create an immutable DataBag at a cost of O(1).

>>> e = kd.new(x=1, y=2)
>>> e1 = e.fork_bag()
>>> e1.x = 3
>>> e2 = e1.fork_bag()
>>> e2.x = 4
>>> e.x
DataItem(1, schema: INT32, bag_id: ...)
>>> e1.x
DataItem(3, schema: INT32, bag_id: ...)
>>> e2.x
DataItem(4, schema: INT32, bag_id: ...)

>>> o = db.new(x=1, y=2)
>>> o1 = o.fork_bag()

>>> o2 = o1.freeze_bag()
>>> assert not o2.get_bag().is_mutable()

>>> l = db.list([1, 2, 3])
>>> l1 = l.fork_bag()
>>> l1.append(4)

>>> d = db.dict({'a': 1, 'b': 2})
>>> d1 = d.fork_bag()
>>> d['a'] = 20

Moving Data from a DataSlice/DataBag to Another DataBag

>>> db1 = kd.mutable_bag()
>>> db2 = kd.mutable_bag()

>>> o1 = db1.new(x=1)

>>> o23 = db2.new(x=kd.slice([2, 3]))
>>> o4 = db2.new(x=4)

# Move entire db2 to db1
>>> db1.merge_inplace(db2)
DataBag ...:
...

# Move o1 into db1
>>> o1_in_db1 = db1.adopt(o1)

# Equivalent to the below
>>> db1.merge_inplace(o1.get_bag())
DataBag ...:
...

>>> o1_in_db1 = o1.with_bag(db1)

# Adopt avoids multiple extractions
# The code below has two extractions
>>> o5 = db1.obj(y=o1)
>>> o6 = db1.obj(y=o1)

# The code below has one extraction
>>> o1_in_db1 = db1.adopt(o1)
>>> o5 = db1.obj(y=o1_in_db1)
>>> o6 = db1.obj(y=o1_in_db1)

Unit Testing

kd.testing.assert_equal

# Primitive DataSlices
>>> ds1 = kd.slice([1, 2, 3])
>>> ds2 = kd.slice([1, 2, 3])
>>> kd.testing.assert_equal(ds1, ds2)

# Mixed primitive DataSlices
>>> ds3 = kd.slice([1, 2., '3'])
>>> ds4 = kd.slice([1, 2., '3'])
>>> kd.testing.assert_equal(ds3, ds4)

# It compares schemas too for DataSlices
>>> ds5 = kd.slice([1, 2, 3])
>>> ds6 = kd.slice([1, 2, 3], kd.INT64)
>>> kd.testing.assert_equal(ds5, ds6)
Traceback (most recent call last):
...
AssertionError: DataSlices are not equal by fingerprint:
...

>>> kd.testing.assert_equal(ds5, kd.int32(ds6))

# It compares DataBags too for DataSlices
>>> ds7 = kd.uuobj(x=1)
>>> ds8 = kd.uuobj(x=1)
>>> kd.testing.assert_equal(ds7, ds8)
Traceback (most recent call last):
...
AssertionError: DataSlices are not equal by fingerprint:
...

>>> kd.testing.assert_equal(ds7.no_bag(), ds8.no_bag())

# It works for JaggedShapes
>>> kd.testing.assert_equal(ds1.get_shape(),
...                         ds2.get_shape())
>>> kd.testing.assert_equal(ds1.get_shape(),
...                         ds8.get_shape())
Traceback (most recent call last):
...
AssertionError: QValues not equal by fingerprint:
...
  actual:
    JaggedShape(3)
  expected:
    JaggedShape()

# It works for DataBags and it checks DataBags are the same instance
>>> db1 = kd.bag()
>>> db2 = kd.bag()
>>> kd.testing.assert_equal(db1, db2)
Traceback (most recent call last):
...
AssertionError: QValues not equal by fingerprint:
...

>>> kd.testing.assert_equal(db1, db1)

kd.testing.assert_equivalent

# Different from assert_equal, it recursively checks that DataSlices content is
# equivalent instead of the same instance
>>> ds1 = kd.uuobj(x=1)
>>> ds2 = kd.uuobj(x=1)
>>> kd.testing.assert_equivalent(ds1, ds2)

>>> ds3 = kd.uuobj(x=kd.slice([1, 2]))

>>> kd.testing.assert_equivalent(ds1, ds3.S[0])

# It works for DataBags too, it checks that DataBags have the same content and
# their fallbacks have the same content respectively.
>>> kd.testing.assert_equivalent(ds1.get_bag(),
...                              ds2.get_bag())

# DataBag comparison is tricky so it is not
# recommended to compare their contents directly.

kd.testing.assert_allclose

It works similar to numpy.testing.assert_allclose.

>>> ds1 = kd.slice([2.71, 2.71])
>>> ds2 = kd.slice([2.7100, 2.710])

>>> kd.testing.assert_allclose(ds1, ds2)

>>> ds3 = kd.float32(3.145678)
>>> ds4 = kd.float32(3.144)

>>> kd.testing.assert_allclose(ds3, ds4)
Traceback (most recent call last):
...
AssertionError: the values are not close up to the given tolerance:
...

>>> kd.testing.assert_allclose(ds3, ds4, atol=0.01)
>>> kd.testing.assert_allclose(ds3, ds4, rtol=0.01)

>>> # Note it compares schemas too
>>> ds5 = kd.float64(3.145678)
>>> kd.testing.assert_allclose(ds3, ds5)
Traceback (most recent call last):
...
AssertionError: DataItem(3.145678, schema: FLOAT32) and DataItem(3.145678, schema: FLOAT64) have different schemas
...

kd.testing.assert_unordered_equal

>>> ds1 = kd.slice([1, 2., '3'])
>>> ds2 = kd.slice(['3', 2., 1])

>>> kd.testing.assert_unordered_equal(ds1, ds2)

>>> # Ignore the order of items in the last dimension
>>> ds3 = kd.slice([[1, 2], [None, 3]])
>>> ds4 = kd.slice([[2, 1], [3, None]])
>>> kd.testing.assert_unordered_equal(ds3, ds4)

>>> ds5 = kd.slice([[1, 3], [None, 2]])
>>> ds6 = kd.slice([[2, 1], [3, None]])
>>> kd.testing.assert_unordered_equal(ds5, ds6)
Traceback (most recent call last):
...
AssertionError: Unordered DataSlice DataSlice([[1, 3], [None, 2]], schema: INT32, present: 3/4, shape: JaggedShape(2, 2)) != DataSlice([[2, 1], [3, None]], schema: INT32, present: 3/4, shape: JaggedShape(2, 2))

Comparing Lists

Note: kd.testing.assert_equivalent checks that the given Lists have the same shape and schema, and that the corresponding values are equivalent. Notably, it does not require the lists themselves to have the same ItemIds - it is an assertion about their contents and not about their identities.

>>> l1 = kd.list([1, 2, 3])
>>> l2 = kd.list([1, 2, 3])
>>> kd.testing.assert_equivalent(l1, l2)

>>> l3 = kd.list([kd.uuobj(x=1), kd.uuobj(x=2)])
>>> l4 = kd.list([kd.uuobj(x=1), kd.uuobj(x=2)])
>>> kd.testing.assert_equivalent(l3, l4)

>>> l5 = kd.list([kd.obj(x=1), kd.obj(x=2)])
>>> l6 = kd.list([kd.obj(x=1), kd.obj(x=3)])
>>> kd.testing.assert_equivalent(l5, l6)
Traceback (most recent call last):
...
AssertionError: Expected: is equal to DataItem(List[...], schema: LIST[OBJECT])
Actual: DataItem(List[...], schema: LIST[OBJECT]), with difference:
modified:
expected[1].x:
DataItem(3, schema: INT32)
-> actual[1].x:
DataItem(2, schema: INT32)

# Nested lists
>>> l7 = kd.list([[1, 2, 3],
...               [kd.uuobj(x=1), kd.uuobj(x=2)]])
>>> l8 = kd.list([[1, 2, 3],
...               [kd.uuobj(x=1), kd.uuobj(x=2)]])
>>> kd.testing.assert_equivalent(l7, l8)

Comparing Dicts

Note: kd.testing.assert_equivalent checks that the given Dicts have the same shape and schema. In addition, it verifies that the keys fetched from their corresponding DataBag(s) are the same (regardless of their order in the last dimension) and that the returned Dict values for those keys are equivalent.

>>> d1 = kd.dict({'a': 1, 'b': 2})
>>> d2 = kd.dict({'a': 1, 'b': 2})
>>> kd.testing.assert_equivalent(d1, d2)

>>> d3 = kd.dict({'a': kd.uuobj(x=1)})
>>> d4 = kd.dict({'a': kd.uuobj(x=1)})
>>> kd.testing.assert_equivalent(d3, d4)

>>> d5 = kd.dict({'a': kd.obj(x=1)})
>>> d6 = kd.dict({'a': kd.obj(x=2)})
>>> kd.testing.assert_equivalent(d5, d6)
Traceback (most recent call last):
...
AssertionError: Expected: is equal to DataItem(Dict{'a'=...}, schema: DICT{STRING, OBJECT})
Actual: DataItem(Dict{'a'=...}, schema: DICT{STRING, OBJECT}), with difference:
modified:
expected['a'].x:
DataItem(2, schema: INT32)
-> actual['a'].x:
DataItem(1, schema: INT32)

# Only check keys/values
>>> kd.testing.assert_dicts_keys_equal(
...   d1, kd.slice(['b', 'a']))  # Order does not matter
>>> kd.testing.assert_dicts_values_equal(
...   d1, kd.slice([2, 1]))  # The values and their counts matter. Order does not

Comparing Complex Objects

When the Koda equality operator x == y is applied to complex objects, it will simply compare their ItemIds for equality. That is different from Python, in which x == y will compare the values of their contents. To do the same in Koda, we can use kd.deep_uuid(x) == kd.deep_uuid(y). However, it does not print out a nice error message to explain which sub-parts are different. To get a better error message, the current recommendation is to convert to pytrees and use Python comparison assertions.

>>> i1 = kd.obj(a=kd.obj(b=kd.obj(c=1),
...                      d=kd.list([2, 3]),
...                      e=kd.dict({'f': 4})))
>>> i2 = kd.obj(a=kd.obj(b=kd.obj(c=1),
...                      d=kd.list([2, 3]),
...                      e=kd.dict({'f': 4})))

>>> assert i1 != i2

# To get better error message, do this in unit tests
>>> class MockSelf:
...   def assertEqual(self, a, b):
...     assert a == b
>>> self = MockSelf()
>>> self.assertEqual(i1.to_pytree(max_depth=-1),
...                  i2.to_pytree(max_depth=-1))