23 #include "socket_tcp.h" 32 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
37 ConnectionReader::SocketInfo::
39 _connection(connection)
48 bool ConnectionReader::SocketInfo::
50 return (_connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()));
58 return _connection->get_socket();
64 ConnectionReader::ReaderThread::
67 Thread(make_thread_name(thread_name, thread_index),
68 make_thread_name(thread_name, thread_index)),
70 _thread_index(thread_index)
77 void ConnectionReader::ReaderThread::
79 _reader->thread_run(_thread_index);
90 const std::string &thread_name) :
93 if (!Thread::is_threading_supported()) {
95 if (num_threads != 0) {
96 if (net_cat.is_debug()) {
98 <<
"Threading support is not available.\n";
106 _tcp_header_size = tcp_header_size;
107 _polling = (num_threads <= 0);
114 _currently_polling_thread = -1;
116 std::string reader_thread_name = thread_name;
117 if (thread_name.empty()) {
118 reader_thread_name =
"ReaderThread";
121 for (i = 0; i < num_threads; i++) {
122 PT(ReaderThread) thread =
new ReaderThread(
this, reader_thread_name, i);
123 _threads.push_back(thread);
125 for (i = 0; i < num_threads; i++) {
126 _threads[i]->start(net_thread_priority,
true);
129 _manager->add_reader(
this);
136 ~ConnectionReader() {
137 if (_manager !=
nullptr) {
138 _manager->remove_reader(
this);
144 Sockets::iterator si;
145 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
148 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
149 SocketInfo *sinfo = (*si);
154 <<
"Reentrant deletion of ConnectionReader--don't delete these\n" 155 <<
"in response to connection_reset().\n";
158 sinfo->_connection.clear();
177 nassertr(connection !=
nullptr,
false);
182 Sockets::const_iterator si;
183 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
184 if ((*si)->_connection == connection) {
190 _sockets.push_back(
new SocketInfo(connection));
208 Sockets::iterator si;
209 si = _sockets.begin();
210 while (si != _sockets.end() && (*si)->_connection != connection) {
213 if (si == _sockets.end()) {
217 _removed_sockets.push_back(*si);
235 Sockets::iterator si;
236 si = _sockets.begin();
237 while (si != _sockets.end() && (*si)->_connection != connection) {
240 if (si == _sockets.end()) {
245 SocketInfo *sinfo = (*si);
246 bool is_ok = !sinfo->_error;
265 SocketInfo *sinfo = get_next_available_socket(
false, -2);
266 if (sinfo !=
nullptr) {
267 double max_poll_cycle = get_net_max_poll_cycle();
268 if (max_poll_cycle < 0.0) {
270 while (sinfo !=
nullptr) {
271 process_incoming_data(sinfo);
272 sinfo = get_next_available_socket(
false, -2);
278 double stop = global_clock->get_short_time() + max_poll_cycle;
280 while (sinfo !=
nullptr) {
281 process_incoming_data(sinfo);
282 if (global_clock->get_short_time() >= stop) {
285 sinfo = get_next_available_socket(
false, -2);
305 return _threads.size();
337 _tcp_header_size = tcp_header_size;
345 return _tcp_header_size;
363 Threads::iterator ti;
364 for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
374 void ConnectionReader::
375 flush_read_connection(
Connection *connection) {
377 SocketInfo sinfo(connection);
391 fdset.setForSocket(*(sinfo.get_socket()));
392 int num_results = fdset.WaitForRead(
true, 0);
393 while (num_results != 0) {
395 if (!process_incoming_data(&sinfo)) {
398 fdset.setForSocket(*(sinfo.get_socket()));
399 num_results = fdset.WaitForRead(
true, 0);
409 void ConnectionReader::
418 void ConnectionReader::
419 finish_socket(SocketInfo *sinfo) {
420 nassertv(sinfo->_busy);
423 sinfo->_busy =
false;
431 bool ConnectionReader::
432 process_incoming_data(SocketInfo *sinfo) {
434 if (sinfo->is_udp()) {
435 return process_raw_incoming_udp_data(sinfo);
437 return process_raw_incoming_tcp_data(sinfo);
440 if (sinfo->is_udp()) {
441 return process_incoming_udp_data(sinfo);
443 return process_incoming_tcp_data(sinfo);
451 bool ConnectionReader::
452 process_incoming_udp_data(SocketInfo *sinfo) {
454 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
458 char buffer[read_buffer_size];
459 int bytes_read = read_buffer_size;
461 bool okflag = socket->
GetPacket(buffer, &bytes_read, addr);
464 finish_socket(sinfo);
467 }
else if (bytes_read == 0) {
470 if (_manager !=
nullptr) {
471 _manager->connection_reset(sinfo->_connection, 0);
473 finish_socket(sinfo);
480 if (bytes_read < datagram_udp_header_size) {
482 <<
"Did not read entire header, discarding UDP datagram.\n";
483 finish_socket(sinfo);
489 char *dp = buffer + datagram_udp_header_size;
490 bytes_read -= datagram_udp_header_size;
496 finish_socket(sinfo);
503 if (!header.verify_datagram(datagram)) {
505 <<
"Ignoring invalid UDP datagram.\n";
507 datagram.set_connection(sinfo->_connection);
510 if (net_cat.is_spam()) {
512 <<
"Received UDP datagram with " 513 << datagram_udp_header_size + datagram.get_length()
514 <<
" bytes on " << (
void *)datagram.get_connection()
515 <<
" from " << datagram.get_address() <<
"\n";
518 receive_datagram(datagram);
527 bool ConnectionReader::
528 process_incoming_tcp_data(SocketInfo *sinfo) {
530 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
533 char buffer[read_buffer_size];
534 int header_bytes_read = 0;
537 while (header_bytes_read < _tcp_header_size) {
539 socket->
RecvData(buffer + header_bytes_read,
540 _tcp_header_size - header_bytes_read);
541 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 542 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
545 bytes_read = socket->
RecvData(buffer + header_bytes_read,
546 _tcp_header_size - header_bytes_read);
548 #endif // SIMPLE_THREADS 550 if (bytes_read <= 0) {
552 if (_manager !=
nullptr) {
553 _manager->connection_reset(sinfo->_connection, 0);
555 finish_socket(sinfo);
559 header_bytes_read += bytes_read;
565 if (header_bytes_read != _tcp_header_size) {
568 <<
"Did not read entire header, discarding TCP datagram.\n";
569 finish_socket(sinfo);
574 int size = header.get_datagram_size(_tcp_header_size);
579 while (!_shutdown && (
int)datagram.
get_length() < size) {
582 int read_bytes = read_buffer_size;
583 #ifdef SIMPLE_THREADS 586 read_bytes = min(read_buffer_size, (
int)net_max_read_per_epoch);
590 socket->
RecvData(buffer, min(read_bytes,
592 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 593 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
597 socket->
RecvData(buffer, min(read_bytes,
600 #endif // SIMPLE_THREADS 604 if (bytes_read <= 0) {
606 if (_manager !=
nullptr) {
607 _manager->connection_reset(sinfo->_connection, 0);
609 finish_socket(sinfo);
614 min(bytes_read, (
int)(size - datagram.
get_length()));
617 if (bytes_read > datagram_bytes) {
621 <<
"Discarding " << bytes_read - datagram_bytes
622 <<
" bytes following TCP datagram.\n";
629 finish_socket(sinfo);
636 if (!header.verify_datagram(datagram, _tcp_header_size)) {
638 <<
"Ignoring invalid TCP datagram.\n";
643 if (net_cat.is_spam()) {
645 <<
"Received TCP datagram with " 647 <<
" bytes on " << (
void *)datagram.get_connection()
651 receive_datagram(datagram);
660 bool ConnectionReader::
661 process_raw_incoming_udp_data(SocketInfo *sinfo) {
663 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
667 char buffer[read_buffer_size];
668 int bytes_read = read_buffer_size;
670 bool okflag = socket->
GetPacket(buffer, &bytes_read, addr);
673 finish_socket(sinfo);
676 }
else if (bytes_read == 0) {
679 if (_manager !=
nullptr) {
680 _manager->connection_reset(sinfo->_connection, 0);
682 finish_socket(sinfo);
691 finish_socket(sinfo);
700 if (net_cat.is_spam()) {
702 <<
"Received raw UDP datagram with " << datagram.
get_length()
703 <<
" bytes on " << (
void *)datagram.get_connection()
707 receive_datagram(datagram);
715 bool ConnectionReader::
716 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
718 DCAST_INTO_R(socket, sinfo->get_socket(),
false);
721 char buffer[read_buffer_size];
722 int bytes_read = socket->
RecvData(buffer, read_buffer_size);
723 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 724 while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
727 bytes_read = socket->
RecvData(buffer, read_buffer_size);
729 #endif // SIMPLE_THREADS 731 if (bytes_read <= 0) {
733 if (_manager !=
nullptr) {
734 _manager->connection_reset(sinfo->_connection, 0);
736 finish_socket(sinfo);
745 finish_socket(sinfo);
754 if (net_cat.is_spam()) {
756 <<
"Received raw TCP datagram with " << datagram.
get_length()
757 <<
" bytes on " << (
void *)datagram.get_connection()
761 receive_datagram(datagram);
769 void ConnectionReader::
770 thread_run(
int thread_index) {
772 nassertv(_threads[thread_index] == Thread::get_current_thread());
776 get_next_available_socket(
true, thread_index);
777 if (sinfo !=
nullptr) {
778 process_incoming_data(sinfo);
796 ConnectionReader::SocketInfo *ConnectionReader::
797 get_next_available_socket(
bool allow_block,
int current_thread_index) {
805 while (!_shutdown && _num_results > 0) {
806 nassertr(_next_index < (
int)_selecting_sockets.size(),
nullptr);
810 if (_fdset.
IsSetFor(*_selecting_sockets[i]->get_socket())) {
812 SocketInfo *sinfo = _selecting_sockets[i];
832 rebuild_select_list();
839 uint32_t timeout = (uint32_t)(get_net_max_block() * 1000.0);
843 #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) 850 _num_results = _fdset.WaitForRead(
false, timeout);
853 if (_num_results == 0 && allow_block) {
860 }
else if (_num_results < 0) {
865 }
while (!_shutdown && interrupted);
871 }
while (!_shutdown && _num_results > 0);
881 void ConnectionReader::
882 rebuild_select_list() {
884 _selecting_sockets.clear();
887 Sockets::const_iterator si;
888 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
889 SocketInfo *sinfo = (*si);
890 if (!sinfo->_busy && !sinfo->_error) {
891 _fdset.setForSocket(*sinfo->get_socket());
892 _selecting_sockets.push_back(sinfo);
898 if (!_removed_sockets.empty()) {
899 Sockets still_busy_sockets;
900 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
901 SocketInfo *sinfo = (*si);
903 still_busy_sockets.push_back(sinfo);
908 _removed_sockets.swap(still_busy_sockets);
917 void ConnectionReader::
920 Sockets::const_iterator si;
921 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
922 SocketInfo *sinfo = (*si);
923 if (!sinfo->_busy && !sinfo->_error) {
924 fdset.setForSocket(*sinfo->get_socket());
bool get_raw_mode() const
Returns the current setting of the raw mode flag.
static TrueClock * get_global_ptr()
Returns a pointer to the one TrueClock object in the world.
ConnectionManager * get_manager() const
Returns a pointer to the ConnectionManager object that serves this ConnectionReader.
Base functionality for a TCP connected socket This class is pretty useless by itself but it does hide...
A specific kind of Datagram, especially for sending across or receiving from a network.
Base functionality for a INET domain Socket This call should be the starting point for all other unix...
void set_tcp_header_size(int tcp_header_size)
Sets the header size of TCP packets.
void append_data(const void *data, size_t size)
Appends some more raw data to the end of the datagram.
std::string RecvData(int max_len)
Read the data from the connection - if error 0 if socket closed for read or length is 0 + bytes read ...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
The primary interface to the low-level networking layer in this package.
void shutdown()
Terminates all threads cleanly.
bool GetPacket(char *data, int *max_len, Socket_Address &address)
Grabs a dataset off the listening UDP socket and fills in the source address information.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Socket_Address GetPeerName(void) const
Wrapper on berkly getpeername...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool Active()
Ask if the socket is open (allocated)
static void force_yield()
Suspends the current thread for the rest of the current epoch.
This is an abstract base class for a family of classes that listen for activity on a socket and respo...
void poll()
Explicitly polls the available sockets to see if any of them have any noise.
const NetAddress & get_address() const
Retrieves the host from which the datagram was read, or to which it is scheduled to be sent.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool is_connection_ok(Connection *connection)
Returns true if the indicated connection has been added to the ConnectionReader and is being monitore...
Base functionality for a combination UDP Reader and Writer.
Similar to MutexHolder, but for a light mutex.
int get_tcp_header_size() const
Returns the current setting of TCP header size.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool remove_connection(Connection *connection)
Removes a socket from the list of sockets being monitored.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
An interface to whatever real-time clock we might have available in the current environment.
void set_raw_mode(bool mode)
Sets the ConnectionReader into raw mode (or turns off raw mode).
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A simple place to store and manipulate tcp and port address for communication layer.
int get_num_threads() const
Returns the number of threads the ConnectionReader has been created with.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
bool add_connection(Connection *connection)
Adds a new socket to the list of sockets the ConnectionReader will monitor.
A thread; that is, a lightweight process.
void set_connection(const PT(Connection) &connection)
Specifies the socket to which the datagram should be written.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
static Integer set(Integer &var, Integer new_value)
Atomically changes the indicated variable and returns the original value.
bool IsSetFor(const Socket_IP &incon) const
check to see if a socket object has been marked for reading
void clear()
Marks the content as empty.
Represents a single TCP or UDP socket for input or output.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
ConnectionReader(ConnectionManager *manager, int num_threads, const std::string &thread_name=std::string())
Creates a new ConnectionReader with the indicated number of threads to handle requests.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
size_t get_length() const
Returns the number of bytes in the datagram.
Represents a network address to which UDP packets may be sent or to which a TCP socket may be bound.