Skip to content

Commit

Permalink
data porter supports multiple schema (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
shunjizhan authored Dec 2, 2024
1 parent ccbbe10 commit a16dca0
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 59 deletions.
3 changes: 1 addition & 2 deletions data-porter/.env.example
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
DB_TYPE=
DB_SCHEMA=
DB_TABLES=
DB_PASSWORD=
DB_TABLES=

DUNE_API_KEY=
DUNE_TABLE_NAMES=
28 changes: 7 additions & 21 deletions data-porter/src/actions/pull.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ export interface RowBase {

export type Row = Extended<RowBase>;

interface QueryTarget {
schema: string,
tables?: string[],
interface DbTarget {
schema: string;
table: string;
}

interface DbData <T = Row> {
Expand All @@ -19,38 +19,24 @@ interface DbData <T = Row> {
rows: T[],
}

const getAllTables = async (client: Client, schema: string) => {
console.log(`querying all tables under schema ${schema} ...`);

const res = await client.query(`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = $1
`, [schema]);

return res.rows.map(row => row.table_name);
};

export const pullDataFromDb = async <T = Row>(
clientConfig: ClientConfig,
queryTarget: QueryTarget,
dbTargets: DbTarget[],
): Promise<DbData<T>[]> => {
const { schema, tables } = queryTarget;
const client = new Client(clientConfig);

const res: DbData<T>[] = [];
try {
await client.connect();

const tableNames = tables ?? await getAllTables(client, schema);

for (const table of tableNames) {
for (const { schema, table } of dbTargets) {
const { rows } = await client.query(`SELECT * FROM "${schema}"."${table}"`);
res.push({ schema,table,rows });
res.push({ schema, table, rows });
}

} catch (err) {
console.error('Error fetching data:', err);
throw err;
} finally {
await client.end();
}
Expand Down
3 changes: 2 additions & 1 deletion data-porter/src/actions/upload.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { DUNE_URL } from '../consts';
import axios from 'axios';

import { DUNE_URL } from '../consts';

interface UploadParams {
data: string,
description: string,
Expand Down
45 changes: 10 additions & 35 deletions data-porter/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,21 @@
import { cleanEnv, str } from 'envalid';
import assert from 'assert';
import dotenv from 'dotenv';

import { DbType, getDbConfig } from './consts';
import {
env,
getClientConfig,
getQueryTarget,
} from './utils';
import {
pullDataFromDb,
transformData,
uploadToDune,
} from './actions';

dotenv.config();

const env = cleanEnv(process.env, {
DB_TYPE: str({ choices: Object.values(DbType) }),
DB_SCHEMA: str(),
DB_TABLES: str(),
DB_PASSWORD: str(),

DUNE_API_KEY: str(),
DUNE_TABLE_NAMES: str(),
});

const main = async () => {
console.log('fetching data from db ...');

const dbConfig = getDbConfig(env.DB_TYPE);
const tables = env.DB_TABLES.split(',');
assert(tables.length > 0, `invalid tables env: ${env.DB_TABLES}`);

const tableNames = env.DUNE_TABLE_NAMES.split(',');
assert(tables.length === tableNames.length, `db tables and dune table names count mismatch: ${tables.length} | ${tableNames.length}`);
console.log('constructing query params ...');
const { dbTargets, tableNames } = getQueryTarget();
const clientConfig = getClientConfig();

const clientConfig = {
...dbConfig,
password: env.DB_PASSWORD,
};

const queryTarget = {
schema: env.DB_SCHEMA,
tables,
};
const dbData = await pullDataFromDb(clientConfig, queryTarget);
console.log('fetching data from db ...');
const dbData = await pullDataFromDb(clientConfig, dbTargets);
console.log(`${dbData.length} tables fetched: ${dbData.map(({ table }) => table).join(', ') }`);

for (const [idx, { schema, table, rows }] of dbData.entries()) {
Expand Down
12 changes: 12 additions & 0 deletions data-porter/src/utils/getClientConfig.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { ClientConfig } from 'pg';

import { env } from './parseEnv';
import { getDbConfig } from '../consts';

export const getClientConfig = (): ClientConfig => {
const dbConfig = getDbConfig(env.DB_TYPE);
return {
...dbConfig,
password: env.DB_PASSWORD,
};
};
25 changes: 25 additions & 0 deletions data-porter/src/utils/getQueryTarget.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import assert from 'assert';

import { env } from './parseEnv';

export const getQueryTarget = () => {
const dbTargets = env.DB_TABLES
.trim()
.split(',')
.map(pair => {
const [schema, table] = pair.split('.');
if (!schema || !table) {
throw new Error(`invalid schema.table: ${pair}`);
}
return {
schema: schema.trim(),
table: table.trim(),
};
});

const tableNames = env.DUNE_TABLE_NAMES.split(',');
assert(dbTargets.length === tableNames.length,
`db tables and dune table names count mismatch: ${dbTargets.length} | ${tableNames.length}`);

return { dbTargets, tableNames };
};
3 changes: 3 additions & 0 deletions data-porter/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './parseEnv';
export * from './getQueryTarget';
export * from './getClientConfig';
15 changes: 15 additions & 0 deletions data-porter/src/utils/parseEnv.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { cleanEnv, str } from 'envalid';
import dotenv from 'dotenv';

import { DbType } from '../consts';

dotenv.config();

export const env = cleanEnv(process.env, {
DB_TYPE: str({ choices: Object.values(DbType) }),
DB_TABLES: str(),
DB_PASSWORD: str(),

DUNE_API_KEY: str(),
DUNE_TABLE_NAMES: str(),
});

0 comments on commit a16dca0

Please sign in to comment.