-
Notifications
You must be signed in to change notification settings - Fork 233
/
Copy pathpick_single_ycb.py
267 lines (234 loc) · 10.6 KB
/
pick_single_ycb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
from typing import Any, Dict, List, Union
import numpy as np
import sapien
import torch
from mani_skill import ASSET_DIR
from mani_skill.agents.robots.fetch.fetch import Fetch
from mani_skill.agents.robots.panda.panda import Panda
from mani_skill.agents.robots.panda.panda_wristcam import PandaWristCam
from mani_skill.agents.robots.xmate3.xmate3 import Xmate3Robotiq
from mani_skill.envs.sapien_env import BaseEnv
from mani_skill.envs.utils.randomization.pose import random_quaternions
from mani_skill.sensors.camera import CameraConfig
from mani_skill.utils import common, sapien_utils
from mani_skill.utils.building import actors
from mani_skill.utils.io_utils import load_json
from mani_skill.utils.registration import register_env
from mani_skill.utils.scene_builder.table import TableSceneBuilder
from mani_skill.utils.structs.actor import Actor
from mani_skill.utils.structs.pose import Pose
from mani_skill.utils.structs.types import GPUMemoryConfig, SimConfig
WARNED_ONCE = False
@register_env("PickSingleYCB-v1", max_episode_steps=50, asset_download_ids=["ycb"])
class PickSingleYCBEnv(BaseEnv):
"""
**Task Description:**
Pick up a random object sampled from the [YCB dataset](https://www.ycbbenchmarks.com/) and move it to a random goal position
**Randomizations:**
- the object's xy position is randomized on top of a table in the region [0.1, 0.1] x [-0.1, -0.1]. It is placed flat on the table
- the object's z-axis rotation is randomized
- the object geometry is randomized by randomly sampling any YCB object. (during reconfiguration)
**Success Conditions:**
- the object position is within goal_thresh (default 0.025) euclidean distance of the goal position
- the robot is static (q velocity < 0.2)
**Goal Specification:**
- 3D goal position (also visualized in human renders)
**Additional Notes**
- On GPU simulation, in order to collect data from every possible object in the YCB database we recommend using at least 128 parallel environments or more, otherwise you will need to reconfigure in order to sample new objects.
"""
_sample_video_link = "https://github.com/haosulab/ManiSkill/raw/main/figures/environment_demos/PickSingleYCB-v1_rt.mp4"
SUPPORTED_ROBOTS = ["panda", "panda_wristcam", "fetch"]
agent: Union[Panda, PandaWristCam, Fetch]
goal_thresh = 0.025
def __init__(
self,
*args,
robot_uids="panda_wristcam",
robot_init_qpos_noise=0.02,
num_envs=1,
reconfiguration_freq=None,
**kwargs,
):
self.robot_init_qpos_noise = robot_init_qpos_noise
self.model_id = None
self.all_model_ids = np.array(
[
k
for k in load_json(
ASSET_DIR / "assets/mani_skill2_ycb/info_pick_v0.json"
).keys()
if k
not in [
"022_windex_bottle",
"028_skillet_lid",
"029_plate",
"059_chain",
] # NOTE (arth): ignore these non-graspable/hard to grasp ycb objects
]
)
if reconfiguration_freq is None:
if num_envs == 1:
reconfiguration_freq = 1
else:
reconfiguration_freq = 0
super().__init__(
*args,
robot_uids=robot_uids,
reconfiguration_freq=reconfiguration_freq,
num_envs=num_envs,
**kwargs,
)
@property
def _default_sim_config(self):
return SimConfig(
gpu_memory_config=GPUMemoryConfig(
max_rigid_contact_count=2**20, max_rigid_patch_count=2**19
)
)
@property
def _default_sensor_configs(self):
pose = sapien_utils.look_at(eye=[0.3, 0, 0.6], target=[-0.1, 0, 0.1])
return [CameraConfig("base_camera", pose, 128, 128, np.pi / 2, 0.01, 100)]
@property
def _default_human_render_camera_configs(self):
pose = sapien_utils.look_at([0.6, 0.7, 0.6], [0.0, 0.0, 0.35])
return CameraConfig("render_camera", pose, 512, 512, 1, 0.01, 100)
def _load_agent(self, options: dict):
super()._load_agent(options, sapien.Pose(p=[-0.615, 0, 0]))
def _load_scene(self, options: dict):
global WARNED_ONCE
self.table_scene = TableSceneBuilder(
env=self, robot_init_qpos_noise=self.robot_init_qpos_noise
)
self.table_scene.build()
# randomize the list of all possible models in the YCB dataset
# then sub-scene i will load model model_ids[i % number_of_ycb_objects]
model_ids = self._batched_episode_rng.choice(self.all_model_ids, replace=True)
if (
self.num_envs > 1
and self.num_envs < len(self.all_model_ids)
and self.reconfiguration_freq <= 0
and not WARNED_ONCE
):
WARNED_ONCE = True
print(
"""There are less parallel environments than total available models to sample.
Not all models will be used during interaction even after resets unless you call env.reset(options=dict(reconfigure=True))
or set reconfiguration_freq to be >= 1."""
)
self._objs: List[Actor] = []
self.obj_heights = []
for i, model_id in enumerate(model_ids):
# TODO: before official release we will finalize a metadata dataclass that these build functions should return.
builder = actors.get_actor_builder(
self.scene,
id=f"ycb:{model_id}",
)
builder.initial_pose = sapien.Pose(p=[0, 0, 0])
builder.set_scene_idxs([i])
self._objs.append(builder.build(name=f"{model_id}-{i}"))
self.remove_from_state_dict_registry(self._objs[-1])
self.obj = Actor.merge(self._objs, name="ycb_object")
self.add_to_state_dict_registry(self.obj)
self.goal_site = actors.build_sphere(
self.scene,
radius=self.goal_thresh,
color=[0, 1, 0, 1],
name="goal_site",
body_type="kinematic",
add_collision=False,
initial_pose=sapien.Pose(),
)
self._hidden_objects.append(self.goal_site)
def _after_reconfigure(self, options: dict):
self.object_zs = []
for obj in self._objs:
collision_mesh = obj.get_first_collision_mesh()
# this value is used to set object pose so the bottom is at z=0
self.object_zs.append(-collision_mesh.bounding_box.bounds[0, 2])
self.object_zs = common.to_tensor(self.object_zs, device=self.device)
def _initialize_episode(self, env_idx: torch.Tensor, options: dict):
with torch.device(self.device):
b = len(env_idx)
self.table_scene.initialize(env_idx)
xyz = torch.zeros((b, 3))
xyz[:, :2] = torch.rand((b, 2)) * 0.2 - 0.1
xyz[:, 2] = self.object_zs[env_idx]
qs = random_quaternions(b, lock_x=True, lock_y=True)
self.obj.set_pose(Pose.create_from_pq(p=xyz, q=qs))
goal_xyz = torch.zeros((b, 3))
goal_xyz[:, :2] = torch.rand((b, 2)) * 0.2 - 0.1
goal_xyz[:, 2] = torch.rand((b)) * 0.3 + xyz[:, 2]
self.goal_site.set_pose(Pose.create_from_pq(goal_xyz))
# Initialize robot arm to a higher position above the table than the default typically used for other table top tasks
if self.robot_uids == "panda" or self.robot_uids == "panda_wristcam":
# fmt: off
qpos = np.array(
[0.0, 0, 0, -np.pi * 2 / 3, 0, np.pi * 2 / 3, np.pi / 4, 0.04, 0.04]
)
# fmt: on
qpos[:-2] += self._episode_rng.normal(
0, self.robot_init_qpos_noise, len(qpos) - 2
)
self.agent.reset(qpos)
self.agent.robot.set_root_pose(sapien.Pose([-0.615, 0, 0]))
elif self.robot_uids == "xmate3_robotiq":
qpos = np.array([0, 0.6, 0, 1.3, 0, 1.3, -1.57, 0, 0])
qpos[:-2] += self._episode_rng.normal(
0, self.robot_init_qpos_noise, len(qpos) - 2
)
self.agent.reset(qpos)
self.agent.robot.set_root_pose(sapien.Pose([-0.562, 0, 0]))
else:
raise NotImplementedError(self.robot_uids)
def evaluate(self):
obj_to_goal_pos = self.goal_site.pose.p - self.obj.pose.p
is_obj_placed = torch.linalg.norm(obj_to_goal_pos, axis=1) <= self.goal_thresh
is_grasped = self.agent.is_grasping(self.obj)
is_robot_static = self.agent.is_static(0.2)
return dict(
is_grasped=is_grasped,
obj_to_goal_pos=obj_to_goal_pos,
is_obj_placed=is_obj_placed,
is_robot_static=is_robot_static,
is_grasping=self.agent.is_grasping(self.obj),
success=torch.logical_and(is_obj_placed, is_robot_static),
)
def _get_obs_extra(self, info: Dict):
obs = dict(
tcp_pose=self.agent.tcp.pose.raw_pose,
goal_pos=self.goal_site.pose.p,
is_grasped=info["is_grasped"],
)
if "state" in self.obs_mode:
obs.update(
tcp_to_goal_pos=self.goal_site.pose.p - self.agent.tcp.pose.p,
obj_pose=self.obj.pose.raw_pose,
tcp_to_obj_pos=self.obj.pose.p - self.agent.tcp.pose.p,
obj_to_goal_pos=self.goal_site.pose.p - self.obj.pose.p,
)
return obs
def compute_dense_reward(self, obs: Any, action: torch.Tensor, info: Dict):
tcp_to_obj_dist = torch.linalg.norm(
self.obj.pose.p - self.agent.tcp.pose.p, axis=1
)
reaching_reward = 1 - torch.tanh(5 * tcp_to_obj_dist)
reward = reaching_reward
is_grasped = info["is_grasped"]
reward += is_grasped
obj_to_goal_dist = torch.linalg.norm(
self.goal_site.pose.p - self.obj.pose.p, axis=1
)
place_reward = 1 - torch.tanh(5 * obj_to_goal_dist)
reward += place_reward * is_grasped
reward += info["is_obj_placed"] * is_grasped
static_reward = 1 - torch.tanh(
5 * torch.linalg.norm(self.agent.robot.get_qvel()[..., :-2], axis=1)
)
reward += static_reward * info["is_obj_placed"] * is_grasped
reward[info["success"]] = 6
return reward
def compute_normalized_dense_reward(
self, obs: Any, action: torch.Tensor, info: Dict
):
return self.compute_dense_reward(obs=obs, action=action, info=info) / 6