View Issue Details
ID | Project | Category | View Status | Date Submitted | Last Update |
---|---|---|---|---|---|
0000297 | LDMud 3.3 | Networking | public | 2004-11-27 00:00 | 2018-01-29 21:57 |
Reporter | Assigned To | Gnomi | |||
Priority | normal | Severity | feature | Reproducibility | N/A |
Status | resolved | Resolution | fixed | ||
Fixed in Version | 3.3.719 | ||||
Summary | 0000297: Better buffering of data sent to players | ||||
Description | Short: Better buffering of data sent to players From: Gnomi Date: 2002-07-14 Type: Feature State: New Driver: 3.2.9-dev.440 If too much data is sent to the player over a slow connection, it can happen that the socket_write() consistently fails with EWOULDBLOCK (== EAGAIN under Linux). The driver then discards the message even if a few seconds later the transmission would succeed. Reason for this behaviour is that the buffer for text to send is of fixed size and has to be emptied. By making the buffer a FIFO (and extending the backend loop by a new call to some send_pending_data()) this problem could be solved. When implementing this, consider that the mudlib must be able to detect and shutdown continually starved connections before the buffers grow too big. Maybe a control efun set_max_buffer_size() for each interactive (with an 'unlimited' setting available). This might require calls to forcefully 'abandon' data from the mudlib, and an internal setting to let data expire once a certain buffer size is reached (set_buffer_mode(): BUF_LIMITED, BUF_UNLIMITED, BUF_FIFO). | ||||
Tags | No tags attached. | ||||
Attached Files | writebuffer.diff (40,041 bytes)
Index: trunk.comm/doc/efun/binary_message =================================================================== --- trunk.comm/doc/efun/binary_message (Revision 2455) +++ trunk.comm/doc/efun/binary_message (Arbeitskopie) @@ -24,14 +24,9 @@ charset filters, but isn't important enough to waste bandwith on a synchronous transmission. - If the client uses MCCP compression, the efun always uses - add_message() and flushes the buffer after adding the message - (equivalent a flag setting of 0b11). - HISTORY Introduced in 3.2.1@40. Argument 'flags' introduced in 3.2.1@60. - LDMud 3.3.477 added the MCCP behaviour. SEE ALSO set_connection_charset(E) Index: trunk.comm/src/pkg-mccp.c =================================================================== --- trunk.comm/src/pkg-mccp.c (Revision 2455) +++ trunk.comm/src/pkg-mccp.c (Arbeitskopie) @@ -166,57 +166,6 @@ } /* start_compress() */ /*-------------------------------------------------------------------------*/ -static Bool -process_compressed (interactive_t * ip) - -/* Try to send any pending compressed-but-not-sent data in for <ip>. - * Return TRUE on success. - */ - -{ - int iStart, nBlock, nWrite, len; - - if (!ip->out_compress) - return MY_TRUE; - - len = ip->out_compress->next_out - ip->out_compress_buf; - if (len > 0) - { - for (iStart = 0; iStart < len; iStart += nWrite) - { - nBlock = UMIN (len - iStart, 4096); - if ((nWrite = - socket_write(ip->socket, ip->out_compress_buf + iStart, nBlock)) < 0) - { - if (errno == EAGAIN) - break; -#ifdef ENOSR - if (errno == ENOSR) - break; -#endif /* ENOSR */ - - /* write error */ - return MY_FALSE; - } - if (nWrite <= 0) - break; - } - - if (iStart) - { - if (iStart < len) - memmove (ip->out_compress_buf, ip->out_compress_buf + iStart, - len - iStart); - - ip->out_compress->next_out = ip->out_compress_buf + len - iStart; - } - } - - /* success */ - return MY_TRUE; -} /* process_compressed() */ - -/*-------------------------------------------------------------------------*/ Bool end_compress (interactive_t * ip, Bool force) @@ -227,6 +176,8 @@ { unsigned char dummy[1]; + unsigned char buf[256]; + size_t len; Bool retval = MY_TRUE; if (!ip->out_compress) @@ -235,28 +186,25 @@ ip->out_compress->avail_in = 0; ip->out_compress->next_in = dummy; + ip->out_compress->next_out = buf; + ip->out_compress->avail_out = sizeof(buf); + /* No terminating signature is needed - receiver will get Z_STREAM_END */ if (deflate (ip->out_compress, Z_FINISH) != Z_STREAM_END && !force) return MY_FALSE; - /* try to send any residual data */ - if (!process_compressed (ip) && !force) - { - printf("%s MCCP-DEBUG: '%s' mccp had error while ending\n" - , time_stamp(), get_txt(ip->ob->name)); - retval = MY_FALSE; - /* This is a write error - no sense in trying it again, so - * get rid of all the buffers anyway. - */ - } - - /* reset compression values */ + len = ip->out_compress->next_out - buf; + + /* first reset compression values */ deflateEnd(ip->out_compress); xfree(ip->out_compress_buf); xfree(ip->out_compress); ip->compressing = 0; ip->out_compress = NULL; ip->out_compress_buf = NULL; + + /* try to send any residual data */ + comm_socket_write((char*) buf, len, ip); printf("%s MCCP-DEBUG: '%s' mccp ended\n" , time_stamp(), get_txt(ip->ob->name)); Index: trunk.comm/src/settings/default =================================================================== --- trunk.comm/src/settings/default (Revision 2455) +++ trunk.comm/src/settings/default (Arbeitskopie) @@ -81,12 +81,12 @@ with_max_callouts=0 -# If PThreads are used, this is the max amount of data held pending -# for writing. If the amount is exceeded, the oldest data blocks +# This is the max amount of data held pending for writing. +# If the amount is exceeded, the oldest data blocks # are discarded. # If 0, any amount of data is allowed. -with_pthread_write_max_size=100000 +with_write_buffer_max_size=100000 # --- Timing --- Index: trunk.comm/src/settings/unitopia =================================================================== --- trunk.comm/src/settings/unitopia (Revision 2455) +++ trunk.comm/src/settings/unitopia (Arbeitskopie) @@ -15,6 +15,7 @@ with_max_mapping_size=50000 with_max_byte_transfer=50000 with_read_file_max_size=50000 +with_write_buffer_max_size=524288 # --- Timing --- with_time_to_clean_up=5400 Index: trunk.comm/src/main.c =================================================================== --- trunk.comm/src/main.c (Revision 2455) +++ trunk.comm/src/main.c (Arbeitskopie) @@ -1048,7 +1048,7 @@ , cMaxFile /* --max-file */ , cMaxMapping /* --max-mapping */ , cMaxMappingKeys /* --max-mapping-keys */ - , cMaxThreadPend /* --max-thread-pending */ + , cMaxWriteBuffer /* --max-write-buffer */ , cMinMalloc /* --min-malloc */ , cMinSmallMalloc /* --min-small-malloc */ , cNoERQ /* --no-erq */ @@ -1326,15 +1326,12 @@ " Set to 0, reads and writes of any size are allowed.\n" } - , { 0, "max-thread-pending", cMaxThreadPend, MY_TRUE - , " --max-thread-pending <size>\n" - , " --max-thread-pending <size>\n" - " The maximum number of bytes to be kept pending by the socket write\n" - " thread.\n" + , { 0, "max-write-buffer", cMaxWriteBuffer, MY_TRUE + , " --max-write-buffer<size>\n" + , " --max-write-buffer <size>\n" + " The maximum number of bytes to be kept pending for each socket\n" + " to write.\n" " Set to 0, an unlimited amount of data can be kept pending.\n" -#ifndef USE_PTHREADS - " (Ignored since pthreads are not supported)\n" -#endif } , { 's', NULL, cSwap, MY_TRUE @@ -1850,9 +1847,7 @@ printf(" Runtime limits: max read file size: %7d\n" " max byte read/write: %7d\n" " max socket buf size: %7d\n" -#if defined(USE_PTHREADS) - " max pthread write size: %7d\n" -#endif /* USE_PTHREADS */ + " max write buf size: %7d\n" " max eval cost: %9d %s\n" " catch eval cost: %7d\n" " master eval cost: %7d\n" @@ -1871,9 +1866,7 @@ #endif , READ_FILE_MAX_SIZE, MAX_BYTE_TRANSFER , SET_BUFFER_SIZE_MAX -#if defined(USE_PTHREADS) - , PTHREAD_WRITE_MAX_SIZE -#endif /* USE_PTHREADS */ + , WRITE_BUFFER_MAX_SIZE , MAX_COST #if defined(DYNAMIC_COSTS) , "(dynamic)" @@ -2411,12 +2404,12 @@ break; } - case cMaxThreadPend: + case cMaxWriteBuffer: { long val = atoi(pValue); if (val >= 0) - pthread_write_max_size = val; + write_buffer_max_size = val; else fprintf(stderr, "Illegal value for limit '%s' ignored.\n", pValue); break; Index: trunk.comm/src/comm.h =================================================================== --- trunk.comm/src/comm.h (Revision 2455) +++ trunk.comm/src/comm.h (Arbeitskopie) @@ -129,16 +129,19 @@ * The instances are kept in a linked list from the interactive_t * structure. */ -#ifdef USE_PTHREADS struct write_buffer_s { struct write_buffer_s *next; size_t length; +#ifndef USE_PTHREADS + size_t pos; +#endif +#if defined(USE_PTHREADS) && defined(USE_MCCP) Bool compress; /* should the buffer get compressed by mccp? */ - int errorno; /* After writing, the errno */ + int errorno; /* After writing, the errno */ +#endif char buffer[1 /* .length */ ]; }; -#endif /* --- struct input_to_s: input_to() datastructure * @@ -177,7 +180,8 @@ svalue_t prompt; /* The prompt to print. */ struct sockaddr_in addr; /* Address of connected user */ - CBool msg_discarded; /* True if an earlier msg had been discarded */ + char msg_discarded; /* != 0 if an earlier msg had been discarded, + index into the message to be sent. */ CBool set_input_to; /* True if input_to was set in this cycle */ CBool closing; /* True when closing this socket. */ CBool tn_enabled; /* True: telnet machine enabled */ @@ -277,12 +281,12 @@ pthread_mutex_t write_mutex; pthread_cond_t write_cond; pthread_t write_thread; + struct write_buffer_s *write_current; /* Buffer currently written */ + struct write_buffer_s *written_first; /* List of written buffers */ +#endif struct write_buffer_s *write_first; /* List of buffers to write */ struct write_buffer_s *write_last; unsigned long write_size; - struct write_buffer_s *write_current; /* Buffer currently written */ - struct write_buffer_s *written_first; /* List of written buffers */ -#endif #ifdef USE_TLS tls_session_t tls_session; @@ -384,7 +388,7 @@ extern char *message_flush; extern char *domain_name; -extern long pthread_write_max_size; +extern long write_buffer_max_size; #ifdef COMM_STAT extern unsigned long add_message_calls; @@ -409,6 +413,7 @@ extern void interactive_unlock (interactive_t *ip UNUSED); extern void interactive_cleanup (interactive_t *ip UNUSED); #endif /* USE_PTHREADS */ +extern Bool comm_socket_write (char *msg, size_t size, interactive_t *ip); extern void comm_cleanup_interactives (void); extern void add_message VARPROT((const char *, ...), printf, 1, 2); extern void flush_all_player_mess(void); Index: trunk.comm/src/autoconf/configure.in =================================================================== --- trunk.comm/src/autoconf/configure.in (Revision 2455) +++ trunk.comm/src/autoconf/configure.in (Arbeitskopie) @@ -167,7 +167,7 @@ AC_MY_ARG_WITH(access-file,ACCESS.ALLOW,,[access permissions file]) AC_MY_ARG_WITH(access-log,access.allow.log,,[access log file]) -AC_MY_ARG_WITH(pthreads-write-max-size,100000,,[max size of write buffer per thread]) +AC_MY_ARG_WITH(write-buffer-max-size,100000,,[max size of write buffer per interactive]) AC_MY_ARG_WITH(erq-max-reply,1024,,) AC_MY_ARG_WITH(erq-max-send,1024,,) AC_MY_ARG_WITH(erq-debug,0,,[ERQ debug level]) @@ -419,7 +419,7 @@ AC_TEXT_VAL_FROM_WITH(malloc) -AC_INT_VAL_FROM_WITH(pthreads_write_max_size) +AC_INT_VAL_FROM_WITH(write_buffer_max_size) AC_INT_VAL_FROM_WITH(erq_max_reply) AC_INT_VAL_FROM_WITH(erq_max_send) AC_INT_VAL_FROM_WITH(erq_debug) @@ -2775,7 +2775,7 @@ AC_SUBST(val_erq_max_reply) AC_SUBST(val_erq_max_send) AC_SUBST(val_erq_debug) -AC_SUBST(val_pthreads_write_max_size) +AC_SUBST(val_write_buffer_max_size) AC_SUBST(val_read_file_max_size) AC_SUBST(val_time_to_clean_up) AC_SUBST(val_time_to_swap) Index: trunk.comm/src/config.h.in =================================================================== --- trunk.comm/src/config.h.in (Revision 2455) +++ trunk.comm/src/config.h.in (Arbeitskopie) @@ -60,11 +60,11 @@ */ #define MAX_CALLOUTS @val_max_callouts@ -/* When USE_PTHREADS is used, define this to the maximum amount of data - * to be held pending for writing. +/* Define this to the maximum amount of data + * to be held pending for writing per interactive. * A value of 0 means 'unlimited'. */ -#define PTHREAD_WRITE_MAX_SIZE @val_pthreads_write_max_size@ +#define WRITE_BUFFER_MAX_SIZE @val_write_buffer_max_size@ /* --- Timing --- */ Index: trunk.comm/src/gcollect.c =================================================================== --- trunk.comm/src/gcollect.c (Revision 2455) +++ trunk.comm/src/gcollect.c (Arbeitskopie) @@ -2026,12 +2026,6 @@ if (all_players[i] == NULL) continue; -#ifdef USE_PTHREADS - /* The threaded write buffers are allocated with malloc() and are - * thus out of our view. - */ -#endif /* USE_PTHREADS */ - for ( it = all_players[i]->input_to; it != NULL; it = it->next) { clear_memory_reference(it); @@ -2217,6 +2211,17 @@ /* The thread write buffers are allocated with malloc() and are * thus out of our view. */ +#else + if (all_players[i]->write_first) + { + struct write_buffer_s *tmp = all_players[i]->write_first; + + do + { + note_ref(tmp); + tmp = tmp->next; + } while (tmp != NULL); + } #endif /* USE_PTHREADS */ #ifdef USE_MCCP if (all_players[i]->out_compress != NULL) Index: trunk.comm/src/comm.c =================================================================== --- trunk.comm/src/comm.c (Revision 2455) +++ trunk.comm/src/comm.c (Arbeitskopie) @@ -61,9 +61,12 @@ * structure. Advantage is that the driver is no longer bothered by blocking * sockets. * - * TODO: Generalize the background buffer and either use pthreads, or a call - * TODO:: from the backend loop to write the data. Also don't - * TODO:: immediately discard EWOULDBLOCK-failed messages. +#else + * The data that can not written directly to the sockets is put instead into + * an intermediate buffer, from which the backend loop will continue writing + * when possible. The buffers are stored in a linked list in the interactive_s + * structure. + * #endif * * TODO: Fiona says: The telnet code is frustrating. It would be better if @@ -232,9 +235,8 @@ * a 'null format string'. */ -long pthread_write_max_size = PTHREAD_WRITE_MAX_SIZE; +long write_buffer_max_size = WRITE_BUFFER_MAX_SIZE; /* Amount of data held pending in the pthread fifo queue. - * Evaluated only with USE_PTHREADS. */ @@ -493,6 +495,8 @@ #ifdef USE_PTHREADS static void *writer_thread(void *arg); +#else +static INLINE ssize_t comm_send_buf(char *msg, size_t size, interactive_t *ip); #endif #ifdef USE_IPV6 @@ -804,12 +808,16 @@ fprintf(stderr, "------\n"); /* Disconnect the user */ +#ifndef PTHREADS + comm_send_buf(msg, strlen(msg), ip); +#else #ifdef USE_TLS if(ip->tls_status == TLS_ACTIVE) tls_write(ip, msg, strlen(msg)); else #endif socket_write(ip->socket, msg, strlen(msg)); +#endif /* PTHREADS */ remove_interactive(ip->ob, MY_TRUE); /* Unset mutex */ @@ -1437,47 +1445,33 @@ /*-------------------------------------------------------------------------*/ #ifdef USE_PTHREADS -static int -thread_socket_write( SOCKET_T s UNUSED, char *msg, size_t size - , interactive_t *ip, Bool docompress -#ifndef USE_MCCP - UNUSED -#endif /* USE_MCCP */ - ) +Bool +comm_socket_write (char *msg, size_t size, interactive_t *ip) /* Stand in for socket_write(): take the data to be written and append - * it to the buffer list of <ip>. <docompress> denotes the MCCP compression - * setting. + * it to the buffer list of <ip>. */ { -#ifdef __MWERKS__ -# pragma unused(s) -# ifndef USE_MCCP -# pragma unused(docompress) -# endif /* USE_MCCP */ -#endif - struct write_buffer_s *b; if (size == 0) - return 0; + return MY_TRUE; /* Get a new buffer for the data to be written */ b = malloc(sizeof(struct write_buffer_s) + size - 1); if (!b) - outofmem(sizeof(struct write_buffer_s) + size - 1, "thread_socket_write()"); + outofmem(sizeof(struct write_buffer_s) + size - 1, "comm_socket_write()"); b->length = size; b->next = NULL; #ifdef USE_MCCP if (ip->out_compress) - b->compress = docompress; + b->compress = MY_TRUE; else + b->compress = MY_FALSE; #endif - b->compress = 0; - memcpy(b->buffer, msg, size); /* Chain in the new buffer */ @@ -1493,8 +1487,8 @@ /* Make sure that the amount of data pending never exceeds * the maximum. */ - while (pthread_write_max_size != 0 - && ip->write_size >= (unsigned long)pthread_write_max_size) + while (write_buffer_max_size != 0 + && ip->write_size >= (unsigned long)write_buffer_max_size) { struct write_buffer_s *tmp = ip->write_first; ip->write_first = tmp->next; @@ -1511,8 +1505,8 @@ pthread_cond_signal(&ip->write_cond); errno = 0; - return size; -} /* thread_socket_write() */ + return MY_TRUE; +} /* comm_socket_write() */ /*-------------------------------------------------------------------------*/ static void @@ -1693,7 +1687,7 @@ if (ip->write_first) { /* We have to move the buffer out of the to-write list, - * so that thread_socket_write() won't remove it if the + * so that comm_socket_write() won't remove it if the * data limit is reached. On the other hand a GC might * happen while we're still printing, erasing the * written list. @@ -1701,7 +1695,7 @@ buf = ip->write_first; ip->write_first = buf->next; /* If this was the last buffer, .write_first will become - * NULL and the next call to thread_socket_write() will + * NULL and the next call to comm_socket_write() will * set both .write_first and .write_last. */ ip->write_size -= buf->length; @@ -1741,6 +1735,247 @@ return NULL; } /* writer_thread() */ +#else + +/*-------------------------------------------------------------------------*/ +static INLINE ssize_t +comm_send_buf (char *msg, size_t size, interactive_t *ip) + +/* Low level send routine, just tries to send the data without buffering. + A return value -1 indicates a total failure (i.e. the connection should + be dropped), otherwise the amount of data sent (even if it's 0). +*/ +{ + int retries; /* Number of retries left when sending data */ + ssize_t n; /* Bytes that have been sent */ + + for (retries = 6;;) + { +#ifdef USE_TLS + if ((n = (ip->tls_status == TLS_INACTIVE ? + (int)socket_write(ip->socket, msg, size): + (int)tls_write(ip, msg, size))) != -1) +#else + if ((n = (int)socket_write(ip->socket, msg, size)) != -1) +#endif + { + break; + } + + switch (errno) + { + + case EINTR: + if (--retries) + continue; + n = 0; + break; + + case EWOULDBLOCK: + n = 0; + break; + + case EMSGSIZE: + fprintf(stderr, "%s comm: write EMSGSIZE.\n", time_stamp()); + break; + + case EINVAL: + fprintf(stderr, "%s comm: write EINVAL.\n", time_stamp()); + break; + + case ENETUNREACH: + fprintf(stderr, "%s comm: write ENETUNREACH.\n", time_stamp()); + break; + + case EHOSTUNREACH: + fprintf(stderr, "%s comm: write EHOSTUNREACH.\n", time_stamp()); + break; + + case EPIPE: + fprintf(stderr, "%s comm: write EPIPE detected\n", time_stamp()); + break; + + case ECONNRESET: + fprintf(stderr, "%s comm: write ECONNRESET detected\n", time_stamp()); + break; + + default: + { + int e = errno; + fprintf(stderr, "%s comm: write: unknown errno %d (%s)\n" + , time_stamp(), e, strerror(e)); + } + } + + if (n == 0) + break; + + ip->do_close = FLAG_DO_CLOSE; + return -1; + + } /* for (retries) */ + + /* EWOULDBLOCK and EINTR are not real failures. */ + if (n == -1) + return 0; + +#ifdef COMM_STAT + inet_packets++; + inet_volume += n; +#endif + + return n; +} /* comm_send_buf() */ + +/*-------------------------------------------------------------------------*/ +Bool +comm_socket_write (char *msg, size_t size, interactive_t *ip) + +/* Stand in for socket_write(): take the data to be written, compress and + * encrypt them if needed and send them to <ip>. If no data can be send + * right now append them to the buffer. + */ + +{ + struct write_buffer_s *b; + char *buf; + size_t length; + + if (size == 0) + return MY_TRUE; + +#ifdef USE_MCCP + if (ip->out_compress) + { + int status; + + ip->out_compress->next_in = (unsigned char *) msg; + ip->out_compress->avail_in = size; + + ip->out_compress->next_out = ip->out_compress_buf; + ip->out_compress->avail_out = COMPRESS_BUF_SIZE; + + status = deflate(ip->out_compress, Z_SYNC_FLUSH); + + if (status != Z_OK) + { + fprintf(stderr, "%s comm: MCCP compression error: %d\n" + , time_stamp(), status); + return MY_FALSE; + } + + /* ok.. perhaps i should take care that all data in message_buf + * is compressed, but i guess there is no chance that 1024 byte + * compressed won't fit into the 8192 byte buffer + */ + + length = ip->out_compress->next_out - ip->out_compress_buf; + buf = (char *) ip->out_compress_buf; + } + else +#endif /* USE_MCCP */ + { + buf = msg; + length = size; + } + + /* now sending the buffer... */ + if (ip->write_first == NULL) + { + /* Try writing to the socket first. */ + ssize_t n = comm_send_buf(buf, length, ip); + + if (n == -1) + return MY_FALSE; + else if (n == length) + { + /* We're done. */ + return MY_TRUE; + } + else + { + buf += n; + length -= n; + } + } + + /* We have to enqueue the message. */ + b = xalloc(sizeof(struct write_buffer_s) + size - 1); + if (!b) + outofmem(sizeof(struct write_buffer_s) + size - 1, "comm_socket_write()"); + + b->length = length; + b->pos = 0; + b->next = NULL; + memcpy(b->buffer, buf, length); + + /* Chain in the new buffer */ + if(ip->write_first) + ip->write_last = ip->write_last->next = b; + else + ip->write_first = ip->write_last = b; + ip->write_size += size; + + /* Make sure that the amount of data pending never exceeds + * the maximum. + */ + while (write_buffer_max_size != 0 + && ip->write_size >= (unsigned long)write_buffer_max_size) + { + struct write_buffer_s *tmp = ip->write_first; + ip->write_first = tmp->next; + ip->write_size -= tmp->length; + xfree(tmp); + + if (!ip->msg_discarded) + ip->msg_discarded = 1; + } + + return MY_TRUE; +} /* comm_socket_write() */ + +/*-------------------------------------------------------------------------*/ +static void +comm_write_pending (interactive_t * ip) + +/* Send as much from the write buffer of <ip> as possible. + */ + +{ + /* First, if a previous call had to discard the message, inform the user. + */ + if (ip->msg_discarded) + { + ssize_t n; + char msg[] = "\n*** Text lost in transmission ***\n"; + + n = comm_send_buf(msg, sizeof(msg) - ip->msg_discarded, ip); + + ip->msg_discarded += n; + if (ip->msg_discarded >= sizeof(msg)) + ip->msg_discarded = 0; + } + + while (ip->write_first != NULL) + { + struct write_buffer_s *buf; + ssize_t n; + + buf = ip->write_first; + n = comm_send_buf(buf->buffer + buf->pos, buf->length - buf->pos, ip); + + if (n == -1) + return; + + buf->pos += n; + if (buf->pos < buf->length) + return; + + ip->write_first = buf->next; + ip->write_size -= buf->length; + xfree(buf); + } +} /* comm_write_pending() */ #endif /* USE_PTHREADS */ /*-------------------------------------------------------------------------*/ @@ -1807,7 +2042,6 @@ va_list va; interactive_t *ip; /* The interactive user */ object_t *snooper; /* Snooper of <ip> */ - int n; source = NULL; srcstr = NULL; @@ -1867,15 +2101,6 @@ return; } - /* First, if a previous call had to discard the message, inform the user. - */ - if (ip->msg_discarded) - { - ip->msg_discarded = MY_FALSE; - add_message("%s", "\n*** Text lost in transmission ***\n"); - /* msg_discarded might be TRUE again now */ - } - old_message_length = ip->message_length; /* --- Compose the final message --- */ @@ -2078,7 +2303,6 @@ do /* while (srclen != 0) */ { - int retries; /* Number of retries left when sending data */ ptrdiff_t chunk; /* Current size of data in .message_buf[] */ char c; /* Currently processed character */ @@ -2146,149 +2370,12 @@ /* Write .message_buf[] to the network. */ -#if !defined(USE_PTHREADS) && defined(USE_MCCP) - if (ip->out_compress) + if (!comm_socket_write(ip->message_buf, (size_t)chunk, ip)) { - ip->out_compress->next_in = (unsigned char *) ip->message_buf; - ip->out_compress->avail_in = chunk; - - ip->out_compress->avail_out = COMPRESS_BUF_SIZE - - (ip->out_compress->next_out - - ip->out_compress_buf); - - { - int status = deflate(ip->out_compress, Z_SYNC_FLUSH); - - if (status != Z_OK) - return; - } - - /* ok.. perhaps i should take care that all data in message_buf - * is compressed, but i guess there is no chance that 1024 byte - * compressed won't fit into the 8192 byte buffer - */ - - length = ip->out_compress->next_out - ip->out_compress_buf; - } -#endif /* USE_MCCP && !USE_PTHREADS */ - - /* now sending the buffer... */ - - for (retries = 6;;) - { - -#if defined(USE_PTHREADS) - - if ((n = (int)thread_socket_write(ip->socket, ip->message_buf, (size_t)chunk, ip, MY_TRUE)) != -1) - { - break; - } - -#elif defined(USE_MCCP) - if (ip->out_compress) /* here we choose the correct buffer */ - { -#ifdef USE_TLS - if ((n = (ip->tls_status == TLS_INACTIVE ? - (int)socket_write(ip->socket, ip->out_compress_buf, (size_t)length): - (int)tls_write(ip, ip->out_compress_buf, (size_t)length))) != -1) -#else - if ((n = (int)socket_write(ip->socket, ip->out_compress_buf, (size_t)length)) != -1) -#endif - { - break; - } - } - else -#endif -#if !defined(USE_PTHREADS) - { -#ifdef USE_TLS - if ((n = (ip->tls_status == TLS_INACTIVE ? - (int)socket_write(ip->socket, ip->message_buf, (size_t)chunk): - (int)tls_write(ip, ip->message_buf, (size_t)chunk))) != -1) -#else - if ((n = (int)socket_write(ip->socket, ip->message_buf, (size_t)chunk)) != -1) -#endif - { - break; - } - } -#endif - switch (errno) { - case EINTR: - if (--retries) - continue; - ip->msg_discarded = MY_TRUE; - fprintf(stderr, - "%s comm: write EINTR. Message discarded.\n", time_stamp()); - if (old_message_length) - remove_flush_entry(ip); - return; - - case EWOULDBLOCK: - ip->msg_discarded = MY_TRUE; - if (d_flag) - fprintf(stderr, - "%s comm: write EWOULDBLOCK. Message discarded.\n", time_stamp()); - if (old_message_length) - remove_flush_entry(ip); - return; - - case EMSGSIZE: - fprintf(stderr, "%s comm: write EMSGSIZE.\n", time_stamp()); - return; - - case EINVAL: - fprintf(stderr, "%s comm: write EINVAL.\n", time_stamp()); - break; - - case ENETUNREACH: - fprintf(stderr, "%s comm: write ENETUNREACH.\n", time_stamp()); - break; - - case EHOSTUNREACH: - fprintf(stderr, "%s comm: write EHOSTUNREACH.\n", time_stamp()); - break; - - case EPIPE: - fprintf(stderr, "%s comm: write EPIPE detected\n", time_stamp()); - break; - - default: - { - int e = errno; - perror("write"); - fprintf(stderr, "%s comm: write: unknown errno %d\n" - , time_stamp(), e); - } - } if (old_message_length) remove_flush_entry(ip); - ip->do_close = FLAG_DO_CLOSE; return; - - } /* for (retries) */ - -#ifdef COMM_STAT - inet_packets++; - inet_volume += n; -#endif - -#if defined(USE_MCCP) && !defined(USE_PTHREADS) - if (ip->out_compress) - { - /* we update the compressed buffer here */ - ip->out_compress->next_out = ip->out_compress_buf + length - n; - if (n != length) - fprintf(stderr, "%s write socket (compressed): wrote %ld, " - "should be %td.\n" - , time_stamp(), (long)n, chunk); } - else -#endif - if (n != chunk) - fprintf(stderr, "%s write socket: wrote %ld, should be %td.\n" - , time_stamp(), (long)n, chunk); /* Continue with the processing of source */ dest = &ip->message_buf[0]; @@ -2548,6 +2635,15 @@ if (socket_number(ip->socket) >= nfds) nfds = socket_number(ip->socket)+1; } + +#ifndef USE_PTHREADS + if (ip->write_first != NULL || ip->msg_discarded) + { + /* There is something to write. */ + FD_SET(ip->socket, &writefds); + } +#endif /* USE_PTHREADS */ + } /* for (all players) */ #ifdef ERQ_DEMON if (erq_demon >= 0) @@ -3015,6 +3111,12 @@ } #endif +#ifndef USE_PTHREADS + if (FD_ISSET(ip->socket, &writefds)) + { + comm_write_pending(ip); + } +#endif /* Skip players which have reached the ip->maxNumCmds limit * for this second. We let the data accumulate on the socket. */ @@ -3370,19 +3472,9 @@ } if (length > ip->chars_ready) { -#ifdef USE_PTHREADS - thread_socket_write(ip->socket, ip->text + ip->chars_ready - , (size_t)(length - ip->chars_ready), ip, MY_FALSE); -#else -#ifdef USE_TLS - if (ip->tls_status != TLS_INACTIVE) - tls_write(ip, ip->text + ip->chars_ready - , (size_t)(length - ip->chars_ready)); - else -#endif /* USE_TLS */ - socket_write(ip->socket, ip->text + ip->chars_ready - , (size_t)(length - ip->chars_ready)); -#endif + comm_socket_write(ip->text + ip->chars_ready + , (size_t)(length - ip->chars_ready) + , ip); ip->chars_ready = length; } } @@ -3619,6 +3711,7 @@ /* buffer list is returned by thread */ interactive_cleanup(interactive); #endif + #ifdef USE_MCCP if (interactive->out_compress) end_compress(interactive, MY_TRUE); @@ -3626,6 +3719,12 @@ * a second chance at it. */ #endif + +#ifndef USE_PTHREADS + /* If there is anything left, try now. */ + comm_write_pending(interactive); +#endif + #ifdef USE_TLS tls_deinit_connection(interactive); #endif @@ -3670,6 +3769,15 @@ if (interactive->trace_prefix) free_mstring(interactive->trace_prefix); +#ifndef USE_PTHREADS + while (interactive->write_first) + { + struct write_buffer_s *tmp = interactive->write_first; + interactive->write_first = tmp->next; + xfree(tmp); + } +#endif + /* Unlink the interactive structure from the shadow sentence * of the object. */ @@ -3918,7 +4026,7 @@ new_interactive->input_to = NULL; put_number(&new_interactive->prompt, 0); new_interactive->modify_command = NULL; - new_interactive->msg_discarded = MY_FALSE; + new_interactive->msg_discarded = 0; new_interactive->set_input_to = MY_FALSE; new_interactive->closing = MY_FALSE; new_interactive->tn_enabled = MY_TRUE; @@ -3954,6 +4062,8 @@ new_interactive->next_player_for_flush = NULL; new_interactive->previous_player_for_flush = NULL; + new_interactive->write_first = new_interactive->write_last = NULL; + new_interactive->write_size = 0; #ifdef USE_PTHREADS new_interactive->flush_on_cleanup = MY_FALSE; pthread_mutex_init(&new_interactive->write_mutex, NULL); @@ -3965,8 +4075,6 @@ pthread_mutexattr_destroy(&mutexattr); } pthread_cond_init(&new_interactive->write_cond, NULL); - new_interactive->write_first = new_interactive->write_last = NULL; - new_interactive->write_size = 0; new_interactive->write_current = NULL; new_interactive->written_first = NULL; pthread_create(&new_interactive->write_thread, NULL, writer_thread, new_interactive); @@ -5540,42 +5648,19 @@ if (ip->text[0] == input_escape && ! (find_no_bang(ip) & IGNORE_BANG) ) { -#ifdef USE_PTHREADS if (to > &ip->text[ip->chars_ready]) { - thread_socket_write(ip->socket, &ip->text[ip->chars_ready], - (size_t)(to - &ip->text[ip->chars_ready]), ip, MY_FALSE); + comm_socket_write(&ip->text[ip->chars_ready] + , (size_t)(to - &ip->text[ip->chars_ready]) + , ip); ip->chars_ready = to - ip->text; } - if (to > first) { - thread_socket_write(ip->socket, "\b \b", 3, ip, MY_FALSE); - to--; - ip->chars_ready--; - } -#else - if (to > &ip->text[ip->chars_ready]) + if (to > first) { -#ifdef USE_TLS - if (ip->tls_status != TLS_INACTIVE) - tls_write(ip, &ip->text[ip->chars_ready] - , (size_t)(to - &ip->text[ip->chars_ready])); - else -#endif - socket_write(ip->socket, &ip->text[ip->chars_ready], - (size_t)(to - &ip->text[ip->chars_ready])); - ip->chars_ready = to - ip->text; - } - if (to > first) { -#ifdef USE_TLS - if (ip->tls_status != TLS_INACTIVE) - tls_write(ip, "\b \b", 3); - else -#endif - socket_write(ip->socket, "\b \b", 3); + comm_socket_write("\b \b", 3, ip); to--; ip->chars_ready--; } -#endif goto ts_data; } } /* if (ip->tn_enabled) */ @@ -7281,11 +7366,6 @@ * codes for colours and other things needs to bypass the allowed * charset filters, but isn't important enough to waste bandwith * on a synchronous transmission. - * -#ifdef USE_MCCP - * If the client uses MCCP compression add_message ist always used - * with flushing buffer _after_ the Message. -#endif */ { @@ -7334,11 +7414,7 @@ save_command_giver = command_giver; command_giver = current_object; -#ifdef USE_MCCP - if ((sp->u.number & 1)||ip->out_compress) -#else if (sp->u.number & 1) -#endif { /* Write before flush... */ @@ -7348,13 +7424,9 @@ sending_telnet_command = MY_FALSE; -#ifdef USE_MCCP - if ((sp->u.number & 2)||ip->out_compress) - add_message(message_flush); -#else if (sp->u.number & 2) add_message(message_flush); -#endif /* USE_MCCP */ + wrote = mstrsize(msg); } else @@ -7366,46 +7438,8 @@ * to the socket now. */ - for (i = 6; i > 0; i--) { -#ifdef USE_PTHREADS - wrote = (mp_int)thread_socket_write(ip->socket, get_txt(msg) - , mstrsize(msg), ip, MY_TRUE); -#else -#ifdef USE_TLS - if (ip->tls_status != TLS_INACTIVE) - wrote = (mp_int)tls_write(ip, get_txt(msg), mstrsize(msg)); - else -#endif /* USE_TLS */ - wrote = (mp_int)socket_write(ip->socket, get_txt(msg), mstrsize(msg)); -#endif - if (wrote != -1 || errno != EINTR || i != 1) - break; - } - if (wrote == -1) - { - /* TODO: Use strerror()? */ - switch(errno) - { - case EINTR: - fprintf(stderr - , "%s comm: write EINTR. Message discarded.\n" - , time_stamp()); - break; - case EWOULDBLOCK: - fprintf(stderr, - "%s comm: write EWOULDBLOCK. Message discarded.\n" - , time_stamp()); - break; - case EMSGSIZE: - fprintf(stderr, "%s comm: write EMSGSIZE.\n" - , time_stamp()); - break; - default: - perror("write"); - ip->do_close = FLAG_DO_CLOSE; - break; - } - } + comm_socket_write(get_txt(msg), mstrsize(msg), ip); + } /* if (type of write) */ command_giver = save_command_giver; writebuffer2.diff (31,638 bytes)
Index: trunk.inputto/doc/master/privilege_violation =================================================================== --- trunk.inputto/doc/master/privilege_violation (Revision 2537) +++ trunk.inputto/doc/master/privilege_violation (Arbeitskopie) @@ -1,5 +1,5 @@ SYNOPSIS - int privilege_violation(string op, mixed who, mixed arg, mixed arg2) + int privilege_violation(string op, mixed who, mixed arg, mixed arg2, mixed arg3) DESCRIPTION Validate the execution of a privileged operation. @@ -18,6 +18,8 @@ bind_lambda Bind a lambda-closure to object <arg>. call_out_info Return an array with all call_out informations. + configure_interactive Set option <arg2> with value <arg3> as + default (<arg>==0) or for object <arg>. enable_telnet Enable/disable telnet (<arg2>) for object <arg>. execute_command Execute command string <arg2> for the object <arg>. Index: trunk.inputto/doc/efun/set_driver_hook =================================================================== --- trunk.inputto/doc/efun/set_driver_hook (Revision 2537) +++ trunk.inputto/doc/efun/set_driver_hook (Arbeitskopie) @@ -88,6 +88,11 @@ Optional hook to notify the mudlib about the termination of the erq demon. + H_MSG_DISCARDED + arg: lambda closure, lfun closure, string + Optional hook to specify a message or take other measures + when a message had to be discarded. + See hooks(C) for a detailed discussion. HISTORY Index: trunk.inputto/doc/concepts/hooks =================================================================== --- trunk.inputto/doc/concepts/hooks (Revision 2537) +++ trunk.inputto/doc/concepts/hooks (Arbeitskopie) @@ -125,6 +125,11 @@ the erq demon. + H_MSG_DISCARDED + Optional hook to specify a message or take other measures + when a message had to be discarded. + + HISTORY The hooks concept was introduced in 3.2.1 H_MOVE_OBJECT0/1 were introduced in 3.2.1@1 Index: trunk.inputto/src/pkg-mccp.c =================================================================== --- trunk.inputto/src/pkg-mccp.c (Revision 2537) +++ trunk.inputto/src/pkg-mccp.c (Arbeitskopie) @@ -204,7 +204,7 @@ ip->out_compress_buf = NULL; /* try to send any residual data */ - comm_socket_write((char*) buf, len, ip); + comm_socket_write((char*) buf, len, ip, WB_NONDISCARDABLE); printf("%s MCCP-DEBUG: '%s' mccp ended\n" , time_stamp(), get_txt(ip->ob->name)); Index: trunk.inputto/src/make_func.y =================================================================== --- trunk.inputto/src/make_func.y (Revision 2537) +++ trunk.inputto/src/make_func.y (Arbeitskopie) @@ -1652,6 +1652,8 @@ return H_PRINT_PROMPT; if ( !strcmp(name, "REGEXP_PACKAGE") ) return H_REGEXP_PACKAGE; + if ( !strcmp(name, "MSG_DISCARDED") ) + return H_MSG_DISCARDED; return -1; } Index: trunk.inputto/src/prolang.y =================================================================== --- trunk.inputto/src/prolang.y (Revision 2537) +++ trunk.inputto/src/prolang.y (Arbeitskopie) @@ -207,6 +207,7 @@ H_DEFAULT_PROMPT: SH(T_CLOSURE) SH(T_STRING), \ H_PRINT_PROMPT: SH(T_CLOSURE) SH(T_STRING), \ H_REGEXP_PACKAGE: SH(T_NUMBER), \ + H_MSG_DISCARDED: SH(T_CLOSURE) SH(T_STRING), \ #undef SH Index: trunk.inputto/src/comm.c =================================================================== --- trunk.inputto/src/comm.c (Revision 2537) +++ trunk.inputto/src/comm.c (Arbeitskopie) @@ -134,6 +134,7 @@ #include "i-eval_cost.h" #include "../mudlib/sys/comm.h" +#include "../mudlib/sys/configuration.h" #include "../mudlib/sys/driver_hook.h" #include "../mudlib/sys/input_to.h" @@ -225,8 +226,10 @@ * a 'null format string'. */ -long write_buffer_max_size = WRITE_BUFFER_MAX_SIZE; +p_int write_buffer_max_size = WRITE_BUFFER_MAX_SIZE; /* Amount of data held pending in the write fifo queue. + * 0: No queue. + * -1: Infinite queue. */ @@ -469,7 +472,9 @@ static void send_wont(int); static void send_do(int); static void send_dont(int); +static void add_flush_entry(interactive_t *ip); static void remove_flush_entry(interactive_t *ip); +static void clear_message_buf(interactive_t *ip); static void new_player(object_t *receiver, SOCKET_T new_socket, struct sockaddr_in *addr, size_t len, int login_port); #ifdef ERQ_DEMON @@ -753,7 +758,6 @@ default: putc('\n', stderr); } fprintf(stderr, " .supress_go_ahead: %02hhx\n", (unsigned char)ip->supress_go_ahead); - fprintf(stderr, " .msg_discarded: %hd\n", ip->msg_discarded); fprintf(stderr, " .text_end: %hd (%p)\n", ip->text_end, ip->text+ip->text_end); fprintf(stderr, " .command_start: %hd (%p)\n", ip->command_start, ip->text+ip->command_start); fprintf(stderr, " .command_end: %hd (%p)\n", ip->command_end, ip->text+ip->command_end); @@ -1350,7 +1354,7 @@ case EPIPE: fprintf(stderr, "%s comm: write EPIPE detected\n", time_stamp()); break; - + case ECONNRESET: fprintf(stderr, "%s comm: write ECONNRESET detected\n", time_stamp()); break; @@ -1362,10 +1366,10 @@ , time_stamp(), e, strerror(e)); } } - + if (n == 0) break; - + ip->do_close = FLAG_DO_CLOSE; return -1; @@ -1385,11 +1389,13 @@ /*-------------------------------------------------------------------------*/ Bool -comm_socket_write (char *msg, size_t size, interactive_t *ip) +comm_socket_write (char *msg, size_t size, interactive_t *ip, write_buffer_flag_t flags) /* Stand in for socket_write(): take the data to be written, compress and * encrypt them if needed and send them to <ip>. If no data can be send - * right now append them to the buffer. + * right now append them to the buffer. Returns false if no more data + * is accepted at this time (messages are discarded or the connection + * was dropped). */ { @@ -1401,30 +1407,64 @@ return MY_TRUE; #ifdef USE_MCCP + /* We cannot discard already compressed packets, + * because the zlib will generate a checksum + * over all bytes. So we have to check before + * compressing the message whether we can send + * or put them in the write buffer. + * + * To provide a consistent behavior we do this + * also for uncompressed connections. + */ +#endif + + if (!(flags & WB_NONDISCARDABLE) && ip->write_first) + { + p_int max_size; + + max_size = ip->write_max_size; + if (max_size == -2) + max_size = write_buffer_max_size; + + if (max_size >= 0 && ip->write_size >= (p_uint) max_size) + { + /* Buffer overflow. */ + if (ip->msg_discarded != DM_NONE) + return MY_FALSE; /* Message will be or was sent. */ + + /* Notify the master about it. */ + ip->msg_discarded = DM_SEND_INFO; + add_flush_entry(ip); + + return MY_FALSE; + } + } + +#ifdef USE_MCCP if (ip->out_compress) { int status; - + ip->out_compress->next_in = (unsigned char *) msg; ip->out_compress->avail_in = size; - + ip->out_compress->next_out = ip->out_compress_buf; ip->out_compress->avail_out = COMPRESS_BUF_SIZE; status = deflate(ip->out_compress, Z_SYNC_FLUSH); - + if (status != Z_OK) { fprintf(stderr, "%s comm: MCCP compression error: %d\n" , time_stamp(), status); return MY_FALSE; } - + /* ok.. perhaps i should take care that all data in message_buf * is compressed, but i guess there is no chance that 1024 byte * compressed won't fit into the 8192 byte buffer */ - + length = ip->out_compress->next_out - ip->out_compress_buf; buf = (char *) ip->out_compress_buf; } @@ -1440,7 +1480,7 @@ { /* Try writing to the socket first. */ ssize_t n = comm_send_buf(buf, length, ip); - + if (n == -1) return MY_FALSE; else if (n == length) @@ -1448,45 +1488,34 @@ /* We're done. */ return MY_TRUE; } - else + else if (n > 0) { buf += n; length -= n; } } - + /* We have to enqueue the message. */ - b = xalloc(sizeof(struct write_buffer_s) + size - 1); + + b = xalloc(sizeof(struct write_buffer_s) + length - 1); if (!b) - outofmem(sizeof(struct write_buffer_s) + size - 1, "comm_socket_write()"); + outofmem(sizeof(struct write_buffer_s) + length - 1, "comm_socket_write()"); b->length = length; b->pos = 0; + b->flags = flags; b->next = NULL; memcpy(b->buffer, buf, length); - /* Chain in the new buffer */ - if(ip->write_first) + /* Chain in the new buffer */ + if (ip->write_first) ip->write_last = ip->write_last->next = b; else - ip->write_first = ip->write_last = b; - ip->write_size += size; - - /* Make sure that the amount of data pending never exceeds - * the maximum. - */ - while (write_buffer_max_size != 0 - && ip->write_size >= (unsigned long)write_buffer_max_size) - { - struct write_buffer_s *tmp = ip->write_first; - ip->write_first = tmp->next; - ip->write_size -= tmp->length; - xfree(tmp); + ip->write_last = ip->write_first = b; - if (!ip->msg_discarded) - ip->msg_discarded = 1; - } - + ip->write_size += length; + ip->msg_discarded = DM_NONE; + return MY_TRUE; } /* comm_socket_write() */ @@ -1498,35 +1527,21 @@ */ { - /* First, if a previous call had to discard the message, inform the user. - */ - if (ip->msg_discarded) - { - ssize_t n; - char msg[] = "\n*** Text lost in transmission ***\n"; - - n = comm_send_buf(msg, sizeof(msg) - ip->msg_discarded, ip); - - ip->msg_discarded += n; - if (ip->msg_discarded >= sizeof(msg)) - ip->msg_discarded = 0; - } - while (ip->write_first != NULL) { struct write_buffer_s *buf; ssize_t n; - + buf = ip->write_first; n = comm_send_buf(buf->buffer + buf->pos, buf->length - buf->pos, ip); if (n == -1) return; - + buf->pos += n; if (buf->pos < buf->length) return; - + ip->write_first = buf->next; ip->write_size -= buf->length; xfree(buf); @@ -1534,6 +1549,63 @@ } /* comm_write_pending() */ /*-------------------------------------------------------------------------*/ +static void +add_discarded_message (interactive_t *ip) + +/* Calls the H_MSG_DISCARDED driver hook and adds the + * message to the write buffer of <ip>. <ip> is removed + * from the list of dirty interactives if it's clean afterwards. + */ +{ + string_t *discarded_msg; + + if (driver_hook[H_MSG_DISCARDED].type == T_CLOSURE) + { + if (driver_hook[H_MSG_DISCARDED].x.closure_type == CLOSURE_LAMBDA) + { + free_object(driver_hook[H_MSG_DISCARDED].u.lambda->ob, "add_discarded_message"); + driver_hook[H_MSG_DISCARDED].u.lambda->ob = ref_object(ip->ob, "add_discarded_message"); + } + + push_ref_valid_object(inter_sp, ip->ob, "add_discarded_message"); + + call_lambda(&driver_hook[H_MSG_DISCARDED], 1); + + if (inter_sp->type == T_STRING) + { + /* The new discarded_msg. Adopt the reference. */ + discarded_msg = inter_sp->u.str; + + inter_sp->type = T_INVALID; + inter_sp--; + } + else + discarded_msg = NULL; + } + else if (driver_hook[H_MSG_DISCARDED].type == T_STRING) + { + discarded_msg = ref_mstring(driver_hook[H_MSG_DISCARDED].u.str); + } + else + { + discarded_msg = ref_mstring(STR_DISCARDED_MSG); + } + + if (discarded_msg) + { + /* Append it to the write buffer. */ + comm_socket_write(get_txt(discarded_msg), mstrsize(discarded_msg) + , ip, WB_NONDISCARDABLE); + free_mstring(discarded_msg); + } + + ip->msg_discarded = DM_INFO_WAS_SENT; + + if (!ip->message_length) + remove_flush_entry(ip); +} + +/*-------------------------------------------------------------------------*/ void add_message (const char *fmt, ...) @@ -1838,6 +1910,11 @@ dest = &ip->message_buf[old_message_length]; end = &ip->message_buf[sizeof ip->message_buf]; + /* If there's any recursive call, let it begin + * at the start. + */ + ip->message_length = 0; + /* This loop advances source until it reaches the end. * Every character encountered is copied, translated or fed * into the telnet machine. @@ -1925,10 +2002,10 @@ /* Write .message_buf[] to the network. */ - if (!comm_socket_write(ip->message_buf, (size_t)chunk, ip)) + if (!comm_socket_write(ip->message_buf, (size_t)chunk, ip, 0)) { if (old_message_length) - remove_flush_entry(ip); + clear_message_buf(ip); return; } @@ -1946,18 +2023,11 @@ { /* Buffer became 'dirty': add this interactive to the list. */ - if ( NULL != (ip->next_player_for_flush = first_player_for_flush) ) - { - O_GET_INTERACTIVE(first_player_for_flush)-> - previous_player_for_flush = - command_giver; - } - ip->previous_player_for_flush = NULL; - first_player_for_flush = command_giver; + add_flush_entry(ip); } if ( !length && old_message_length ) /* buffer has become empty */ { - remove_flush_entry(ip); + clear_message_buf(ip); } } /* add_message() */ @@ -1999,19 +2069,35 @@ /*-------------------------------------------------------------------------*/ static void +add_flush_entry (interactive_t *ip) + +/* Add the given interactive <ip> to the list of 'dirty' interactives. + * The function is safe to call for interactives already in the list. + */ + +{ + if ( ip->previous_player_for_flush || first_player_for_flush == ip->ob) + return; + + if ( NULL != (ip->next_player_for_flush = first_player_for_flush) ) + { + O_GET_INTERACTIVE(first_player_for_flush)-> + previous_player_for_flush = ip->ob; + } + ip->previous_player_for_flush = NULL; + first_player_for_flush = ip->ob; +} /* add_flush_entry() */ + +/*-------------------------------------------------------------------------*/ +static void remove_flush_entry (interactive_t *ip) /* Remove the given interactive <ip> from the list of 'dirty' interactives * and make sure it is really clean. The function is safe to call for * interactives not in the list. - * - * This function is called after an interactive sent all pending data (or - * failing while doing so). */ { - ip->message_length = 0; - /* To make it safe for calling the function even for interactives * not in the flush list, we check that <ip> is either in the middle * or at the end of the flush list (one or both of the .previous @@ -2050,19 +2136,43 @@ { object_t *p, *np; + interactive_t *ip; object_t *save = command_giver; for ( p = first_player_for_flush; p != NULL; p = np) { - np = O_GET_INTERACTIVE(p)->next_player_for_flush; + ip = O_GET_INTERACTIVE(p); + np = ip->next_player_for_flush; /* add_message() will clobber (p)->next_player_for_flush! */ command_giver = p; add_message(message_flush); + + if(ip->msg_discarded == DM_SEND_INFO) + add_discarded_message(ip); } command_giver = save; } /* flush_all_player_mess() */ /*-------------------------------------------------------------------------*/ +static void +clear_message_buf (interactive_t *ip) + +/* Clear the buffer of the given interactive <ip> and remove it from + * the list of 'dirty' interactives if there is nothing else to do. + * The function is safe to call for interactives not in the list. + * + * This function is called after an interactive sent all pending data (or + * failing while doing so). + */ + +{ + ip->message_length = 0; + + if (ip->msg_discarded != DM_SEND_INFO) + remove_flush_entry(ip); +} /* clear_message_buf() */ + +/*-------------------------------------------------------------------------*/ Bool get_message (char *buff) @@ -2191,7 +2301,7 @@ nfds = socket_number(ip->socket)+1; } - if (ip->write_first != NULL || ip->msg_discarded) + if (ip->write_first != NULL) { /* There is something to write. */ FD_SET(ip->socket, &writefds); @@ -3025,7 +3135,7 @@ { comm_socket_write(ip->text + ip->chars_ready , (size_t)(length - ip->chars_ready) - , ip); + , ip, 0); ip->chars_ready = length; } } @@ -3532,7 +3642,6 @@ new_interactive->input_to = NULL; put_number(&new_interactive->prompt, 0); new_interactive->modify_command = NULL; - new_interactive->msg_discarded = 0; new_interactive->closing = MY_FALSE; new_interactive->tn_enabled = MY_TRUE; new_interactive->do_close = 0; @@ -3564,9 +3673,11 @@ new_interactive->socket = new_socket; new_interactive->next_player_for_flush = NULL; new_interactive->previous_player_for_flush = NULL; + new_interactive->msg_discarded = DM_NONE; new_interactive->write_first = new_interactive->write_last = NULL; new_interactive->write_size = 0; + new_interactive->write_max_size = -2; /* Add the new interactive structure to the list of users */ @@ -5143,12 +5254,12 @@ { comm_socket_write(&ip->text[ip->chars_ready] , (size_t)(to - &ip->text[ip->chars_ready]) - , ip); + , ip, 0); ip->chars_ready = to - ip->text; } if (to > first) { - comm_socket_write("\b \b", 3, ip); + comm_socket_write("\b \b", 3, ip, 0); to--; ip->chars_ready--; } @@ -6909,7 +7020,7 @@ * to the socket now. */ - if (comm_socket_write(get_txt(msg), mstrsize(msg), ip)) + if (comm_socket_write(get_txt(msg), mstrsize(msg), ip, 0)) wrote = mstrsize(msg); } /* if (type of write) */ @@ -8781,4 +8892,80 @@ return sp; } /* f_net_connect() */ +/*-------------------------------------------------------------------------*/ +svalue_t * +f_configure_interactive (svalue_t *sp) + +/* EFUN configure_interactive() + * + * void configure_interactive(object ob, int what, mixed data) + * + * Sets the option <what> to the value <data> on the interactive <ob> + * or the default for all interactives if <ob> is 0. + * + * <what> == IC_MAX_WRITE_BUFFER_SIZE + * + * If the first argument <ob> is not this_object(), the privilege violation + * ("configure_interactive", this_object(), ob, what, data) occurs. + */ + +{ + object_t *ob; + interactive_t *ip; + + if (sp[-2].type == T_OBJECT) + { + ob = sp[-2].u.ob; + + if (!O_SET_INTERACTIVE(ip, ob)) + { + errorf("Bad arg 1 to configure_interactive(): " + "Object '%s' is not interactive.\n" + , get_txt(ob->name) + ); + return sp; /* NOTREACHED */ + } + } + else + { + ob = NULL; + ip = NULL; + } + + if (ob != current_object + && !privilege_violation_n(STR_CONFIGURE_INTERACTIVE, ob, sp, 2)) + { + sp = pop_n_elems(3, sp); + return sp; + } + + switch(sp[-1].u.number) + { + default: + errorf("Illegal value %"PRIdPINT" for configure_interactive().\n", sp[-1].u.number); + return sp; /* NOTREACHED */ + + case IC_MAX_WRITE_BUFFER_SIZE: + { + int max; + + if (sp->type != T_NUMBER) + efun_exp_arg_error(3, TF_NUMBER, sp->type, sp); + + max = sp->u.number; + if (max < 0) + max = -1; + + if (!ip) + write_buffer_max_size = max; + else + ip->write_max_size = max; + break; + } + } + + sp = pop_n_elems(3, sp); + return sp; +} /* f_configure_interactive() */ + /***************************************************************************/ Index: trunk.inputto/src/comm.h =================================================================== --- trunk.inputto/src/comm.h (Revision 2537) +++ trunk.inputto/src/comm.h (Arbeitskopie) @@ -126,14 +126,32 @@ * The instances are kept in a linked list from the interactive_t * structure. */ +enum write_buffer_flags +{ + WB_NONDISCARDABLE = 0x0001, /* This message must be sent. */ +}; + +typedef uint32 write_buffer_flag_t; + struct write_buffer_s { struct write_buffer_s *next; size_t length; size_t pos; + write_buffer_flag_t flags; char buffer[1 /* .length */ ]; }; +/* Indicates discarded messages. */ +enum discarded_msg_states +{ + DM_NONE = 0, + DM_SEND_INFO, + DM_INFO_WAS_SENT, +}; + +typedef char discarded_msg_state_t; + /* --- struct input_to_s: input_to() datastructure * * input-to structures describe a pending input_to() for a given @@ -186,9 +204,9 @@ CBool outgoing_conn; /* TRUE if the connection was created by * net_connect(). */ - - short msg_discarded; /* != 0 if an earlier msg had been discarded, - index into the message to be sent. */ + discarded_msg_state_t msg_discarded; + /* Indicates if an earlier message had + * been discarded. */ short text_end; /* first free char in buffer */ short command_start; /* used for charmode */ short command_end; /* where we are up to in player cmd buffer */ @@ -247,14 +265,20 @@ /* The send buffer. */ #ifdef USE_MCCP - unsigned char compressing; - z_stream * out_compress; - unsigned char * out_compress_buf; -#endif + unsigned char compressing; + z_stream * out_compress; + unsigned char * out_compress_buf; +#endif struct write_buffer_s *write_first; /* List of buffers to write */ struct write_buffer_s *write_last; - unsigned long write_size; + p_uint write_size; + p_int write_max_size; + /* Maximum write_size. + * 0: No write buffer. + * -1: Infinite write buffer. + * -2: Use global variable write_buffer_max_size. + */ #ifdef USE_TLS tls_session_t tls_session; @@ -356,7 +380,7 @@ extern char *message_flush; extern char *domain_name; -extern long write_buffer_max_size; +extern p_int write_buffer_max_size; #ifdef COMM_STAT extern unsigned long add_message_calls; @@ -372,7 +396,7 @@ extern void initialize_host_ip_number(const char *, const char *); extern void prepare_ipc(void); extern void ipc_remove(void); -extern Bool comm_socket_write (char *msg, size_t size, interactive_t *ip); +extern Bool comm_socket_write (char *msg, size_t size, interactive_t *ip, uint32 flags); extern void add_message VARPROT((const char *, ...), printf, 1, 2); extern void flush_all_player_mess(void); extern Bool get_message(char *buff); @@ -437,6 +461,7 @@ extern svalue_t *f_set_max_commands (svalue_t *sp); extern svalue_t *f_enable_telnet (svalue_t *sp); extern svalue_t *f_net_connect (svalue_t *sp); +extern svalue_t *f_configure_interactive(svalue_t *sp); extern void refresh_access_data(void (*add_entry)(struct sockaddr_in *, int, long*) ); Index: trunk.inputto/src/func_spec =================================================================== --- trunk.inputto/src/func_spec (Revision 2537) +++ trunk.inputto/src/func_spec (Arbeitskopie) @@ -502,6 +502,7 @@ object blueprint(string|object default: F_THIS_OBJECT); /* PRELIMINARY */ object clone_object(string|object); object *clones(void|int|string|object, void|int); +void configure_interactive(object, int, mixed); void destruct(null|object); int exec(object, object); object find_object(string); Index: trunk.inputto/src/string_spec =================================================================== --- trunk.inputto/src/string_spec (Revision 2537) +++ trunk.inputto/src/string_spec (Arbeitskopie) @@ -20,6 +20,7 @@ DANGLING_V_CL "dangling var closure" DEFAULT_PROMPT "> " DESTRUCTED "destructed" +DISCARDED_MSG "\n*** Text lost in transmission ***\n" EFUN "efun" EFUN_CLOSURE "<efun closure>" FATAL_ERROR "Fatal Error" @@ -70,6 +71,7 @@ ATTACH_ERQ_DEMON "attach_erq_demon" BIND_LAMBDA "bind_lambda" CALL_OUT_INFO "call_out_info" +CONFIGURE_INTERACTIVE "configure_interactive" SEND_ERQ "erq" INPUT_TO "input_to" SEND_UDP "send_udp" Index: trunk.inputto/src/interpret.c =================================================================== --- trunk.inputto/src/interpret.c (Revision 2537) +++ trunk.inputto/src/interpret.c (Arbeitskopie) @@ -6652,6 +6652,79 @@ } /* privilege_violation4() */ /*-------------------------------------------------------------------------*/ +Bool +privilege_violation_n ( string_t *what, object_t *whom, svalue_t *sp, int num_arg) + +/* Call the mudlib to check for a privilege violation: + * + * master->privilege_violation(what, current_object, whom, + * sp[-num_arg+1], ...., sp) + * + * where <what> describes the type of the violation, and <whom> and the last + * <num_arg> values of the stack are data used in the violation. <sp> is + * also the current stack setting. All strings are not counted. + * + * If the apply returns a positive number, the privilege is granted and + * the function returns TRUE. + * If the apply returns 0, the privilege is gently denied and the function + * returns FALSE. + * If the apply returns something else, or if the lfun doesn't exist, + * an error is raised. + * + * If the current_object is the master or simul_efun object, this function + * immediately returns TRUE. + * + * If the lfun doesn't exist, or returns anything else but a positive + * number, an error is raised. + * + * <inter_sp> is updated to <sp>, <inter_pc> is assumed to be correct. + */ + +{ + svalue_t *svp, *arg; + int num; + + /* Trust these objects */ + if (current_object == master_ob) return MY_TRUE; + if (current_object == simul_efun_object) return MY_TRUE; + + /* Set up the lfun call */ + + arg = sp + 1 - num_arg; + + push_ref_string(sp, what); + push_ref_valid_object(sp, current_object, "privilege_violation"); + if (!whom) + { + push_number(sp, 0); + } + else + { + push_ref_object(sp, whom, "privilege_violation"); + } + + for (num = num_arg; num--; arg++) + { + sp++; + assign_svalue_no_free(sp, arg); + } + + inter_sp = sp; + svp = apply_master(STR_PRIVILEGE, 3 + num_arg); + + /* Was it the proper lfun to call? */ + if (!svp || svp->type != T_NUMBER || svp->u.number < 0) + { + inter_sp = sp - 3 - num_arg; + errorf("privilege violation : %s\n", get_txt(what)); + /* TODO: Print full args and types */ + } + + /* Return the result */ + return svp->u.number > 0; +} /* privilege_violation_n() */ + +/*-------------------------------------------------------------------------*/ static Bool trace_test (int b) Index: trunk.inputto/src/interpret.h =================================================================== --- trunk.inputto/src/interpret.h (Revision 2537) +++ trunk.inputto/src/interpret.h (Arbeitskopie) @@ -181,6 +181,7 @@ extern Bool privilege_violation(string_t *what, svalue_t *arg, svalue_t *sp); extern Bool privilege_violation2(string_t *what, svalue_t *arg, svalue_t *arg2, svalue_t *sp); extern Bool privilege_violation4(string_t *what, object_t *whom, string_t *how_str, int how_num, svalue_t *sp); +extern Bool privilege_violation_n(string_t *what, object_t *whom, svalue_t *sp, int num_arg); extern svalue_t *sapply_int(string_t *fun, object_t *ob, int num_arg, Bool b_ign_prot, Bool b_use_default); #define sapply(f,o,n) sapply_int(f,o,n, MY_FALSE, MY_TRUE) Index: trunk.inputto/CHANGELOG =================================================================== --- trunk.inputto/CHANGELOG (Revision 2538) +++ trunk.inputto/CHANGELOG (Arbeitskopie) @@ -1,6 +1,13 @@ This file lists all changes made to the game driver in all glory detail. See the file HISTORY for a user-oriented summary of all the changes. +??-Apr-2009 (Gnomi) + - (comm.c, interpret.c, doc/) Made the maximum write buffer size + configurable per interactive via a new efun configure_interactive. + New optional driver hook H_MSG_DISCARDED to inform the master about + discarded messages and send a custom error message to the interactive. + Already compressed packages wont be discarded (Bug #297). + 07-Apr-2009 (Gnomi) - (pkg-tls.h) Corrected the definition of tls_check_certificate. - (pkg-gnutls.h, pkg-openssl.h) Forgot an include. Index: trunk.inputto/mudlib/sys/configuration.h =================================================================== --- trunk.inputto/mudlib/sys/configuration.h (Revision 0) +++ trunk.inputto/mudlib/sys/configuration.h (Revision 0) @@ -0,0 +1,11 @@ +#ifndef LPC_CONFIGURATION_H_ +#define LPC_ CONFIGURATION_H_ + +/* Definition of argument values for configure_interactive(). + */ + +/* Possible options for configure_interactive(). + */ +#define IC_MAX_WRITE_BUFFER_SIZE 0 + +#endif /* LPC_CONFIGURATION_H_ */ Index: trunk.inputto/mudlib/sys/driver_hook.h =================================================================== --- trunk.inputto/mudlib/sys/driver_hook.h (Revision 2537) +++ trunk.inputto/mudlib/sys/driver_hook.h (Arbeitskopie) @@ -28,8 +28,9 @@ #define H_DEFAULT_PROMPT 21 #define H_PRINT_PROMPT 22 #define H_REGEXP_PACKAGE 23 +#define H_MSG_DISCARDED 24 -#define NUM_DRIVER_HOOKS 24 /* Number of hooks */ +#define NUM_DRIVER_HOOKS 25 /* Number of hooks */ #endif /* LPC_DRIVER_HOOK_ */ | ||||
|
I attached a patch that implements the write buffers without pthreads. The buffer has (as it were with pthreads) a limit that is configured during compile time or given as a command line argument. So now we need to give the mudlib some control over the buffer. I'm thinking about an efun for setting the maximum buffer size and a driver hook to inform about abandoned data. |
|
First part is committed as r2489. |
|
I attached a patch for the second part (configuration efun and driver hook). The driver hook is called in the backend loop (while processing dirty interactives) because I didn't want to call LPC code from add_message, which is not reentrant. |
|
Committed as r2551. |
Date Modified | Username | Field | Change |
---|---|---|---|
2004-11-27 00:00 |
|
New Issue | |
2009-01-05 03:38 | Gnomi | Status | new => assigned |
2009-01-05 03:38 | Gnomi | Assigned To | => Gnomi |
2009-01-05 03:38 | Gnomi | File Added: writebuffer.diff | |
2009-01-05 03:44 | Gnomi | Note Added: 0000851 | |
2009-01-05 03:44 | Gnomi | Relationship added | parent of 0000401 |
2009-01-05 03:44 | Gnomi | Relationship added | parent of 0000211 |
2009-01-05 03:46 | Gnomi | File Deleted: writebuffer.diff | |
2009-01-05 03:47 | Gnomi | File Added: writebuffer.diff | |
2009-01-14 08:06 | Gnomi | Note Added: 0000889 | |
2009-04-07 13:19 | Gnomi | File Added: writebuffer2.diff | |
2009-04-07 13:22 | Gnomi | Note Added: 0001011 | |
2009-04-08 01:10 | Gnomi | File Deleted: writebuffer2.diff | |
2009-04-08 01:10 | Gnomi | File Added: writebuffer2.diff | |
2009-04-08 01:52 | Gnomi | File Deleted: writebuffer2.diff | |
2009-04-08 01:52 | Gnomi | File Added: writebuffer2.diff | |
2009-04-16 10:59 | Gnomi | Note Added: 0001045 | |
2009-04-16 10:59 | Gnomi | Status | assigned => resolved |
2009-04-16 10:59 | Gnomi | Fixed in Version | => 3.3.719 |
2009-04-16 10:59 | Gnomi | Resolution | open => fixed |
2009-05-05 13:50 | zesstra | Project | LDMud => LDMud 3.3 |
2009-05-19 16:04 | Gnomi | Relationship replaced | has duplicate 0000401 |
2009-05-19 16:08 | Gnomi | Relationship replaced | has duplicate 0000211 |
2010-11-16 09:42 | Gnomi | Source_changeset_attached | => ldmud.git master ddd34aac |
2018-01-29 18:59 | Gnomi | Source_changeset_attached | => ldmud.git master ddd34aac |
2018-01-29 21:57 | Gnomi | Source_changeset_attached | => ldmud.git master ddd34aac |