-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrecorder.py
67 lines (54 loc) · 2.15 KB
/
recorder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import os
import gzip
from datetime import datetime
import numpy as np
import cv2
import os
class RecorderData(object):
def __init__(self,dir_name):
self.directory = dir_name
self.objects = []
self.inputFiles = {}
self.camera = None
def subScribeLidar(self,lidar):
self.lidar = lidar
self.lidarDir = os.path.join(self.directory, lidar.NAME)
if not os.path.isdir(self.lidarDir):
os.makedirs(self.lidarDir)
def subscribeCamera(self,obj):
self.camera = obj
self.cameraDir = os.path.join(self.directory,"webcam")
if not os.path.isdir(self.cameraDir):
os.makedirs(self.cameraDir)
def subscribe(self, obj, compressedMode):
self.objects.append(obj)
if compressedMode:
filename = os.path.join(self.directory, obj.NAME + ".npz")
self.inputFiles[obj.NAME] = gzip.GzipFile(filename, "w")
else:
filename = os.path.join(self.directory, obj.NAME + ".npy")
self.inputFiles[obj.NAME] = open(filename, "ba+")
def record(self, timestamp):
if self.camera is not None:
image = self.camera.getData()
idxstr = str(timestamp)
cv2.imwrite(os.path.join(self.cameraDir, 'webcam_frame_' + idxstr + '.png'), image)
if self.lidar is not None:
data = self.lidar.getData()
if data is not None:
idxstr = str(timestamp)
filename = os.path.join(self.lidarDir, idxstr + '.ply')
data.save_to_disk(filename)
for obj in self.objects:
data = obj.getData()
if data is not None:
# if (type(obj).__name__ == "VehicleSpeedometer")
# pass
# else:
np.save(self.inputFiles[obj.NAME], {"timestamp": timestamp, "data": data, "currentDateTime": datetime.now()})
def dispose(self):
for obj in self.objects:
self.inputFiles[obj.NAME].close()
self.objects = None
self.camera = None
self.lidar = None