-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwega_radio.py
311 lines (255 loc) · 9.41 KB
/
wega_radio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import time
import yaml
import argparse
import logging
import logging.config
from mpd import (MPDClient)
from threading import Timer
try:
import RPi.GPIO as GPIO
except RuntimeError:
print("Error importing RPi.GPIO! This is probably because you need superuser privileges. You can achieve this by"
" using 'sudo' to run your script")
class MusicDaemonClient:
"""
Class to control the MusicPlayerDaemon
"""
def __init__(self, mpd_host, mpd_port):
"""
:param mpd_host: The name of the host (e.g. localhost)
:param mpd_port: The port of the daemon (e.g. 6600)
"""
self.__log = logging.getLogger("MusicDaemonClient")
self.__client = MPDClient(use_unicode=True)
self.__host = mpd_host
self.__port = mpd_port
# test the mpd connection
self.__connect()
self.__disconnect()
def __connect(self):
self.__client.connect(self.__host, self.__port)
def __disconnect(self):
self.__client.close()
self.__client.disconnect()
def __stop(self):
self.__client.stop() # stop playing
self.__client.clear() # clear queue
def __play(self, uri):
self.__client.add(uri)
self.__client.play()
def stop(self):
"""
stop playback
"""
self.__log.debug("stop playback and clear the queue")
self.__connect()
self.__stop()
self.__disconnect()
def play(self, uri):
"""
start playback
:param uri: the uri to play
"""
self.__log.debug("play: {}".format(uri))
self.__connect()
self.__stop()
self.__play(uri)
self.__disconnect()
def log_info(self):
info = self.info()
log.info("MPD Status: {}".format(info['status']))
log.info("MPD Stats: {}".format(info['stats']))
log.info("MPD Current Song: {}".format(info['current_song']))
def info(self):
self.__connect()
try:
res = dict()
res['status'] = self.__client.status()
res['stats'] = self.__client.stats()
res['current_song'] = self.__client.currentsong()
return res
finally:
self.__disconnect()
def teardown(self):
"""
teardown this instance. The instance is unusable after this call!
"""
self.__log.debug("teardown")
self.__connect()
self.__stop()
self.__disconnect()
def mpd_version(self):
self.__connect()
try:
return self.__client.mpd_version
finally:
self.__disconnect()
class GpioClient:
"""
Client for the GPIO stuff
"""
def __init__(self):
self.__log = logging.getLogger("GpioClient")
GPIO.setmode(GPIO.BCM)
self.__log.info("RPi.GPIO Version: {}".format(GPIO.VERSION))
def add_input_channel_callback(self, channel, rise_fall, callback):
"""
Set the specified channel as input channel and adds a callback
:param channel: the number of the GPIO channel (BCM)
:param rise_fall: fire the callback at a rising edge or a falling edge
:param callback: the callback to fire
"""
self.__log.debug("register callback for channel {}".format(channel))
GPIO.setup(channel, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.add_event_detect(channel, rise_fall, callback=callback, bouncetime=300)
def channel_state(self, channel):
"""
Retrieves the state of the channel
:param channel the channel number to retrieve
"""
return GPIO.input(channel)
def teardown(self):
"""
teardown this instance. GPIO is unusable after this call!
"""
self.__log.debug("teardown")
GPIO.cleanup()
class WegaRadioControl:
"""
Controls the WEGA radio
"""
SWITCH_OFF_GPIO_CHANNEL = 17
RADIO_GPIO_CHANNELS = [5, 6, 13, 19]
def __init__(self):
self.__log = logging.getLogger("WegaRadioControl")
self.__mpdClient = MusicDaemonClient(config['mpd_host'], config['mpd_port'])
self.__gpioClient = GpioClient()
self.__stations = dict()
self.__timer = Timer(0.2, self.__set_radio_state)
# configure the station callbacks
self.__setup_stations(config['stations'])
# register a callback to switch the radio off
self.__log.info(
"register callback for the 'off' button on channel {}".format(WegaRadioControl.SWITCH_OFF_GPIO_CHANNEL))
self.__gpioClient.add_input_channel_callback(WegaRadioControl.SWITCH_OFF_GPIO_CHANNEL, GPIO.BOTH,
self.__on_channel)
def __setup_stations(self, stations):
"""
Setup the stations
:param stations: a list with stations
"""
if len(stations) > 4:
self.__log.warn("You defined more than 4 stations. I only setup the first 4 stations in your list.")
if len(stations) < 4:
self.__log.warn("You defined less than 4 stations. I setup the buttons von left to right with your {}"
" stations".format(len(stations)))
num_stations = 4 if len(stations) >= 4 else len(stations)
for i in range(num_stations):
ch = WegaRadioControl.RADIO_GPIO_CHANNELS[i]
self.__stations[ch] = stations[i]
self.__log.info("register callback for '{}' on channel {}".format(stations[i]['name'], ch))
self.__gpioClient.add_input_channel_callback(ch, GPIO.BOTH, self.__on_channel)
def __set_radio_state(self):
"""
checks the button states and switches the radio in the correct state
"""
if self.__gpioClient.channel_state(WegaRadioControl.SWITCH_OFF_GPIO_CHANNEL):
for ch in self.__stations:
if self.__gpioClient.channel_state(ch):
self.__switch_to_station(ch)
break
else:
self.__switch_off(WegaRadioControl.SWITCH_OFF_GPIO_CHANNEL)
self.__mpdClient.log_info()
def __debounce_buttons(self):
"""
a helper function to debounce the buttons
"""
self.__timer.cancel()
self.__timer = Timer(0.2, self.__set_radio_state)
self.__timer.start()
def __on_channel(self, channel):
"""
callback for all channels
:param channel:
"""
self.__log.debug("on channel {}".format(channel))
self.__debounce_buttons()
def __switch_off(self, _):
"""
function for the off switch
"""
self.__log.info("switch off the radio")
self.__mpdClient.stop()
def __switch_to_station(self, channel):
"""
switch to the station with the given GPIO-Channel
:param channel: the GPIO-Channel
"""
station = self.__stations[channel]
self.__log.info("switch to station: {}".format(station['name']))
self.__mpdClient.play(station['uri'])
def teardown(self):
"""
This instance is unusable after this call!
"""
self.__log.info("teardown")
self.__timer.cancel()
self.__mpdClient.teardown()
self.__gpioClient.teardown()
class MyLogger:
"""
Class to redirect stdout und stderr to the logging files
"""
def __init__(self, logger, level):
self.logger = logger
self.level = level
def write(self, message):
# Only log if there is a message (not just a new line)
if message.rstrip() != "":
self.logger.log(self.level, message.rstrip())
def log_mpd_status(music_daemon_client):
"""
Write the status of the MPD to the log
:param music_daemon_client: the MusicDaemonClient
"""
music_daemon_client.log_info()
def load_config():
"""
load the YAML config file for the application
:return: the config dictionary
"""
parser = argparse.ArgumentParser(description="pyWegaRadio")
parser.add_argument("-c", "--config", help="filename of the application config (YAML)", required=True)
args = parser.parse_args()
with open(args.config) as application_config:
cfg = yaml.load(application_config)
# setting defaults
cfg['mpd_host'] = cfg['mpd_client'] if 'mpd_host' in cfg else 'localhost'
cfg['mpd_port'] = cfg['mpd_port'] if 'mpd_port' in cfg else 6600
return cfg
if __name__ == '__main__':
config = load_config() # load the config file
logging.config.dictConfig(config['logging']) # configure logging
log = logging.getLogger("daemon") # get the global "daemon" logger
# Replace stdout with logging to file at DEBUG level
sys.stdout = MyLogger(log, logging.DEBUG)
# Replace stderr with logging to file at ERROR level
sys.stderr = MyLogger(log, logging.ERROR)
log.info("start...")
musicDaemonClient = MusicDaemonClient(config['mpd_host'], config['mpd_port'])
log.info("MPD Version: {}".format(musicDaemonClient.mpd_version())) # print the version of the daemon
log_mpd_status(musicDaemonClient)
radioControl = WegaRadioControl()
try:
while True:
time.sleep(300) # sleep in seconds
log.info("still alive...")
log_mpd_status(musicDaemonClient)
except KeyboardInterrupt:
log.info("KeyboardInterrupt: Ctrl+c") # on Ctrl+C
radioControl.teardown()
log.info("exit...")