Source code for cinema_python.adaptors.paraview.cinemareader

r"""cinemareader is used by vtkCinemaDatabaseReader.
"""
from __future__ import print_function

import cinema_python.database.file_store as file_store
import cinema_python.images.querymaker_specb as CT
from cinema_python.images import querymaker
from paraview import vtk
from paraview.vtk.numpy_interface import dataset_adapter as dsa
from cinema_python.images.camera_utils import convert_pose_to_camera

from ast import literal_eval


[docs]def layer2img(layer): """converts a layer to vtkImageData.""" if not layer: return None img = vtk.vtkImageData() dims = [0, 0, 0] if layer.hasValueArray(): nvalues = layer.getValueArray() # now, the numpy array's shape matches the 2D image dims[0] = nvalues.shape[1] dims[1] = nvalues.shape[0] img.SetDimensions(dims[0], dims[1], 1) nvalues = nvalues.reshape(dims[0]*dims[1]) vvalues = dsa.numpyTovtkDataArray(nvalues, "Values") img.GetPointData().SetScalars(vvalues) elif layer.hasColorArray(): ncolors = layer.getColorArray() # now, the numpy array's shape matches the 2D image dims[0] = ncolors.shape[1] dims[1] = ncolors.shape[0] img.SetDimensions(dims[0], dims[1], 1) ncolors = ncolors.reshape((dims[0]*dims[1], -1)) vcolors = dsa.numpyTovtkDataArray(ncolors, "Colors") img.GetPointData().SetScalars(vcolors) ndepth = layer.getDepth() if ndepth is None: raise RuntimeError("Missing 'depth'") ndepth = ndepth.reshape(dims[0]*dims[1]) vdepth = dsa.numpyTovtkDataArray(ndepth, "Depth") img.GetPointData().AddArray(vdepth) nluminance = layer.getLuminance() if nluminance is not None: nluminance = nluminance.reshape((dims[0]*dims[1], -1)) vluminance = dsa.numpyTovtkDataArray(nluminance, "Luminance") img.GetPointData().AddArray(vluminance) # from paraview.vtk.vtkIOLegacy import vtkDataSetWriter # writer = vtkDataSetWriter() # writer.SetInputDataObject(img) # writer.SetFileName("/tmp/layer.vtk") # writer.Update() # del writer return img
[docs]class FileStoreAPI(object): def __init__(self, fs, qt): self.fs = fs self.qt = qt self.qt.setStore(self.fs)
[docs] def get_objects(self): """returns a list of viewable objects in the scene""" return []
[docs] def get_parents(self, objectname): """returns a list of names of the objects that the given object's parents""" return []
[docs] def get_visibility(self, object): """returns an objects default visbility""" return False
[docs] def get_timesteps(self): """returns a list of timesteps available in the store""" return []
[docs] def get_control_parameters(self, objectname): """returns control paramater names for the given object""" return []
[docs] def get_control_values(self, controlparametername): """returns sorted list of available values for a control parameter""" info = self.fs.get_parameter(controlparametername) values = info["values"] values.sort() return values
[docs] def get_control_values_as_strings(self, controlparametername): vals = self.get_control_values(controlparametername) return [str(x) for x in vals]
[docs] def get_field_name(self, objectname): """return the field name associated with an object""" return None
[docs] def get_field_values(self, objectname, valuetype): """returns fields for a specific object""" return []
[docs] def get_field_valuerange(self, objectname, valueanme): return []
[docs] def translate_query(self, q): return []
[docs] def get_spec(self): """returns current database spec """ return ""
[docs]class FileStoreSpecA(FileStoreAPI): def __init__(self, fs): FileStoreAPI.__init__(self, fs, querymaker.QueryMaker_SpecA()) self.__cameras = []
[docs] def get_objects(self): """returns a list of viewable objects in the scene""" objects = ["Cinema"] return objects
[docs] def get_parents(self, objectname): if objectname == "Cinema": return [] return ["Cinema"]
[docs] def get_visibility(self, objectname): """returns an objects default visbility""" return True
[docs] def get_control_parameters(self, objectname): """returns all parameters for this database""" params = [] for param in self.fs.parameter_list.keys(): params.append(param) return params
[docs] def get_cameras(self, ts): """returns a list of camera positions timestep is ignored.""" # use cached cameras, if available. if 0 in self.__cameras: return self.__cameras # if not cached, read and cache. md = self.fs.metadata try: eye = md["camera_eye"][0][:3] at = md["camera_at"][0][:3] up = md["camera_up"][0][:3] nearfar = md["camera_nearfar"][0][:2] angle = md["camera_angle"][0] except KeyError: print("Warning: Cannot initialiaze cameras." + " Interaction may not work correctly") return self.__cameras phis = [0] if "phi" in self.fs.parameter_list.keys(): phis = self.fs.get_parameter("phi")["values"] thetas = [0] if "theta" in self.fs.parameter_list.keys(): thetas = self.fs.get_parameter("theta")["values"] camera = vtk.vtkCamera() camera.SetPosition(eye) camera.SetFocalPoint(at) camera.SetViewUp(up) camera.SetViewAngle(angle) camera.SetClippingRange(nearfar[0], nearfar[1]) for phi in phis: for theta in thetas: c = vtk.vtkCamera() c.DeepCopy(camera) c.Azimuth(phi) c.Elevation(theta) c.OrthogonalizeViewUp() self.__cameras.append(c) return self.__cameras
[docs] def translate_query(self, q): if type(q) == str: q = literal_eval(q) assert type(q) == dict # be sure all params have a value in the new query query = {} for param in self.fs.parameter_list.keys(): default = [self.fs.get_parameter(param)["default"]] query[param] = q.get(param, default) layers = self.qt.translateQuery(query) # convert layers to image data layer = layers[0] img = vtk.vtkImageData() dims = [0, 0, 0] if layer.hasColorArray(): ncolors = layer.getColorArray() # now, the numpy array's shape matches the 2D image dims[0] = ncolors.shape[1] dims[1] = ncolors.shape[0] img.SetDimensions(dims[0], dims[1], 1) ncolors = ncolors.reshape((dims[0]*dims[1], -1)) vcolors = dsa.numpyTovtkDataArray(ncolors, "Colors") img.GetPointData().SetScalars(vcolors) nluminance = layer.getLuminance() if nluminance is not None: nluminance = nluminance.reshape((dims[0]*dims[1], -1)) vluminance = dsa.numpyTovtkDataArray(nluminance, "Luminance") img.GetPointData().AddArray(vluminance) return [img]
[docs] def get_spec(self): return "specA"
[docs]class FileStoreSpecB(FileStoreAPI): def __init__(self, fs): FileStoreAPI.__init__(self, fs, CT.QueryMaker_SpecB()) self.__cameras = {}
[docs] def get_objects(self): # to make our lives much easier, we returns the objects in a sorted # order from upstream to downstream in the pipeline. objs = self.fs.parameter_list["vis"]["values"] retval = [] all_parents = self.get_all_parents() def _add_object(me, obj, lst): if obj in lst: return parents = all_parents.get(obj, []) for p in parents: _add_object(me, p, lst) lst.append(obj) for o in objs: _add_object(self, o, retval) return retval
[docs] def get_all_parents(self): """builds a map with key is object name, and value is a list of objectname for its parents.""" md = self.fs.metadata pipeline = md.get("pipeline", []) idmap = {} for item in pipeline: idmap[item["id"]] = item["name"] parents = {} for item in pipeline: itemparents = [] for pid in item.get("parents", []): if pid in idmap: itemparents.append(idmap[pid]) parents[item["name"]] = itemparents return parents
[docs] def get_parents(self, objectname): ps = self.get_all_parents() return ps.get(objectname, [])
[docs] def get_visibility(self, objectname): md = self.fs.metadata pipeline = md.get("pipeline", []) for item in pipeline: if item["name"] == objectname: return item.get("visibility", 0) == 1 return False
[docs] def get_control_parameters(self, objectname): (tmp0, tmp1, controls) = self.fs.parameters_for_object(objectname) # I am going to prune any control parameters that are not the parameter # itself. This removes control parameters coming from up the pipeline. if objectname in controls: return [objectname] else: return []
[docs] def get_field_name(self, objectname): (tmp0, field, tmp1) = self.fs.parameters_for_object(objectname) return field
[docs] def get_field_values(self, objectname, valuetype): """returns fields for a specific object""" (tmp0, field, tmp1) = self.fs.parameters_for_object(objectname) if not field: return [] retval = [] p = self.fs.get_parameter(field) for (n, t) in zip(p["values"], p["types"]): if t == valuetype: retval.append(n) return retval
[docs] def get_field_valuerange(self, objectname, valuename): (tmp0, field, tmp1) = self.fs.parameters_for_object(objectname) if not field: return [] p = self.fs.get_parameter(field) if p and valuename in p["valueRanges"]: return list(p["valueRanges"][valuename]) return []
[docs] def get_timesteps(self): """returns a list of timesteps available in the store""" if "time" in self.fs.parameter_list: return list(self.get_control_values("time")) return []
[docs] def get_cameras(self, ts): """returns a list of camera positions for a specific timestep. timestep is ignored if camera is not affected by time """ try: tsindex = self.get_timesteps().index(ts) except ValueError: # no timesteps or invalid timestep. assert len(self.get_timesteps()) == 0 tsindex = 0 # use cached cameras, if available. if tsindex in self.__cameras: return self.__cameras[tsindex] # if not cached, read and cache. cameras = [] md = self.fs.metadata camera_model = md["camera_model"] if ((camera_model == "azimuth-elevation-roll") or (camera_model == "yaw-pitch-roll")): eye = md["camera_eye"][tsindex][:3] at = md["camera_at"][tsindex][:3] up = md["camera_up"][tsindex][:3] nearfar = md["camera_nearfar"][tsindex][:2] angle = md["camera_angle"][tsindex] poses = self.fs.get_parameter("pose")["values"] for pose in poses: neweye, newat, newup = convert_pose_to_camera( eye, at, up, pose, camera_model) camera = vtk.vtkCamera() camera.SetPosition(neweye) camera.SetFocalPoint(newat) camera.SetViewUp(newup) camera.SetViewAngle(angle) camera.SetClippingRange(nearfar[0], nearfar[1]) cameras.append(camera) self.__cameras[tsindex] = cameras return cameras
[docs] def translate_query(self, q): if type(q) == str: q = literal_eval(q) assert type(q) == dict # be sure all params have a value in the new query query = {} for param in self.fs.parameter_list.keys(): default = [self.fs.get_parameter(param)["default"]] query[param] = q.get(param, default) # update pose poseIndex = query.get("pose", -1) if type(poseIndex) == int: if poseIndex != -1: query["pose"] = self.fs.get_parameter( "pose")["values"][poseIndex] else: return [] # print ("query", q) layers = self.qt.translateQuery(query) # convert layers to image data return [layer2img(l) for l in layers]
[docs] def get_spec(self): return "specC"
__warning_count = {}
[docs]def load(filename): global __warning_count fs = file_store.FileStore(filename) fs.load() if fs.metadata.get("type") == "parametric-image-stack": return FileStoreSpecA(fs) # check if we support this cinema database. if fs.metadata.get("type") != "composite-image-stack": print("Only 'composite-image-stack' and 'parametric-image-stack'" + " file stores are supported.") raise RuntimeError( "Only 'composite-image-stack' and 'parametric-image-stack'" + " file stores are supported.") if fs.metadata.get("camera_model") != "azimuth-elevation-roll": print("Only 'azimuth-elevation-roll' cameras are supported.") raise RuntimeError( "Only 'azimuth-elevation-roll' cameras are supported.") if fs.metadata.get("value_mode") != 2: if filename not in __warning_count: __warning_count[filename] = True print("Warning: the cinema store '" + filename.strip() + "', encodes data values as RGB arrays which is known to " + "have issues in current implementation. Scalar" + "coloring may produce unexpected results.") return FileStoreSpecB(fs)