From 64c38052b9d4f44d817049a7658f7f1fb15bc665 Mon Sep 17 00:00:00 2001 From: rainingmaster <312841925@qq.com> Date: Mon, 19 Oct 2020 13:07:23 +0800 Subject: [PATCH] feature: support socket in some block phase --- config | 16 +++ src/event/ngx_http_lua_epoll.c | 178 ++++++++++++++++++++++++++++ src/event/ngx_http_lua_kqueue.c | 109 ++++++++++++++++++ src/event/ngx_http_lua_poll.c | 102 ++++++++++++++++ src/ngx_http_lua_event.h | 83 +++++++++++++ src/ngx_http_lua_module.c | 6 + src/ngx_http_lua_socket_tcp.c | 198 +++++++++++++++++++++++++++++++- t/058-tcp-socket.t | 42 +++++++ 8 files changed, 729 insertions(+), 5 deletions(-) create mode 100644 src/event/ngx_http_lua_epoll.c create mode 100644 src/event/ngx_http_lua_kqueue.c create mode 100644 src/event/ngx_http_lua_poll.c create mode 100644 src/ngx_http_lua_event.h diff --git a/config b/config index 4b32d38318..804b44fa83 100644 --- a/config +++ b/config @@ -298,6 +298,21 @@ HTTP_LUA_SRCS=" \ $ngx_addon_dir/src/ngx_http_lua_pipe.c \ " +if [[ $EVENT_MODULES =~ "ngx_epoll_module" ]] +then + HTTP_LUA_SRCS="$HTTP_LUA_SRCS $ngx_addon_dir/src/event/ngx_http_lua_poll.c" +fi + +if [[ $EVENT_MODULES =~ "ngx_poll_module" ]] +then + HTTP_LUA_SRCS="$HTTP_LUA_SRCS $ngx_addon_dir/src/event/ngx_http_lua_poll.c" +fi + +if [[ $EVENT_MODULES =~ "ngx_kqueue_module" ]] +then + HTTP_LUA_SRCS="$HTTP_LUA_SRCS $ngx_addon_dir/src/event/ngx_http_lua_kqueue.c" +fi + HTTP_LUA_DEPS=" \ $ngx_addon_dir/src/ddebug.h \ $ngx_addon_dir/src/ngx_http_lua_autoconf.h \ @@ -355,6 +370,7 @@ HTTP_LUA_DEPS=" \ $ngx_addon_dir/src/ngx_http_lua_log_ringbuf.h \ $ngx_addon_dir/src/ngx_http_lua_input_filters.h \ $ngx_addon_dir/src/ngx_http_lua_pipe.h \ + $ngx_addon_dir/src/ngx_http_lua_event.h \ " # ---------------------------------------- diff --git a/src/event/ngx_http_lua_epoll.c b/src/event/ngx_http_lua_epoll.c new file mode 100644 index 0000000000..86761e989c --- /dev/null +++ b/src/event/ngx_http_lua_epoll.c @@ -0,0 +1,178 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#include +#include +#include +#include "../ngx_http_lua_event.h" + + +static ngx_int_t ngx_http_lua_epoll_init_event(ngx_conf_t *cf); + +static void ngx_http_lua_epoll_set_event(ngx_event_t *ev, ngx_int_t event); + +static ngx_int_t ngx_http_lua_epoll_process_events(ngx_http_request_t *r, + ngx_msec_t timer); + +static int ep = -1; +static struct epoll_event event_list[1]; + +ngx_http_lua_event_actions_t ngx_http_lua_epoll = { + ngx_http_lua_epoll_init_event, + ngx_http_lua_epoll_set_event, + ngx_http_lua_epoll_process_events, +}; + + +static ngx_int_t +ngx_http_lua_epoll_init_event(ngx_conf_t *cf) +{ + ep = epoll_create(cycle->connection_n / 2); + + if (ep == -1) { + ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, + "epoll_create() failed"); + return NGX_ERROR; + } + + return NGX_OK; +} + + +static void +ngx_http_lua_epoll_set_event(ngx_event_t *ev, ngx_int_t event) +{ + int op; + uint32_t events, prev; + ngx_event_t *e; + ngx_connection_t *c; + struct epoll_event ee; + + c = ev->data; + + events = (uint32_t) event; + + if (event == NGX_READ_EVENT) { + e = c->write; + prev = EPOLLOUT; +#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP) + events = EPOLLIN|EPOLLRDHUP; +#endif + + } else { + e = c->read; + prev = EPOLLIN|EPOLLRDHUP; +#if (NGX_WRITE_EVENT != EPOLLOUT) + events = EPOLLOUT; +#endif + } + + if (e->active) { + op = EPOLL_CTL_MOD; + events |= prev; + + } else { + op = EPOLL_CTL_ADD; + } + +#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP) + if (flags & NGX_EXCLUSIVE_EVENT) { + events &= ~EPOLLRDHUP; + } +#endif + + ee.events = events | (uint32_t) flags; + ee.data.ptr = (void *) ((uintptr_t) c | ev->instance); + + if (epoll_ctl(ep, op, c->fd, &ee) == -1) { + ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, + "epoll_ctl(%d, %d) failed", op, c->fd); + return NGX_ERROR; + } + + ev->active = 1; + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_lua_epoll_process_events(ngx_http_request_t *r, ngx_msec_t timer) +{ + int events; + uint32_t revents; + ngx_int_t instance, i; + ngx_uint_t level; + ngx_err_t err; + ngx_event_t *rev, *wev; + ngx_queue_t *queue; + ngx_connection_t *c; + + events = epoll_wait(ep, event_list, (int) nevents, timer); + + err = (events == -1) ? ngx_errno : 0; + + if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { + ngx_time_update(); + } + + if (err) { + ngx_log_error(NGX_LOG_ALERT, r->connection->log, err, "epoll_wait() failed"); + return NGX_ERROR; + } + + if (events == 0) { + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, + "epoll_wait() returned no events without timeout"); + + return NGX_ERROR; + } + + c = event_list[0].data.ptr; + + instance = (uintptr_t) c & 1; + c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); + + revents = event_list[0].events; + + if (revents & (EPOLLERR|EPOLLHUP)) { + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, + "epoll_wait() error on fd:%d ev:%04XD", + c->fd, revents); + + /* + * if the error events were returned, add EPOLLIN and EPOLLOUT + * to handle the events at least in one active handler + */ + + revents |= EPOLLIN|EPOLLOUT; + } + + rev = c->read; + + if ((revents & EPOLLIN) && rev->active) { + +#if (NGX_HAVE_EPOLLRDHUP) + if (revents & EPOLLRDHUP) { + rev->pending_eof = 1; + } +#endif + + rev->ready = 1; + rev->available = -1; + } + + wev = c->write; + + if ((revents & EPOLLOUT) && wev->active) { + wev->ready = 1; +#if (NGX_THREADS) + wev->complete = 1; +#endif + } + + return NGX_OK; +} diff --git a/src/event/ngx_http_lua_kqueue.c b/src/event/ngx_http_lua_kqueue.c new file mode 100644 index 0000000000..589e90dee1 --- /dev/null +++ b/src/event/ngx_http_lua_kqueue.c @@ -0,0 +1,109 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#include +#include +#include +#include "../ngx_http_lua_event.h" + + +static ngx_int_t ngx_http_lua_kqueue_init_event(ngx_conf_t *cf); + +static void ngx_http_lua_kqueue_set_event(ngx_event_t *ev, ngx_int_t event); + +static ngx_int_t ngx_http_lua_kqueue_process_events(ngx_http_request_t *r, + ngx_msec_t timer); + +int ngx_lua_kqueue = -1; +static struct kevent change_list[1]; +static struct kevent event_list[1]; + +ngx_http_lua_event_actions_t ngx_http_lua_kqueue = { + ngx_http_lua_kqueue_init_event, + ngx_http_lua_kqueue_set_event, + ngx_http_lua_kqueue_process_events, +}; + + +static ngx_int_t +ngx_http_lua_kqueue_init_event(ngx_conf_t *cf) +{ + if (ngx_lua_kqueue == -1) { + ngx_lua_kqueue = kqueue(); + + if (ngx_lua_kqueue == -1) { + ngx_conf_log_error(NGX_LOG_ALERT, cf, 0, "kqueue() failed"); + + return NGX_ERROR; + } + } + + return NGX_OK; +} + + +static void +ngx_http_lua_kqueue_set_event(ngx_event_t *ev, ngx_int_t event) +{ + struct kevent *kev; + ngx_connection_t *c; + + c = ev->data; + + ev->active = 1; + + kev = &change_list[0]; + + kev->ident = c->fd; + kev->filter = (short) event; + kev->flags = EV_ADD|EV_ENABLE; + kev->udata = NGX_KQUEUE_UDATA_T ((uintptr_t) ev | ev->instance); +} + + +static ngx_int_t +ngx_http_lua_kqueue_process_events(ngx_http_request_t *r, ngx_msec_t timer) +{ + int events; + struct timespec ts; + ngx_event_t *ev; + ngx_int_t instance; + ngx_err_t err; + + ts.tv_sec = timer / 1000; + ts.tv_nsec = (timer % 1000) * 1000000; + + events = kevent(ngx_lua_kqueue, change_list, 1, event_list, 1, &ts); + + err = (events == -1) ? ngx_errno : 0; + + if (err) { + ngx_log_error(NGX_LOG_ALERT, r->connection->log, err, "kevent() failed"); + + return NGX_ERROR; + } + + if (events == 0) { + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, + "kevent() returned no events without timeout"); + + return NGX_ERROR; + } + + ev = (ngx_event_t *) event_list[0].udata; + instance = (uintptr_t) ev & 1; + ev = (ngx_event_t *) ((uintptr_t) ev & (uintptr_t) ~1); + + ev->available = event_list[0].data; + + if (event_list[0].flags & EV_EOF) { + ev->pending_eof = 1; + } + + ev->ready = 1; + + return NGX_OK; +} diff --git a/src/event/ngx_http_lua_poll.c b/src/event/ngx_http_lua_poll.c new file mode 100644 index 0000000000..c5e7f9e0fd --- /dev/null +++ b/src/event/ngx_http_lua_poll.c @@ -0,0 +1,102 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#include +#include +#include +#include "../ngx_http_lua_event.h" + + +static ngx_int_t ngx_http_lua_poll_init_event(ngx_conf_t *cf); + +static void ngx_http_lua_poll_set_event(ngx_event_t *ev, ngx_int_t event); + +static ngx_int_t ngx_http_lua_poll_process_events(ngx_http_request_t *r, + ngx_msec_t timer); + +static struct pollfd event_list[1]; + +ngx_http_lua_event_actions_t ngx_http_lua_poll = { + ngx_http_lua_poll_init_event, + ngx_http_lua_poll_set_event, + ngx_http_lua_poll_process_events, +}; + + +static ngx_int_t +ngx_http_lua_poll_init_event(ngx_conf_t *cf) +{ + return NGX_OK; +} + + +static void +ngx_http_lua_poll_set_event(ngx_event_t *ev, ngx_int_t event) +{ + ngx_connection_t *c; + + c = ev->data; + + ev->active = 1; + + if (event == NGX_READ_EVENT) { +#if (NGX_READ_EVENT != POLLIN) + event = POLLIN; +#endif + + } else { +#if (NGX_WRITE_EVENT != POLLOUT) + event = POLLOUT; +#endif + } + + event_list[0].fd = c->fd; + event_list[0].events = (short) event; + event_list[0].revents = 0; +} + + +static ngx_int_t +ngx_http_lua_poll_process_events(ngx_http_request_t *r, ngx_msec_t timer) +{ + int ready, revents; + ngx_event_t *ev; + ngx_err_t err; + ngx_connection_t *c; + + ready = poll(event_list, 1, (int) timer); + + err = (ready == -1) ? ngx_errno : 0; + + if (err) { + ngx_log_error(NGX_LOG_ALERT, r->connection->log, err, "poll() failed"); + + return NGX_ERROR; + } + + if (ready == 0) { + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, + "poll() returned no events without timeout"); + + return NGX_ERROR; + } + + revents = event_list[0].revents; + c = ngx_cycle->files[event_list[0].fd]; + + if ((revents & POLLIN) && c->read->active) { + ev = c->read; + ev->ready = 1; + ev->available = -1; + } + + if ((revents & POLLOUT) && c->write->active) { + ev = c->write; + ev->ready = 1; + } + + return NGX_OK; +} diff --git a/src/ngx_http_lua_event.h b/src/ngx_http_lua_event.h new file mode 100644 index 0000000000..ef18467988 --- /dev/null +++ b/src/ngx_http_lua_event.h @@ -0,0 +1,83 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef _NGX_HTTP_LUA_EVENT_H_INCLUDED_ +#define _NGX_HTTP_LUA_EVENT_H_INCLUDED_ + + +#include "ngx_http_lua_common.h" + + +typedef struct { + ngx_int_t (*init_event)(ngx_conf_t *cf); + void (*set_event)(ngx_event_t *ev, ngx_int_t event); + ngx_int_t (*process_events)(ngx_http_request_t *r, ngx_msec_t timer); +} ngx_http_lua_event_actions_t; + + +ngx_http_lua_event_actions_t ngx_http_lua_event_actions; + +extern ngx_http_lua_event_actions_t ngx_http_lua_epoll; +extern ngx_http_lua_event_actions_t ngx_http_lua_poll; +extern ngx_http_lua_event_actions_t ngx_http_lua_kqueue; + + +#define ngx_http_lua_set_event ngx_http_lua_event_actions.set_event +#define ngx_http_lua_process_events ngx_http_lua_event_actions.process_events + + +static ngx_inline ngx_int_t +ngx_http_lua_init_event(ngx_conf_t *cf) +{ + void ***ccf; + ngx_event_conf_t *ecf; + + ccf = ngx_get_conf(cf->cycle->conf_ctx, ngx_events_module); + if (ccf == NULL) { + /* will exit and get an error: no "events" section in configuration */ + return NGX_OK; + } + + ecf = (*ccf)[ngx_event_core_module.ctx_index]; + +#if (NGX_HAVE_EPOLL) && !(NGX_TEST_BUILD_EPOLL) + + if (ngx_strcmp(ecf->name, "epoll") == 0) { + ngx_http_lua_event_actions = ngx_http_lua_epoll; + } else + +#endif + +#if (NGX_HAVE_POLL) + + if (ngx_strcmp(ecf->name, "poll") == 0) { + ngx_http_lua_event_actions = ngx_http_lua_poll; + } else + +#endif + +#if (NGX_HAVE_KQUEUE) + + if (ngx_strcmp(ecf->name, "kqueue") == 0) { + ngx_http_lua_event_actions = ngx_http_lua_kqueue; + } else + +#endif + + { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid event type \"%V\"", ecf->name); + + return NGX_ERROR; + } + + return ngx_http_lua_event_actions.init_event(cf); +} + + +#endif /* _NGX_HTTP_LUA_EVENT_H_INCLUDED_ */ + +/* vi:set ft=c ts=4 sw=4 et fdm=marker: */ diff --git a/src/ngx_http_lua_module.c b/src/ngx_http_lua_module.c index 7358a95639..c4643ba132 100644 --- a/src/ngx_http_lua_module.c +++ b/src/ngx_http_lua_module.c @@ -31,6 +31,7 @@ #include "ngx_http_lua_ssl_session_fetchby.h" #include "ngx_http_lua_headers.h" #include "ngx_http_lua_pipe.h" +#include "ngx_http_lua_event.h" static void *ngx_http_lua_create_main_conf(ngx_conf_t *cf); @@ -786,6 +787,11 @@ ngx_http_lua_init(ngx_conf_t *cf) cln->handler = ngx_http_lua_ngx_raw_header_cleanup; #endif + rc = ngx_http_lua_init_event(cf); + if (rc == NGX_ERROR) { + return rc; + } + if (lmcf->lua == NULL) { dd("initializing lua vm"); diff --git a/src/ngx_http_lua_socket_tcp.c b/src/ngx_http_lua_socket_tcp.c index e2c2cf2f01..994c530bc1 100644 --- a/src/ngx_http_lua_socket_tcp.c +++ b/src/ngx_http_lua_socket_tcp.c @@ -17,6 +17,7 @@ #include "ngx_http_lua_output.h" #include "ngx_http_lua_contentby.h" #include "ngx_http_lua_probe.h" +#include "ngx_http_lua_event.h" static int ngx_http_lua_socket_tcp(lua_State *L); @@ -159,6 +160,12 @@ static void ngx_http_lua_ssl_handshake_handler(ngx_connection_t *c); static int ngx_http_lua_ssl_free_session(lua_State *L); #endif static void ngx_http_lua_socket_tcp_close_connection(ngx_connection_t *c); +static ngx_int_t ngx_http_lua_socket_tcp_block_conn(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u); +static ngx_int_t ngx_http_lua_socket_tcp_block_write(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u); +static ngx_int_t ngx_http_lua_socket_tcp_block_read(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u); enum { @@ -446,7 +453,7 @@ ngx_http_lua_socket_tcp(lua_State *L) return luaL_error(L, "no ctx found"); } - ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_YIELDABLE); + /* ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_YIELDABLE); */ lua_createtable(L, 5 /* narr */, 1 /* nrec */); lua_pushlightuserdata(L, ngx_http_lua_lightudata_mask( @@ -888,7 +895,7 @@ ngx_http_lua_socket_tcp_connect(lua_State *L) return luaL_error(L, "no ctx found"); } - ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_YIELDABLE); + /* ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_YIELDABLE); */ luaL_checktype(L, 1, LUA_TTABLE); @@ -1477,11 +1484,16 @@ ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r, u->writer.last = &u->writer.out; #endif - ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + dd("setting data to %p", u); - coctx = ctx->cur_co_ctx; + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); - dd("setting data to %p", u); + if (rc == NGX_AGAIN && !(ctx->context & NGX_HTTP_LUA_CONTEXT_YIELDABLE)) { + rc = ngx_http_lua_socket_tcp_block_conn(r, u); + if (rc == NGX_ERROR) { + return ngx_http_lua_socket_conn_error_retval_handler(r, u, L); + } + } if (rc == NGX_OK) { ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, @@ -1517,6 +1529,8 @@ ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r, /* rc == NGX_AGAIN */ + coctx = ctx->cur_co_ctx; + ngx_http_lua_cleanup_pending_operation(coctx); coctx->cleanup = ngx_http_lua_coctx_cleanup; coctx->data = u; @@ -1780,6 +1794,10 @@ ngx_http_lua_socket_tcp_sslhandshake(lua_State *L) dd("ngx_ssl_handshake returned %d", (int) rc); + if (rc == NGX_AGAIN && !(ctx->context & NGX_HTTP_LUA_CONTEXT_YIELDABLE)) { + /* Do something */ + } + if (rc == NGX_AGAIN) { if (c->write->timer_set) { ngx_del_timer(c->write); @@ -2105,6 +2123,10 @@ ngx_http_lua_socket_tcp_receive_helper(ngx_http_request_t *r, rc = ngx_http_lua_socket_tcp_read(r, u); + if (rc == NGX_AGAIN && !(ctx->context & NGX_HTTP_LUA_CONTEXT_YIELDABLE)) { + rc = ngx_http_lua_socket_tcp_block_read(r, u); + } + if (rc == NGX_ERROR) { dd("read failed: %d", (int) u->ft_type); rc = ngx_http_lua_socket_tcp_receive_retval_handler(r, u, L); @@ -2917,6 +2939,10 @@ ngx_http_lua_socket_tcp_send(lua_State *L) dd("socket send returned %d", (int) rc); + if (rc == NGX_AGAIN && !(ctx->context & NGX_HTTP_LUA_CONTEXT_YIELDABLE)) { + rc = ngx_http_lua_socket_tcp_block_write(r, u); + } + if (rc == NGX_ERROR) { return ngx_http_lua_socket_write_error_retval_handler(r, u, L); } @@ -4499,6 +4525,10 @@ ngx_http_lua_socket_receiveuntil_iterator(lua_State *L) rc = ngx_http_lua_socket_tcp_read(r, u); + if (rc == NGX_AGAIN && !(ctx->context & NGX_HTTP_LUA_CONTEXT_YIELDABLE)) { + rc = ngx_http_lua_socket_tcp_block_read(r, u); + } + if (rc == NGX_ERROR) { dd("read failed: %d", (int) u->ft_type); rc = ngx_http_lua_socket_tcp_receive_retval_handler(r, u, L); @@ -6114,6 +6144,164 @@ ngx_http_lua_coctx_cleanup(void *data) ngx_http_lua_socket_tcp_finalize(u->request, u); } +/* + * TODO: + * 1. add other events + * 2. support in init_by_lua + */ +static ngx_int_t +ngx_http_lua_socket_tcp_block_conn(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u) +{ + ngx_int_t rc; + ngx_msec_t delta; + ngx_connection_t *c = u->peer.connection; + ngx_msec_t timer = u->connect_timeout; + +#if 0 + if (c->write->timer_set) { + ngx_del_timer(c->write); + } + + if (c->write->active) { + ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT); + } +#endif + + ngx_http_lua_set_event(c->write, NGX_WRITE_EVENT); + + delta = ngx_current_msec; + + rc = ngx_http_lua_process_events(r, timer); + + if (rc == NGX_ERROR) { + ngx_http_lua_socket_handle_conn_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); + + return rc; + } + + ngx_time_update(); + + if (ngx_current_msec - delta >= timer) { + ngx_http_lua_socket_handle_conn_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); + + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_lua_socket_tcp_block_write(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u) +{ + int rc; + ngx_msec_t delta; + ngx_connection_t *c = u->peer.connection; + ngx_msec_t timer = u->connect_timeout; + + if (c->write->timer_set) { + ngx_del_timer(c->write); + } + + if (c->write->active) { + ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT); + } + + ngx_http_lua_set_event(c->write, NGX_WRITE_EVENT); + + delta = ngx_current_msec; + + rc = ngx_http_lua_process_events(r, timer); + + if (rc == NGX_ERROR) { + ngx_http_lua_socket_handle_write_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); + + return rc; + } + + ngx_time_update(); + + if (ngx_current_msec - delta >= timer) { + ngx_http_lua_socket_handle_write_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); + + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_lua_socket_tcp_block_read(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u) +{ + int rc; + ngx_msec_t delta; + ngx_connection_t *c = u->peer.connection; + ngx_msec_t timer = u->connect_timeout; + + ngx_http_lua_set_event(c->read, NGX_READ_EVENT); + + delta = ngx_current_msec; + + for (;;) { + + if (c->read->timer_set) { + ngx_del_timer(c->read); + } + + if (c->read->active) { + ngx_del_event(c->read, NGX_READ_EVENT, NGX_CLOSE_EVENT); + } + + rc = ngx_http_lua_process_events(r, timer); + if (rc == NGX_ERROR) { + ngx_http_lua_socket_handle_read_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); + + return rc; + } + + if (c->read->pending_eof) { + ngx_http_lua_socket_handle_read_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_CLOSED); + + return NGX_ERROR; + } + + ngx_time_update(); + + /* timeout */ + if (ngx_current_msec - delta >= timer) { + ngx_http_lua_socket_handle_read_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); + + return NGX_ERROR; + } + + timer -= ngx_current_msec - delta; + + if (u->buffer.start != NULL) { + rc = ngx_http_lua_socket_tcp_read(r, u); + + if (rc == NGX_ERROR || rc == NGX_OK) { + return rc; + } + + /* NGX_AGAIN, continue in loop*/ + } + } + + /* unreachable */ + return NGX_ERROR; +} + #if (NGX_HTTP_SSL) diff --git a/t/058-tcp-socket.t b/t/058-tcp-socket.t index 593e49461f..2209543698 100644 --- a/t/058-tcp-socket.t +++ b/t/058-tcp-socket.t @@ -4367,3 +4367,45 @@ connect failed: missing the port number finish --- no_error_log [error] + + + +=== TEST 73: init_worker_by_lua +--- SKIP +--- http_config + header_filter_by_lua_block { + local sock = ngx.socket.tcp() + local ok, err = sock:connect("127.0.0.1", 8081) + ngx.log(ngx.ERR, err or "connect success") + + local err = sock:send("GET /apis/sgw-node/v1/node/ HTTP/1.1\nHost: 127.0.0.1:8081\n\n") + ngx.log(ngx.ERR, err or "send success") + + local content, err = sock:receive("*l") + ngx.log(ngx.ERR, err or "recv success") + ngx.log(ngx.ERR, content or "???") + } +--- config + location /t { + content_by_lua_block { + ngx.say("hello") + } + } +--- request +GET /t +--- response_body +hello +--- no_error_log +[error] +--- error_log +connected: 1 +request sent: 57 +received: HTTP/1.1 200 OK +received: Server: nginx +received: Content-Type: text/plain +received: Content-Length: 4 +received: Connection: close +received: +received: foo +failed to receive a line: closed [] +close: 1 nil