rikai.spark.functions package

Submodules

rikai.spark.functions.geometry module

Geometry related PySpark UDFs

rikai.spark.functions.geometry.area(box: Box2d) float

A UDF to calculate the area of a bounding box.

Example

>>> spark.sql("SELECT * FROM dataset WHERE area(prediction.box) > 15")
rikai.spark.functions.geometry.box2d(coords) Box2d

Build a Box2d from [xmin,ymin,xmax,ymax] array.

rikai.spark.functions.geometry.box2d_from_center(coords) Box2d

Build a Box2d from a center-point based coordinate array: [center_x, center_y, width, height].

rikai.spark.functions.geometry.box2d_from_top_left(coords) Box2d

Build Box2d from a top-left based coordinate array: [x0, y0, width, height]

Example

Coco dataset is one public dataset that use top-left coordinates

>>> #! pyspark --packages ai.eto:rikai_2.1:0.0.1
>>> import json
>>> from rikai.spark.functions.geometry import box2d_from_top_left
>>>
>>> with open("coco_sample/annotations/train_sample.json") as fobj:
...     coco = json.load(fobj)
>>> anno_df = (
...     spark
...     .createDataFrame(coco["annotations"][:5])
...     .withColumn("box2d", box2d_from_top_left("bbox"))
... )
>>> anno_df.show()
+--------------------+-----------+--------+--------------------+
|                bbox|category_id|image_id|               box2d|
+--------------------+-----------+--------+--------------------+
|[505.24, 0.0, 47....|         72|  318219|Box2d(x=505.24, y...|
|[470.68, 0.0, 45....|         72|  318219|Box2d(x=470.68, y...|
|[442.51, 0.0, 43....|         72|  318219|Box2d(x=442.51, y...|
|[380.74, 112.85, ...|         72|  554625|Box2d(x=380.74, y...|
|[339.13, 32.99, 3...|         72|  554625|Box2d(x=339.13, y...|
+--------------------+-----------+--------+--------------------+
>>> anno_df.printSchema()
root
|-- bbox: array (nullable = true)
|    |-- element: double (containsNull = true)
|-- category_id: long (nullable = true)
|-- image_id: long (nullable = true)
|-- box2d: box2d (nullable = true)

rikai.spark.functions.io module

I/O related PySpark UDFs.

rikai.spark.functions.io.copy(source: str, dest: str) str

Copy a file from source to dest

Parameters
  • source (str) – The source URI to copy from

  • dest (str) – The destination uri or the destionation directory. If dest is a URI ends with a “/”, it represents a directory.

Returns

Return the URI of destination.

Return type

str

rikai.spark.functions.vision module

Vision related Spark UDFs.

rikai.spark.functions.vision.crop(img: Image, box: Union[Box2d, List[Box2d]])

Crop image specified by the bounding boxes, and returns the cropped images.

Parameters
  • img (Image) – An image object to be cropped.

  • box (Box2d or List of Box2d) – One bound-2d box or a list of such boxes.

Returns

Return a list of cropped images.

Return type

[Image]

Examples

>>> spark.sql("SELECT crop(img, boxes) as patches FROM detections")
rikai.spark.functions.vision.image_copy(img: Image, uri: str) Image

Copy the image to a new destination, specified by the URI.

Parameters
  • img (Image) – An image object

  • uri (str) – The base directory to copy the image to.

Returns

Return a new image pointed to the new URI

Return type

Image

rikai.spark.functions.vision.numpy_to_image(array: ndarray, uri: str, format: str = None, **kwargs) Image

Convert a numpy array to image, and upload to external storage.

Parameters
Returns

Return a new image pointed to the new URI.

Return type

Image

Example

>>> spark.createDataFrame(..).registerTempTable("df")
>>>
>>> spark.sql("""SELECT numpy_to_image(
...        resize(grayscale(image)),
...        lit('s3://asset')
...    ) AS new_image FROM df""")
rikai.spark.functions.vision.spectrogram_image(video: Union[VideoStream, YouTubeVideo], output_uri: str, segment: Segment = Segment(start_fno=0, end_fno=-1), size: int = 224, max_samples: int = 15000, image_format: str = None, **image_kwargs) Image

Applies ffmpeg filter to generate spectrogram image.

Parameters
  • video (VideoStream or YouTubeVideo) – A video object whose audio track will be converted to spectrogram

  • output_uri (str) – The uri to which the spectrogram image will be written to

  • segment (Segment) – A Segment object, localizing video in time to (start_fno, end_fno)

  • max_samples (Int) – Yield at most this many frames (-1 means no max)

  • size (Int) – Sets resolution of frequency, time spectrogram image.

  • image_format (str, optional) –

    The image format to save as. See supported formats for details.

  • image_kwargs (dict, optional) –

    Optional arguments to pass to PIL.Image.save.

Returns

Return an Image of the audio spectrogram.

Return type

Image

rikai.spark.functions.vision.to_image(image_data: Union[bytes, bytearray, str, Path]) Image

Build an Image from bytes, file-like object, str, or Path.

Parameters

image_data (bytes, bytearray, str, Path) – The resource identifier or bytes of the source image.

Returns

img – An Image from the given embedded data or URI

Return type

Image

Example

>>> df = spark.read.format("image").load("<path-to-data>")
>>>
>>> df.withColumn("new_image", to_image("image.data"))
rikai.spark.functions.vision.video_metadata(video: Union[str, VideoStream]) dict

Return useful video stream metadata like width, height, num_frames, duration, bit_rate, frame_rate, codec, and size about the given video

Parameters

video (str or VideoStream) – The video uri or the Video object

Returns

result – The keys are: width, height, num_frames, duration, bit_rate, frame_rate, codec, size, and _errors

Return type

dict

Notes

The frame_rate is rounded to the nearest whole number of frames per sec

Examples

The following returns the fps rounded to the nearest integer:

import rikai.spark.functions as RF (spark.createDataFrame([(VideoStream(<uri>),)], [‘video’]) .withColumn(‘meta’, RF.video_metadata(‘video’)) .select(‘meta.data.frame_rate’))

rikai.spark.functions.vision.video_to_images(video: Union[VideoStream, YouTubeVideo], output_uri: str, segment: Segment = Segment(start_fno=0, end_fno=-1), sample_rate: int = 1, max_samples: int = 15000, quality: str = 'worst', image_format: str = 'png', **image_kwargs) list

Extract video frames into a list of images.

Parameters
  • video (Video) – An video object, either YouTubeVideo or VideoStream.

  • output_uri (str) – Frames will be written as <output_uri>/<fno>.<img_format>

  • segment (Segment, default Segment(0, -1)) – A Segment object, localizing video in time to (start_fno, end_fno)

  • sample_rate (int, default 1) – Keep 1 out of every sample_rate frames.

  • max_samples (int, default 15000) – Return at most this many frames (-1 means no max)

  • quality (str, default 'worst') – Either ‘worst’ (lowest bitrate) or ‘best’ (highest bitrate) See: https://pythonhosted.org/Pafy/index.html#Pafy.Pafy.getbest

  • image_format (str, optional) –

    The image format to save as. See supported formats for details.

  • image_kwargs (dict, optional) –

    Optional arguments to pass to PIL.Image.save.

  • ------

  • List – Return a list of images from video indexed by frame number.

Module contents

Domain-specific Pyspark UDFs

rikai.spark.functions.init(spark: SparkSession)

Register all rikai UDFs