1 //===-- Communication.cpp ---------------------------------------*- C++ -*-===//
2 //
3 // The LLVM Compiler Infrastructure
4 //
5 // This file is distributed under the University of Illinois Open Source
6 // License. See LICENSE.TXT for details.
7 //
8 //===----------------------------------------------------------------------===//
9
10 // C Includes
11 // C++ Includes
12 // Other libraries and framework includes
13 // Project includes
14 #include "lldb/Core/Communication.h"
15 #include "lldb/Core/Connection.h"
16 #include "lldb/Core/Log.h"
17 #include "lldb/Core/Timer.h"
18 #include "lldb/Core/Event.h"
19 #include "lldb/Host/Host.h"
20 #include "lldb/Host/HostThread.h"
21 #include "lldb/Host/ThreadLauncher.h"
22 #include <string.h>
23
24 using namespace lldb;
25 using namespace lldb_private;
26
27 ConstString &
GetStaticBroadcasterClass()28 Communication::GetStaticBroadcasterClass ()
29 {
30 static ConstString class_name ("lldb.communication");
31 return class_name;
32 }
33
34 //----------------------------------------------------------------------
35 // Constructor
36 //----------------------------------------------------------------------
Communication(const char * name)37 Communication::Communication(const char *name) :
38 Broadcaster (NULL, name),
39 m_connection_sp (),
40 m_read_thread_enabled (false),
41 m_read_thread_did_exit (false),
42 m_bytes(),
43 m_bytes_mutex (Mutex::eMutexTypeRecursive),
44 m_write_mutex (Mutex::eMutexTypeNormal),
45 m_synchronize_mutex (Mutex::eMutexTypeNormal),
46 m_callback (NULL),
47 m_callback_baton (NULL),
48 m_close_on_eof (true)
49
50 {
51 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION,
52 "%p Communication::Communication (name = %s)",
53 this, name);
54
55 SetEventName (eBroadcastBitDisconnected, "disconnected");
56 SetEventName (eBroadcastBitReadThreadGotBytes, "got bytes");
57 SetEventName (eBroadcastBitReadThreadDidExit, "read thread did exit");
58 SetEventName (eBroadcastBitReadThreadShouldExit, "read thread should exit");
59 SetEventName (eBroadcastBitPacketAvailable, "packet available");
60 SetEventName (eBroadcastBitNoMorePendingInput, "no more pending input");
61
62 CheckInWithManager();
63 }
64
65 //----------------------------------------------------------------------
66 // Destructor
67 //----------------------------------------------------------------------
~Communication()68 Communication::~Communication()
69 {
70 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_OBJECT | LIBLLDB_LOG_COMMUNICATION,
71 "%p Communication::~Communication (name = %s)",
72 this, m_broadcaster_name.AsCString(""));
73 Clear();
74 }
75
76 void
Clear()77 Communication::Clear()
78 {
79 SetReadThreadBytesReceivedCallback (NULL, NULL);
80 Disconnect (NULL);
81 StopReadThread (NULL);
82 }
83
84 ConnectionStatus
Connect(const char * url,Error * error_ptr)85 Communication::Connect (const char *url, Error *error_ptr)
86 {
87 Clear();
88
89 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Connect (url = %s)", this, url);
90
91 lldb::ConnectionSP connection_sp (m_connection_sp);
92 if (connection_sp.get())
93 return connection_sp->Connect (url, error_ptr);
94 if (error_ptr)
95 error_ptr->SetErrorString("Invalid connection.");
96 return eConnectionStatusNoConnection;
97 }
98
99 ConnectionStatus
Disconnect(Error * error_ptr)100 Communication::Disconnect (Error *error_ptr)
101 {
102 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION, "%p Communication::Disconnect ()", this);
103
104 lldb::ConnectionSP connection_sp (m_connection_sp);
105 if (connection_sp.get())
106 {
107 ConnectionStatus status = connection_sp->Disconnect (error_ptr);
108 // We currently don't protect connection_sp with any mutex for
109 // multi-threaded environments. So lets not nuke our connection class
110 // without putting some multi-threaded protections in. We also probably
111 // don't want to pay for the overhead it might cause if every time we
112 // access the connection we have to take a lock.
113 //
114 // This unique pointer will cleanup after itself when this object goes away,
115 // so there is no need to currently have it destroy itself immediately
116 // upon disconnnect.
117 //connection_sp.reset();
118 return status;
119 }
120 return eConnectionStatusNoConnection;
121 }
122
123 bool
IsConnected() const124 Communication::IsConnected () const
125 {
126 lldb::ConnectionSP connection_sp (m_connection_sp);
127 if (connection_sp.get())
128 return connection_sp->IsConnected ();
129 return false;
130 }
131
132 bool
HasConnection() const133 Communication::HasConnection () const
134 {
135 return m_connection_sp.get() != NULL;
136 }
137
138 size_t
Read(void * dst,size_t dst_len,uint32_t timeout_usec,ConnectionStatus & status,Error * error_ptr)139 Communication::Read (void *dst, size_t dst_len, uint32_t timeout_usec, ConnectionStatus &status, Error *error_ptr)
140 {
141 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
142 "%p Communication::Read (dst = %p, dst_len = %" PRIu64 ", timeout = %u usec) connection = %p",
143 this,
144 dst,
145 (uint64_t)dst_len,
146 timeout_usec,
147 m_connection_sp.get());
148
149 if (m_read_thread_enabled)
150 {
151 // We have a dedicated read thread that is getting data for us
152 size_t cached_bytes = GetCachedBytes (dst, dst_len);
153 if (cached_bytes > 0 || timeout_usec == 0)
154 {
155 status = eConnectionStatusSuccess;
156 return cached_bytes;
157 }
158
159 if (m_connection_sp.get() == NULL)
160 {
161 if (error_ptr)
162 error_ptr->SetErrorString("Invalid connection.");
163 status = eConnectionStatusNoConnection;
164 return 0;
165 }
166 // Set the timeout appropriately
167 TimeValue timeout_time;
168 if (timeout_usec != UINT32_MAX)
169 {
170 timeout_time = TimeValue::Now();
171 timeout_time.OffsetWithMicroSeconds (timeout_usec);
172 }
173
174 Listener listener ("Communication::Read");
175 listener.StartListeningForEvents (this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
176 EventSP event_sp;
177 while (listener.WaitForEvent (timeout_time.IsValid() ? &timeout_time : NULL, event_sp))
178 {
179 const uint32_t event_type = event_sp->GetType();
180 if (event_type & eBroadcastBitReadThreadGotBytes)
181 {
182 return GetCachedBytes (dst, dst_len);
183 }
184
185 if (event_type & eBroadcastBitReadThreadDidExit)
186 {
187 if (GetCloseOnEOF ())
188 Disconnect (NULL);
189 break;
190 }
191 }
192 return 0;
193 }
194
195 // We aren't using a read thread, just read the data synchronously in this
196 // thread.
197 lldb::ConnectionSP connection_sp (m_connection_sp);
198 if (connection_sp.get())
199 {
200 return connection_sp->Read (dst, dst_len, timeout_usec, status, error_ptr);
201 }
202
203 if (error_ptr)
204 error_ptr->SetErrorString("Invalid connection.");
205 status = eConnectionStatusNoConnection;
206 return 0;
207 }
208
209
210 size_t
Write(const void * src,size_t src_len,ConnectionStatus & status,Error * error_ptr)211 Communication::Write (const void *src, size_t src_len, ConnectionStatus &status, Error *error_ptr)
212 {
213 lldb::ConnectionSP connection_sp (m_connection_sp);
214
215 Mutex::Locker locker(m_write_mutex);
216 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
217 "%p Communication::Write (src = %p, src_len = %" PRIu64 ") connection = %p",
218 this,
219 src,
220 (uint64_t)src_len,
221 connection_sp.get());
222
223 if (connection_sp.get())
224 return connection_sp->Write (src, src_len, status, error_ptr);
225
226 if (error_ptr)
227 error_ptr->SetErrorString("Invalid connection.");
228 status = eConnectionStatusNoConnection;
229 return 0;
230 }
231
232
233 bool
StartReadThread(Error * error_ptr)234 Communication::StartReadThread (Error *error_ptr)
235 {
236 if (error_ptr)
237 error_ptr->Clear();
238
239 if (m_read_thread.IsJoinable())
240 return true;
241
242 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
243 "%p Communication::StartReadThread ()", this);
244
245
246 char thread_name[1024];
247 snprintf(thread_name, sizeof(thread_name), "<lldb.comm.%s>", m_broadcaster_name.AsCString());
248
249 m_read_thread_enabled = true;
250 m_read_thread_did_exit = false;
251 m_read_thread = ThreadLauncher::LaunchThread(thread_name, Communication::ReadThread, this, error_ptr);
252 if (!m_read_thread.IsJoinable())
253 m_read_thread_enabled = false;
254 return m_read_thread_enabled;
255 }
256
257 bool
StopReadThread(Error * error_ptr)258 Communication::StopReadThread (Error *error_ptr)
259 {
260 if (!m_read_thread.IsJoinable())
261 return true;
262
263 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
264 "%p Communication::StopReadThread ()", this);
265
266 m_read_thread_enabled = false;
267
268 BroadcastEvent (eBroadcastBitReadThreadShouldExit, NULL);
269
270 // error = m_read_thread.Cancel();
271
272 Error error = m_read_thread.Join(nullptr);
273 return error.Success();
274 }
275
276 bool
JoinReadThread(Error * error_ptr)277 Communication::JoinReadThread (Error *error_ptr)
278 {
279 if (!m_read_thread.IsJoinable())
280 return true;
281
282 Error error = m_read_thread.Join(nullptr);
283 return error.Success();
284 }
285
286 size_t
GetCachedBytes(void * dst,size_t dst_len)287 Communication::GetCachedBytes (void *dst, size_t dst_len)
288 {
289 Mutex::Locker locker(m_bytes_mutex);
290 if (m_bytes.size() > 0)
291 {
292 // If DST is NULL and we have a thread, then return the number
293 // of bytes that are available so the caller can call again
294 if (dst == NULL)
295 return m_bytes.size();
296
297 const size_t len = std::min<size_t>(dst_len, m_bytes.size());
298
299 ::memcpy (dst, m_bytes.c_str(), len);
300 m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
301
302 return len;
303 }
304 return 0;
305 }
306
307 void
AppendBytesToCache(const uint8_t * bytes,size_t len,bool broadcast,ConnectionStatus status)308 Communication::AppendBytesToCache (const uint8_t * bytes, size_t len, bool broadcast, ConnectionStatus status)
309 {
310 lldb_private::LogIfAnyCategoriesSet (LIBLLDB_LOG_COMMUNICATION,
311 "%p Communication::AppendBytesToCache (src = %p, src_len = %" PRIu64 ", broadcast = %i)",
312 this, bytes, (uint64_t)len, broadcast);
313 if ((bytes == NULL || len == 0)
314 && (status != lldb::eConnectionStatusEndOfFile))
315 return;
316 if (m_callback)
317 {
318 // If the user registered a callback, then call it and do not broadcast
319 m_callback (m_callback_baton, bytes, len);
320 }
321 else if (bytes != NULL && len > 0)
322 {
323 Mutex::Locker locker(m_bytes_mutex);
324 m_bytes.append ((const char *)bytes, len);
325 if (broadcast)
326 BroadcastEventIfUnique (eBroadcastBitReadThreadGotBytes);
327 }
328 }
329
330 size_t
ReadFromConnection(void * dst,size_t dst_len,uint32_t timeout_usec,ConnectionStatus & status,Error * error_ptr)331 Communication::ReadFromConnection (void *dst,
332 size_t dst_len,
333 uint32_t timeout_usec,
334 ConnectionStatus &status,
335 Error *error_ptr)
336 {
337 lldb::ConnectionSP connection_sp (m_connection_sp);
338 if (connection_sp.get())
339 return connection_sp->Read (dst, dst_len, timeout_usec, status, error_ptr);
340 return 0;
341 }
342
343 bool
ReadThreadIsRunning()344 Communication::ReadThreadIsRunning ()
345 {
346 return m_read_thread_enabled;
347 }
348
349 lldb::thread_result_t
ReadThread(lldb::thread_arg_t p)350 Communication::ReadThread (lldb::thread_arg_t p)
351 {
352 Communication *comm = (Communication *)p;
353
354 Log *log(lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION));
355
356 if (log)
357 log->Printf ("%p Communication::ReadThread () thread starting...", p);
358
359 uint8_t buf[1024];
360
361 Error error;
362 ConnectionStatus status = eConnectionStatusSuccess;
363 bool done = false;
364 while (!done && comm->m_read_thread_enabled)
365 {
366 size_t bytes_read = comm->ReadFromConnection (buf, sizeof(buf), 5 * TimeValue::MicroSecPerSec, status, &error);
367 if (bytes_read > 0)
368 comm->AppendBytesToCache (buf, bytes_read, true, status);
369 else if ((bytes_read == 0)
370 && status == eConnectionStatusEndOfFile)
371 {
372 if (comm->GetCloseOnEOF ())
373 comm->Disconnect ();
374 comm->AppendBytesToCache (buf, bytes_read, true, status);
375 }
376
377 switch (status)
378 {
379 case eConnectionStatusSuccess:
380 break;
381
382 case eConnectionStatusEndOfFile:
383 done = true;
384 break;
385 case eConnectionStatusError: // Check GetError() for details
386 if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO)
387 {
388 // EIO on a pipe is usually caused by remote shutdown
389 comm->Disconnect ();
390 done = true;
391 }
392 if (log)
393 error.LogIfError (log,
394 "%p Communication::ReadFromConnection () => status = %s",
395 p,
396 Communication::ConnectionStatusAsCString (status));
397 break;
398 case eConnectionStatusInterrupted: // Synchronization signal from SynchronizeWithReadThread()
399 // The connection returns eConnectionStatusInterrupted only when there is no
400 // input pending to be read, so we can signal that.
401 comm->BroadcastEvent (eBroadcastBitNoMorePendingInput);
402 break;
403 case eConnectionStatusNoConnection: // No connection
404 case eConnectionStatusLostConnection: // Lost connection while connected to a valid connection
405 done = true;
406 // Fall through...
407 case eConnectionStatusTimedOut: // Request timed out
408 if (log)
409 error.LogIfError (log,
410 "%p Communication::ReadFromConnection () => status = %s",
411 p,
412 Communication::ConnectionStatusAsCString (status));
413 break;
414 }
415 }
416 log = lldb_private::GetLogIfAllCategoriesSet (LIBLLDB_LOG_COMMUNICATION);
417 if (log)
418 log->Printf ("%p Communication::ReadThread () thread exiting...", p);
419
420 comm->m_read_thread_did_exit = true;
421 // Let clients know that this thread is exiting
422 comm->BroadcastEvent (eBroadcastBitNoMorePendingInput);
423 comm->BroadcastEvent (eBroadcastBitReadThreadDidExit);
424 return NULL;
425 }
426
427 void
SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback,void * callback_baton)428 Communication::SetReadThreadBytesReceivedCallback
429 (
430 ReadThreadBytesReceived callback,
431 void *callback_baton
432 )
433 {
434 m_callback = callback;
435 m_callback_baton = callback_baton;
436 }
437
438 void
SynchronizeWithReadThread()439 Communication::SynchronizeWithReadThread ()
440 {
441 // Only one thread can do the synchronization dance at a time.
442 Mutex::Locker locker(m_synchronize_mutex);
443
444 // First start listening for the synchronization event.
445 Listener listener("Communication::SyncronizeWithReadThread");
446 listener.StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
447
448 // If the thread is not running, there is no point in synchronizing.
449 if (!m_read_thread_enabled || m_read_thread_did_exit)
450 return;
451
452 // Notify the read thread.
453 m_connection_sp->InterruptRead();
454
455 // Wait for the synchronization event.
456 EventSP event_sp;
457 listener.WaitForEvent(NULL, event_sp);
458 }
459
460 void
SetConnection(Connection * connection)461 Communication::SetConnection (Connection *connection)
462 {
463 Disconnect (NULL);
464 StopReadThread(NULL);
465 m_connection_sp.reset(connection);
466 }
467
468 const char *
ConnectionStatusAsCString(lldb::ConnectionStatus status)469 Communication::ConnectionStatusAsCString (lldb::ConnectionStatus status)
470 {
471 switch (status)
472 {
473 case eConnectionStatusSuccess: return "success";
474 case eConnectionStatusError: return "error";
475 case eConnectionStatusTimedOut: return "timed out";
476 case eConnectionStatusNoConnection: return "no connection";
477 case eConnectionStatusLostConnection: return "lost connection";
478 case eConnectionStatusEndOfFile: return "end of file";
479 case eConnectionStatusInterrupted: return "interrupted";
480 }
481
482 static char unknown_state_string[64];
483 snprintf(unknown_state_string, sizeof (unknown_state_string), "ConnectionStatus = %i", status);
484 return unknown_state_string;
485 }
486