-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpoint2.py
146 lines (134 loc) · 5.72 KB
/
point2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3
import cv2
import warnings
import numpy as np
import depthai as dai
import pandas as pd
from memory import Memory
mem = Memory()
class Fwd_Collision_Detect():
'''
Creates a focussed point cloud that determines any obstacles
directly in front of the robot and returns the minimum distance
to the closest
'''
def __init__(self):
# Point cloud loop constants - ranges
self.x_bins = pd.interval_range(start = -350, end = 350, periods = 7)
self.y_bins = pd.interval_range(start = 0, end = 800, periods = 8)
self.fx = 1.4 # values found by measuring known sized objects at known distances
self.fy = 1.4 # originally 2.05
self.pc_width = 640
self.cx = self.pc_width / 2
self.pc_height = 480
self.cy = self.pc_height / 2
self.pc_max_range = 5000.0
self.pc_min_range = 200.0
self.column, self.row = np.meshgrid(np.arange(self.pc_width), np.arange(self.pc_height), sparse=True)
def record_min_dist(self,depth_image) -> float:
'''
Distills the point cloud down to a single value that is the distance to the
nearest obstacle that is directly in front of the robot
'''
# Ignore points too close or too far away
valid = (depth_image > self.pc_min_range) & (depth_image < self.pc_max_range)
# Calculate the point cloud using simple extrapolation from depth
z = np.where(valid, depth_image, 0.0)
x = np.where(valid, (z * (self.column - self.cx) /self.cx / self.fx) + 120.0 , self.pc_max_range)
y = np.where(valid, 300.0 - (z * (self.row - self.cy) / self.cy / self.fy) , self.pc_max_range) # measured height is 268mm
z2 = z.flatten()
x2 = x.flatten()
y2 = y.flatten()
cloud = np.column_stack((x2,y2,z2))
# Remove points that are projected to fall outside the field of view
# points below floor level, above 1.6m or those more than 2m to the
# sides of the robot are ignored
in_scope = (cloud[:,1] < 800) & (cloud[:,1] > 0) & (cloud[:,0] < 350) & (cloud[:,0] > -350)
in_scope = np.repeat(in_scope, 3)
in_scope = in_scope.reshape(-1, 3)
scope = np.where(in_scope, cloud, np.nan)
scope = scope[~np.isnan(scope).any(axis=1)]
# print(np.shape(scope))
# place the points into a set of 10cm square bins
x_index = pd.cut(scope[:,0], self.x_bins)
y_index = pd.cut(scope[:,1], self.y_bins)
binned_depths = pd.Series(scope[:,2])
# simplify each bin to a single median value
totals = binned_depths.groupby([y_index, x_index]).median()
# shape the simplified bins into a 2D array
totals = totals.values.reshape(8,7)
min = float(np.nanmin(totals))
im_totals = totals - min
max = float(np.nanmax(im_totals))
# print("PC:",min, max)
disp = (im_totals / max * 255.0).astype(np.uint8)
disp = cv2.applyColorMap(disp, cv2.COLORMAP_HOT)
flipv = cv2.flip(disp, 0)
dim = (350, 400)
resized = cv2.resize(flipv, dim, interpolation = cv2.INTER_AREA)
cv2.imshow("Resized point cloud image", resized)
# for each column in the array, find out the closest
# bin; as the robot cannot duck or jump, the
# y values are irrelevant
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
min_dist = float(np.nanmin(totals))
# inject the resulting 40 sensor points into thew
# short term memory of the robot
# point_cloud = point_cloud[16:24]
# min_dist = np.amin(point_cloud)
# mem.storeState("forward", min_dist)
#plt.scatter(x2,-y2,c=z2,cmap='afmhot',s=10)
#plt.xlim(-350,350)
#plt.ylim(-200,1600)
#plt.show()
if min_dist:
min_dist = float(min_dist/1000.0)
mem.storeState("forward", min_dist)
return min_dist
else:
return None
def getDepthFrame(frame):
min = np.amin(frame)
frame = frame - min
mean = np.mean(frame)
disp = (frame / mean * 128.0).astype(np.uint8)
disp = cv2.applyColorMap(disp, cv2.COLORMAP_HOT)
return disp
print("Creating stereo depth pipeline")
pipeline = dai.Pipeline()
camLeft = pipeline.create(dai.node.MonoCamera)
camRight = pipeline.create(dai.node.MonoCamera)
stereo = pipeline.create(dai.node.StereoDepth)
xoutDepth = pipeline.create(dai.node.XLinkOut)
camLeft.setBoardSocket(dai.CameraBoardSocket.LEFT)
camRight.setBoardSocket(dai.CameraBoardSocket.RIGHT)
camLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_480_P)
camRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_480_P)
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_ACCURACY)
stereo.initialConfig.setMedianFilter(dai.StereoDepthProperties.MedianFilter.KERNEL_7x7)
stereo.setRectifyEdgeFillColor(0)
stereo.setLeftRightCheck(True)
stereo.setExtendedDisparity(False)
stereo.setSubpixel(True)
xoutDepth.setStreamName("depth")
camLeft.out.link(stereo.left)
camRight.out.link(stereo.right)
stereo.depth.link(xoutDepth.input)
print("Creating device")
device = dai.Device()
fc = Fwd_Collision_Detect()
with device:
print("Starting pipeline")
device.startPipeline(pipeline)
print("Pipeline started")
qDepth = device.getOutputQueue(name = "depth", maxSize = 3, blocking = False)
print("Ready to process images")
while True:
frame = qDepth.get().getCvFrame()
print("Min dist:",fc.record_min_dist(frame))
# print(np.shape(frame)) # 480, 640
im_frame = getDepthFrame(frame)
cv2.imshow("False Depth Image", im_frame)
if cv2.waitKey(1) == ord("q"):
break