Source code for odmax.io

# I/O functionality for ODMax
import os
import cv2
from odmax import helpers
from datetime import datetime
import gpxpy
import piexif
from PIL import Image

# high level variables
PATH = os.path.dirname(__file__)
gpx_fmt_fn = os.path.join(PATH, "gpx.fmt")
if os.name == "posix":
    null_output = "/dev/null 2>&1"
else:
    null_output = "nul"

# pil uses specific encoder names such as "jpeg", translate if necessary using the below dict
pil_encoders = {
    "jpg": "jpeg"
}


[docs]def to_pil(array): """ Converts ND-array into PIL object :param array: ND-array with colors (3rd dimension) in RGB order :return: PIL image """ return Image.fromarray(cv2.cvtColor(array, cv2.COLOR_BGR2RGB))
[docs]def open_file(fn): """ Open file for reading by cv2. :param fn: video file (cv2 compatible) :return: cv2 pointer to file """ assert(isinstance(fn, str)), "No valid file provided, should be of type str" assert(os.path.isfile(fn)), f"File {fn} was not found" if isinstance(fn, str): # try to open file with openCV f = cv2.VideoCapture(fn) if not(f.isOpened()): raise(f"Could not recognise file {fn} as a proper video file") return f else: raise TypeError(f"{fn} should be a string pointing to a path")
[docs]def get_frame_number(f, time): """ Get the frame number belonging to the defined time in seconds. :param f: pointer to opened video file :param time: seconds from start of video :return: """ fps = f.get(cv2.CAP_PROP_FPS) frame_count = f.get(cv2.CAP_PROP_FRAME_COUNT) return int(min(frame_count, time * fps))
[docs]def read_frame(f, n): """ Reads frame number n from opened video file f. :param f: pointer to opened video file :param n: frame number :return: img, blob containing frame """ assert isinstance(f, cv2.VideoCapture) assert isinstance(n, int), f"{n} is not an integer" # check if frame is beyond length of movie if n > f.get(cv2.CAP_PROP_FRAME_COUNT): raise ValueError(f"The requested frame number {n} is larger than the available frames {cv2.CAP_PROP_FRAME_COUNT}") # wind to the right frame number f.set(cv2.CAP_PROP_POS_FRAMES, n) # extract this frame success, img = f.read() if success: return img else: raise IOError(f"The requested frame {n} could not be extracted. Perhaps the videofile is damaged.")
[docs]def write_frame(img, fn, encoder="jpg", exif_dict={}): """ Writes a frame to a file or bytestream. If a 6-face cube list is provided, 6 files will be written using "F", "R", "B", "L", "U", "D" as suffixes for "front", "right", "back", "left", "up" and "down". :param img: ndarray or list of 6 ndarrays of size [H, W, 3] :param fn: path or io.BytesIO object to write frame to :param encoder: PIL compatible encoder to use for writing :param exif_dict: dictionary with EXIF tag groups and tags within groups (e.g. "GPS") :return: """ # determine PIL encoder if encoder in pil_encoders: # translate p_encoder = pil_encoders[encoder] else: p_encoder = encoder try: exif = piexif.dump(exif_dict) except: raise ValueError(f"EXIF dict is invalid {exif_dict}") # now save with the intended metadata and filename to_pil(img).save(fn, p_encoder.lower(), exif=exif)
def get_exif(fn, fn_out): """ Reads the exif tag from a video and writes it to a file. :param fn: video filename :param fn_out: text file to write exif tag to :return: """ if not(os.path.isfile(fn)): raise IOError(f"File {fn} does not exist") if not(os.path.isdir(os.path.split(fn_out)[0])): raise IOError(f"Path {os.path.split(fn_out)[0]} is not available") # FIXME: complete this code raise NotImplementedError("Not implemented yet")
[docs]def get_gpx(fn): """ Reads the gpx track from a video and writes it to a file. :param fn: video filename :return: parsed gpx data """ if not(os.path.isfile(fn)): raise IOError(f"File {fn} does not exist") return gpxpy.parse(helpers.exiftool('-ee', '-p', f"{gpx_fmt_fn}", fn))