Source code for ccsdspy.converters

"""This class hold the implementation of the converter system, which applies 
post-process to decoded packet fields. This post-processing includes applying
linear/polynomial calibration curves, dictionary replacement, and time parsing.
"""

from datetime import datetime, timedelta

import numpy as np

__all__ = [
    "EnumConverterMissingKey",
    "Converter",
    "PolyConverter",
    "LinearConverter",
    "EnumConverter",
    "DatetimeConverter",
    "StringifyBytesConverter",
]


[docs] class EnumConverterMissingKey(RuntimeError): """During conversion a value was encountered which did not have a corresponding key in the replacement dictionary. """
[docs] class Converter: """Base class for all converter objects. This class is extended to create converters, and users may extend this class to write their own custom converters. To write a converter, one must create a subclass and override either the method `convert(*field_arrays)`. This method implements the conversion for an entire sequence of decoded packet field values in a single call. """ def __init__(self): raise NotImplementedError("This is a base class not meant to be instantiated directly")
[docs] def convert(self, field_array): """Convert a sequence of decoded packet field values. Parameters ---------- field_array : NumPy array decoded packet field values, must have at least one dimension Returns ------- converted_field_array : NumPy array converted form of the decoded packet field values """ raise NotImplementedError("This method must be overridden by a subclass")
[docs] class PolyConverter(Converter): """Post-processing conversion which applies calibration using a series of coefficients ordered from highest power to intercept. """ def __init__(self, coeffs): """Instantiate a PolyConverter object Parameters ---------- coeffs : list of float Polynomial coefficients ordered from highest power to intercept. """ self._coeffs = coeffs
[docs] def convert(self, field_array): """Apply the polynomial conversion. Parameters ---------- field_array : NumPy array decoded packet field values, must have at least one dimension Returns ------- converted : NumPy array converted form of the decoded packet field values """ converted = np.zeros(field_array.shape, dtype=np.float64) for power, coeff in enumerate(reversed(self._coeffs)): converted += coeff * field_array**power return converted
[docs] class LinearConverter(PolyConverter): """Post-processing conversion which applies a linear (y=mx+b) transformation. """ def __init__(self, slope, intercept): """Instantiate a LinearConverter""" super().__init__([slope, intercept])
[docs] class EnumConverter(Converter): """Post-processing conversion for applying dictionary replacement of integers to strings. If during conversion a value is encountered which does not have a corresponding key in the replacement dictionary, then a `:py:class:`~ccsdspy.converters.EnumConverterMissingKey` exception will be thrown. """ def __init__(self, replace_dict): """Initialize a EnumConverter. Parameters ---------- replace_dict : dict of int to string Replacement dictionary mapping integer values to string values Raises ------ TypeError Either one of the keys of the replacement dictionary is not an integer, or one of the values is not a string. """ self._replace_dict = replace_dict for key, value in replace_dict.items(): if not isinstance(key, int): raise TypeError( f"Found key in EnumConverter replacement dictionary that is " f"not an integer: {repr(key)}" ) if not isinstance(value, str): raise TypeError( f"Found value in EnumConverter replacement dictionary that is " f"not a string: {repr(value)}" )
[docs] def convert(self, field_array): """Apply the enum replacement conversion. Parameters ---------- field_array : NumPy array decoded packet field values, must have at least one dimension Returns ------- converted : NumPy array converted form of the decoded packet field values """ converted = np.zeros(field_array.shape, dtype=object) converted_mask = np.zeros(field_array.shape, dtype=bool) for key, value in self._replace_dict.items(): converted[field_array == key] = value converted_mask[field_array == key] = True if not converted_mask.all(): missing_keys = field_array[~converted_mask].tolist() raise EnumConverterMissingKey( f"The following were encountered which did not have " f"corresponding keys in the replacement dictionary: " f"{repr(missing_keys)}" ) return converted
[docs] class DatetimeConverter(Converter): """Post-processing conversion for converting timestamp fields to datetime instances, computed using offset(s) from a reference time. This class supports the offsets stored in multiple input fields, for example where one field is a coarse time (e.g. seconds) and a second field is a fine time (e.g. nanoseconds). To use multiple input fields, pass a tuple of input field names when this converter is added to the packet. """ _VALID_UNITS = ( "days", "hours", "minutes", "seconds", "milliseconds", "microseconds", "nanoseconds", ) _MILLISECONDS_PER_SECOND = 1_000 _MICROSECONDS_PER_SECOND = 1_000_000 _NANOSECONDS_PER_SECOND = 1_000_000_000 def __init__(self, since, units): """Initialize a DatetimeConverter Parameters ---------- since : datetime Reference datetime. The time stored in the field(s) is considered an offset to this reference. If this has timezone information attached to it, so will the converted datetimes. units : str or tuple of str Units string of tuples of units strings for the offset of each input field. Valid units are "days", "hours", "minutes", "seconds", "milliseconds", "microseconds", and "nanoseconds". Raises ------ TypeError One of the input arguments is not of the correct type ValueError One or more of the units are invalid """ if not isinstance(since, datetime): raise TypeError("Argument 'since' must be an instance of datetime") if isinstance(units, str): units_tuple = (units,) elif isinstance(units, tuple): units_tuple = units else: raise TypeError("Argument 'units' must be either a string or tuple") if not (set(units_tuple) <= set(self._VALID_UNITS)): raise ValueError("One or more units are invalid") self._since = since self._units = units_tuple
[docs] def convert(self, *field_arrays): """Apply the datetime conversion. Parameters ---------- field_arrays : list of NumPy array list of decoded packet field values, each must have at least one dimension Returns ------- converted : NumPy array of object (holding datetimes) converted form of the decoded packet field values Raises ------ ValueError Too many or too few units were provided, as compared to the input field arrays sent. """ assert len(field_arrays) > 0, "Must have at least one input field" converted = [] for field_values in zip(*field_arrays): converted_time = self._since for unit, offset_raw in zip(self._units, field_values): offset_raw = float(offset_raw) if unit == "days": converted_time += timedelta(days=offset_raw) elif unit == "hours": converted_time += timedelta(hours=offset_raw) elif unit == "minutes": converted_time += timedelta(minutes=offset_raw) elif unit == "seconds": converted_time += timedelta(seconds=offset_raw) elif unit == "milliseconds": converted_time += timedelta(seconds=offset_raw / self._MILLISECONDS_PER_SECOND) elif unit == "microseconds": converted_time += timedelta(seconds=offset_raw / self._MICROSECONDS_PER_SECOND) elif unit == "nanoseconds": converted_time += timedelta(seconds=offset_raw / self._NANOSECONDS_PER_SECOND) converted.append(converted_time) converted = np.array(converted, dtype=object) return converted
[docs] class StringifyBytesConverter(Converter): """Post-processing conversion which converts byte arrays or multi-byte numbers to strings in numeric representations such as binary, hexadecimal, or octal. To convert individual bytes, the input field should be defined as a `~ccsdspy.PacketArray` constructed with `data_type="uint"` and `bit_length=8`. Otherwise, each element is converted as a single entity. If the field is an array, the shape of the array is retained. The strings generated are not padded to a fixed length. The converted strings contain prefixes such as `0b` (binary), `0x` (hex), or `0o` (octal). If the number is signed and negative, the prefixes change to `-0b` (binary), `-0x` (hex), or `-0o` (octal). """ def __init__(self, format="hex"): """Instantiate a StringifyBytesConverter object Parameters ---------- format : {"bin", "hex", "oct"} Format used to encode the bytes in a string. """ if format not in ("bin", "hex", "oct"): raise ValueError( "The format= keyword passed to StringifyBytesConverter " f"must be either 'bin', 'hex', or 'oct'. Got {repr(format)}" ) self._format = format def _stringify_number(self, num, nbytes): """Internal helper method to convert a number to a string. Parameters ---------- number : int A single number to convert to string Returns -------- as_string : the byte converted to a string using the format specified when this object was created. """ if self._format == "bin": return bin(num) elif self._format == "hex": return hex(num) else: return oct(num)
[docs] def convert(self, field_array): """Apply the conversion. Parameters ---------- field_array : NumPy array decoded packet field values, must have at least two dimensions Returns ------- converted : NumPy array converted form of the converted packet field values """ # field_arrays may either be a 1-D array, or an N-D array where N>1 # (this includes jagged arrays where the outer array is of # dtype=object). These are implemented separately. ndims = len(field_array.shape) if ndims == 1 and field_array.dtype != object: converted = [] for num in field_array: as_string = self._stringify_number(num, field_array.itemsize) converted.append(as_string) else: converted = [] for i in range(field_array.shape[0]): cur_array_flat = field_array[i].flatten() n_items = cur_array_flat.shape[0] cur_shape = field_array[i].shape # Loop over elements, converting individually curr_array_strings = [] for element in cur_array_flat: as_string = self._stringify_number(element, cur_array_flat.itemsize) curr_array_strings.append(as_string) # Put back into original array shape curr_array_strings = np.array(curr_array_strings, dtype=object).reshape(cur_shape) converted.append(curr_array_strings) converted = np.array(converted, dtype=object) return converted