-
Notifications
You must be signed in to change notification settings - Fork 377
/
Copy pathopenscenario_configuration.py
420 lines (345 loc) · 17.5 KB
/
openscenario_configuration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
#!/usr/bin/env python
# Copyright (c) 2019 Intel Corporation
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
"""
This module provides the key configuration parameters for a scenario based on OpenSCENARIO
"""
import logging
import os
import xml.etree.ElementTree as ET
import xmlschema
import carla
# pylint: disable=line-too-long
from srunner.scenarioconfigs.scenario_configuration import ActorConfigurationData, ScenarioConfiguration
# pylint: enable=line-too-long
from srunner.scenariomanager.carla_data_provider import CarlaDataProvider # workaround
from srunner.tools.openscenario_parser import OpenScenarioParser
class OpenScenarioConfiguration(ScenarioConfiguration):
"""
Limitations:
- Only one Story + Init is supported per Storyboard
"""
def __init__(self, filename, client, custom_params):
self.xml_tree = ET.parse(filename)
self.filename = filename
self._custom_params = custom_params if custom_params is not None else dict()
self._validate_openscenario_configuration()
self.client = client
self.catalogs = {}
self.other_actors = []
self.ego_vehicles = []
self.trigger_points = []
self.weather = carla.WeatherParameters()
self.storyboard = self.xml_tree.find("Storyboard")
self.stories = self.storyboard.findall("Story")
self.init = self.storyboard.find("Init")
logging.basicConfig()
self.logger = logging.getLogger("[SR:OpenScenarioConfiguration]")
self._global_parameters = {}
self._set_parameters()
self._parse_openscenario_configuration()
def _validate_openscenario_configuration(self):
"""
Validate the given OpenSCENARIO config against the 1.0 XSD
Note: This will throw if the config is not valid. But this is fine here.
"""
xsd_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../openscenario/OpenSCENARIO.xsd")
xsd = xmlschema.XMLSchema(xsd_file)
xsd.validate(self.xml_tree)
def _validate_openscenario_catalog_configuration(self, catalog_xml_tree):
"""
Validate the given OpenSCENARIO catalog config against the 1.0 XSD
Note: This will throw if the catalog config is not valid. But this is fine here.
"""
xsd_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../openscenario/OpenSCENARIO.xsd")
xsd = xmlschema.XMLSchema(xsd_file)
xsd.validate(catalog_xml_tree)
def _parse_openscenario_configuration(self):
"""
Parse the given OpenSCENARIO config file, set and validate parameters
"""
OpenScenarioParser.set_osc_filepath(os.path.dirname(self.filename))
self._check_version()
self._load_catalogs()
self._set_scenario_name()
self._set_carla_town()
self._set_actor_information()
self._validate_result()
def _check_version(self):
"""
Ensure correct OpenSCENARIO version is used
"""
header = self.xml_tree.find("FileHeader")
if not (header.attrib.get('revMajor') == "1" and header.attrib.get('revMinor') == "0"):
raise AttributeError("Only OpenSCENARIO 1.0 is supported")
def _load_catalogs(self):
"""
Read Catalog xml files into dictionary for later use
NOTE: Catalogs must have distinct names, even across different types
"""
catalogs = self.xml_tree.find("CatalogLocations")
if list(catalogs) is None:
return
catalog_types = ["Vehicle",
"Controller",
"Pedestrian",
"MiscObject",
"Environment",
"Maneuver",
"Trajectory",
"Route"]
for catalog_type in catalog_types:
catalog = catalogs.find(catalog_type + "Catalog")
if catalog is None:
continue
catalog_path = catalog.find("Directory").attrib.get('path') + "/" + catalog_type + "Catalog.xosc"
if not os.path.isabs(catalog_path) and "xosc" in self.filename:
catalog_path = os.path.dirname(os.path.abspath(self.filename)) + "/" + catalog_path
if not os.path.isfile(catalog_path):
self.logger.warning(" The %s path for the %s Catalog is invalid", catalog_path, catalog_type)
else:
xml_tree = ET.parse(catalog_path)
self._validate_openscenario_catalog_configuration(xml_tree)
catalog = xml_tree.find("Catalog")
catalog_name = catalog.attrib.get("name")
self.catalogs[catalog_name] = {}
for entry in catalog:
self.catalogs[catalog_name][entry.attrib.get("name")] = entry
def _set_scenario_name(self):
"""
Extract the scenario name from the OpenSCENARIO header information
"""
header = self.xml_tree.find("FileHeader")
self.name = header.attrib.get('description', 'Unknown')
if self.name.startswith("CARLA:"):
self.name = self.name[6:]
OpenScenarioParser.set_use_carla_coordinate_system()
def _set_carla_town(self):
"""
Extract the CARLA town (level) from the RoadNetwork information from OpenSCENARIO
Note: The specification allows multiple Logics elements within the RoadNetwork element.
Hence, there can be multiple towns specified. We just use the _last_ one.
"""
for logic in self.xml_tree.find("RoadNetwork").findall("LogicFile"):
self.town = logic.attrib.get('filepath', None)
if self.town is not None and ".xodr" in self.town:
if not os.path.isabs(self.town):
self.town = os.path.dirname(os.path.abspath(self.filename)) + "/" + self.town
if not os.path.exists(self.town):
raise AttributeError("The provided RoadNetwork '{}' does not exist".format(self.town))
# workaround for relative positions during init
world = self.client.get_world()
wmap = None
if world:
world.get_settings()
wmap = world.get_map()
if world is None or (wmap is not None and wmap.name != self.town):
if ".xodr" in self.town:
with open(self.town) as od_file:
data = od_file.read()
index = data.find('<OpenDRIVE>')
data = data[index:]
old_map = ""
if wmap is not None:
old_map = wmap.to_opendrive()
index = old_map.find('<OpenDRIVE>')
old_map = old_map[index:]
if data != old_map:
self.logger.warning(" Wrong OpenDRIVE map in use. Forcing reload of CARLA world")
vertex_distance = 2.0 # in meters
wall_height = 1.0 # in meters
extra_width = 0.6 # in meters
world = self.client.generate_opendrive_world(str(data),
carla.OpendriveGenerationParameters(
vertex_distance=vertex_distance,
wall_height=wall_height,
additional_width=extra_width,
smooth_junctions=True,
enable_mesh_visibility=True))
else:
self.logger.warning(" Wrong map in use. Forcing reload of CARLA world")
self.client.load_world(self.town)
world = self.client.get_world()
CarlaDataProvider.set_world(world)
if CarlaDataProvider.is_sync_mode():
world.tick()
else:
world.wait_for_tick()
else:
CarlaDataProvider.set_world(world)
def _set_parameters(self):
"""
Parse the complete scenario definition file, and replace all parameter references
with the actual values
Set _global_parameters.
"""
self.xml_tree, self._global_parameters = OpenScenarioParser.set_parameters(self.xml_tree, self._custom_params)
for elem in self.xml_tree.iter():
if elem.find('ParameterDeclarations') is not None:
elem, _ = OpenScenarioParser.set_parameters(elem)
OpenScenarioParser.set_global_parameters(self._global_parameters)
def _set_actor_information(self):
"""
Extract all actors and their corresponding specification
NOTE: The rolename property has to be unique!
"""
for entity in self.xml_tree.iter("Entities"):
for obj in entity.iter("ScenarioObject"):
rolename = obj.attrib.get('name', 'simulation')
args = dict()
for prop in obj.iter("Property"):
key = prop.get('name')
value = prop.get('value')
args[key] = value
actor_controller = []
for controller in obj.iter("ObjectController"):
actor_controller.append(controller)
for catalog_reference in obj.iter("CatalogReference"):
entry = OpenScenarioParser.get_catalog_entry(self.catalogs, catalog_reference)
if entry.tag == "Vehicle":
self._extract_vehicle_information(entry, rolename, entry, args, actor_controller)
elif entry.tag == "Pedestrian":
self._extract_pedestrian_information(entry, rolename, entry, args, actor_controller)
elif entry.tag == "MiscObject":
self._extract_misc_information(entry, rolename, entry, args, actor_controller)
else:
self.logger.debug(
" A CatalogReference specifies a reference that is not an Entity. Skipping...")
for vehicle in obj.iter("Vehicle"):
self._extract_vehicle_information(obj, rolename, vehicle, args, actor_controller)
for pedestrian in obj.iter("Pedestrian"):
self._extract_pedestrian_information(obj, rolename, pedestrian, args, actor_controller)
for misc in obj.iter("MiscObject"):
self._extract_misc_information(obj, rolename, misc, args, actor_controller)
# Set transform for all actors
# This has to be done in a multi-stage loop to resolve relative position settings
all_actor_transforms_set = False
while not all_actor_transforms_set:
all_actor_transforms_set = True
for actor in self.other_actors + self.ego_vehicles:
if actor.transform is None:
try:
actor.transform = self._get_actor_transform(actor.rolename)
except AttributeError as e:
if "Object '" in str(e):
ref_actor_rolename = str(e).split('\'')[1]
for ref_actor in self.other_actors + self.ego_vehicles:
if ref_actor.rolename == ref_actor_rolename:
if ref_actor.transform is not None:
raise e
break
else:
raise e
if actor.transform is None:
all_actor_transforms_set = False
def _extract_vehicle_information(self, obj, rolename, vehicle, args, actor_controller):
"""
Helper function to _set_actor_information for getting vehicle information from XML tree
"""
color = None
model = vehicle.attrib.get('name', "vehicle.*")
category = vehicle.attrib.get('vehicleCategory', "car")
ego_vehicle = False
for prop in obj.iter("Property"):
if prop.get('name', '') == 'type':
ego_vehicle = prop.get('value') == 'ego_vehicle'
if prop.get('name', '') == 'color':
color = prop.get('value')
speed = self._get_actor_speed(rolename)
new_actor = ActorConfigurationData(
model, None, rolename, speed, color=color, category=category, args=args, controller=actor_controller)
if ego_vehicle:
self.ego_vehicles.append(new_actor)
else:
self.other_actors.append(new_actor)
def _extract_pedestrian_information(self, obj, rolename, pedestrian, args, actor_controller):
"""
Helper function to _set_actor_information for getting pedestrian information from XML tree
"""
model = pedestrian.attrib.get('model', "walker.*")
speed = self._get_actor_speed(rolename)
new_actor = ActorConfigurationData(model, None, rolename, speed,
category="pedestrian", args=args, controller=actor_controller)
self.other_actors.append(new_actor)
def _extract_misc_information(self, obj, rolename, misc, args, actor_controller):
"""
Helper function to _set_actor_information for getting vehicle information from XML tree
"""
category = misc.attrib.get('miscObjectCategory')
if category == "barrier":
model = "static.prop.streetbarrier"
elif category == "guardRail":
model = "static.prop.chainbarrier"
else:
model = misc.attrib.get('name')
new_actor = ActorConfigurationData(model, None, rolename, category="misc",
args=args, controller=actor_controller)
self.other_actors.append(new_actor)
def _get_actor_transform(self, actor_name):
"""
Get the initial actor transform provided by the Init section
Note: - The OpenScenario specification allows multiple definitions. We use the _first_ one
- The OpenScenario specification allows different ways of specifying a position.
We currently support the specification with absolute world coordinates and the relative positions
RelativeWorld, RelativeObject and RelativeLane
- When using relative positions the relevant reference point (e.g. transform of another actor)
should be defined before!
"""
actor_transform = carla.Transform()
actor_found = False
for private_action in self.init.iter("Private"):
if private_action.attrib.get('entityRef', None) == actor_name:
if actor_found:
# pylint: disable=line-too-long
self.logger.warning(
" Warning: The actor '%s' was already assigned an initial position. Overwriting pose!", actor_name)
# pylint: enable=line-too-long
actor_found = True
for position in private_action.iter('Position'):
transform = OpenScenarioParser.convert_position_to_transform(
position, actor_list=self.other_actors + self.ego_vehicles)
if transform:
actor_transform = transform
if not actor_found:
# pylint: disable=line-too-long
self.logger.warning(
" Warning: The actor '%s' was not assigned an initial position. Using (0,0,0)", actor_name)
# pylint: enable=line-too-long
return actor_transform
def _get_actor_speed(self, actor_name):
"""
Get the initial actor speed provided by the Init section
"""
actor_speed = 0
actor_found = False
for private_action in self.init.iter("Private"):
if private_action.attrib.get('entityRef', None) == actor_name:
if actor_found:
# pylint: disable=line-too-long
self.logger.warning(
" Warning: The actor '%s' was already assigned an initial speed. Overwriting inital speed!", actor_name)
# pylint: enable=line-too-long
actor_found = True
for longitudinal_action in private_action.iter('LongitudinalAction'):
for speed in longitudinal_action.iter('SpeedAction'):
for target in speed.iter('SpeedActionTarget'):
for absolute in target.iter('AbsoluteTargetSpeed'):
speed = float(absolute.attrib.get('value', 0))
if speed >= 0:
actor_speed = speed
else:
raise AttributeError(
"Warning: Speed value of actor {} must be positive. Speed set to 0.".format(actor_name)) # pylint: disable=line-too-long
return actor_speed
def _validate_result(self):
"""
Check that the current scenario configuration is valid
"""
if not self.name:
raise AttributeError("No scenario name found")
if not self.town:
raise AttributeError("CARLA level not defined")
if not self.ego_vehicles:
self.logger.warning(" No ego vehicles defined in scenario")