-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathmain.py
86 lines (64 loc) · 2.71 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/python3
# stdlib
import datetime, time, logging
# Self libraries
import linky
# ----------------------------- #
# Setup #
# ----------------------------- #
log = logging.getLogger('linky')
log.debug('Loading config...')
config = linky.load_config()
log.debug(f'Config loaded! Values: {config}')
terminal = linky.setup_serial(config['device'])
# Trying to connect to db server and creating schema if not exists
linky.test_db_connection(config['database']['server'], config['database']['user'], config['database']['password'], config['database']['name'])
# ----------------------------- #
# Main loop #
# ----------------------------- #
if config.get('use_utc', False):
current_loop_day = datetime.datetime.now(datetime.timezone.utc).day
previous_loop_day = datetime.datetime.now(datetime.timezone.utc).day
else:
current_loop_day = datetime.datetime.now().day
previous_loop_day = datetime.datetime.now().day
while True:
log.debug("Cycle begins")
data_BASE = None
data_PAPP = None
if config.get('use_utc', False):
current_loop_day = datetime.datetime.now(datetime.timezone.utc).day
else:
current_loop_day = datetime.datetime.now().day
# Now beginning to read data from Linky
log.debug("Opening terminal...")
terminal.open()
# reading continously output until we have data that interests us
while True:
line = terminal.readline().decode('ascii')
log.debug(f"Current line: {line}")
if line.startswith('BASE'):
data_BASE = int(line.split(' ')[1])
if line.startswith('PAPP'):
data_PAPP = int(line.split(' ')[1])
# We have BASE and PAPP, we can now close the connection
if data_BASE and data_PAPP:
log.debug(f"Output parsed: BASE={data_BASE}, PAPP={data_PAPP}. Closing terminal.")
terminal.close()
break
# Connecting to database
log.debug("Connecting to database")
db, cr = linky.open_db(config['database']['server'], config['database']['user'], config['database']['password'], config['database']['name'])
# first record of the day? generating dailies
if current_loop_day != previous_loop_day:
log.debug(f"First record of the day! Inserting dailies record.")
linky.insert_dailies(config, db, cr, data_BASE)
if config.get('use_utc', False):
previous_loop_day = datetime.datetime.now(datetime.timezone.utc).day
else:
previous_loop_day = datetime.datetime.now().day
# inserting values
log.debug("Inserting stream record")
linky.insert_stream(config, db, cr, data_BASE, data_PAPP)
log.debug("Cycle ends, sleeping for 60 seconds")
time.sleep(60)