rikai.parquet package¶
Submodules¶
rikai.parquet.dataset module¶
Parquet-based Rikai Dataset that supports automatically UDT conversions.
- class rikai.parquet.dataset.Dataset(query: Union[str, Path], columns: Optional[List[str]] = None, seed: Optional[int] = None, world_size: int = 1, rank: int = 0, offset: int = 0)¶
Bases:
object
Rikai Dataset.
Dataset
provides read access to a Rikai dataset. It supportsRead Rikai encoded dataset on the supported storage medias, i.e., local filesystem, AWS S3 or Google GCS.
Automatically deserialize data into semantic user defined types (UDT).
Distributed training by setting
world_size
andrank
parameters. When enabled, parquet row-group level partition will be used to distribute data amount the workers.
- Parameters
query (str) – A SQL query “SELECT image, annotation FROM s3://foo/bar” or a dataset URI, i.e., “s3://foo/bar”
columns (List[str], optional) – To read only given columns
seed (int, optional) – Random seed for shuffling process.
world_size (int) – Total number of distributed workers
rank (int) – The rank of this worker in all the distributed workers
offset (int) – Instruct the dataset to skip the first N records.
Notes
Typically user should not directly use this class. Instead, users are encouraged to use framework-native readers, for example, using
rikai.pytorch.data.Dataset
in Pytorch
- SPARK_PARQUET_ROW_METADATA = b'org.apache.spark.sql.parquet.row.metadata'¶
rikai.parquet.resolver module¶
Extensive Dataset Resolver.
- class rikai.parquet.resolver.DefaultResolver¶
Bases:
BaseResolver
DefaultResolver supports features on local filesystem or s3.
Supported URIs
“/path/to/dataset”
“s3://path/to/dataset”
- SPARK_PARQUET_ROW_METADATA = b'org.apache.spark.sql.parquet.row.metadata'¶
- class rikai.parquet.resolver.Resolver¶
Bases:
object
Extensible Dataset Resolver
- DEFAULT_SCHEME = None¶
- classmethod get_schema(uri: str)¶
Get the schema of the dataset
- Parameters
uri (str) – URI of the dataset
- classmethod register(scheme: str, resolver: BaseResolver)¶
Register a customize dataset resolver with given scheme, providing integration with feature store registeration.
- Parameters
scheme (str) – Feature Dataset URI scheme
resolver (BaseResolver) – Parquet file resolver
- Raises
KeyError – If the same scheme name has already been registered
- classmethod reset()¶
Reset Resolver for testing purpose.
- rikai.parquet.resolver.register(scheme: str)¶
A decorator that registers a customize dataset resolver with given scheme, providing integration with featurestore registeration.
- Parameters
scheme (str) – Feature Dataset URI scheme
- Raises
ValueError – If the same scheme name has already been registered
Examples
# Extend Rikai with a smart feature store @register("smart") class SmartResolver(rikai.parquet.resolver.BaseResolver): def resolve(self, uri: str) -> Iterable[str]: return smart_client.get_files(uri) dataset = rikai.parquet.Dataset("smart://featureA/version/1")
rikai.parquet.writer module¶
Tools to write Rikai format datasets to disk
- rikai.parquet.writer.df_to_rikai(df: DataFrame, dest_path: str, schema: pyspark.sql.types.StructType, partition_cols: Union[str, list] = None, max_rows_per_file: int = None, mode: str = None)¶
Write the given pandas dataframe to the given location using the specified (pyspark) schema
- Parameters
df (pd.DataFrame) – The DataFrame to write to Rikai format
dest_path (str) – The destination path to write df to
schema (StructType) – The pyspark schema required to convert Rikai types and make the resulting rikai dataset readable by spark
partition_cols (str or list, default None) – Columns to partition on
max_rows_per_file (int, default None) – passed to Arrow Dataset.write_dataset
mode (str, default None) – This controls existing_data_behavior in pyarrow. {‘error’, ‘overwrite_or_ignore’, ‘delete_matching’} None will default to Pyarrow’s current default (‘error’) https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html
Module contents¶
Utilities to manipulate Rikai parquet dataset