Skip to content

Commit

Permalink
proxy: internal backend for V2 API
Browse files Browse the repository at this point in the history
Replaces `res = mcp.internal(req)` with the standard V2 API flow.
Create a handle for an fgen using `mcp.internal_backend` as an argument,
then call wait against it like a normal pool.

Code is incomplete but the basics work.
  • Loading branch information
dormando committed Dec 27, 2024
1 parent 7d6bc7b commit 4448fb8
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 29 deletions.
12 changes: 7 additions & 5 deletions proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ struct _io_pending_proxy_t {
// original struct ends here

mcp_rcontext_t *rctx; // pointer to request context.
mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
int queue_handle; // queue slot to return this result to
bool ascii_multiget; // passed on from mcp_r_t
uint8_t io_type; // extstore IO or backend IO
Expand All @@ -546,7 +547,6 @@ struct _io_pending_proxy_t {
struct iovec iov[2]; // request string + tail buffer
int iovcnt; // 1 or 2...
unsigned int iovbytes; // total bytes in the iovec
mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
bool flushed; // whether we've fully written this request to a backend.
bool background; // dummy IO for backgrounded awaits
bool qcount_incr; // HACK.
Expand Down Expand Up @@ -614,6 +614,7 @@ io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, m
// internal request interface
int mcplib_internal(lua_State *L);
int mcplib_internal_run(mcp_rcontext_t *rctx);
void *mcp_rcontext_internal(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_resp_t *r);

// user stats interface
#define MAX_USTATS_DEFAULT 1024
Expand Down Expand Up @@ -703,10 +704,11 @@ struct mcp_funcgen_router {

#define RQUEUE_TYPE_NONE 0
#define RQUEUE_TYPE_POOL 1
#define RQUEUE_TYPE_FGEN 2
#define RQUEUE_TYPE_UOBJ 3 // user tracked object types past this point
#define RQUEUE_TYPE_UOBJ_REQ 4
#define RQUEUE_TYPE_UOBJ_RES 5
#define RQUEUE_TYPE_INT 2
#define RQUEUE_TYPE_FGEN 3
#define RQUEUE_TYPE_UOBJ 4 // user tracked object types past this point
#define RQUEUE_TYPE_UOBJ_REQ 5
#define RQUEUE_TYPE_UOBJ_RES 6
#define RQUEUE_ASSIGNED (1<<0)
#define RQUEUE_R_RESUME (1<<1)
#define RQUEUE_R_GOOD (1<<3)
Expand Down
69 changes: 46 additions & 23 deletions proxy_internal.c
Original file line number Diff line number Diff line change
Expand Up @@ -1636,30 +1636,8 @@ static void process_marithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_res

/*** Lua and internal handler ***/

int mcplib_internal(lua_State *L) {
luaL_checkudata(L, 1, "mcp.request");
mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
memset(r, 0, sizeof(mcp_resp_t));
luaL_getmetatable(L, "mcp.response");
lua_setmetatable(L, -2);

lua_pushinteger(L, MCP_YIELD_INTERNAL);
return lua_yield(L, 2);
}

// we're pretending to be p_c_ascii(), but reusing our already tokenized code.
// the text parser should eventually move to the new tokenizer and we can
// merge all of this code together.
int mcplib_internal_run(mcp_rcontext_t *rctx) {
lua_State *L = rctx->Lc;
mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
mcp_resp_t *r = luaL_checkudata(L, 2, "mcp.response");
mc_resp *resp = resp_start_unlinked(rctx->c);
LIBEVENT_THREAD *t = rctx->c->thread;
static inline int _mcplib_internal_run(LIBEVENT_THREAD *t, mcp_request_t *rq, mcp_resp_t *r, mc_resp *resp) {
mcp_parser_t *pr = &rq->pr;
if (resp == NULL) {
return -1;
}

// TODO: meta no-op isn't handled here. haven't decided how yet.
switch (rq->pr.command) {
Expand Down Expand Up @@ -1755,6 +1733,51 @@ int mcplib_internal_run(mcp_rcontext_t *rctx) {
// kilobyte.
t->proxy_vm_extra_kb++;

return 0;
}

int mcplib_internal(lua_State *L) {
luaL_checkudata(L, 1, "mcp.request");
mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
memset(r, 0, sizeof(mcp_resp_t));
luaL_getmetatable(L, "mcp.response");
lua_setmetatable(L, -2);

lua_pushinteger(L, MCP_YIELD_INTERNAL);
return lua_yield(L, 2);
}

// V2 API internal handling.
void *mcp_rcontext_internal(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_resp_t *r) {
LIBEVENT_THREAD *t = rctx->fgen->thread;
mc_resp *resp = resp_start_unlinked(rctx->c);
if (resp == NULL) {
return NULL;
}

// TODO: release resp here instead on error?
if (_mcplib_internal_run(t, rq, r, resp) != 0) {
return NULL;
}

return resp;
}

// we're pretending to be p_c_ascii(), but reusing our already tokenized code.
// the text parser should eventually move to the new tokenizer and we can
// merge all of this code together.
int mcplib_internal_run(mcp_rcontext_t *rctx) {
lua_State *L = rctx->Lc;
mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
mcp_resp_t *r = luaL_checkudata(L, 2, "mcp.response");
mc_resp *resp = resp_start_unlinked(rctx->c);
LIBEVENT_THREAD *t = rctx->c->thread;
if (resp == NULL) {
return -1;
}

_mcplib_internal_run(t, rq, r, resp);

if (resp->io_pending) {
// TODO (v2): here we move the IO from the temporary resp to the top
// resp, but this feels kludgy so I'm leaving an explicit note to find
Expand Down
10 changes: 10 additions & 0 deletions proxy_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,10 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
luaL_newmetatable(L, "mcp.funcgen");
lua_pop(L, 1);

// mt for magical null wrapper for using internal cache as backend
luaL_newmetatable(L, "mcp.internal_be");
lua_pop(L, 1);

luaL_newlibtable(L, mcplib_f_routes);
} else {
// Change the extra space override for the configuration VM to just point
Expand Down Expand Up @@ -1833,6 +1837,12 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
luaL_newlibtable(L, mcplib_f_config);
}

// Create magic empty value to pass as an internal backend.
lua_newuserdatauv(L, 1, 0);
luaL_getmetatable(L, "mcp.internal_be");
lua_setmetatable(L, -2);
lua_setfield(L, -2, "internal_backend");

// create main library table.
//luaL_newlib(L, mcplib_f);
// TODO (v2): luaL_newlibtable() just pre-allocs the exact number of things
Expand Down
39 changes: 38 additions & 1 deletion proxy_luafgen.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ static int _mcplib_funcgen_gencall(lua_State *L) {
struct mcp_rqueue_s *frqu = &fgen->queue_list[x];
struct mcp_rqueue_s *rqu = &rc->qslots[x];
rqu->obj_type = frqu->obj_type;
if (frqu->obj_type == RQUEUE_TYPE_POOL) {
if (frqu->obj_type == RQUEUE_TYPE_POOL || frqu->obj_type == RQUEUE_TYPE_INT) {
rqu->obj_ref = 0;
rqu->obj = frqu->obj;
mcp_resp_t *r = mcp_prep_bare_resobj(L, fgen->thread);
Expand Down Expand Up @@ -657,6 +657,7 @@ int mcplib_funcgen_new_handle(lua_State *L) {
mcp_funcgen_t *fgen = lua_touserdata(L, 1);
mcp_pool_proxy_t *pp = NULL;
mcp_funcgen_t *fg = NULL;
void *test = NULL;

if (fgen->ready) {
proxy_lua_error(L, "cannot modify function generator after calling ready");
Expand All @@ -665,6 +666,8 @@ int mcplib_funcgen_new_handle(lua_State *L) {

if ((pp = luaL_testudata(L, 2, "mcp.pool_proxy")) != NULL) {
// good.
} else if ((test = luaL_testudata(L, 2, "mcp.internal_be")) != NULL) {
// also good.
} else if ((fg = luaL_testudata(L, 2, "mcp.funcgen")) != NULL) {
if (fg->is_router) {
proxy_lua_error(L, "cannot assign a router to a handle in new_handle");
Expand Down Expand Up @@ -698,6 +701,11 @@ int mcplib_funcgen_new_handle(lua_State *L) {
rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX);
rqu->obj_type = RQUEUE_TYPE_POOL;
rqu->obj = pp;
} else if (test) {
// pops test from the stack
rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX);
rqu->obj_type = RQUEUE_TYPE_INT;
rqu->obj = test;
} else {
// pops the fgen from the stack.
mcp_funcgen_reference(L);
Expand Down Expand Up @@ -1189,6 +1197,35 @@ void mcp_run_rcontext_handle(mcp_rcontext_t *rctx, int handle) {
p->return_cb = proxy_return_rqu_cb;
p->queue_handle = handle;
rctx->pending_reqs++;
} else if (rqu->obj_type == RQUEUE_TYPE_INT) {
mcp_request_t *rq = rqu->rq;
mc_resp *resp = mcp_rcontext_internal(rctx, rq, rqu->res_obj);
if (resp == NULL) {
// FIXME: error handling.
} else if (resp->io_pending) {
resp->io_pending->return_cb = proxy_return_rqu_cb;
// Add io object to extstore submission queue.
io_queue_t *q = conn_io_queue_get(rctx->c, IO_QUEUE_EXTSTORE);
io_pending_proxy_t *io = (io_pending_proxy_t *)rctx->resp->io_pending;

io->eio.next = q->stack_ctx;
q->stack_ctx = &io->eio;
assert(q->count >= 0);
q->count++;

io->rctx = rctx;
io->c = rctx->c;
io->ascii_multiget = rq->ascii_multiget;
// mark the buffer into the mcp_resp for freeing later.
rqu->res_obj->buf = io->eio.buf;
rctx->pending_reqs++;
} else {
io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, rqu->res_obj);
p->return_cb = proxy_return_rqu_cb;
p->queue_handle = rctx->parent_handle;
p->background = true;
rctx->pending_reqs++;
}
} else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
// TODO: NULL the ->c post-return?
mcp_rcontext_t *subrctx = rqu->obj;
Expand Down
15 changes: 15 additions & 0 deletions t/proxyinternal3.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
function mcp_config_pools()
return nil
end

function mcp_config_routes(p)
local fg = mcp.funcgen_new()
local h = fg:new_handle(mcp.internal_backend)
fg:ready({ n = "internal", f = function(rctx)
return function(r)
return rctx:enqueue_and_wait(r, h)
end
end})

mcp.attach(mcp.CMD_ANY_STORAGE, fg)
end

0 comments on commit 4448fb8

Please sign in to comment.