Eclipse SUMO - Simulation of Urban MObility
Connection.cpp
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3// Copyright (C) 2012-2022 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
21// C++ TraCI client API implementation
22/****************************************************************************/
23#include <config.h>
24
25#include <thread>
26#include <chrono>
27#include <array>
29#include <libsumo/TraCIDefs.h>
30#include "Connection.h"
31
32
33namespace libtraci {
34// ===========================================================================
35// static member initializations
36// ===========================================================================
37Connection* Connection::myActive = nullptr;
38std::map<const std::string, Connection*> Connection::myConnections;
39
40
41// ===========================================================================
42// member method definitions
43// ===========================================================================
44#ifdef _MSC_VER
45/* Disable "decorated name length exceeded, name was truncated" warnings for the whole file. */
46#pragma warning(disable: 4503)
47#endif
48Connection::Connection(const std::string& host, int port, int numRetries, const std::string& label, FILE* const pipe) :
49 myLabel(label), myProcessPipe(pipe), myProcessReader(nullptr), mySocket(host, port) {
50 if (pipe != nullptr) {
51 myProcessReader = new std::thread(&Connection::readOutput, this);
52 }
53 for (int i = 0; i <= numRetries; i++) {
54 try {
56 break;
57 } catch (tcpip::SocketException& e) {
58 if (i == numRetries) {
59 close();
60 throw;
61 }
62 std::cout << "Could not connect to TraCI server at " << host << ":" << port << " " << e.what() << std::endl;
63 std::cout << " Retrying in 1 second" << std::endl;
64 std::this_thread::sleep_for(std::chrono::seconds(1));
65 }
66 }
67}
68
69
70void
72 std::array<char, 256> buffer;
73 bool errout = false;
74 while (fgets(buffer.data(), (int)buffer.size(), myProcessPipe) != nullptr) {
75 std::stringstream result;
76 result << buffer.data();
77 std::string line;
78 while (std::getline(result, line)) {
79 if ((errout && line[0] == ' ') || line.compare(0, 6, "Error:") == 0 || line.compare(0, 8, "Warning:") == 0) {
80 std::cerr << line << std::endl;
81 errout = true;
82 } else {
83 std::cout << line << std::endl;
84 errout = false;
85 }
86 }
87 }
88}
89
90
91void
94 tcpip::Storage outMsg;
95 // command length
96 outMsg.writeUnsignedByte(1 + 1);
97 // command id
99 mySocket.sendExact(outMsg);
100
101 tcpip::Storage inMsg;
102 std::string acknowledgement;
103 check_resultState(inMsg, libsumo::CMD_CLOSE, false, &acknowledgement);
104 mySocket.close();
105 }
106 if (myProcessReader != nullptr) {
107 myProcessReader->join();
108 delete myProcessReader;
109 myProcessReader = nullptr;
110#ifdef WIN32
111 _pclose(myProcessPipe);
112#else
113 pclose(myProcessPipe);
114#endif
115 }
116 myConnections.erase(myLabel);
117 delete myActive;
118 myActive = nullptr;
119}
120
121
122void
124 tcpip::Storage outMsg;
125 // command length
126 outMsg.writeUnsignedByte(1 + 1 + 8);
127 // command id
129 outMsg.writeDouble(time);
130 // send request message
131 mySocket.sendExact(outMsg);
132
133 tcpip::Storage inMsg;
135 mySubscriptionResults.clear();
137 int numSubs = inMsg.readInt();
138 while (numSubs-- > 0) {
139 const int responseID = check_commandGetResult(inMsg, 0, -1, true);
142 readVariableSubscription(responseID, inMsg);
143 } else {
144 readContextSubscription(responseID, inMsg);
145 }
146 }
147}
148
149
150void
152 tcpip::Storage outMsg;
153 // command length
154 outMsg.writeUnsignedByte(1 + 1 + 4);
155 // command id
157 // client index
158 outMsg.writeInt(order);
159 mySocket.sendExact(outMsg);
160
161 tcpip::Storage inMsg;
163}
164
165
166void
167Connection::createCommand(int cmdID, int varID, const std::string* const objID, tcpip::Storage* add) const {
169 throw libsumo::FatalTraCIError("Not connected.");
170 }
171 myOutput.reset();
172 // command length
173 int length = 1 + 1;
174 if (varID >= 0) {
175 length += 1;
176 if (objID != nullptr) {
177 length += 4 + (int)objID->length();
178 }
179 }
180 if (add != nullptr) {
181 length += (int)add->size();
182 }
183 if (length <= 255) {
185 } else {
187 myOutput.writeInt(length + 4);
188 }
190 if (varID >= 0) {
192 if (objID != nullptr) {
193 myOutput.writeString(*objID);
194 }
195 }
196 // additional values
197 if (add != nullptr) {
199 }
200}
201
202
203void
204Connection::subscribe(int domID, const std::string& objID, double beginTime, double endTime,
205 int domain, double range, const std::vector<int>& vars, const libsumo::TraCIResults& params) {
207 throw tcpip::SocketException("Socket is not initialised");
208 }
209 const bool isContext = domain != -1;
210 tcpip::Storage outMsg;
211 outMsg.writeUnsignedByte(domID); // command id
212 outMsg.writeDouble(beginTime);
213 outMsg.writeDouble(endTime);
214 outMsg.writeString(objID);
215 if (isContext) {
216 outMsg.writeUnsignedByte(domain);
217 outMsg.writeDouble(range);
218 }
219 if (vars.size() == 1 && vars.front() == -1) {
220 if (domID == libsumo::CMD_SUBSCRIBE_VEHICLE_VARIABLE && !isContext) {
221 // default for vehicles is edge id and lane position
222 outMsg.writeUnsignedByte(2);
225 } else {
226 // default for detectors is vehicle number, for all others (and contexts) id list
227 outMsg.writeUnsignedByte(1);
228 const bool isDetector = domID == libsumo::CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
234 }
235 } else {
236 outMsg.writeUnsignedByte((int)vars.size());
237 for (const int v : vars) {
238 outMsg.writeUnsignedByte(v);
239 const auto& paramEntry = params.find(v);
240 if (paramEntry != params.end()) {
241 outMsg.writeStorage(*libsumo::StorageHelper::toStorage(*paramEntry->second));
242 }
243 }
244 }
245 tcpip::Storage complete;
246 complete.writeUnsignedByte(0);
247 complete.writeInt(5 + (int)outMsg.size());
248 complete.writeStorage(outMsg);
249 // send message
250 mySocket.sendExact(complete);
251
252 tcpip::Storage inMsg;
253 check_resultState(inMsg, domID);
254 if (!vars.empty()) {
255 const int responseID = check_commandGetResult(inMsg, domID);
256 if (isContext) {
257 readContextSubscription(responseID, inMsg);
258 } else {
259 readVariableSubscription(responseID, inMsg);
260 }
261 }
262}
263
264
265void
266Connection::check_resultState(tcpip::Storage& inMsg, int command, bool ignoreCommandId, std::string* acknowledgement) {
267 mySocket.receiveExact(inMsg);
268 int cmdLength;
269 int cmdId;
270 int resultType;
271 int cmdStart;
272 std::string msg;
273 try {
274 cmdStart = inMsg.position();
275 cmdLength = inMsg.readUnsignedByte();
276 cmdId = inMsg.readUnsignedByte();
277 if (command != cmdId && !ignoreCommandId) {
278 throw libsumo::TraCIException("#Error: received status response to command: " + toString(cmdId) + " but expected: " + toString(command));
279 }
280 resultType = inMsg.readUnsignedByte();
281 msg = inMsg.readString();
282 } catch (std::invalid_argument&) {
283 throw libsumo::TraCIException("#Error: an exception was thrown while reading result state message");
284 }
285 switch (resultType) {
287 throw libsumo::TraCIException(msg);
289 throw libsumo::TraCIException(".. Sent command is not implemented (" + toString(command) + "), [description: " + msg + "]");
291 if (acknowledgement != nullptr) {
292 (*acknowledgement) = ".. Command acknowledged (" + toString(command) + "), [description: " + msg + "]";
293 }
294 break;
295 default:
296 throw libsumo::TraCIException(".. Answered with unknown result code(" + toString(resultType) + ") to command(" + toString(command) + "), [description: " + msg + "]");
297 }
298 if ((cmdStart + cmdLength) != (int) inMsg.position()) {
299 throw libsumo::TraCIException("#Error: command at position " + toString(cmdStart) + " has wrong length");
300 }
301}
302
303
304int
305Connection::check_commandGetResult(tcpip::Storage& inMsg, int command, int expectedType, bool ignoreCommandId) const {
306 int length = inMsg.readUnsignedByte();
307 if (length == 0) {
308 length = inMsg.readInt();
309 }
310 int cmdId = inMsg.readUnsignedByte();
311 if (!ignoreCommandId && cmdId != (command + 0x10)) {
312 throw libsumo::TraCIException("#Error: received response with command id: " + toString(cmdId) + "but expected: " + toString(command + 0x10));
313 }
314 if (expectedType >= 0) {
315 // not called from the TraCITestClient but from within the Connection
316 inMsg.readUnsignedByte(); // variableID
317 inMsg.readString(); // objectID
318 int valueDataType = inMsg.readUnsignedByte();
319 if (valueDataType != expectedType) {
320 throw libsumo::TraCIException("Expected " + toString(expectedType) + " but got " + toString(valueDataType));
321 }
322 }
323 return cmdId;
324}
325
326
328Connection::doCommand(int command, int var, const std::string& id, tcpip::Storage* add) {
329 createCommand(command, var, &id, add);
331 myInput.reset();
332 check_resultState(myInput, command);
333 return myInput;
334}
335
336
337void
341 myInput.reset();
343}
344
345
346void
347Connection::readVariables(tcpip::Storage& inMsg, const std::string& objectID, int variableCount, libsumo::SubscriptionResults& into) {
348 while (variableCount > 0) {
349
350 const int variableID = inMsg.readUnsignedByte();
351 const int status = inMsg.readUnsignedByte();
352 const int type = inMsg.readUnsignedByte();
353
354 if (status == libsumo::RTYPE_OK) {
355 switch (type) {
357 into[objectID][variableID] = std::make_shared<libsumo::TraCIDouble>(inMsg.readDouble());
358 break;
360 into[objectID][variableID] = std::make_shared<libsumo::TraCIString>(inMsg.readString());
361 break;
363 auto p = std::make_shared<libsumo::TraCIPosition>();
364 p->x = inMsg.readDouble();
365 p->y = inMsg.readDouble();
366 p->z = 0.;
367 into[objectID][variableID] = p;
368 break;
369 }
371 auto p = std::make_shared<libsumo::TraCIPosition>();
372 p->x = inMsg.readDouble();
373 p->y = inMsg.readDouble();
374 p->z = inMsg.readDouble();
375 into[objectID][variableID] = p;
376 break;
377 }
378 case libsumo::TYPE_COLOR: {
379 auto c = std::make_shared<libsumo::TraCIColor>();
380 c->r = (unsigned char)inMsg.readUnsignedByte();
381 c->g = (unsigned char)inMsg.readUnsignedByte();
382 c->b = (unsigned char)inMsg.readUnsignedByte();
383 c->a = (unsigned char)inMsg.readUnsignedByte();
384 into[objectID][variableID] = c;
385 break;
386 }
388 into[objectID][variableID] = std::make_shared<libsumo::TraCIInt>(inMsg.readInt());
389 break;
391 auto sl = std::make_shared<libsumo::TraCIStringList>();
392 int n = inMsg.readInt();
393 for (int i = 0; i < n; ++i) {
394 sl->value.push_back(inMsg.readString());
395 }
396 into[objectID][variableID] = sl;
397 }
398 break;
400 int n = inMsg.readInt();
401 if (n == 2) {
402 inMsg.readUnsignedByte();
403 const std::string s = inMsg.readString();
404 const int secondType = inMsg.readUnsignedByte();
405 if (secondType == libsumo::TYPE_DOUBLE) {
406 auto r = std::make_shared<libsumo::TraCIRoadPosition>();
407 r->edgeID = s;
408 r->pos = inMsg.readDouble();
409 into[objectID][variableID] = r;
410 } else if (secondType == libsumo::TYPE_STRING) {
411 auto sl = std::make_shared<libsumo::TraCIStringList>();
412 sl->value.push_back(s);
413 sl->value.push_back(inMsg.readString());
414 into[objectID][variableID] = sl;
415 }
416 }
417 }
418 break;
419
420 // TODO Other data types
421
422 default:
423 throw libsumo::TraCIException("Unimplemented subscription type: " + toString(type));
424 }
425 } else {
426 throw libsumo::TraCIException("Subscription response error: variableID=" + toString(variableID) + " status=" + toString(status));
427 }
428
429 variableCount--;
430 }
431}
432
433
434void
436 const std::string objectID = inMsg.readString();
437 const int variableCount = inMsg.readUnsignedByte();
438 readVariables(inMsg, objectID, variableCount, mySubscriptionResults[responseID]);
439}
440
441
442void
444 const std::string contextID = inMsg.readString();
445 inMsg.readUnsignedByte(); // context domain
446 const int variableCount = inMsg.readUnsignedByte();
447 int numObjects = inMsg.readInt();
448 // the following also instantiates the empty map to get comparable results with libsumo
449 // see also https://github.com/eclipse/sumo/issues/7288
450 libsumo::SubscriptionResults& results = myContextSubscriptionResults[responseID][contextID];
451 while (numObjects-- > 0) {
452 std::string objectID = inMsg.readString();
453 readVariables(inMsg, objectID, variableCount, results);
454 }
455}
456
457
458}
459
460
461/****************************************************************************/
An error which is not recoverable.
Definition: TraCIDefs.h:149
static std::shared_ptr< tcpip::Storage > toStorage(const TraCIResult &v)
Definition: StorageHelper.h:33
An error which allows to continue.
Definition: TraCIDefs.h:138
void simulationStep(double time)
Sends a SimulationStep command.
Definition: Connection.cpp:123
Connection(const std::string &host, int port, int numRetries, const std::string &label, FILE *const pipe)
Constructor, connects to the specified SUMO server.
Definition: Connection.cpp:48
void close()
ends the simulation and closes the connection
Definition: Connection.cpp:92
void createCommand(int cmdID, int varID, const std::string *const objID, tcpip::Storage *add=nullptr) const
Sends a GetVariable / SetVariable request if mySocket is connected. Otherwise writes to myOutput only...
Definition: Connection.cpp:167
int check_commandGetResult(tcpip::Storage &inMsg, int command, int expectedType=-1, bool ignoreCommandId=false) const
Validates the result state of a command.
Definition: Connection.cpp:305
void addFilter(int var, tcpip::Storage *add=nullptr)
Definition: Connection.cpp:338
void readVariableSubscription(int responseID, tcpip::Storage &inMsg)
Definition: Connection.cpp:435
tcpip::Socket mySocket
The socket.
Definition: Connection.h:167
std::map< int, libsumo::SubscriptionResults > mySubscriptionResults
Definition: Connection.h:173
void check_resultState(tcpip::Storage &inMsg, int command, bool ignoreCommandId=false, std::string *acknowledgement=0)
Validates the result state of a command.
Definition: Connection.cpp:266
tcpip::Storage myInput
The reusable input storage.
Definition: Connection.h:171
FILE *const myProcessPipe
Definition: Connection.h:164
void readVariables(tcpip::Storage &inMsg, const std::string &objectID, int variableCount, libsumo::SubscriptionResults &into)
Definition: Connection.cpp:347
std::map< int, libsumo::ContextSubscriptionResults > myContextSubscriptionResults
Definition: Connection.h:174
tcpip::Storage myOutput
The reusable output storage.
Definition: Connection.h:169
void setOrder(int order)
Sends a SetOrder command.
Definition: Connection.cpp:151
void subscribe(int domID, const std::string &objID, double beginTime, double endTime, int domain, double range, const std::vector< int > &vars, const libsumo::TraCIResults &params)
Sends a SubscribeContext or a SubscribeVariable request.
Definition: Connection.cpp:204
tcpip::Storage & doCommand(int command, int var, const std::string &id, tcpip::Storage *add=nullptr)
Definition: Connection.cpp:328
static std::map< const std::string, Connection * > myConnections
Definition: Connection.h:177
const std::string myLabel
Definition: Connection.h:163
void readContextSubscription(int responseID, tcpip::Storage &inMsg)
Definition: Connection.cpp:443
static Connection * myActive
Definition: Connection.h:176
static std::string toString(const T &t, std::streamsize accuracy=PRECISION)
Definition: Connection.h:145
std::thread * myProcessReader
Definition: Connection.h:165
bool receiveExact(Storage &)
Receive a complete TraCI message from Socket::socket_.
Definition: socket.cpp:536
void sendExact(const Storage &)
Definition: socket.cpp:439
bool has_client_connection() const
Definition: socket.cpp:568
void connect()
Connects to host_:port_.
Definition: socket.cpp:367
void close()
Definition: socket.cpp:391
virtual std::string readString()
Definition: storage.cpp:180
virtual void writeString(const std::string &s)
Definition: storage.cpp:197
virtual unsigned int position() const
Definition: storage.cpp:76
virtual void writeInt(int)
Definition: storage.cpp:321
virtual void writeDouble(double)
Definition: storage.cpp:354
virtual int readUnsignedByte()
Definition: storage.cpp:155
void reset()
Definition: storage.cpp:85
virtual void writeUnsignedByte(int)
Definition: storage.cpp:165
StorageType::size_type size() const
Definition: storage.h:119
virtual void writeStorage(tcpip::Storage &store)
Definition: storage.cpp:388
virtual double readDouble()
Definition: storage.cpp:362
virtual int readInt()
Definition: storage.cpp:311
TRACI_CONST int TYPE_COLOR
TRACI_CONST int LAST_STEP_VEHICLE_NUMBER
TRACI_CONST int POSITION_3D
TRACI_CONST int RTYPE_NOTIMPLEMENTED
TRACI_CONST int TRACI_ID_LIST
TRACI_CONST int VAR_ROAD_ID
TRACI_CONST int TYPE_COMPOUND
TRACI_CONST int RESPONSE_SUBSCRIBE_PARKINGAREA_VARIABLE
TRACI_CONST int RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int POSITION_2D
TRACI_CONST int RESPONSE_SUBSCRIBE_OVERHEADWIRE_VARIABLE
TRACI_CONST int CMD_CLOSE
TRACI_CONST int CMD_SETORDER
TRACI_CONST int TYPE_STRINGLIST
TRACI_CONST int TYPE_INTEGER
TRACI_CONST int RESPONSE_SUBSCRIBE_BUSSTOP_VARIABLE
TRACI_CONST int CMD_ADD_SUBSCRIPTION_FILTER
std::map< std::string, libsumo::TraCIResults > SubscriptionResults
{object->{variable->value}}
Definition: TraCIDefs.h:300
TRACI_CONST int VAR_LANEPOSITION
TRACI_CONST int CMD_SUBSCRIBE_VEHICLE_VARIABLE
TRACI_CONST int TYPE_DOUBLE
TRACI_CONST int CMD_SUBSCRIBE_LANEAREA_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE
TRACI_CONST int RTYPE_ERR
TRACI_CONST int CMD_SIMSTEP
TRACI_CONST int CMD_SUBSCRIBE_LANE_VARIABLE
TRACI_CONST int RTYPE_OK
std::map< int, std::shared_ptr< libsumo::TraCIResult > > TraCIResults
{variable->value}
Definition: TraCIDefs.h:298
TRACI_CONST int CMD_SUBSCRIBE_EDGE_VARIABLE
TRACI_CONST int TYPE_STRING