Skip to content

Commit

Permalink
Added brain for memory
Browse files Browse the repository at this point in the history
Added direct caching for cache_miss in Pusher
  • Loading branch information
rennokki committed Jul 6, 2023
1 parent 576666b commit 2a25d6f
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 18 deletions.
13 changes: 13 additions & 0 deletions src/brain/brain.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type * as FN from '@soketi/impl/types';

export abstract class Brain {
abstract get(key: string): Promise<FN.Brain.BrainRecord['value']|null>;
abstract getWithMetadata(key: string): Promise<FN.Brain.BrainRecord|null>;
abstract set(key: string, value: FN.Brain.BrainRecord['value'], ttlSeconds?: number): Promise<void>;
abstract has(key: string): Promise<boolean>;
abstract delete(key: string): Promise<void>;

async cleanup(): Promise<void> {
//
}
}
2 changes: 2 additions & 0 deletions src/brain/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './brain';
export * from './local-brain';
44 changes: 44 additions & 0 deletions src/brain/local-brain.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import type * as FN from '@soketi/impl/types';
import { Brain } from './brain';

export class LocalBrain extends Brain {
memory: Map<string, FN.Brain.BrainRecord> = new Map();

constructor() {
super();

setInterval(() => {
for (let [key, { ttlSeconds, setTime }] of [...this.memory]) {
let currentTime = parseInt((new Date().getTime() / 1000) as unknown as string);

if (ttlSeconds > 0 && (setTime + ttlSeconds) <= currentTime) {
this.memory.delete(key);
}
}
}, 1_000);
}

async get(key: string): Promise<FN.JSON.Value|null> {
return (await this.getWithMetadata(key))?.value ?? null;
}

async getWithMetadata(key: string): Promise<FN.Brain.BrainRecord|null> {
return this.memory.get(key) ?? null;
}

async set(key: string, value: FN.JSON.Value, ttlSeconds = -1): Promise<void> {
this.memory.set(key, {
value,
ttlSeconds,
setTime: parseInt((new Date().getTime() / 1000) as unknown as string),
});
}

async has(key: string): Promise<boolean> {
return Boolean(this.memory.get(key));
}

async delete(key: string): Promise<void> {
this.memory.delete(key);
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * as Brain from './brain';
export * as Gossiper from './gossiper';
export * as Pusher from './pusher';
export * as Webhooks from './webhooks';
Expand Down
15 changes: 10 additions & 5 deletions src/pusher/ws/pusher-connections.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type * as FN from '@soketi/impl/types';
import { Brain } from '../../brain';
import { Connections as BaseConnections } from '../../ws';
import { EncryptedPrivateChannelManager, PresenceChannelManager, PrivateChannelManager, PublicChannelManager } from '../channels';
import { PusherConnection, Utils } from '../';
Expand All @@ -10,6 +11,7 @@ export class PusherConnections extends BaseConnections implements FN.Pusher.Push
constructor(
protected app: FN.Pusher.PusherApps.App,
protected readonly gossiper: Gossiper,
protected readonly brain: Brain,
) {
super();

Expand Down Expand Up @@ -508,16 +510,19 @@ export class PusherConnections extends BaseConnections implements FN.Pusher.Push
}

async sendMissedCacheIfExists(conn: FN.Pusher.PusherWS.PusherConnection, channel: string) {
// TODO: Caching module
/* let cachedEvent = await this.env.APPS.get(
let cachedEvent = await this.brain.get(
`app_${this.app.id}_channel_${channel}_cache_miss`,
);

if (cachedEvent) {
conn.sendJson({ event: 'pusher:cache_miss', channel, data: cachedEvent });
conn.sendJson({
event: 'pusher:cache_miss',
channel,
data: cachedEvent,
});
} else {
// TODO: this.webhooks.sendCacheMissed(channel);
} */
// TODO: Send webhook event.
}
}

async getChannelManagerFor(channel: string): Promise<FN.Pusher.Channels.ChannelManager> {
Expand Down
29 changes: 29 additions & 0 deletions tests/brain/local-brain.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { LocalBrain } from '../../src/brain';
import { describe, test, expect } from 'vitest';

describe('brain/local-brain', () => {
test('basic storage', async () => {
const brain = new LocalBrain();

await brain.set('test', { test: 'object' });

expect(await brain.has('test')).toBe(true);
expect(await brain.get('test')).toEqual({ test: 'object' });

await brain.delete('test');

expect(await brain.has('test')).toBe(false);
expect(await brain.get('test')).toBe(null);
});

test('basic storage with ttl', async () => {
const brain = new LocalBrain();

await brain.set('test', { test: 'object' }, 1);

await new Promise((resolve) => setTimeout(resolve, 1_100));

expect(await brain.has('test')).toBe(false);
expect(await brain.get('test')).toBe(null);
});
});
9 changes: 6 additions & 3 deletions tests/pusher/channels/presence.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ import { PusherConnection, PusherConnections } from '../../../src/pusher/ws';
import { Router as WsRouter } from '../../../src/ws';
import { describe, test, expect, beforeEach } from 'vitest';
import { createHmac } from 'crypto';
import { Brain, LocalBrain } from '../../../src/brain';

const pusherUtil = require('pusher/lib/util');
const Pusher = require('pusher');

let apps: TestAppsManager;
let gossiper: NoGossiper;
let brain: Brain;

beforeEach(() => {
apps = new TestAppsManager();
gossiper = new NoGossiper();
brain = new LocalBrain();

AppsRegistry.registerDriver('default', apps);

Expand All @@ -36,7 +39,7 @@ beforeEach(() => {
describe('pusher/channels/presence', () => {
test('join and leave', () => new Promise<void>(async (done) => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);
const user = {
user_id: '1',
user_info: {
Expand Down Expand Up @@ -90,7 +93,7 @@ describe('pusher/channels/presence', () => {

test('connect and disconnect', async () => new Promise<void>(async (done) => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);
const user = {
user_id: '1',
user_info: {
Expand Down Expand Up @@ -138,7 +141,7 @@ describe('pusher/channels/presence', () => {

test('connect but get unauthorized', async () => new Promise<void>(async (done) => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);
const user = {
user_id: '1',
user_info: {
Expand Down
9 changes: 6 additions & 3 deletions tests/pusher/channels/private-encrypted.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ import { PusherConnection, PusherConnections } from '../../../src/pusher/ws';
import { Router as WsRouter } from '../../../src/ws';
import { describe, test, expect, beforeEach } from 'vitest';
import { createHmac } from 'crypto';
import { Brain, LocalBrain } from '../../../src/brain';

const pusherUtil = require('pusher/lib/util');
const Pusher = require('pusher');

let apps: TestAppsManager;
let gossiper: NoGossiper;
let brain: Brain;

beforeEach(() => {
apps = new TestAppsManager();
gossiper = new NoGossiper();
brain = new LocalBrain();

AppsRegistry.registerDriver('default', apps);
AppsRegistry.initializeApp({}).then((app: TestApp) => {
Expand All @@ -35,7 +38,7 @@ beforeEach(() => {
describe('pusher/channels/private-encrypted', () => {
test('join and leave', () => new Promise<void>(async (done) => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);

const conn = new PusherConnection('test', {
send: (message) => {
Expand Down Expand Up @@ -71,7 +74,7 @@ describe('pusher/channels/private-encrypted', () => {

test('connect and disconnect', async () => new Promise<void>(async (done) => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);

WsRouter.onConnectionClosed(async (conn) => {
await conns.unsubscribeFromAllChannels(conn);
Expand Down Expand Up @@ -109,7 +112,7 @@ describe('pusher/channels/private-encrypted', () => {

test('connect but get unauthorized', async () => new Promise<void>(async (done) => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);

const conn = new PusherConnection('test', {
send: (message) => {
Expand Down
9 changes: 6 additions & 3 deletions tests/pusher/channels/private.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ import { PusherConnection, PusherConnections } from '../../../src/pusher/ws';
import { Router as WsRouter } from '../../../src/ws';
import { describe, test, expect, beforeEach } from 'vitest';
import { createHmac } from 'crypto';
import { Brain, LocalBrain } from '../../../src/brain';

const pusherUtil = require('pusher/lib/util');
const Pusher = require('pusher');

let apps: TestAppsManager;
let gossiper: NoGossiper;
let brain: Brain;

beforeEach(() => {
apps = new TestAppsManager();
gossiper = new NoGossiper();
brain = new LocalBrain();

AppsRegistry.registerDriver('default', apps);
AppsRegistry.initializeApp({}).then((app: TestApp) => {
Expand All @@ -35,7 +38,7 @@ beforeEach(() => {
describe('pusher/channels/private', () => {
test('join and leave', () => new Promise<void>(async (done) => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);

const conn = new PusherConnection('test', {
send: (message) => {
Expand Down Expand Up @@ -71,7 +74,7 @@ describe('pusher/channels/private', () => {

test('connect and disconnect', async () => new Promise<void>(async (done) => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);

WsRouter.onConnectionClosed(async (conn) => {
await conns.unsubscribeFromAllChannels(conn);
Expand Down Expand Up @@ -110,7 +113,7 @@ describe('pusher/channels/private', () => {

test('connect but get unauthorized', async () => new Promise<void>(async (done) => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);

const conn = new PusherConnection('test', {
send: (message) => {
Expand Down
7 changes: 5 additions & 2 deletions tests/pusher/channels/public.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ import { PusherConnection, PusherConnections } from '../../../src/pusher/ws';
import { Router as WsRouter } from '../../../src/ws';
import { describe, test, expect, beforeEach } from 'vitest';
import { createHmac } from 'crypto';
import { Brain, LocalBrain } from '../../../src/brain';

const pusherUtil = require('pusher/lib/util');
const Pusher = require('pusher');

let apps: TestAppsManager;
let gossiper: NoGossiper;
let brain: Brain;

beforeEach(() => {
apps = new TestAppsManager();
gossiper = new NoGossiper();
brain = new LocalBrain();

AppsRegistry.registerDriver('default', apps);
AppsRegistry.initializeApp({}).then((app: TestApp) => {
Expand All @@ -35,7 +38,7 @@ beforeEach(() => {
describe('pusher/channels/public', () => {
test('join and leave', async () => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);

const conn = new PusherConnection('test', {
send: (message) => {
Expand Down Expand Up @@ -66,7 +69,7 @@ describe('pusher/channels/public', () => {

test('connect and disconnect', async () => new Promise<void>(async (done) => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);

WsRouter.onConnectionClosed(async (conn) => {
await conns.unsubscribeFromAllChannels(conn);
Expand Down
7 changes: 5 additions & 2 deletions tests/pusher/ws.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ import { App, AppsManager, AppsRegistry } from '../../src/pusher/apps';
import { PusherConnection, PusherConnections } from '../../src/pusher/ws';
import { describe, test, expect, beforeEach } from 'vitest';
import { createHmac } from 'crypto';
import { Brain, LocalBrain } from '../../src/brain';

const pusherUtil = require('pusher/lib/util');
const Pusher = require('pusher');

let apps: TestAppsManager;
let gossiper: NoGossiper;
let brain: Brain;

beforeEach(() => {
apps = new TestAppsManager();
gossiper = new NoGossiper();
brain = new LocalBrain();

AppsRegistry.registerDriver('default', apps);
AppsRegistry.initializeApp({}).then((app: TestApp) => {
Expand All @@ -26,7 +29,7 @@ beforeEach(() => {
describe('pusher/ws', () => {
test('handle ping pongs', () => new Promise<void>(async (done) => {
const app = await AppsRegistry.getById('app-id') as TestApp;
const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);

const conn = new PusherConnection('test', {
send: async (message) => {
Expand All @@ -49,7 +52,7 @@ describe('pusher/ws', () => {
apps.apps.set('app-id', app);
apps.apps.set('app-key', app);

const conns = new LocalConnections(app, gossiper);
const conns = new LocalConnections(app, gossiper, brain);

const otherConn = new PusherConnection('other', {
send: async (message) => {
Expand Down
8 changes: 8 additions & 0 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ export namespace Gossip {
type Payload = JSON.Object|JSON.Array|JSON.Value;
}

export namespace Brain {
type BrainRecord = {
value: JSON.Value;
ttlSeconds: number;
setTime: number;
};
}

export declare namespace WS {
type ConnectionID = string;
type Message = JSON.Object|JSON.Array|JSON.Value;
Expand Down

0 comments on commit 2a25d6f

Please sign in to comment.