Mercurial > njs
changeset 2616:db32e358a1fd
Fetch: added keepalive support for ngx.fetch() API.
This closes #957 feature request on Github.
| author | Dmitry Volyntsev <xeioex@nginx.com> |
|---|---|
| date | Wed, 03 Sep 2025 20:27:16 -0700 |
| parents | 59872fdb9291 |
| children | 4909fc840713 |
| files | nginx/ngx_http_js_module.c nginx/ngx_js.c nginx/ngx_js.h nginx/ngx_js_fetch.c nginx/ngx_js_http.c nginx/ngx_js_http.h nginx/ngx_qjs_fetch.c nginx/ngx_stream_js_module.c nginx/t/js_fetch_https_keepalive.t nginx/t/js_fetch_keepalive.t nginx/t/stream_js_fetch_keepalive.t |
| diffstat | 11 files changed, 1309 insertions(+), 70 deletions(-) [+] |
line wrap: on
line diff
--- a/nginx/ngx_http_js_module.c Fri Sep 12 19:10:32 2025 -0700 +++ b/nginx/ngx_http_js_module.c Wed Sep 03 20:27:16 2025 -0700 @@ -565,6 +565,34 @@ 0, NULL }, + { ngx_string("js_fetch_keepalive"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_js_loc_conf_t, fetch_keepalive), + NULL }, + + { ngx_string("js_fetch_keepalive_requests"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_js_loc_conf_t, fetch_keepalive_requests), + NULL }, + + { ngx_string("js_fetch_keepalive_time"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_js_loc_conf_t, fetch_keepalive_time), + NULL }, + + { ngx_string("js_fetch_keepalive_timeout"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_js_loc_conf_t, fetch_keepalive_timeout), + NULL }, + ngx_null_command };
--- a/nginx/ngx_js.c Fri Sep 12 19:10:32 2025 -0700 +++ b/nginx/ngx_js.c Wed Sep 03 20:27:16 2025 -0700 @@ -10,6 +10,7 @@ #include <ngx_core.h> #include <math.h> #include "ngx_js.h" +#include "ngx_js_http.h" typedef struct { @@ -3986,6 +3987,11 @@ conf->max_response_body_size = NGX_CONF_UNSET_SIZE; conf->timeout = NGX_CONF_UNSET_MSEC; + conf->fetch_keepalive = NGX_CONF_UNSET_UINT; + conf->fetch_keepalive_requests = NGX_CONF_UNSET_UINT; + conf->fetch_keepalive_time = NGX_CONF_UNSET_MSEC; + conf->fetch_keepalive_timeout = NGX_CONF_UNSET_MSEC; + return conf; } @@ -4097,6 +4103,17 @@ ngx_conf_merge_size_value(conf->max_response_body_size, prev->max_response_body_size, 1048576); + ngx_conf_merge_uint_value(conf->fetch_keepalive, prev->fetch_keepalive, 0); + ngx_conf_merge_uint_value(conf->fetch_keepalive_requests, + prev->fetch_keepalive_requests, 1000); + ngx_conf_merge_msec_value(conf->fetch_keepalive_time, + prev->fetch_keepalive_time, 3600000); + ngx_conf_merge_msec_value(conf->fetch_keepalive_timeout, + prev->fetch_keepalive_timeout, 60000); + + ngx_queue_init(&conf->fetch_keepalive_cache); + ngx_queue_init(&conf->fetch_keepalive_free); + if (ngx_js_merge_vm(cf, (ngx_js_loc_conf_t *) conf, (ngx_js_loc_conf_t *) prev, init_vm)
--- a/nginx/ngx_js.h Fri Sep 12 19:10:32 2025 -0700 +++ b/nginx/ngx_js.h Wed Sep 03 20:27:16 2025 -0700 @@ -13,6 +13,7 @@ #include <ngx_config.h> #include <ngx_core.h> #include <ngx_event.h> +#include <ngx_event_connect.h> #include <njs.h> #include <njs_rbtree.h> #include <njs_arr.h> @@ -133,7 +134,14 @@ \ size_t buffer_size; \ size_t max_response_body_size; \ - ngx_msec_t timeout + ngx_msec_t timeout; \ + \ + ngx_uint_t fetch_keepalive; \ + ngx_uint_t fetch_keepalive_requests; \ + ngx_msec_t fetch_keepalive_time; \ + ngx_msec_t fetch_keepalive_timeout; \ + ngx_queue_t fetch_keepalive_cache; \ + ngx_queue_t fetch_keepalive_free #if defined(NGX_HTTP_SSL) || defined(NGX_STREAM_SSL)
--- a/nginx/ngx_js_fetch.c Fri Sep 12 19:10:32 2025 -0700 +++ b/nginx/ngx_js_fetch.c Wed Sep 03 20:27:16 2025 -0700 @@ -7,10 +7,6 @@ */ -#include <ngx_config.h> -#include <ngx_core.h> -#include <ngx_event.h> -#include <ngx_event_connect.h> #include "ngx_js.h" #include "ngx_js_http.h" @@ -550,6 +546,13 @@ goto fail; } + if (u.host.len >= NGX_JS_HOST_MAX_LEN) { + njs_vm_error(vm, "Host name too long"); + goto fail; + } + + http->host = u.host; + http->port = u.port; http->response.url = request.url; http->buffer_size = http->conf->buffer_size; http->max_response_body_size = http->conf->max_response_body_size; @@ -681,18 +684,22 @@ continue; } + if (h[i].key.len == 10 + && ngx_strncasecmp(h[i].key.data, (u_char *) "Connection", 10) + == 0) + { + continue; + } + njs_chb_append(&http->chain, h[i].key.data, h[i].key.len); njs_chb_append_literal(&http->chain, ": "); njs_chb_append(&http->chain, h[i].value.data, h[i].value.len); njs_chb_append_literal(&http->chain, CRLF); } - njs_chb_append_literal(&http->chain, "Connection: close" CRLF); - -#if (NGX_SSL) - http->tls_name.data = u.host.data; - http->tls_name.len = u.host.len; -#endif + if (!http->keepalive) { + njs_chb_append_literal(&http->chain, "Connection: close" CRLF); + } if (request.body.len != 0) { njs_chb_sprintf(&http->chain, 32, "Content-Length: %uz" CRLF CRLF, @@ -1154,7 +1161,8 @@ http->log = log; http->conf = conf; - http->http_parse.content_length_n = -1; + http->content_length_n = -1; + http->keepalive = (conf->fetch_keepalive > 0); http->append_headers = ngx_js_fetch_append_headers; http->ready_handler = ngx_js_fetch_process_done;
--- a/nginx/ngx_js_http.c Fri Sep 12 19:10:32 2025 -0700 +++ b/nginx/ngx_js_http.c Wed Sep 03 20:27:16 2025 -0700 @@ -7,19 +7,35 @@ */ -#include <ngx_config.h> -#include <ngx_core.h> -#include <ngx_event.h> -#include <ngx_event_connect.h> #include "ngx_js.h" #include "ngx_js_http.h" +typedef struct { + ngx_js_loc_conf_t *conf; + ngx_queue_t queue; + ngx_connection_t *connection; + + ngx_flag_t ssl; + size_t host_len; + u_char host[NGX_JS_HOST_MAX_LEN]; + in_port_t port; +} ngx_js_http_keepalive_cache_t; + + +#define ngx_js_http_version(major, minor) ((major) * 1000 + (minor)) + + static void ngx_js_http_resolve_handler(ngx_resolver_ctx_t *ctx); static void ngx_js_http_next(ngx_js_http_t *http); static void ngx_js_http_write_handler(ngx_event_t *wev); static void ngx_js_http_read_handler(ngx_event_t *rev); static void ngx_js_http_dummy_handler(ngx_event_t *ev); +static void ngx_js_http_keepalive_close_handler(ngx_event_t *ev); +static void ngx_js_http_keepalive_dummy_handler(ngx_event_t *ev); + +static ngx_int_t ngx_js_http_get_keepalive_connection(ngx_js_http_t *http); +static ngx_int_t ngx_js_http_free_keepalive_connection(ngx_js_http_t *http); static ngx_int_t ngx_js_http_process_status_line(ngx_js_http_t *http); static ngx_int_t ngx_js_http_process_headers(ngx_js_http_t *http); @@ -177,12 +193,13 @@ static void ngx_js_http_close_connection(ngx_connection_t *c) { - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, - "js http close connection: %d", c->fd); + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, + "js http close connection: %p:%d", c, c->fd); #if (NGX_SSL) if (c->ssl) { c->ssl->no_wait_shutdown = 1; + c->ssl->no_send_shutdown = 1; if (ngx_ssl_shutdown(c) == NGX_AGAIN) { c->ssl->handler = ngx_js_http_close_connection; @@ -193,6 +210,7 @@ c->destroyed = 1; + ngx_destroy_pool(c->pool); ngx_close_connection(c); } @@ -210,18 +228,32 @@ void ngx_js_http_close_peer(ngx_js_http_t *http) { - if (http->peer.connection != NULL) { + if (http->peer.connection == NULL) { + return; + } + + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, http->log, 0, + "js http close peer"); + + if (http->keepalive) { + if (ngx_js_http_free_keepalive_connection(http) != NGX_OK) { + ngx_js_http_close_connection(http->peer.connection); + } + + } else { ngx_js_http_close_connection(http->peer.connection); - http->peer.connection = NULL; } + + http->peer.connection = NULL; } void ngx_js_http_connect(ngx_js_http_t *http) { - ngx_int_t rc; - ngx_addr_t *addr; + ngx_int_t rc; + ngx_addr_t *addr; + ngx_connection_t *c; addr = &http->addrs[http->naddr]; @@ -235,38 +267,51 @@ http->peer.log = http->log; http->peer.log_error = NGX_ERROR_ERR; - rc = ngx_event_connect_peer(&http->peer); + rc = ngx_js_http_get_keepalive_connection(http); + if (rc != NGX_OK) { + rc = ngx_event_connect_peer(&http->peer); + if (rc == NGX_ERROR) { + ngx_js_http_error(http, "connect failed"); + return; + } - if (rc == NGX_ERROR) { - ngx_js_http_error(http, "connect failed"); - return; + if (rc == NGX_BUSY || rc == NGX_DECLINED) { + ngx_js_http_next(http); + return; + } } - if (rc == NGX_BUSY || rc == NGX_DECLINED) { - ngx_js_http_next(http); - return; + c = http->peer.connection; + + c->requests++; + c->data = http; + + if (c->pool == NULL) { + /* we need separate pool here to be able to cache SSL connections */ + c->pool = ngx_create_pool(128, http->log); + if (c->pool == NULL) { + ngx_js_http_error(http, "create pool failed"); + return; + } } - http->peer.connection->data = http; - http->peer.connection->pool = http->pool; - - http->peer.connection->write->handler = ngx_js_http_write_handler; - http->peer.connection->read->handler = ngx_js_http_read_handler; + c->write->handler = ngx_js_http_write_handler; + c->read->handler = ngx_js_http_read_handler; http->process = ngx_js_http_process_status_line; - ngx_add_timer(http->peer.connection->read, http->conf->timeout); - ngx_add_timer(http->peer.connection->write, http->conf->timeout); + ngx_add_timer(c->read, http->conf->timeout); + ngx_add_timer(c->write, http->conf->timeout); #if (NGX_SSL) - if (http->ssl != NULL && http->peer.connection->ssl == NULL) { + if (http->ssl != NULL && c->ssl == NULL) { ngx_js_http_ssl_init_connection(http); return; } #endif if (rc == NGX_OK) { - ngx_js_http_write_handler(http->peer.connection->write); + ngx_js_http_write_handler(c->write); } } @@ -346,10 +391,10 @@ goto failed; } - if (ngx_ssl_check_host(c, &http->tls_name) != NGX_OK) { + if (ngx_ssl_check_host(c, &http->host) != NGX_OK) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "js http SSL certificate does not match \"%V\"", - &http->tls_name); + &http->host); goto failed; } } @@ -380,7 +425,7 @@ u_char *p; /* as per RFC 6066, literal IPv4 and IPv6 addresses are not permitted */ - ngx_str_t *name = &http->tls_name; + ngx_str_t *name = &http->host; if (name->len == 0 || *name->data == '[') { goto done; @@ -571,6 +616,10 @@ return; } + if (rc == NGX_DONE) { + break; + } + continue; } @@ -631,6 +680,10 @@ http->response.status_text.len = hp->status_text_end - hp->status_text; http->process = ngx_js_http_process_headers; + if (http->keepalive) { + http->keepalive = (hp->http_version >= ngx_js_http_version(1, 1)); + } + return http->process(http); } @@ -694,21 +747,33 @@ && ngx_strncasecmp(hp->header_start, (u_char *) "chunked", vlen) == 0) { - hp->chunked = 1; + http->chunked = 1; + } + + if (len == (sizeof("Connection") - 1) + && ngx_strncasecmp(hp->header_name_start, + (u_char *) "Connection", len) == 0) + { + if (vlen == (sizeof("close") - 1) + && ngx_strncasecmp(hp->header_start, (u_char *) "close", + vlen) == 0) + { + http->keepalive = 0; + } } if (len == (sizeof("Content-Length") - 1) && ngx_strncasecmp(hp->header_name_start, (u_char *) "Content-Length", len) == 0) { - hp->content_length_n = ngx_atoof(hp->header_start, vlen); - if (hp->content_length_n == NGX_ERROR) { + http->content_length_n = ngx_atoof(hp->header_start, vlen); + if (http->content_length_n == NGX_ERROR) { ngx_js_http_error(http, "invalid http content length"); return NGX_ERROR; } if (!http->header_only - && hp->content_length_n + && http->content_length_n > (off_t) http->max_response_body_size) { ngx_js_http_error(http, @@ -762,22 +827,22 @@ } if (!http->header_only - && http->http_parse.chunked - && http->http_parse.content_length_n == -1) + && http->chunked + && http->content_length_n == -1) { ngx_js_http_error(http, "invalid http chunked response"); return NGX_ERROR; } if (http->header_only - || http->http_parse.content_length_n == -1 - || size == http->http_parse.content_length_n) + || http->content_length_n == -1 + || size == http->content_length_n) { http->ready_handler(http); return NGX_DONE; } - if (size < http->http_parse.content_length_n) { + if (size < http->content_length_n) { return NGX_AGAIN; } @@ -787,7 +852,7 @@ b = http->buffer; - if (http->http_parse.chunked) { + if (http->chunked) { rc = ngx_js_http_parse_chunked(&http->http_chunk_parse, b, &http->response.chain); if (rc == NGX_ERROR) { @@ -798,7 +863,7 @@ size = njs_chb_size(&http->response.chain); if (rc == NGX_OK) { - http->http_parse.content_length_n = size; + http->content_length_n = size; } if (size > http->max_response_body_size * 10) { @@ -814,11 +879,11 @@ if (http->header_only) { need = 0; - } else if (http->http_parse.content_length_n == -1) { + } else if (http->content_length_n == -1) { need = http->max_response_body_size - size; } else { - need = http->http_parse.content_length_n - size; + need = http->content_length_n - size; } chsize = ngx_min(need, b->last - b->pos); @@ -1074,7 +1139,7 @@ b->pos = p + 1; hp->state = sw_start; - hp->http_version = hp->http_major * 1000 + hp->http_minor; + hp->http_version = ngx_js_http_version(hp->http_major, hp->http_minor); return NGX_OK; } @@ -1549,3 +1614,242 @@ return NGX_OK; } + + +static ngx_int_t +ngx_js_http_get_keepalive_connection(ngx_js_http_t *http) +{ + ngx_str_t *host; + ngx_queue_t *q; + ngx_connection_t *c; + ngx_js_loc_conf_t *conf; + ngx_js_http_keepalive_cache_t *cache; + + if (!http->keepalive) { + return NGX_DECLINED; + } + + conf = http->conf; + + host = &http->host; + + for (q = ngx_queue_head(&conf->fetch_keepalive_cache); + q != ngx_queue_sentinel(&conf->fetch_keepalive_cache); + q = ngx_queue_next(q)) + { + cache = ngx_queue_data(q, ngx_js_http_keepalive_cache_t, queue); + + if (host->len != cache->host_len) { + continue; + } + + if ((http->ssl != NULL) != (cache->ssl != 0)) { + continue; + } + + if (ngx_strncasecmp(host->data, cache->host, host->len) != 0) { + continue; + } + + if (http->port != cache->port) { + continue; + } + + c = cache->connection; + ngx_queue_remove(q); + ngx_queue_insert_head(&conf->fetch_keepalive_free, q); + + goto found; + } + + return NGX_DECLINED; + +found: + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, http->log, 0, + "js http keepalive using cached connection: %p:%d", + c, c->fd); + + c->idle = 0; + c->sent = 0; + c->data = NULL; + c->log = http->log; + c->pool->log = http->log; + c->read->log = http->log; + c->write->log = http->log; + + if (c->read->timer_set) { + ngx_del_timer(c->read); + } + + http->peer.cached = 1; + http->peer.connection = c; + + return NGX_OK; +} + + +static ngx_int_t +ngx_js_http_free_keepalive_connection(ngx_js_http_t *http) +{ + ngx_uint_t i; + ngx_queue_t *q; + ngx_connection_t *c; + ngx_js_loc_conf_t *conf; + ngx_js_http_keepalive_cache_t *cache; + + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, http->log, 0, + "js http free keepalive connection"); + + c = http->peer.connection; + + if (c == NULL + || c->read->eof + || c->read->error + || c->read->timedout + || c->write->error + || c->write->timedout) + { + return NGX_ERROR; + } + + if (c->requests >= http->conf->fetch_keepalive_requests) { + return NGX_DONE; + } + + if (ngx_current_msec - c->start_time > http->conf->fetch_keepalive_time) { + return NGX_DONE; + } + + if (ngx_terminate || ngx_exiting) { + return NGX_DONE; + } + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + return NGX_ERROR; + } + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, http->log, 0, + "js http free keepalive connection, " + "saving connection: %p:%d", c, c->fd); + + conf = http->conf; + + if (ngx_queue_empty(&conf->fetch_keepalive_cache) + && ngx_queue_empty(&conf->fetch_keepalive_free)) + { + cache = ngx_pcalloc(ngx_cycle->pool, + sizeof(ngx_js_http_keepalive_cache_t) * conf->fetch_keepalive); + if (cache == NULL) { + return NGX_ERROR; + } + + for (i = 0; i < conf->fetch_keepalive; i++) { + ngx_queue_insert_head(&conf->fetch_keepalive_free, + &cache[i].queue); + cache[i].conf = conf; + } + } + + if (ngx_queue_empty(&conf->fetch_keepalive_free)) { + /* evict from cache */ + q = ngx_queue_last(&conf->fetch_keepalive_cache); + ngx_queue_remove(q); + + cache = ngx_queue_data(q, ngx_js_http_keepalive_cache_t, queue); + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, http->log, 0, + "js http free keepalive connection, evicting: %d", + cache->connection->fd); + ngx_js_http_close_connection(cache->connection); + + } else { + q = ngx_queue_head(&conf->fetch_keepalive_free); + ngx_queue_remove(q); + cache = ngx_queue_data(q, ngx_js_http_keepalive_cache_t, queue); + } + + ngx_queue_insert_head(&conf->fetch_keepalive_cache, q); + + c = http->peer.connection; + http->peer.connection = NULL; + + cache->connection = c; + + cache->ssl = (http->ssl != NULL); + ngx_memcpy(cache->host, http->host.data, http->host.len); + cache->host_len = http->host.len; + cache->port = http->port; + + c->read->delayed = 0; + ngx_add_timer(c->read, conf->fetch_keepalive_timeout); + + if (c->write->timer_set) { + ngx_del_timer(c->write); + } + + c->data = cache; + c->write->handler = ngx_js_http_keepalive_dummy_handler; + c->read->handler = ngx_js_http_keepalive_close_handler; + + c->idle = 1; + c->log = ngx_cycle->log; + c->pool->log = ngx_cycle->log; + c->read->log = ngx_cycle->log; + c->write->log = ngx_cycle->log; + + if (c->read->ready) { + ngx_js_http_keepalive_close_handler(c->read); + } + + return NGX_OK; +} + + +static void +ngx_js_http_keepalive_dummy_handler(ngx_event_t *ev) +{ + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, + "js http keepalive dummy handler"); +} + + +static void +ngx_js_http_keepalive_close_handler(ngx_event_t *ev) +{ + ssize_t n; + ngx_connection_t *c; + ngx_js_loc_conf_t *conf; + ngx_js_http_keepalive_cache_t *cache; + u_char buf[1]; + + c = ev->data; + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0, + "js http keepalive close handler: %d", c->fd); + + if (c->close || ev->timedout) { + goto close; + } + + n = recv(c->fd, buf, 1, MSG_PEEK); + + if (n == -1 && ngx_socket_errno == NGX_EAGAIN) { + ev->ready = 0; + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + goto close; + } + + return; + } + +close: + + cache = c->data; + conf = cache->conf; + + ngx_js_http_close_connection(c); + + ngx_queue_remove(&cache->queue); + ngx_queue_insert_head(&conf->fetch_keepalive_free, &cache->queue); +}
--- a/nginx/ngx_js_http.h Fri Sep 12 19:10:32 2025 -0700 +++ b/nginx/ngx_js_http.h Wed Sep 03 20:27:16 2025 -0700 @@ -11,9 +11,11 @@ #define _NGX_JS_HTTP_H_INCLUDED_ +#define NGX_JS_HOST_MAX_LEN 256 + + typedef struct ngx_js_http_s ngx_js_http_t; - typedef struct { ngx_uint_t state; unsigned http_major:16; @@ -23,8 +25,6 @@ u_char *status_text; u_char *status_text_end; ngx_uint_t count; - ngx_flag_t chunked; - off_t content_length_n; u_char *header_name_start; u_char *header_name_end; @@ -114,6 +114,7 @@ ngx_addr_t *addrs; ngx_uint_t naddrs; ngx_uint_t naddr; + ngx_str_t host; in_port_t port; ngx_peer_connection_t peer; @@ -124,8 +125,11 @@ unsigned header_only; + ngx_flag_t chunked; + ngx_flag_t keepalive; + off_t content_length_n; + #if (NGX_SSL) - ngx_str_t tls_name; ngx_ssl_t *ssl; njs_bool_t ssl_verify; #endif
--- a/nginx/ngx_qjs_fetch.c Fri Sep 12 19:10:32 2025 -0700 +++ b/nginx/ngx_qjs_fetch.c Wed Sep 03 20:27:16 2025 -0700 @@ -5,10 +5,6 @@ */ -#include <ngx_config.h> -#include <ngx_core.h> -#include <ngx_event.h> -#include <ngx_event_connect.h> #include "ngx_js.h" #include "ngx_js_http.h" @@ -270,7 +266,14 @@ goto fail; } + if (u.host.len >= NGX_JS_HOST_MAX_LEN) { + JS_ThrowInternalError(cx, "Host name too long"); + goto fail; + } + http = &fetch->http; + http->host = u.host; + http->port = u.port; http->response.url = request.url; http->buffer_size = ngx_qjs_external_buffer_size(cx, external); http->max_response_body_size = @@ -418,18 +421,22 @@ continue; } + if (h[i].key.len == 10 + && ngx_strncasecmp(h[i].key.data, (u_char *) "Connection", 10) + == 0) + { + continue; + } + njs_chb_append(&http->chain, h[i].key.data, h[i].key.len); njs_chb_append_literal(&http->chain, ": "); njs_chb_append(&http->chain, h[i].value.data, h[i].value.len); njs_chb_append_literal(&http->chain, CRLF); } - njs_chb_append_literal(&http->chain, "Connection: close" CRLF); - -#if (NGX_SSL) - http->tls_name.data = u.host.data; - http->tls_name.len = u.host.len; -#endif + if (!http->keepalive) { + njs_chb_append_literal(&http->chain, "Connection: close" CRLF); + } if (request.body.len != 0) { njs_chb_sprintf(&http->chain, 32, "Content-Length: %uz" CRLF CRLF, @@ -1213,7 +1220,8 @@ http->conf = conf; - http->http_parse.content_length_n = -1; + http->content_length_n = -1; + http->keepalive = (conf->fetch_keepalive > 0); ngx_qjs_arg(http->response.header_value) = JS_UNDEFINED;
--- a/nginx/ngx_stream_js_module.c Fri Sep 12 19:10:32 2025 -0700 +++ b/nginx/ngx_stream_js_module.c Wed Sep 03 20:27:16 2025 -0700 @@ -351,6 +351,34 @@ offsetof(ngx_stream_js_srv_conf_t, timeout), NULL }, + { ngx_string("js_fetch_keepalive"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_js_srv_conf_t, fetch_keepalive), + NULL }, + + { ngx_string("js_fetch_keepalive_requests"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_js_srv_conf_t, fetch_keepalive_requests), + NULL }, + + { ngx_string("js_fetch_keepalive_time"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_js_srv_conf_t, fetch_keepalive_time), + NULL }, + + { ngx_string("js_fetch_keepalive_timeout"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_js_srv_conf_t, fetch_keepalive_timeout), + NULL }, + #if (NGX_STREAM_SSL) { ngx_string("js_fetch_ciphers"),
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/nginx/t/js_fetch_https_keepalive.t Wed Sep 03 20:27:16 2025 -0700 @@ -0,0 +1,345 @@ +#!/usr/bin/perl + +# (C) Dmitry Volyntsev +# (C) F5, Inc. + +# Tests for http njs module, fetch method, https keepalive support. + +############################################################################### + +use warnings; +use strict; + +use Test::More; + +BEGIN { use FindBin; chdir($FindBin::Bin); } + +use lib 'lib'; +use Test::Nginx; + +############################################################################### + +select STDERR; $| = 1; +select STDOUT; $| = 1; + +my $t = Test::Nginx->new()->has(qw/http http_ssl rewrite/) + ->write_file_expand('nginx.conf', <<'EOF'); + +%%TEST_GLOBALS%% + +daemon off; + +events { +} + +http { + %%TEST_GLOBALS_HTTP%% + + js_import test.js; + + server { + listen 127.0.0.1:8080; + server_name localhost; + + resolver 127.0.0.1:%%PORT_8981_UDP%%; + resolver_timeout 1s; + + location /njs { + js_content test.njs; + } + + location /https { + js_content test.https; + + js_fetch_keepalive 4; + js_fetch_ciphers HIGH:!aNull:!MD5; + js_fetch_protocols TLSv1.1 TLSv1.2; + js_fetch_trusted_certificate myca.crt; + } + + location /sni_isolation { + js_content test.sni_isolation; + + js_fetch_keepalive 4; + js_fetch_ciphers HIGH:!aNull:!MD5; + js_fetch_protocols TLSv1.1 TLSv1.2; + js_fetch_trusted_certificate myca.crt; + } + + location /plain_vs_https_isolation { + js_content test.plain_vs_https_isolation; + + js_fetch_keepalive 4; + js_fetch_ciphers HIGH:!aNull:!MD5; + js_fetch_protocols TLSv1.1 TLSv1.2; + js_fetch_trusted_certificate myca.crt; + } + } + + server { + listen 127.0.0.1:8081 ssl; + server_name ka.example.com; + + keepalive_requests 100; + + ssl_certificate ka.example.com.chained.crt; + ssl_certificate_key ka.example.com.key; + + location /loc { + return 200 CONN:$connection_requests; + } + } + + server { + listen 127.0.0.1:8081 ssl; + server_name 1.example.com; + + ssl_certificate 1.example.com.chained.crt; + ssl_certificate_key 1.example.com.key; + + location /loc { + return 200 "You are at 1.example.com."; + } + } + + server { + listen 127.0.0.1:8082; + server_name plain.example.com; + + keepalive_requests 100; + + location /loc { + return 200 PLAIN:$connection_requests; + } + } +} + +EOF + +my $p1 = port(8081); +my $p2 = port(8082); + +$t->write_file('test.js', <<EOF); + function test_njs(r) { + r.return(200, njs.version); + } + + function https(r) { + var url = `https://\${r.args.domain}:$p1/loc`; + var opt = {}; + + if (r.args.verify != null && r.args.verify == "false") { + opt.verify = false; + } + + ngx.fetch(url, opt) + .then(reply => reply.text()) + .then(body => r.return(200, body)) + .catch(e => r.return(501, e.message)) + } + + async function sni_isolation(r) { + try { + let resp = await ngx.fetch(`https://ka.example.com:$p1/loc`); + let body1 = await resp.text(); + + resp = await ngx.fetch(`https://1.example.com:$p1/loc`); + let body2 = await resp.text(); + + resp = await ngx.fetch(`https://ka.example.com:$p1/loc`); + let body3 = await resp.text(); + + r.return(200, `\${body1}|\${body2}|\${body3}`); + + } catch (e) { + r.return(501, e.message); + } + } + + async function plain_vs_https_isolation(r) { + try { + let resp = await ngx.fetch(`https://ka.example.com:$p1/loc`); + let body1 = await resp.text(); + + resp = await ngx.fetch(`http://plain.example.com:$p2/loc`); + let body2 = await resp.text(); + + resp = await ngx.fetch(`https://ka.example.com:$p1/loc`); + let body3 = await resp.text(); + + r.return(200, `\${body1}|\${body2}|\${body3}`); + + } catch (e) { + r.return(501, e.message); + } + } + + export default {njs: test_njs, https, sni_isolation, + plain_vs_https_isolation}; +EOF + +my $d = $t->testdir(); + +$t->write_file('openssl.conf', <<EOF); +[ req ] +default_bits = 2048 +encrypt_key = no +distinguished_name = req_distinguished_name +x509_extensions = myca_extensions +[ req_distinguished_name ] +[ myca_extensions ] +basicConstraints = critical,CA:TRUE +EOF + +$t->write_file('myca.conf', <<EOF); +[ ca ] +default_ca = myca + +[ myca ] +new_certs_dir = $d +database = $d/certindex +default_md = sha256 +policy = myca_policy +serial = $d/certserial +default_days = 1 +x509_extensions = myca_extensions + +[ myca_policy ] +commonName = supplied + +[ myca_extensions ] +basicConstraints = critical,CA:TRUE +EOF + +system('openssl req -x509 -new ' + . "-config $d/openssl.conf -subj /CN=myca/ " + . "-out $d/myca.crt -keyout $d/myca.key " + . ">>$d/openssl.out 2>&1") == 0 + or die "Can't create self-signed certificate for CA: $!\n"; + +foreach my $name ('intermediate', '1.example.com', 'ka.example.com') { + system("openssl req -new " + . "-config $d/openssl.conf -subj /CN=$name/ " + . "-out $d/$name.csr -keyout $d/$name.key " + . ">>$d/openssl.out 2>&1") == 0 + or die "Can't create certificate signing req for $name: $!\n"; +} + +$t->write_file('certserial', '1000'); +$t->write_file('certindex', ''); + +system("openssl ca -batch -config $d/myca.conf " + . "-keyfile $d/myca.key -cert $d/myca.crt " + . "-subj /CN=intermediate/ -in $d/intermediate.csr " + . "-out $d/intermediate.crt " + . ">>$d/openssl.out 2>&1") == 0 + or die "Can't sign certificate for intermediate: $!\n"; + +foreach my $name ('1.example.com', 'ka.example.com') { + system("openssl ca -batch -config $d/myca.conf " + . "-keyfile $d/intermediate.key -cert $d/intermediate.crt " + . "-subj /CN=$name/ -in $d/$name.csr -out $d/$name.crt " + . ">>$d/openssl.out 2>&1") == 0 + or die "Can't sign certificate for $name $!\n"; + $t->write_file("$name.chained.crt", $t->read_file("$name.crt") + . $t->read_file('intermediate.crt')); +} + +$t->try_run('no njs.fetch'); + +$t->plan(5); + +$t->run_daemon(\&dns_daemon, port(8981), $t); +$t->waitforfile($t->testdir . '/' . port(8981)); + +############################################################################### + +like(http_get('/https?domain=localhost'), + qr/connect failed/s, 'fetch https wrong CN certificate'); +like(http_get('/https?domain=ka.example.com'), + qr/CONN:1$/s, 'fetch https keepalive'); +like(http_get('/https?domain=ka.example.com'), + qr/CONN:2$/s, 'fetch https keepalive reused'); +like(http_get('/sni_isolation'), + qr/CONN:1\|You are at 1\.example\.com\.\|CONN:2$/s, + 'fetch https keepalive SNI isolation'); +like(http_get('/plain_vs_https_isolation'), + qr/CONN:1\|PLAIN:1\|CONN:2$/s, + 'fetch https->plain->https keepalive isolation'); + +############################################################################### + +sub reply_handler { + my ($recv_data, $port, %extra) = @_; + + my (@name, @rdata); + + use constant NOERROR => 0; + use constant A => 1; + use constant IN => 1; + + # default values + + my ($hdr, $rcode, $ttl) = (0x8180, NOERROR, 3600); + + # decode name + + my ($len, $offset) = (undef, 12); + while (1) { + $len = unpack("\@$offset C", $recv_data); + last if $len == 0; + $offset++; + push @name, unpack("\@$offset A$len", $recv_data); + $offset += $len; + } + + $offset -= 1; + my ($id, $type, $class) = unpack("n x$offset n2", $recv_data); + + my $name = join('.', @name); + + if ($type == A) { + push @rdata, rd_addr($ttl, '127.0.0.1'); + } + + $len = @name; + pack("n6 (C/a*)$len x n2", $id, $hdr | $rcode, 1, scalar @rdata, + 0, 0, @name, $type, $class) . join('', @rdata); +} + +sub rd_addr { + my ($ttl, $addr) = @_; + + my $code = 'split(/\./, $addr)'; + + return pack 'n3N', 0xc00c, A, IN, $ttl if $addr eq ''; + + pack 'n3N nC4', 0xc00c, A, IN, $ttl, eval "scalar $code", eval($code); +} + +sub dns_daemon { + my ($port, $t) = @_; + + my ($data, $recv_data); + my $socket = IO::Socket::INET->new( + LocalAddr => '127.0.0.1', + LocalPort => $port, + Proto => 'udp', + ) + or die "Can't create listening socket: $!\n"; + + local $SIG{PIPE} = 'IGNORE'; + + # signal we are ready + + open my $fh, '>', $t->testdir() . '/' . $port; + close $fh; + + while (1) { + $socket->recv($recv_data, 65536); + $data = reply_handler($recv_data, $port); + $socket->send($data); + } +} + +###############################################################################
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/nginx/t/js_fetch_keepalive.t Wed Sep 03 20:27:16 2025 -0700 @@ -0,0 +1,289 @@ +#!/usr/bin/perl + +# (C) Dmitry Volyntsev +# (C) F5, Inc. + +# Tests for http njs module, fetch method keepalive. + +############################################################################### + +use warnings; +use strict; + +use Test::More; +use IO::Socket::INET; + +use Socket qw/ CRLF /; + +BEGIN { use FindBin; chdir($FindBin::Bin); } + +use lib 'lib'; +use Test::Nginx; + +############################################################################### + +select STDERR; $| = 1; +select STDOUT; $| = 1; + +my $t = Test::Nginx->new()->has(qw/http/) + ->write_file_expand('nginx.conf', <<'EOF'); + +%%TEST_GLOBALS%% + +daemon off; + +events { +} + +http { + %%TEST_GLOBALS_HTTP%% + + js_import test.js; + + server { + listen 127.0.0.1:8080; + server_name localhost; + + location /engine { + js_content test.engine; + } + + location /keepalive { + js_fetch_keepalive 4; + js_fetch_keepalive_requests 100; + js_fetch_keepalive_time 60s; + js_fetch_keepalive_timeout 60s; + js_content test.keepalive; + } + + location /keepalive_simultaneous { + js_fetch_keepalive 4; + js_content test.keepalive_simultaneous; + } + + location /keepalive_requests { + js_fetch_keepalive 4; + js_fetch_keepalive_requests 2; + js_content test.keepalive; + } + + location /keepalive_time { + js_fetch_keepalive 4; + js_fetch_keepalive_time 100ms; + js_content test.keepalive; + } + + location /keepalive_timeout { + js_fetch_keepalive 4; + js_fetch_keepalive_timeout 100ms; + js_content test.keepalive; + } + + location /no_keepalive { + js_fetch_keepalive 0; + js_content test.keepalive; + } + } + + server { + listen 127.0.0.1:8081; + keepalive_requests 100; + keepalive_timeout 60s; + + location /count { + add_header Connection-ID $connection_requests; + return 200 $connection_requests; + } + + location /count_close { + add_header Connection close; + add_header Connection-ID $connection_requests; + return 200 $connection_requests; + } + + location /count_close_mixed { + add_header cOnNeCtiOn ClOsE; + add_header Connection-ID $connection_requests; + return 200 $connection_requests; + } + } +} + +EOF + +my $p1 = port(8081); +my $p2 = port(8082); + +$t->write_file('test.js', <<EOF); + function engine(r) { + r.return(200, njs.engine); + } + + function sleep(milliseconds) { + return new Promise(resolve => setTimeout(resolve, milliseconds)); + } + + async function keepalive(r) { + const path = r.args.path; + let port = $p1; + if (r.args.port) { + port = r.args.port; + } + + let responses = []; + for (let i = 0; i < 3; i++) { + let resp = await ngx.fetch(`http://127.0.0.1:\${port}/\${path}`) + .then(resp => resp.text()) + .catch(err => err.message); + responses.push(resp.trim()); + + if (r.args.sleep) { + await sleep(Number(r.args.sleep)); + } + } + + r.return(200, responses.toString()); + } + + async function keepalive_simultaneous(r) { + let promises = []; + for (let i = 0; i < Number(r.args.n); i++) { + promises.push(ngx.fetch('http://127.0.0.1:$p1/count')); + } + + let results = await Promise.all(promises); + let bodies = await Promise.all(results.map(r => r.text())); + let responses = bodies.map(b => parseInt(b.trim())); + + r.return(200, JSON.stringify(responses)); + } + + export default {engine, keepalive, keepalive_simultaneous}; +EOF + +$t->try_run('no js_fetch_keepalive'); + +$t->run_daemon(\&http_daemon, $p2); +$t->waitforsocket('127.0.0.1:' . $p2); + +$t->plan(16); + +############################################################################### + +like(http_get('/no_keepalive?path=count'), qr/1,1,1/, + 'no keepalive connections'); +like(http_get('/keepalive?path=count_close'), qr/1,1,1/, + 'upstream Connection: close (HTTP/1.1)'); +like(http_get('/keepalive?path=count_close_mixed'), qr/1,1,1/, + 'upstream Connection: close, mixed-case (HTTP/1.1)'); +like(http_get('/keepalive?path=count'), qr/1,2,3/, + 'keepalive reuses connection'); +like(http_get('/keepalive?path=count'), qr/4,5,6/, + 'keepalive reuses connection across requests'); +like(http_get('/keepalive_simultaneous?n=8'), qr/1,1,1,1,1,1,1,1/, + 'keepalive simultaneous requests'); +like(http_get('/keepalive_simultaneous?n=8'), qr/2,2,2,2,1,1,1,1/, + 'keepalive simultaneous requests reused connections'); +like(http_get('/keepalive_requests?path=count'), qr/1,2,1/, + 'keepalive with limited requests per connection'); + +like(http_get('/keepalive_time?path=count'), qr/1,2,3/, + 'keepalive with time limit, first round'); + +select undef, undef, undef, 0.15; + +like(http_get('/keepalive_time?path=count'), qr/4,1,2/, + 'keepalive with time limit, second round'); + +like(http_get('/keepalive_timeout?path=count'), qr/1,2,3/, + 'keepalive with timeout limit, first round'); + +select undef, undef, undef, 0.15; + +like(http_get('/keepalive_timeout?path=count'), qr/1,2,3/, + 'keepalive with timeout limit, second round'); + +like(http_get("/keepalive?path=broken_keepalive&port=$p2&sleep=1"), qr/1,1,1/, + 'upstream broken keepalive (connection closed by upstream)'); +like(http_get("/keepalive?path=http10&port=$p2"), qr/1,1,1/, + 'upstream HTTP/1.0 (no keepalive)'); +like(http_get("/keepalive?path=count&port=$p2&sleep=1"), qr/1,2,3/, + 'normal keepalive'); +like(http_get("/keepalive?path=assumed_keepalive&port=$p2&sleep=1"), qr/4,5,6/, + 'assumed keepalive'); + +############################################################################### + +sub http_daemon { + my $port = shift; + + my $server = IO::Socket::INET->new( + Proto => 'tcp', + LocalAddr => '127.0.0.1:' . $port, + Listen => 5, + Reuse => 1 + ) or die "Can't create listening socket: $!\n"; + + my $ccount = 0; + my $rcount = 0; + + # dumb server which is able to keep connections alive + + while (my $client = $server->accept()) { + Test::Nginx::log_core('||', + "connection from " . $client->peerhost()); + $client->autoflush(1); + $ccount++; + $rcount = 0; + + while (1) { + my $headers = ''; + my $uri = ''; + + while (<$client>) { + Test::Nginx::log_core('||', $_); + $headers .= $_; + last if (/^\x0d?\x0a?$/); + } + + last if $headers eq ''; + $rcount++; + + $uri = $1 if $headers =~ /^\S+\s+([^ ]+)\s+HTTP/i; + my $body = $rcount; + + if ($uri eq '/broken_keepalive') { + print $client + "HTTP/1.1 200 OK" . CRLF . + "Content-Length: " . length($body) . CRLF . + "Connection: keep-alive" . CRLF . CRLF . + $body; + + last; + + } elsif ($uri eq '/assumed_keepalive') { + print $client + "HTTP/1.1 200 OK" . CRLF . + "Content-Length: " . length($body) . CRLF . CRLF . + $body; + + } elsif ($uri eq '/count') { + print $client + "HTTP/1.1 200 OK" . CRLF . + "Content-Length: " . length($body) . CRLF . + "Connection: keep-alive" . CRLF . CRLF . + $body; + + } elsif ($uri eq '/http10') { + print $client + "HTTP/1.0 200 OK" . CRLF . + "Content-Length: " . length($body) . CRLF . CRLF . + $body; + } + } + + close $client; + } +} + +###############################################################################
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/nginx/t/stream_js_fetch_keepalive.t Wed Sep 03 20:27:16 2025 -0700 @@ -0,0 +1,200 @@ +#!/usr/bin/perl + +# (C) Dmitry Volyntsev +# (C) F5, Inc. + +# Tests for stream njs module, fetch method keepalive. + +############################################################################### + +use warnings; +use strict; + +use Test::More; +use IO::Socket::INET; + +BEGIN { use FindBin; chdir($FindBin::Bin); } + +use lib 'lib'; +use Test::Nginx; +use Test::Nginx::Stream qw/ stream /; + +############################################################################### + +select STDERR; $| = 1; +select STDOUT; $| = 1; + +my $t = Test::Nginx->new()->has(qw/http stream/) + ->write_file_expand('nginx.conf', <<'EOF'); + +%%TEST_GLOBALS%% + +daemon off; + +events { +} + +http { + %%TEST_GLOBALS_HTTP%% + + js_import test.js; + + server { + listen 127.0.0.1:8080; + server_name localhost; + + location /engine { + js_content test.engine; + } + } + + server { + listen 127.0.0.1:8081; + keepalive_requests 100; + keepalive_timeout 60s; + + location /count { + add_header Connection-ID $connection_requests; + return 200 $connection_requests; + } + + location /headers { + return 200 "Connection: $http_connection"; + } + } +} + +stream { + %%TEST_GLOBALS_STREAM%% + + js_import test.js; + js_var $message; + + server { + listen 127.0.0.1:8082; + js_fetch_keepalive 4; + js_fetch_keepalive_requests 100; + js_fetch_keepalive_time 60s; + js_fetch_keepalive_timeout 60s; + js_preread test.keepalive; + return $message; + } + + server { + listen 127.0.0.1:8083; + js_fetch_keepalive 0; + js_preread test.keepalive; + return $message; + } + + server { + listen 127.0.0.1:8084; + js_fetch_keepalive 4; + js_fetch_keepalive_requests 2; + js_preread test.keepalive; + return $message; + } + + server { + listen 127.0.0.1:8085; + js_fetch_keepalive 4; + js_fetch_keepalive_time 100ms; + js_preread test.keepalive; + return $message; + } + + server { + listen 127.0.0.1:8086; + js_fetch_keepalive 4; + js_fetch_keepalive_timeout 100ms; + js_preread test.keepalive; + return $message; + } + + server { + listen 127.0.0.1:8087; + js_fetch_keepalive 4; + js_preread test.keepalive_simultaneous; + return $message; + } +} + +EOF + +my $p1 = port(8081); + +$t->write_file('test.js', <<EOF); + function engine(r) { + r.return(200, njs.engine); + } + + async function keepalive(s) { + let responses = []; + + for (let i = 0; i < 3; i++) { + let resp = await ngx.fetch('http://127.0.0.1:$p1/count'); + let body = await resp.text(); + responses.push(parseInt(body.trim())); + } + + s.variables.message = JSON.stringify(responses); + s.done(); + } + + async function keepalive_simultaneous(s) { + let promises = []; + let n = 8; + for (let i = 0; i < n; i++) { + promises.push(ngx.fetch('http://127.0.0.1:$p1/count')); + } + + let results = await Promise.all(promises); + let bodies = await Promise.all(results.map(r => r.text())); + let responses = bodies.map(b => parseInt(b.trim())); + + s.variables.message = JSON.stringify(responses); + s.done(); + } + + export default {engine, keepalive, keepalive_simultaneous}; +EOF + +$t->try_run('no stream js_fetch_keepalive'); + +$t->plan(10); + +############################################################################### + +like(stream('127.0.0.1:' . port(8083))->io('GO'), qr/\[1,1,1]/, + 'no keepalive connections'); +like(stream('127.0.0.1:' . port(8082))->io('GO'), qr/\[1,2,3]/, + 'keepalive reuses connection'); +like(stream('127.0.0.1:' . port(8082))->io('GO'), qr/\[4,5,6]/, + 'keepalive reuses connection across sessions'); + +like(stream('127.0.0.1:' . port(8087))->io('GO'), qr/^\[(1,){7}1\]$/, + 'keepalive simultaneous requests'); +like(stream('127.0.0.1:' . port(8087))->io('GO'), + qr/\[2,2,2,2,1,1,1,1\]/, + 'keepalive simultaneous requests reused connections'); + +like(stream('127.0.0.1:' . port(8084))->io('GO'), qr/\[1,2,1]/, + 'keepalive with limited requests per connection'); + +like(stream('127.0.0.1:' . port(8085))->io('GO'), qr/\[1,2,3]/, + 'keepalive with time limit, first round'); + +select undef, undef, undef, 0.15; + +like(stream('127.0.0.1:' . port(8085))->io('GO'), qr/\[4,1,2]/, + 'keepalive with time limit, second round'); + +like(stream('127.0.0.1:' . port(8086))->io('GO'), qr/\[1,2,3]/, + 'keepalive with timeout limit, first round'); + +select undef, undef, undef, 0.15; + +like(stream('127.0.0.1:' . port(8086))->io('GO'), qr/\[1,2,3]/, + 'keepalive with timeout limit, second round'); + +###############################################################################
