Source code for pycovjson.write


from pycovjson.model import *
from pycovjson.read_netcdf import NetCDFReader as Reader
import time
import json
import uuid


[docs]class Writer(object): """Writer class""" def __init__(self, output_name: object, dataset_path: object, vars_to_write: object, tiled=False, tile_shape=[]) -> object: """ Writer class constructor :param output_name: Name of output file :param dataset_path: Path to dataset :param vars_to_write: List of variables to write :param tiled: Boolean value (default False) :param tile_shape: List containing shape of tiles """ self.output_name = output_name self.dataset_path = dataset_path self.tile_shape = tile_shape self.vars_to_write = vars_to_write self.urlTemplate = 'localhost:8080/{t}.covjson' self.tiled = tiled if tiled: self.range_type = 'TiledNdArray' else: self.range_type = 'NdArray' self.dataset_path = dataset_path self.Reader = Reader(dataset_path) self.axis_dict = self.Reader.get_axes() self.axis_list = list(self.axis_dict.keys()) self.ref_list = [] if 't' in self.axis_list and 'z' in self.axis_list: self.ref_list.append(TemporalReferenceSystem()) self.ref_list.append(SpatialReferenceSystem3d()) if 't' in self.axis_list and 'z' not in self.axis_list: self.ref_list.append(TemporalReferenceSystem()) self.ref_list.append(SpatialReferenceSystem2d()) elif 't' not in self.axis_list and 'z' not in self.axis_list: self.ref_list.append(SpatialReferenceSystem2d())
[docs] def write(self): """ Writes Coverage object to disk """ coverage = self._construct_coverage() if self.tiled: self.save_covjson_tiled(coverage, self.output_name) else: self._save_covjson(coverage, self.output_name) pass
def _construct_coverage(self): """ Constructs Coverage object from constituent parts :return: coverage object """ coverage = Coverage(self._construct_domain(), self._construct_range( ), self._construct_params(), self._construct_refs()).to_dict() return coverage def _construct_domain(self): """ Constructs Domain object, populates with values :return: domain object """ domain_type = 'Grid' x_values = self.Reader.get_x().flatten().tolist() y_values = self.Reader.get_y().flatten().tolist() t_values = [] z_values = [] if 't' in self.axis_list: t_values = self.Reader.get_t() if 'z' in self.axis_list: z_values = self.Reader.get_z().flatten().tolist() domain = Domain(domain_type, x_values, y_values, z_values, t_values) return domain def _construct_params(self): """ Construct parameter object from constituent parts :return: Parameter object """ for variable in self.vars_to_write: description = self.Reader.get_std_name(variable) unit = self.Reader.get_units(variable) symbol = self.Reader.dataset[variable].units label = self.Reader.dataset[variable].long_name params = Parameter(description=description, variable_name=variable, symbol=symbol, unit=unit, observed_property=label) return params def _construct_refs(self): """ Construct reference object :return: refs """ refs = Reference(self.ref_list) return refs def _construct_range(self): """ Construct range object :return: range """ # TODO iteration through variable list variable = self.vars_to_write[0] axis_names = list(map(str.lower, list(self.Reader.get_axis(variable)))) if self.tiled: # print(self.tile_shape) tile_set_obj = TileSet(self.tile_shape, self.urlTemplate) variable_type = self.Reader.get_type(variable) variable_shape = self.Reader.get_shape(variable) print('Variable shape:', variable_shape) count = 0 for tile in tile_set_obj.get_tiles(self.tile_shape, self.Reader.dataset[variable].values): count += 1 range = {'ranges': Range('NdArray', data_type=variable_type, axes=tile[ 1], shape=variable_shape, values=tile[0].flatten().tolist()).to_dict()} self.save_covjson_range(range, str(count) + '.json') url_template = tile_set_obj.generate_url_template( axis_names=axis_names) # tileset = TileSet(variable_shape, url_template).create_tileset() tileset = [{'tileShape': [None, 173, 301], 'urlTemplate': 'http://localhost:8080/{t}.covjson'}] range = Range('TiledNdArray', data_type=variable_type, axes=axis_names, tile_sets=tileset, shape=variable_shape) return range else: shape = self.Reader.get_shape(variable) values = self.Reader.get_values(variable).flatten().tolist() data_type = self.Reader.get_type(variable) axes = self.Reader.get_axis(variable) range = Range(range_type='NdArray', data_type=data_type, values=values, shape=shape, variable_name=variable, axes=axis_names) return range # Adapted from # https://github.com/the-iea/ecem/blob/master/preprocess/ecem/util.py - # letmaik def _save_json(self, obj, path, **kw): """Save json object to disk""" with open(path, 'w') as fp: print("Converting....") start = time.clock() jsonstr = json.dumps(obj, fp, cls=CustomEncoder, **kw) fp.write(jsonstr) stop = time.clock() print("Completed in: ", (stop - start), "seconds.") def _save_covjson(self, obj, path): """ Skip indentation of certain fields to make JSON more compact but still human readable :param obj: :param path: """ for axis in obj['domain']['axes'].values(): self.compact(axis, 'values') for ref in obj['domain']['referencing']: self.no_indent(ref, 'coordinates') for range in obj['ranges'].values(): self.no_indent(range, 'axisNames', 'shape') self.compact(range, 'values') self.save_json(obj, path, indent=2)
[docs] def save_covjson_tiled(self, obj, path): """ Skip indentation of certain fields to make JSON more compact but still human readable :param obj: :param path: """ for axis in obj['domain']['axes'].values(): self.compact(axis, 'values') for ref in obj['domain']['referencing']: self.no_indent(ref, 'coordinates') self.save_json(obj, path, indent=2)
def save_json(self, obj, path, **kw): with open(path, 'w') as fp: print("Converting....") start = time.clock() jsonstr = json.dumps(obj, fp, cls=CustomEncoder, **kw) fp.write(jsonstr) stop = time.clock() print("Completed in: ", (stop - start), "seconds.") def save_covjson_range(self, obj, path): for range in obj['ranges'].values(): self.no_indent(range, 'axisNames', 'shape') self.compact(range, 'values') self.save_json(obj, path, indent=2) def compact(self, obj, *names): for name in names: obj[name] = Custom(obj[name], separators=(',', ':')) def no_indent(self, obj, *names): for name in names: obj[name] = Custom(obj[name])
# From http://stackoverflow.com/a/25935321 class Custom(object): def __init__(self, value, **custom_args): self.value = value self.custom_args = custom_args class CustomEncoder(json.JSONEncoder): """Custom Json Encoder class - Allows Json to be saved using custom format (no_indent, compact)""" def __init__(self, *args, **kwargs): super(CustomEncoder, self).__init__(*args, **kwargs) self._replacement_map = {} def default(self, o): if isinstance(o, Custom): key = uuid.uuid4().hex self._replacement_map[key] = json.dumps(o.value, **o.custom_args) return "@@%s@@" % (key,) else: return super(CustomEncoder, self).default(o) def encode(self, o): result = super(CustomEncoder, self).encode(o) for k, v in self._replacement_map.items(): result = result.replace('"@@%s@@"' % (k,), v) return result