Skip to content

Instantly share code, notes, and snippets.

@seahawk1986
Last active October 9, 2017 20:33
Show Gist options
  • Save seahawk1986/616ba06abb7bab4c8dfc2031bcac4afe to your computer and use it in GitHub Desktop.
Save seahawk1986/616ba06abb7bab4c8dfc2031bcac4afe to your computer and use it in GitHub Desktop.
add svdrp support for detach command to frontend script
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
import sys
import os
import codecs
import subprocess
import gobject
import socket
import string
import struct
import datetime
import time
import syslog
import pyudev
from collections import namedtuple
from pyudev.glib import GUDevMonitorObserver
import dbus
import dbus.service
from dbus.mainloop.glib import DBusGMainLoop
raw_response = namedtuple('raw_response', 'return_code end_of_message message')
response = namedtuple('response', 'return_code message')
class SVDRPClient(object):
def __init__(self, host='localhost', port=6419):
self.host = host
self.port = port
self.telnet = telnetlib.Telnet()
self.telnet.open(host, port)
self.read_response()
def __enter__(self, host='localhost', port=6419):
self.telnet.open(self.host, self.port)
print(self.read_response())
return self
def __exit__(self, type, value, traceback):
r = self.send_command('QUIT')
self.telnet.close()
def close(self):
self.telnet.close()
def send_command(self, line):
self.telnet.write(line.encode("UTF-8"))
self.telnet.write('\r\n')
return self.read_response()
def read_line(self):
line = self.telnet.read_until('\n', 10)
if line:
line = line.replace('\n', '').replace('\r', '')
if len(line) < 4:
return None
return raw_response(int(line[0:3]), line[3] == '-', line[4:])
def read_response(self):
msg = []
return_code = 0
line = self.read_line()
if line:
return_code = line.return_code
msg.append(line.message)
while line and line[1]:
line = self.read_line()
if line: msg.append(line.message)
return return_code, "\n".join(msg)
class dbusRemote():
'''wrapper for remote interface provided by the dbus2vdr plugin'''
def __init__(self):
self.dbusremote = bus.get_object("de.tvdr.vdr", "/Remote")
self.interface = 'de.tvdr.vdr.remote'
def sendkey(self, key):
answer, message = self.dbusremote.HitKey(dbus.String(key),
dbus_interface=self.interface)
if answer == 250:
return True
else:
return False
def sendkeys(self, keys):
answer, message = self.dbusremote.HitKeys(keys,
dbus_interface=self.interface
)
if answer == 250:
return True
else:
return False
def enable(self):
answer, message = self.dbusremote.Enable(dbus_interface=self.interface)
if answer == 250:
return True
def disable(self):
answer, message = self.dbusremote.Disable(
dbus_interface=self.interface)
if answer == 250:
return True
def status(self):
answer, message = self.dbusremote.Status(dbus_interface=self.interface)
if answer == 250:
return True
class dbusShutdown():
'''wrapper for shutdown interface provided by the dbus2vdr plugin'''
def __init__(self):
self.dbusshutdown = bus.get_object("de.tvdr.vdr", "/Shutdown")
self.interface = 'de.tvdr.vdr.shutdown'
def manualstart(self):
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
if uptime_seconds < 3600:
return self.dbusshutdown.ManualStart(dbus_interface=self.interface)
else:
return True
def confirmShutdown(self, user=False):
code, message, shutdownhooks, message = (
self.dbusshutdown.ConfirmShutdown(dbus.Boolean(user),
dbus_interface=self.interface,
timeout=120)
) # wait up to 120 second for a reply, because the lifeguard-addon is
# as fast as a snail
if code in [250, 990]:
return True
else:
syslog.syslog(u"vdr not ready for shutdown: %s: %s" % (
code, message))
return False
class dbusSetup():
'''wrapper for setup interface provided by the dbus2vdr plugin'''
def __init__(self):
self.dbussetup = bus.get_object("de.tvdr.vdr", "/Setup")
self.interface = 'de.tvdr.vdr.setup'
def vdrsetupget(self, option):
return self.dbussetup.Get(dbus.String(option),
dbus_interface=self.interface)
class dbusTimer():
def __init__(self):
self.dbustimer = bus.get_object("de.tvdr.vdr", "/Timers")
self.interface = 'de.tvdr.vdr.timer'
def wakeup_for_Timer(self):
status, tid, rel, startt, stopt, event = self.dbustimer.Next(
dbus_interface=self.interface)
if status == 250 and (rel <= (6 * 60)):
return True
else:
return False
class dbusDevices():
'''wrapper for device interface provided by the dbus2vdr plugin'''
def __init__(self):
self.dbusdevice = bus.get_object("de.tvdr.vdr", "/Devices")
self.interface = 'de.tvdr.vdr.device'
def get_primary(self):
index, number, has_decoder, is_primary, name = (
self.dbusdevice.GetPrimary(dbus_interface=self.interface))
return index, number, has_decoder, is_primary, name
def list(self):
devices = self.dbusdevice.List(dbus_interface=self.interface)
return devices
class dbusSkindesigner():
'''wrapper for plugin interface provided by the dbus2vdr plugin'''
def __init__(self):
self.dbusplugins = bus.get_object("de.tvdr.vdr",
"/Plugins/skindesigner")
self.interface = 'de.tvdr.vdr.plugin'
def svdrpcommand(self, cmd, arg):
answer, reply = self.dbusplugins.SVDRPCommand(
dbus.String(cmd), arg, dbus_interface=self.interface)
return answer, reply
def get_dbusPlugins():
'''wrapper for dbus plugin list'''
dbusplugins = bus.get_object("de.tvdr.vdr", "/Plugins")
raw = dbusplugins.List(dbus_interface="de.tvdr.vdr.pluginmanager")
plugins = {}
if len(raw) > 0:
for name, version in raw:
plugins[name] = version
return plugins
class dbusSofthddeviceFrontend():
'''handler for softhddevice's svdrp plugin command interface
provided by the dbus2vdr plugin'''
def __init__(self):
self.dbusfe = bus.get_object("de.tvdr.vdr", "/Plugins/softhddevice")
self.interface = 'de.tvdr.vdr.plugin'
def status(self):
code, mode = self.dbusfe.SVDRPCommand(dbus.String("STAT"),
dbus.String(None),
dbus_interface=self.interface)
return mode.split()[-1]
def attach(self):
display = dbus.String("-d %s" % (settings.env["DISPLAY"]))
reply, answer = self.dbusfe.SVDRPCommand(
dbus.String("ATTA"), display, dbus_interface=self.interface)
remote.enable()
settings.frontend_active = 1
settings.wants_shutdown = False
def detach(self):
remote.sendkeys(["menu", "back", "back", "back"])
time.sleep(0.5)
remote.sendkeys(["back", "back"])
time.sleep(0.5)
if settings.skindesigner:
plugins.svdrpcommand("DLIC", "")
if settings.conf['use_svdrp_for_detach'] == '0':
reply, answer = self.dbusfe.SVDRPCommand(
dbus.String("DETA"), dbus.String(None),
dbus_interface=self.interface)
else:
with SVDRPClient() as s:
s.send_command('plug softhddevice deta')
remote.disable()
if settings.wants_shutdown:
setUserInactive()
settings.frontend_active = 0
def resume(self, status):
reply, answer = self.dbusfe.SVDRPCommand(dbus.String("RESU"),
dbus.String(None),
dbus_interface=self.interface)
settings.frontend_active = 1
def setUserInactive():
dbusshutdown = bus.get_object("de.tvdr.vdr", "/Shutdown")
dbusshutdown.SetUserInactive(dbus_interface='de.tvdr.vdr.shutdown')
send_shutdown()
settings.time = gobject.timeout_add(300000, send_shutdown)
def detach():
# set background visible when frontend is detached
subprocess.call(["/usr/bin/feh", "--bg-fill",
settings.conf['logo_detached']], env=settings.env)
frontend.detach()
graphtftng_switch()
return True
def send_shutdown():
try:
if shutdown.confirmShutdown():
remote.enable()
remote.sendkey("POWER")
remote.disable()
except Exception as error:
syslog.syslog(error)
remote.enable()
remote.sendkey("POWER")
remote.disable()
finally:
return True
def soft_detach():
detach()
settings.timer = gobject.timeout_add(300000, send_shutdown)
return False
def graphtftng_switch():
if settings.graphtftng:
dbusgraph = bus.get_object("de.tvdr.vdr", "/Plugins/graphtftng")
if settings.frontend_active == 0:
dbusgraph.SVDRPCommand(
dbus.String('TVIEW'),
dbus.String(settings.conf['graphtftng_view']),
dbus_interface='de.tvdr.vdr.plugin')
elif settings.frontend_active == 1:
dbusgraph.SVDRPCommand(dbus.String('RVIEW'), dbus.String(None),
dbus_interface='de.tvdr.vdr.plugin')
def resume(status):
# set background visible when frontend is detached
subprocess.call(["/usr/bin/feh", "--bg-fill",
settings.conf['logo_attached']], env=settings.env)
if status == "SUSPENDED":
frontend.resume()
elif status == "SUSPEND_DETACHED":
frontend.attach()
graphtftng_switch()
class Settings():
''' read and store configuration, handle input devices using udev'''
def __init__(self):
global gobject
self.frontend_active = 0
self.env = os.environ
self.timer = None
self.wants_shutdown = False
self.updateDisplay()
self.manualstart = shutdown.manualstart()
self.acpi_wakeup = self.check_acpi()
self.context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(self.context)
self.monitor.filter_by(subsystem='input')
self.devices = {}
self.paths = {}
# struct for kernel input devices on 64-Bit linux systems
self.inputEventFormat = 'llHHi'
self.inputEventSize = 24
# on 32-Bit systems try
# inputEventFormat = 'iihhi'
# inputEventSize = 16
# (untested)
self.conf = {
'logo_attached': "/usr/share/yavdr/images/yavdr_logo.png",
'logo_detached':
"/usr/share/yavdr/images/yaVDR_background_detached.jpg",
'key_detach': "KEY_PROG1",
'key_power': "KEY_POWER2",
'start_always_detached': '0',
'graphtftng_view': "NonLiveTv",
'use_svdrp_for_detach': '0',
}
for i in self.conf:
if i in os.environ:
self.conf[i] = os.environ[i]
self.plugins = get_dbusPlugins()
self.check_graphtftng()
self.check_skindesigner()
try:
self.get_event_devices()
except:
syslog.syslog(
"Error: insufficient permissions for accessing input devices")
pass
def check_graphtftng(self):
if 'graphtftng' in self.plugins:
# print "found graphtftng"
self.graphtftng = True
else:
self.graphtftng = False
def check_skindesigner(self):
if 'skindesigner' in self.plugins:
self.skindesigner = True
else:
self.skindesigner = False
def get_event_devices(self):
'''filter all connected input devices and watch those
not used by eventlircd'''
for device in self.context.list_devices(subsystem='input',
ID_INPUT_KEYBOARD=True):
if device.sys_name.startswith('event') and not (
('eventlircd_enable' in device) or (
'eventlircd_enable' in device and
device['eventlircd_enable'] is ('true'))):
self.paths[device['DEVNAME']] = open(device['DEVNAME'], 'rb')
syslog.syslog(codecs.encode(u"watching %s: %s" % (
device.parent['NAME'], device['DEVNAME']), 'utf-8'))
self.devices[device['DEVNAME']] = gobject.io_add_watch(
self.paths[device['DEVNAME']], gobject.IO_IN,
self.evthandler)
self.observer = GUDevMonitorObserver(self.monitor)
self.observer.connect('device-event', self.udev_event)
self.monitor.start()
syslog.syslog("started udev monitoring of input devices")
def udev_event(self, observer, action, device):
'''callback function to add/remove input devices'''
if action == "add" and 'DEVNAME' in device:
syslog.syslog("added %s" % device['DEVNAME'])
if "eventlircd_enable" not in device:
self.paths[device['DEVNAME']] = open(device['DEVNAME'], 'rb')
syslog.syslog(codecs.encode(u"watching %s: %s - %s" % (
device.parent['NAME'], device['DEVNAME'],
self.paths[device['DEVNAME']]), 'utf-8'))
self.devices[device['DEVNAME']] = gobject.io_add_watch(
self.paths[device['DEVNAME']], gobject.IO_IN,
self.evthandler)
elif action == "remove" and 'DEVNAME' in device:
try:
self.paths[device['DEVNAME']].close()
except:
pass
try:
gobject.source_remove(self.devices[device['DEVNAME']])
syslog.syslog("removed %s" % device['DEVNAME'])
except:
pass # device already removed from watchlist
def GetActiveWindowTitle(self):
'''get title of active window'''
return subprocess.Popen(["xprop", "-id", subprocess.Popen(
["xprop", "-root", "_NET_ACTIVE_WINDOW"], stdout=subprocess.PIPE,
env=settings.env).communicate()[0].strip().split()[-1], "WM_NAME"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate(
)[0].strip().split('"', 1)[-1][:-1]
def evthandler(self, path, *args):
'''callback function to handle keys sent by inputdevices'''
try:
event = path.read(self.inputEventSize)
except:
syslog.syslog('can not read from %s' % (path))
return False
(time1, time2, typeev, code, value) = struct.unpack(
self.inputEventFormat, event)
if self.frontend_active == 0:
# resume frontend on KEY_ENTER if no window has focus
if typeev == 1 and code == 28 and (self.GetActiveWindowTitle() ==
'WM_NAM'):
syslog.syslog("frontend attached by keyboard activity")
resume(frontend.status())
return True
def updateDisplay(self):
self.env["DISPLAY"] = ":1" + self.getTempDisplay()
def getTempDisplay(self):
tempdisplay = subprocess.check_output(["dbget", "vdr.tempdisplay"])
if len(tempdisplay) == 0:
tempdisplay = ".0"
return tempdisplay
def check_acpi(self):
try:
with open('/var/cache/vdr/acpiwakeup.time.old', 'r') as f:
timestr = f.read().splitlines()[0]
except IOError:
return False
else:
try:
wakeup = datetime.datetime.strptime(timestr, "%Y-%m-%d %H:%M:%S")
except ValueError:
syslog.syslog("WARNING: invalid timestamp in /var/cache/vdr/acpiwakeup.time.old")
return False
now = datetime.datetime.utcnow()
syslog.syslog(u"acip-wakeup.time.old hatte Wert: %s" % (
wakeup.ctime()))
syslog.syslog(u"vergleiche mit jetzt: %s" % (now.ctime()))
if wakeup < now:
d = now - wakeup
else:
d = wakeup - now
if d.seconds > 360:
syslog.syslog("assuming manual start")
return False
else:
syslog.syslog("assuming start for acpi-wakeup")
return True
class dbusService(dbus.service.Object):
def __init__(self, settings):
bus_name = dbus.service.BusName('de.yavdr.frontend', bus=bus)
dbus.service.Object.__init__(self, bus_name, '/frontend')
self.settings = settings
@dbus.service.method('de.yavdr.frontend', out_signature='b')
def deta(self):
detach()
return True
@dbus.service.method('de.yavdr.frontend', out_signature='b')
def atta(self):
resume(frontend.status())
return True
@dbus.service.method('de.yavdr.frontend', out_signature='b')
def toggle(self):
if frontend.status() == "NOT_SUSPENDED":
detach()
settings.frontend_active = 0
return True
else:
resume(frontend.status())
return True
@dbus.service.method('de.yavdr.frontend', in_signature='s',
out_signature='b')
def setBackground(self, path):
if os.path.isfile(path):
subprocess.call(["/usr/bin/feh", "--bg-fill", path],
env=settings.env)
syslog.syslog("setting Background to %s" % (path))
return True
else:
return False
@dbus.service.method('de.yavdr.frontend', in_signature="s",
out_signature='b')
def updateDisplay(self, display=None):
if display is not None:
self.settings.env["DISPLAY"] = display
return True
else:
return False
@dbus.service.method('de.yavdr.frontend', out_signature='s')
def checkDisplay(self):
return self.settings.env["DISPLAY"]
class lircConnection():
def __init__(self, socket_path="/var/run/lirc/lircd"):
self.socket_path = socket_path
self.try_connection()
self.callback = None
def connect_eventlircd(self):
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(self.socket_path)
self.callback = gobject.io_add_watch(self.sock, gobject.IO_IN,
self.handler)
def try_connection(self):
try:
self.connect_eventlircd()
syslog.syslog(u"conntected to Lirc-Socket on %s" % (
self.socket_path))
return False
except:
gobject.timeout_add(1000, self.try_connection)
try:
if self.callback:
gobject.source_remove(self.callback)
except:
pass
syslog.syslog(
"Error: vdr-frontend could not connect to eventlircd socket")
return False
def handler(self, sock, *args):
'''callback function for activity on eventlircd socket'''
try:
buf = sock.recv(1024)
if not buf:
self.sock.close()
try:
if self.callback:
gobject.source_remove(self.callback)
except:
pass
syslog.syslog("Error reading from lircd socket")
self.try_connection()
return False
except:
sock.close()
try:
gobject.source_remove(self.callback)
except:
pass
syslog.syslog('retry lirc connection')
self.try_connection()
return True
lines = string.split(buf, "\n")
for line in lines[:-1]:
try:
code, count, cmd, device = string.split(line, " ")[:4]
if count != "0":
# syslog.syslog('repeated keypress')
return True
else:
try:
gobject.source_remove(settings.timer)
except:
pass
except:
syslog.syslog(line)
return True
if cmd == settings.conf['key_detach']: # "KEY_PROG1":
if frontend.status() == "NOT_SUSPENDED":
detach()
settings.frontend_active = 0
else:
resume(frontend.status())
elif cmd == settings.conf['key_power']: # "KEY_POWER2":
settings.wants_shutdown = True
if frontend.status() == "NOT_SUSPENDED":
settings.timer = gobject.timeout_add(15000, soft_detach)
settings.frontend_active = 0
else:
send_shutdown()
else:
settings.wants_shutdown = False
if settings.frontend_active == 0:
resume(frontend.status())
settings.frontend_active = 1
else:
pass
return True
if __name__ == '__main__':
DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
syslog.openlog(ident="vdr-frontend", logoption=syslog.LOG_PID)
# Initialise dbus-control classes
frontend = dbusSofthddeviceFrontend()
remote = dbusRemote()
shutdown = dbusShutdown()
setup = dbusSetup()
timer = dbusTimer()
devices = dbusDevices()
try:
plugins = dbusSkindesigner()
except Exception as e:
print(e)
settings = Settings()
# attach frontend if vdr has not been started for a timer or an acpi
# wakeup event
if ((settings.manualstart and not settings.acpi_wakeup)
and settings.conf['start_always_detached'] in ('0', '2')):
resume(frontend.status())
else:
# set background visible when frontend is detached
subprocess.call(["/usr/bin/feh", "--bg-fill",
settings.conf['logo_detached']], env=settings.env)
# change graphtftng view if the plugin has been loaded
graphtftng_switch()
# check for timer within the next 6 minutes
if not settings.manualstart and timer.wakeup_for_Timer():
settings.timer = gobject.timeout_add(300000, send_shutdown)
# no timer, just an acpi-wakeup event
elif (settings.acpi_wakeup and
setup.vdrsetupget("MinUserInactivity")[0] > 0):
interval, default, answer = setup.vdrsetupget("MinEventTimeout")
interval_ms = interval * 60000 # * 60s * 1000ms
settings.timer = gobject.timeout_add(interval_ms, setUserInactive)
# neither woken for a timer nor for a acpi event
elif setup.vdrsetupget("MinUserInactivity")[0] > 0:
interval, default, answer = setup.vdrsetupget("MinEventTimeout")
interval_ms = interval * 60000 # * 60s * 1000ms
settings.timer = gobject.timeout_add(interval_ms, setUserInactive)
remote.disable()
lircconnection = lircConnection()
dbusservice = dbusService(settings)
loop = gobject.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
detach()
sys.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment