rikai.parquet package

Submodules

rikai.parquet.dataset module

Parquet-based Rikai Dataset that supports automatically UDT conversions.

class rikai.parquet.dataset.Dataset(query: Union[str, Path], columns: Optional[List[str]] = None, seed: Optional[int] = None, world_size: int = 1, rank: int = 0, offset: int = 0)

Bases: object

Rikai Dataset.

Dataset provides read access to a Rikai dataset. It supports

  • Read Rikai encoded dataset on the supported storage medias, i.e., local filesystem, AWS S3 or Google GCS.

  • Automatically deserialize data into semantic user defined types (UDT).

  • Distributed training by setting world_size and rank parameters. When enabled, parquet row-group level partition will be used to distribute data amount the workers.

Parameters
  • query (str) – A SQL query “SELECT image, annotation FROM s3://foo/bar” or a dataset URI, i.e., “s3://foo/bar”

  • columns (List[str], optional) – To read only given columns

  • seed (int, optional) – Random seed for shuffling process.

  • world_size (int) – Total number of distributed workers

  • rank (int) – The rank of this worker in all the distributed workers

  • offset (int) – Instruct the dataset to skip the first N records.

Notes

  • Typically user should not directly use this class. Instead, users are encouraged to use framework-native readers, for example, using rikai.pytorch.data.Dataset in Pytorch

SPARK_PARQUET_ROW_METADATA = b'org.apache.spark.sql.parquet.row.metadata'
count() int

Count the number of records in the datasets.

property metadata: dict

Rikai metadata

to_pandas(limit=None)

Create a pandas dataframe from the parquet data in this Dataset

Parameters

limit (int, default None) – The max number of rows to retrieve. If none, 0, or negative then retrieve all rows

rikai.parquet.resolver module

Extensive Dataset Resolver.

class rikai.parquet.resolver.BaseResolver

Bases: ABC

Abstract base class for the concrete resolver

abstract get_schema(uri: str)

Return the schema of the dataset, specified by URI.

abstract resolve(uri: str) Iterable[str]

Resolve the name of a feature dataset URI, and returns a list of parquet file URIs.

class rikai.parquet.resolver.DefaultResolver

Bases: BaseResolver

DefaultResolver supports features on local filesystem or s3.

Supported URIs

SPARK_PARQUET_ROW_METADATA = b'org.apache.spark.sql.parquet.row.metadata'
get_schema(uri: str)

Get the schema of the dataset.

Parameters

uri (str) – The directory URI for the dataset.

Returns

Json formatted schema of the dataset.

Return type

Dict

resolve(uri: str) Iterable[str]

Resolve dataset via a filesystem URI.

Parameters

uri (str) – The directory / base uri for a dataset.

Returns

An iterator of parquet files.

Return type

Iterator[str]

class rikai.parquet.resolver.Resolver

Bases: object

Extensible Dataset Resolver

DEFAULT_SCHEME = None
classmethod get_schema(uri: str)

Get the schema of the dataset

Parameters

uri (str) – URI of the dataset

classmethod register(scheme: str, resolver: BaseResolver)

Register a customize dataset resolver with given scheme, providing integration with feature store registeration.

Parameters
  • scheme (str) – Feature Dataset URI scheme

  • resolver (BaseResolver) – Parquet file resolver

Raises

KeyError – If the same scheme name has already been registered

classmethod reset()

Reset Resolver for testing purpose.

classmethod resolve(uri: Union[str, Path]) Iterable[str]

Resolve the dataset URI, and returns a list of parquet files.

classmethod set_default_scheme(default_scheme: str)

Changes the default scheme when none is given in the uri.

Parameters

default_scheme (str) – If a uri has no scheme then the resolver for this scheme is used

rikai.parquet.resolver.register(scheme: str)

A decorator that registers a customize dataset resolver with given scheme, providing integration with featurestore registeration.

Parameters

scheme (str) – Feature Dataset URI scheme

Raises

ValueError – If the same scheme name has already been registered

Examples

# Extend Rikai with a smart feature store
@register("smart")
class SmartResolver(rikai.parquet.resolver.BaseResolver):
    def resolve(self, uri: str) -> Iterable[str]:
        return smart_client.get_files(uri)

dataset = rikai.parquet.Dataset("smart://featureA/version/1")

rikai.parquet.writer module

Tools to write Rikai format datasets to disk

rikai.parquet.writer.df_to_rikai(df: DataFrame, dest_path: str, schema: pyspark.sql.types.StructType, partition_cols: Union[str, list] = None, max_rows_per_file: int = None, mode: str = None)

Write the given pandas dataframe to the given location using the specified (pyspark) schema

Parameters
  • df (pd.DataFrame) – The DataFrame to write to Rikai format

  • dest_path (str) – The destination path to write df to

  • schema (StructType) – The pyspark schema required to convert Rikai types and make the resulting rikai dataset readable by spark

  • partition_cols (str or list, default None) – Columns to partition on

  • max_rows_per_file (int, default None) – passed to Arrow Dataset.write_dataset

  • mode (str, default None) – This controls existing_data_behavior in pyarrow. {‘error’, ‘overwrite_or_ignore’, ‘delete_matching’} None will default to Pyarrow’s current default (‘error’) https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html

Module contents

Utilities to manipulate Rikai parquet dataset