Source code for enigma2_http_api.utils

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
.. seealso::

    * http://dreambox.de/board/index.php?thread/13534-satellite-position-id-need-to-know-what-western-ones-mean-for-my-plugin-englisch/&s=3f1817729cb55399e2a345953329ce54e8a1a150&postID=89843
    * https://www.linuxtv.org/pipermail/linux-dvb/2005-June/002618.html
    * http://radiovibrations.com/dreambox/namespace.htm
    * http://www.dreambox-tools.info/print.php?threadid=1908&page=1&sid=1e12a523fe12d2073918670bff46ba23
    * https://wiki.neutrino-hd.de/wiki/Enigma:Services:Formatbeschreibung
    * http://radiovibrations.com/dreambox/services.htm

"""
import os
import datetime
import hashlib
import re
import codecs
import sys

# https://wiki.neutrino-hd.de/wiki/Enigma:Services:Formatbeschreibung
# Dezimalwert: 1=TV, 2=Radio, 4=NVod, andere=Daten

SERVICE_TYPE_TV = 0x01
SERVICE_TYPE_RADIO = 0x02
SERVICE_TYPE_SD4 = 0x10
SERVICE_TYPE_HDTV = 0x19
SERVICE_TYPE_UHD = 0x1f
SERVICE_TYPE_OPT = 0xd3

# type 1 = digital television service
# type 2 = digital radio sound service
# type 4 = nvod reference service (NYI)
# type 10 = advanced codec digital radio sound service
# type 17 = MPEG-2 HD digital television service
# type 22 = advanced codec SD digital television
# type 24 = advanced codec SD NVOD reference service (NYI)
# type 25 = advanced codec HD digital television
# type 27 = advanced codec HD NVOD reference service (NYI)


SERVICE_TYPE = {
    'TV': SERVICE_TYPE_TV,
    'HDTV': SERVICE_TYPE_HDTV,
    'RADIO': SERVICE_TYPE_RADIO,
    'UHD': SERVICE_TYPE_UHD,
    'SD4': SERVICE_TYPE_SD4,
    'OPT': SERVICE_TYPE_OPT,
}

SERVICE_TYPE_LOOKUP = {v: k for k, v in SERVICE_TYPE.iteritems()}

NS_DVB_S_ASTRA = 192 << 16  # 0x00c00000
NS_DVB_S_HOTBIRD = 130 << 16  # 0x00820000

#: Namespace - DVB-C services
NS_DVB_C = 0xffff0000
NS_DVB_C_LABEL = 'DVB-C'

#: Namespace - DVB-S services
NS_DVB_S = NS_DVB_S_ASTRA
NS_DVB_S_LABEL = 'DVB-S'

#: Namespace - DVB-T services
NS_DVB_T = 0xeeee0000
NS_DVB_T_LABEL = 'DVB-T'

#: Label:Namespace map
NS = {
    NS_DVB_C_LABEL: NS_DVB_C,
    NS_DVB_S_LABEL: NS_DVB_S,
    NS_DVB_T_LABEL: NS_DVB_T,
}

NS_FILE_LABEL = 'File'

#: Namespace:Label lookup map
NS_LOOKUP = {
    NS_DVB_C: NS_DVB_C_LABEL,
    NS_DVB_S: NS_DVB_S_LABEL,
    NS_DVB_S_HOTBIRD: NS_DVB_S_LABEL,
    NS_DVB_T: NS_DVB_T_LABEL,
    0: NS_FILE_LABEL,
    NS_DVB_C >> 0x10: NS_DVB_C_LABEL,
    NS_DVB_S >> 0x10: NS_DVB_S_LABEL,
    NS_DVB_S_HOTBIRD >> 0x10: NS_DVB_S_LABEL,
    NS_DVB_T >> 0x10: NS_DVB_T_LABEL,
}

#: list of tuples which may contain title and description of an event
LISTING_ITEM_KEY_PAIRS = [
    ('eventname', 'descriptionExtended'),
    ('title', 'longdesc'),
    ('name', 'descriptionextended'),
    ('eventname', 'description'),
    ('title', 'longinfo'),
]

NORMALISED_SERVICEREFERENCE_FMT = '{service_type:04X}:{sid:04X}:{tsid:04X}:{oid:04X}:{ns:08X}'

# sondern Magnums Leben in Gefahr... 47 Min.
PATTERN_RUNLENGTH = r'\s\d+\sMin\.'
RE_RUNLENGTH = re.compile(PATTERN_RUNLENGTH)


[docs]def guess_namespace_label(value, fallback='UNKNOWN'): """ Try to guess a textual representation for given namespace value. :param value: namespace :param fallback: value to be returned if no matching label is found :return: label :rtype: basestring >>> guess_namespace_label(1234) 'UNKNOWN' >>> guess_namespace_label(99, 'DVB-S') 'DVB-S' >>> guess_namespace_label(0x0) 'File' >>> guess_namespace_label(0x00c00000) 'DVB-S' >>> guess_namespace_label(0x0c0) 'DVB-S' >>> guess_namespace_label(0x00c0) 'DVB-S' >>> guess_namespace_label(0x00c01234) 'DVB-S' >>> guess_namespace_label(0x00820000) 'DVB-S' >>> guess_namespace_label(0x0082) 'DVB-S' >>> guess_namespace_label(0x00820082) 'DVB-S' >>> guess_namespace_label(0x00000001) 'UNKNOWN' >>> guess_namespace_label(0xffef1234) 'UNKNOWN' >>> guess_namespace_label(0xeeeeffff) 'DVB-T' """ v_low = value & 0xffff value = (value >> 0x10) & 0xffff if value == 0: value = v_low try: return NS_LOOKUP[value] except KeyError: return fallback
[docs]def normalise_servicereference(serviceref): """ Create a normalised representation of *serviceref* to be used e.g. as sorting hint :param serviceref: service reference :return: >>> sref = '1:0:1:300:7:85:00c00000:0:0:0:' >>> normalise_servicereference(sref) '0001:0300:0007:0085:00C00000' >>> sref2 = '1:64:A:0:0:0:0:0:0:0::SKY Sport' >>> normalise_servicereference(sref2) '000A:0000:0000:0000:00000000' """ psref = parse_servicereference(serviceref) return NORMALISED_SERVICEREFERENCE_FMT.format(**psref)
[docs]def parse_servicereference(serviceref): """ Parse a Enigma2 style service reference string representation. :param serviceref: Enigma2 style service reference :type serviceref: string >>> sref = '1:0:1:300:7:85:00c00000:0:0:0:' >>> result = parse_servicereference(sref) >>> result {'service_type': 1, 'oid': 133, 'tsid': 7, 'ns': 12582912, 'sid': 768} >>> sref_g = create_servicereference(**result) >>> sref_g '1:0:1:300:7:85:00c00000:0:0:0:' >>> sref_g2 = create_servicereference(result) >>> sref_g2 '1:0:1:300:7:85:00c00000:0:0:0:' >>> sref == sref_g True >>> sref2 = '1:64:A:0:0:0:0:0:0:0::SKY Sport' >>> result2 = parse_servicereference(sref2) >>> result2 {'service_type': 10, 'oid': 0, 'tsid': 0, 'ns': 0, 'sid': 0} >>> sref3 = '1:0:0:0:0:0:0:0:0:0:/media/hdd/movie/20170921 2055 - DASDING - DASDING Sprechstunde - .ts' >>> result3 = parse_servicereference(sref3) >>> result3 {'service_type': 0, 'oid': 0, 'tsid': 0, 'ns': 0, 'sid': 0} """ parts = serviceref.split(":") sref_data = { 'service_type': int(parts[2], 16), 'sid': int(parts[3], 16), 'tsid': int(parts[4], 16), 'oid': int(parts[5], 16), 'ns': int(parts[6], 16) } return sref_data
[docs]def create_servicereference(*args, **kwargs): """ Generate a (Enigma2 style) service reference string representation. :param args[0]: Service Reference Parameter as dict :type args[0]: :class:`dict` :param service_type: Service Type :type service_type: int :param sid: SID :type sid: int :param tsid: TSID :type tsid: int :param oid: OID :type oid: int :param ns: Enigma2 Namespace :type ns: int """ if len(args) == 1 and isinstance(args[0], dict): kwargs = args[0] service_type = kwargs.get('service_type', 0) sid = kwargs.get('sid', 0) tsid = kwargs.get('tsid', 0) oid = kwargs.get('oid', 0) ns = kwargs.get('ns', 0) return '{:x}:0:{:x}:{:x}:{:x}:{:x}:{:08x}:0:0:0:'.format( 1, service_type, sid, tsid, oid, ns)
[docs]def create_picon(*args, **kwargs): """ Generate a (Enigma2 style) program icon string representation. :param args[0]: Service Reference Parameter as dict :type args[0]: :class:`dict` :param service_type: Service Type :type service_type: int :param sid: SID :type sid: int :param tsid: TSID :type tsid: int :param oid: OID :type oid: int :param ns: Enigma2 Namespace :type ns: int """ if len(args) == 1 and isinstance(args[0], dict): kwargs = args[0] service_type = kwargs.get('service_type', 0) sid = kwargs.get('sid', 0) tsid = kwargs.get('tsid', 0) oid = kwargs.get('oid', 0) ns = kwargs.get('ns', 0) return '{:x}_0_{:d}_{:x}_{:x}_{:x}_{:x}_0_0_0'.format( 1, service_type, sid, tsid, oid, ns).upper() + kwargs.get('extension', '.png')
[docs]def filter_simple_events(data): """ .. code:: [ u'begin', u'sname', u'end', u'title', u'id', u'now_timestamp', u'picon', u'longdesc', u'duration', u'duration_sec', u'sref', u'date', u'shortdesc', u'progress', u'tleft', u'begin_timestamp'], :param data: :return: """ for row in data: psref = parse_servicereference(row['sref']) yield row['sname'], row['title'], row['longdesc'], '{:04X}'.format( psref['ns']), datetime.datetime.fromtimestamp( int(row['begin_timestamp']))
[docs]def pseudo_unique_id(item): """ Generate a pseudo unique ID for an event item, movie item or timer item. The ID is based on the item's title and description attribute. Neither title nor description may be empty. :param item: event, movie or timer :return: generated pseudo ID :rtype: str >>> pseudo_unique_id({'event': 1}) Traceback (most recent call last): ... AssertionError: name or desc may not be None >>> pseudo_unique_id({'eventname': 1, 'descriptionExtended': 'bla'}) Traceback (most recent call last): ... AttributeError: 'int' object has no attribute 'strip' >>> pseudo_unique_id({'eventname': "x", 'descriptionExtended': 'bla'}) '7a6615ef8ca6b06ac6a837741293759d3083a49c' >>> pseudo_unique_id({'eventname': "x", 'description': 'bla'}) '7a6615ef8ca6b06ac6a837741293759d3083a49c' >>> pseudo_unique_id({'eventname': " ", 'descriptionExtended': ' '}) Traceback (most recent call last): ... AssertionError: name or desc may not be empty >>> pseudo_unique_id({'title': "x", 'longdesc': 'bla'}) '7a6615ef8ca6b06ac6a837741293759d3083a49c' >>> pseudo_unique_id({'name': "x", 'descriptionextended': 'bla'}) '7a6615ef8ca6b06ac6a837741293759d3083a49c' >>> pseudo_unique_id({'name': "x", 'descriptionextended': ' 17 Min.'}) Traceback (most recent call last): ... AssertionError: desc_mangled may not be empty >>> pseudo_unique_id({'name': "x", 'descriptionextended': 'Bla Bla 17 Min.'}) '4db3822ad252366e57d9515b1e37d3449a45a0cb' >>> pseudo_unique_id({'name': "x", 'descriptionextended': 'Bla Bla 18 Min.'}) '4db3822ad252366e57d9515b1e37d3449a45a0cb' >>> pseudo_unique_id({'eventname': "x", 'descriptionExtended': 'bla', 'description': ''}) '7a6615ef8ca6b06ac6a837741293759d3083a49c' """ try: (name, desc) = getattr(item, 'title'), getattr(item, 'longinfo') except AttributeError: (name, desc) = None, None for name_key, desc_key in LISTING_ITEM_KEY_PAIRS: try: (name, desc) = item[name_key], item[desc_key] if desc is not None and desc.strip(): break except KeyError: pass if None in (name, desc): raise AssertionError("name or desc may not be None") if '' in (name.strip(), desc.strip()): raise AssertionError("name or desc may not be empty") desc_mangled = re.sub(RE_RUNLENGTH, '', desc) if not desc_mangled.strip(): raise AssertionError("desc_mangled may not be empty") m = hashlib.sha1() m.update(name.encode("utf-8")) m.update(desc_mangled.encode("utf-8")) return m.hexdigest()
def pseudo_unique_id_radio(item): return pseudo_unique_id( { "title": item.get("title"), "longdesc": "{:s},{:s}".format( item.get("sref"), item.get("date")) } ) def pseudo_unique_id_any(item, is_radio=False): try: return pseudo_unique_id(item) except Exception: try: if is_radio: return pseudo_unique_id_radio(item) except Exception: pass return None
[docs]def enigma_trunkname(path): """ Determine the trunk of enigma2 specific files. :param path: filename :return: trunk >>> enigma_trunkname(None) Traceback (most recent call last): ... ValueError: None >>> enigma_trunkname('') Traceback (most recent call last): ... ValueError: '' >>> enigma_trunkname(2) Traceback (most recent call last): ... ValueError: 2 >>> enigma_trunkname('somefile') Traceback (most recent call last): ... ValueError: 'somefile' has no extension >>> enigma_trunkname('somefile.bla') Traceback (most recent call last): ... ValueError: 'somefile.bla' has bad extension 'bla' >>> enigma_trunkname('somefile.bla.bla') Traceback (most recent call last): ... ValueError: 'somefile.bla.bla' has bad extension 'bla' >>> enigma_trunkname('somefile.ts') 'somefile' >>> enigma_trunkname('/tmp/somefile.ts') 'somefile' >>> enigma_trunkname('somefile.ts.sc') 'somefile' """ e_ext_level1 = ('ts', 'eit',) e_ext_level2 = ('ap', 'cuts', 'meta', 'sc',) try: w_path = os.path.basename(path) except AttributeError: raise ValueError(repr(path)) if not w_path: raise ValueError(repr(path)) parts = w_path.split('.') if len(parts) == 1: raise ValueError('{!r} has no extension'.format(path)) elif len(parts) == 2: if not parts[1] in e_ext_level1: raise ValueError( '{!r} has bad extension {!r}'.format(path, parts[1])) return parts[0] elif len(parts) == 3: if not parts[1] in e_ext_level1: raise ValueError( '{!r} has bad extension {!r}'.format(path, parts[1])) if not parts[2] in e_ext_level2: raise ValueError( '{!r} has bad extension {!r}'.format(path, parts[2])) return parts[0] raise ValueError(repr(path))
[docs]def set_output_encoding(encoding='utf-8'): """ Stolen from https://stackoverflow.com/ questions/19696652/piping-output-causes-python-program-to-fail When piping to the terminal, python knows the encoding needed, and sets it automatically. But when piping to another program (for example, | less), python can not check the output encoding. In that case, it is None. What I am doing here is to catch this situation for both stdout and stderr and force the encoding """ current = sys.stdout.encoding if current is None: sys.stdout = codecs.getwriter(encoding)(sys.stdout) current = sys.stderr.encoding if current is None: sys.stderr = codecs.getwriter(encoding)(sys.stderr)
if __name__ == '__main__': import doctest (FAILED, SUCCEEDED) = doctest.testmod() print("[doctest] SUCCEEDED/FAILED: {:d}/{:d}".format(SUCCEEDED, FAILED))