rikai.spark.functions package¶
Submodules¶
rikai.spark.functions.geometry module¶
Geometry related PySpark UDFs
- rikai.spark.functions.geometry.area(box: Box2d) float ¶
A UDF to calculate the area of a bounding box.
Example
>>> spark.sql("SELECT * FROM dataset WHERE area(prediction.box) > 15")
- rikai.spark.functions.geometry.box2d(coords) Box2d ¶
Build a Box2d from
[xmin,ymin,xmax,ymax]
array.
- rikai.spark.functions.geometry.box2d_from_center(coords) Box2d ¶
Build a Box2d from a center-point based coordinate array:
[center_x, center_y, width, height]
.
- rikai.spark.functions.geometry.box2d_from_top_left(coords) Box2d ¶
Build
Box2d
from a top-left based coordinate array:[x0, y0, width, height]
Example
Coco dataset is one public dataset that use top-left coordinates
>>> #! pyspark --packages ai.eto:rikai_2.1:0.0.1 >>> import json >>> from rikai.spark.functions.geometry import box2d_from_top_left >>> >>> with open("coco_sample/annotations/train_sample.json") as fobj: ... coco = json.load(fobj) >>> anno_df = ( ... spark ... .createDataFrame(coco["annotations"][:5]) ... .withColumn("box2d", box2d_from_top_left("bbox")) ... ) >>> anno_df.show() +--------------------+-----------+--------+--------------------+ | bbox|category_id|image_id| box2d| +--------------------+-----------+--------+--------------------+ |[505.24, 0.0, 47....| 72| 318219|Box2d(x=505.24, y...| |[470.68, 0.0, 45....| 72| 318219|Box2d(x=470.68, y...| |[442.51, 0.0, 43....| 72| 318219|Box2d(x=442.51, y...| |[380.74, 112.85, ...| 72| 554625|Box2d(x=380.74, y...| |[339.13, 32.99, 3...| 72| 554625|Box2d(x=339.13, y...| +--------------------+-----------+--------+--------------------+ >>> anno_df.printSchema() root |-- bbox: array (nullable = true) | |-- element: double (containsNull = true) |-- category_id: long (nullable = true) |-- image_id: long (nullable = true) |-- box2d: box2d (nullable = true)
rikai.spark.functions.io module¶
I/O related PySpark UDFs.
rikai.spark.functions.vision module¶
Vision related Spark UDFs.
- rikai.spark.functions.vision.crop(img: Image, box: Union[Box2d, List[Box2d]])¶
Crop image specified by the bounding boxes, and returns the cropped images.
- Parameters
- Returns
Return a list of cropped images.
- Return type
[Image]
Examples
>>> spark.sql("SELECT crop(img, boxes) as patches FROM detections")
- rikai.spark.functions.vision.image_copy(img: Image, uri: str) Image ¶
Copy the image to a new destination, specified by the URI.
- rikai.spark.functions.vision.numpy_to_image(array: ndarray, uri: str, format: str = None, **kwargs) Image ¶
Convert a numpy array to image, and upload to external storage.
- Parameters
array (
numpy.ndarray
) – Image data.uri (str) – The base directory to copy the image to.
format (str, optional) – The image format to save as. See supported formats for details.
kwargs (dict, optional) – Optional arguments to pass to PIL.Image.save.
- Returns
Return a new image pointed to the new URI.
- Return type
Example
>>> spark.createDataFrame(..).registerTempTable("df") >>> >>> spark.sql("""SELECT numpy_to_image( ... resize(grayscale(image)), ... lit('s3://asset') ... ) AS new_image FROM df""")
- rikai.spark.functions.vision.spectrogram_image(video: Union[VideoStream, YouTubeVideo], output_uri: str, segment: Segment = Segment(start_fno=0, end_fno=-1), size: int = 224, max_samples: int = 15000, image_format: str = None, **image_kwargs) Image ¶
Applies ffmpeg filter to generate spectrogram image.
- Parameters
video (VideoStream or YouTubeVideo) – A video object whose audio track will be converted to spectrogram
output_uri (str) – The uri to which the spectrogram image will be written to
segment (Segment) – A Segment object, localizing video in time to (start_fno, end_fno)
max_samples (Int) – Yield at most this many frames (-1 means no max)
size (Int) – Sets resolution of frequency, time spectrogram image.
image_format (str, optional) –
The image format to save as. See supported formats for details.
image_kwargs (dict, optional) –
Optional arguments to pass to PIL.Image.save.
- Returns
Return an Image of the audio spectrogram.
- Return type
- rikai.spark.functions.vision.to_image(image_data: Union[bytes, bytearray, str, Path]) Image ¶
Build an
Image
from bytes, file-like object, str, orPath
.- Parameters
image_data (bytes, bytearray, str, Path) – The resource identifier or bytes of the source image.
- Returns
img – An Image from the given embedded data or URI
- Return type
Example
>>> df = spark.read.format("image").load("<path-to-data>") >>> >>> df.withColumn("new_image", to_image("image.data"))
- rikai.spark.functions.vision.video_metadata(video: Union[str, VideoStream]) dict ¶
Return useful video stream metadata like width, height, num_frames, duration, bit_rate, frame_rate, codec, and size about the given video
- Parameters
video (str or VideoStream) – The video uri or the Video object
- Returns
result – The keys are: width, height, num_frames, duration, bit_rate, frame_rate, codec, size, and _errors
- Return type
Notes
The frame_rate is rounded to the nearest whole number of frames per sec
Examples
The following returns the fps rounded to the nearest integer:
import rikai.spark.functions as RF (spark.createDataFrame([(VideoStream(<uri>),)], [‘video’]) .withColumn(‘meta’, RF.video_metadata(‘video’)) .select(‘meta.data.frame_rate’))
- rikai.spark.functions.vision.video_to_images(video: Union[VideoStream, YouTubeVideo], output_uri: str, segment: Segment = Segment(start_fno=0, end_fno=-1), sample_rate: int = 1, max_samples: int = 15000, quality: str = 'worst', image_format: str = 'png', **image_kwargs) list ¶
Extract video frames into a list of images.
- Parameters
video (Video) – An video object, either YouTubeVideo or VideoStream.
output_uri (str) – Frames will be written as <output_uri>/<fno>.<img_format>
segment (Segment, default Segment(0, -1)) – A Segment object, localizing video in time to (start_fno, end_fno)
sample_rate (int, default 1) – Keep 1 out of every sample_rate frames.
max_samples (int, default 15000) – Return at most this many frames (-1 means no max)
quality (str, default 'worst') – Either ‘worst’ (lowest bitrate) or ‘best’ (highest bitrate) See: https://pythonhosted.org/Pafy/index.html#Pafy.Pafy.getbest
image_format (str, optional) –
The image format to save as. See supported formats for details.
image_kwargs (dict, optional) –
Optional arguments to pass to PIL.Image.save.
------ –
List – Return a list of images from video indexed by frame number.
Module contents¶
Domain-specific Pyspark UDFs
- rikai.spark.functions.init(spark: SparkSession)¶
Register all rikai UDFs