Passa al contenuto principale

redisBus.js

Utilizzo nei microservizi

DBManager (legacy), alertingService, cacheManager, decision-engine, ibkr-keepalive, redisWsBridge, scheduler.

Funzioni e utilizzo

FunzioneParametri principaliMicroservizi che la usano
new RedisBus(options)options: url, name, channelsDBManager (legacy), alertingService, cacheManager, decision-engine, ibkr-keepalive, redisWsBridge, scheduler
connect()-DBManager (legacy), alertingService, cacheManager, decision-engine, ibkr-keepalive, redisWsBridge, scheduler
publish(channel, payload)channel + payload serializzabileDBManager (legacy), alertingService, cacheManager, decision-engine, ibkr-keepalive, redisWsBridge, scheduler
subscribe(channel, handler)channel + callbackDBManager (legacy), alertingService, cacheManager, decision-engine, ibkr-keepalive, redisWsBridge, scheduler
psubscribe(pattern, handler)pattern + callbackDBManager (legacy), alertingService, cacheManager, decision-engine, ibkr-keepalive, redisWsBridge, scheduler
get(key) / set(key,value,opts) / del(key)chiave + valore/opzioni TTLDBManager (legacy), alertingService, cacheManager, decision-engine, ibkr-keepalive, redisWsBridge, scheduler
xadd/xaddJson(stream, ... )stream + fields/eventDBManager (legacy), alertingService, cacheManager, decision-engine, ibkr-keepalive, redisWsBridge, scheduler
consumeLoop(opts)opts: stream/group/consumer/onMessageDBManager (legacy), alertingService, cacheManager, decision-engine, ibkr-keepalive, redisWsBridge, scheduler
setChannelConfig(key,cfg)`key: telemetrymetrics
close()-DBManager (legacy), alertingService, cacheManager, decision-engine, ibkr-keepalive, redisWsBridge, scheduler

Dettaglio funzioni

new RedisBus(options)

  • Cosa fa: Inizializza il bus Redis con canali e scheduler batching.
  • Parametri: options: url, name, channels

connect()

  • Cosa fa: Apre connessioni pub/sub e prepara il bus all'uso.
  • Parametri: -

publish(channel, payload)

  • Cosa fa: Pubblica evento/comando su canale Redis.
  • Parametri: channel + payload serializzabile

subscribe(channel, handler)

  • Cosa fa: Subscribe a canale specifico.
  • Parametri: channel + callback

psubscribe(pattern, handler)

  • Cosa fa: Subscribe con pattern wildcard (pub/sub).
  • Parametri: pattern + callback

get(key) / set(key,value,opts) / del(key)

  • Cosa fa: API KV helper su Redis.
  • Parametri: chiave + valore/opzioni TTL

xadd/xaddJson(stream, ... )

  • Cosa fa: Scrittura eventi su Redis Stream.
  • Parametri: stream + fields/event

consumeLoop(opts)

  • Cosa fa: Loop consumo stream con consumer group.
  • Parametri: opts: stream/group/consumer/onMessage

setChannelConfig(key,cfg)

  • Cosa fa: Aggiorna config runtime dei canali.
  • Parametri: key: telemetry|metrics|...; cfg runtime

close()

  • Cosa fa: Chiude connessioni Redis in shutdown controllato.
  • Parametri: -

Esempi di utilizzo

1) Inizializzazione nel servizio

const { RedisBus } = require('../shared/redisBus');

const bus = new RedisBus({
url: process.env.REDIS_URL,
name: 'scheduler',
env: process.env.ENV || 'LOCAL',
channels: {
telemetry: { on: true, params: { intervalsMs: 1000 } },
metrics: { on: true, params: { intervalsMs: 1000 } },
data: { on: true, params: { intervalsMs: 0 } },
logs: { on: true, params: { intervalsMs: 0 } },
events: { on: true, params: { intervalsMs: 0 } },
},
});

await bus.connect();

2) Publish su canale data/event

const redisDataChannel = `${process.env.ENV}.market-data-service.data`;
const redisEventsChannel = `${process.env.ENV}.scheduler.events`;

await bus.publish(redisDataChannel, {
type: 'marketData',
ticker: 'AAPL',
price: 189.21,
ts: Date.now(),
});

await bus.publish(redisEventsChannel, {
eventKey: 'scheduler.TASK.COMPLETED',
eventId: 'TASK.COMPLETED',
ts: new Date().toISOString(),
payload: { jobKey: 'market-daily', result: 'ok' },
});

3) Subscribe / Pattern subscribe

await bus.subscribe(`${process.env.ENV}.cachemanager.events`, async (parsed, raw) => {
// parsed = JSON se valido, altrimenti undefined
if (parsed?.eventId === 'CACHE.CLEARED') {
console.log('Cache cleared', parsed.payload);
}
});

await bus.psubscribe(`${process.env.ENV}.*.status.HOOK`, async (parsed, _raw, channel) => {
if (parsed?.type === 'job.done') {
console.log('job done from', channel, parsed.jobId, parsed.status);
}
});

4) KV cache con TTL

const key = bus.key('market-data', 'snapshot', 'aapl'); // es: PAPER:market-data:snapshot:aapl

await bus.set(key, { ticker: 'AAPL', price: 189.21 }, { EX: 60 });
const value = await bus.get(key);
await bus.del(key);

5) Stream + consumer group

await bus.xaddJson('stream:orders', {
type: 'ORDER.CREATED',
orderId: 'ord_123',
ts: new Date().toISOString(),
});

await bus.consumeLoop({
stream: 'stream:orders',
group: 'orders-workers',
consumer: 'worker-1',
onMessage: async ({ id, json }) => {
// process event
return true; // ack
},
onError: (err) => console.error('consumeLoop error', err),
});

6) Aggiornamento runtime canali

bus.setChannelConfig('telemetry', { on: false, params: { intervalsMs: 1000 } });
bus.setChannelConfig('events', { on: true, params: { intervalsMs: 0 } });

Percorso

  • trading-system/shared/redisBus.js