Skip to content

Commit

Permalink
feat: do TTL in worker with a GUC
Browse files Browse the repository at this point in the history
  • Loading branch information
steve-chavez committed Jul 29, 2021
1 parent 3f9d1ba commit f909f31
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 9 deletions.
5 changes: 4 additions & 1 deletion sql/pg_net--0.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ create table net.http_request_queue(
url text not null,
headers jsonb not null,
body bytea,
timeout_milliseconds int not null
timeout_milliseconds int not null,
created timestamptz not null default now()
);

create index created_idx on net.http_request_queue (created);

-- Associates a response with a request
-- API: Private
create table net._http_response(
Expand Down
49 changes: 49 additions & 0 deletions src/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@

#include "utils/jsonb.h"

#include "tcop/utility.h"

#include <curl/curl.h>
#include <curl/multi.h>

PG_MODULE_MAGIC;

static char *ttl = NULL;

void _PG_init(void);
void worker_main(Datum main_arg) pg_attribute_noreturn();
bool isExtensionLoaded(void);
Expand All @@ -40,6 +44,7 @@ typedef struct _CurlData
} CurlData;

static volatile sig_atomic_t got_sigterm = false;
static volatile sig_atomic_t got_sighup = false;

static void
handle_sigterm(SIGNAL_ARGS)
Expand All @@ -51,6 +56,16 @@ handle_sigterm(SIGNAL_ARGS)
errno = save_errno;
}

static void
handle_sighup(SIGNAL_ARGS)
{
int save_errno = errno;
got_sighup = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}

static size_t
body_cb(void *contents, size_t size, size_t nmemb, void *userp)
{
Expand Down Expand Up @@ -202,6 +217,7 @@ worker_main(Datum main_arg)
int hashFlags = 0;

pqsignal(SIGTERM, handle_sigterm);
pqsignal(SIGHUP, handle_sighup);

BackgroundWorkerUnblockSignals();

Expand All @@ -225,6 +241,7 @@ worker_main(Datum main_arg)
StringInfoData select_query;
StringInfoData query_insert_response_ok;
StringInfoData query_insert_response_bad;
StringInfoData delete_query;

/* Wait 10 seconds */
WaitLatch(&MyProc->procLatch,
Expand All @@ -238,10 +255,34 @@ worker_main(Datum main_arg)
continue;
}

if (got_sighup) {
got_sighup = false;
ProcessConfigFile(PGC_SIGHUP);
}

StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
SPI_connect();

initStringInfo(&delete_query);
appendStringInfo(&delete_query, "DELETE FROM net.http_request_queue WHERE created < clock_timestamp() - $1");

{

int argCount = 1;
Oid argTypes[1];
Datum argValues[1];

argTypes[0] = INTERVALOID;
argValues[0] = DirectFunctionCall3(interval_in, CStringGetDatum(ttl), ObjectIdGetDatum(InvalidOid), Int32GetDatum(-1));

if (SPI_execute_with_args(delete_query.data, argCount, argTypes, argValues, NULL,
false, 0) != SPI_OK_DELETE)
{
elog(ERROR, "SPI_exec failed: %s", delete_query.data);
}
}

initStringInfo(&select_query);

appendStringInfo(&select_query, "\
Expand Down Expand Up @@ -447,6 +488,14 @@ _PG_init(void)
worker.bgw_main_arg = (Datum) 0;
worker.bgw_notify_pid = 0;
RegisterBackgroundWorker(&worker);

DefineCustomStringVariable("pg_net.ttl",
"time to live for request/response rows",
"should be a valid interval type",
&ttl,
"3 days",
PGC_SIGHUP, 0,
NULL, NULL, NULL);
}

PG_FUNCTION_INFO_V1(_urlencode_string);
Expand Down
2 changes: 2 additions & 0 deletions test/fixtures.sql
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
create extension pg_net;
alter system set pg_net.ttl TO '2 seconds';
select pg_reload_conf();
16 changes: 8 additions & 8 deletions test/test_http_requests_deleted_after_ttl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,45 @@
import pytest
from sqlalchemy import text


@pytest.mark.skip(reason="pending implementation")
def test_http_requests_deleted_after_ttl(sess):
"""Check that http requests are deleted within a few seconds of their ttl"""
# Create a request
(request_id,) = sess.execute(
"""
select net.http_get(
url:='https://httpbin.org/anything',
ttl:='2 seconds'
'https://httpbin.org/anything'
);
"""
).fetchone()

# Commit so background worker can start
sess.commit()

# Sleep a while for ensuring the request is completed
time.sleep(2)

# Confirm that the request was retrievable
response = sess.execute(
text(
"""
select * from net.http_collect_response(:request_id, async:=false);
select * from net.http_collect_response(:request_id);
"""
),
{"request_id": request_id},
).fetchone()
assert response[0] == "SUCCESS"

# Sleep until after request should have been deleted
time.sleep(5)
time.sleep(3)

# Ensure collecting the resposne now results in an error
response = sess.execute(
text(
"""
select * from net.http_collect_response(:request_id, async:=true);
select * from net.http_collect_response(:request_id);
"""
),
{"request_id": request_id},
).fetchone()
# TODO an ERROR status doesn't seem correct here
assert response[0] == "ERROR"
assert "not found" in response[2]

0 comments on commit f909f31

Please sign in to comment.