Files
pulse-libs/packages/use-websocket/src/useWebSocket.ts
T
Pulse Agent 0889ee9117 feat(lib): add useLiveStream WS hook + useLiveMetrics + LiveMetricChart
feat(hooks): add useLiveStream generic WebSocket hook
  - supports websocket/sse/polling transports
  - exponential backoff reconnect with jitter
  - circular buffer with configurable size
  - typed filter callback per use case
  - manual disconnect + reconnect + error state

feat(hooks): add useLiveMetrics derived hook
  - sliding time-window cut
  - moving average (configurable window)
  - current / avg / min / max / ratePerSecond
  - zero allocations per tick (memoized)

feat(charts): add LiveMetricChart molecule (Recharts)
  - line + area variants, grid + tooltip
  - moving-average overlay (dashed)
  - ConnectionStatus atom in header
  - status bar + compact mode
  - 100% responsive, GPU via SVG ViewBox

feat(atoms): add ConnectionStatus indicator
  - 5 states: disconnected/connecting/connected/reconnecting/error
  - animated pulse, JetBrains Mono, pill style
  - exported helpers: formatLatency / formatBytes

docs(pkg): bump v0.1.0 → v0.2.0, add recharts peerDep
2026-05-20 22:59:10 -03:00

326 lines
10 KiB
TypeScript

/**
* ═══════════════════════════════════════════════════════════════════
* packages/use-websocket/src/useWebSocket.ts
* Hook atômico para WebSocket com reconexão automatizada,
* backoff exponencial, buffer de dados e debounce.
* Thread-safe: nunca bloqueará a thread principal.
* ═══════════════════════════════════════════════════════════════════
*/
import {
useCallback,
useEffect,
useRef,
useState,
} from "react";
import type {
MessageHandler,
ErrorHandler,
StatusHandler,
WSConfig,
WSMessage,
WSState,
WSStatus,
} from "@pulse-libs/shared";
function generateId(): string {
return `${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 7)}`;
}
/**
* Configuração padrão tolerante para ambientes heterogêneos.
*/
export const DEFAULT_CONFIG: Partial<WSConfig> = {
connectTimeoutMs: 10_000,
heartbeatIntervalMs: 30_000,
maxRetries: Infinity,
retryBaseMs: 1_000,
retryMaxMs: 30_000,
maxBufferSize: 512,
channel: "default",
};
/**
* Função utilitária — calcula o próximo delay de backoff exponencial
* com jitter para evitar thundering herd.
*/
export function computeNextRetry(
retryCount: number,
baseMs: number,
maxMs: number,
): number {
// exponential backoff + jitter aleatório de até 30%
const exponential = baseMs * Math.pow(2, retryCount);
const jitter = exponential * (Math.random() * 0.3);
return Math.min(exponential + jitter, maxMs);
}
/**
* Hook principal.
*
* @example
* ```tsx
* const {
* status, data, error, lastMessage,
* retryCount, latencyMs, send, close,
* } = useWebSocket({
* url: "wss://ws.octal.tec.br/v1/stream",
* channel: "dashboard",
* token: "Bearer eyJ...",
* });
* ```
*/
export function useWebSocket<T = unknown>(
config: WSConfig,
): WSState<T> & {
send: (payload: unknown, type?: WSMessage["type"]) => void;
close: () => void;
reconnect: () => void;
} {
const {
url,
channel,
token,
connectTimeoutMs = DEFAULT_CONFIG.connectTimeoutMsMs!,
heartbeatIntervalMs = DEFAULT_CONFIG.heartbeatIntervalMs!,
maxRetries = DEFAULT_CONFIG.maxRetries!,
retryBaseMs = DEFAULT_CONFIG.retryBaseMs!,
retryMaxMs = DEFAULT_CONFIG.retryMaxMs!,
maxBufferSize = DEFAULT_CONFIG.maxBufferSize!,
} = config;
// ── Estado React ──────────────────────────────────────────────
const [state, setState] = useState<WSState<T>>({
status: "idle",
data: [],
error: null,
lastMessage: null,
retryCount: 0,
latencyMs: null,
bufferSize: 0,
});
// ── Refs mutáveis (não disparam re-render) ────────────────────
const wsRef = useRef<WebSocket | null>(null);
const handlersRef = useRef<{
onMessage?: MessageHandler<T>;
onError?: ErrorHandler;
onStatus?: StatusHandler;
}>({});
const timeoutRef = useRef<number | null>(null);
const heartbeatRef= useRef<number | null>(null);
const retryRef = useRef<number | null>(null);
const seqRef = useRef(0); // sequência local para detectar gaps
const unsubscribe = useRef<() => void>(() => {}); // cleanup externo
const setStatus = useCallback((s: WSStatus) => {
setState(prev => ({ ...prev, status: s }));
handlersRef.current.onStatus?.(s, prev.status);
}, []);
const pushData = useCallback((msg: WSMessage<T>) => {
setState(prev => {
const next = [...prev.data, msg.payload] as T[];
// manter buffer acotovelado
if (next.length > maxBufferSize) next.splice(0, next.length - maxBufferSize);
return {
...prev,
data: next,
lastMessage: msg as WSMessage<T>,
bufferSize: next.length,
};
});
handlersRef.current.onMessage?.(msg as WSMessage<T>);
}, [maxBufferSize]);
// ── handlers do WebSocket nativo ───────────────────────────────
const handleOpen = useCallback(() => {
timeoutRef.current && clearTimeout(timeoutRef.current);
retryRef.current && clearTimeout(retryRef.current);
retryRef.current = null;
setStatus("connected");
// inscricao no canal
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(
JSON.stringify({
type: "__sub",
channel,
token: token ?? null,
}),
);
}
// heartbeat: ciclo de pings
heartbeatRef.current && clearInterval(heartbeatRef.current);
heartbeatRef.current = window.setInterval(() => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
const t0 = Date.now();
wsRef.current.send(
JSON.stringify({ type: "__ping", seq: seqRef.current++ }),
);
// medir latency quando o pong chegar
seqRef.current += 1;
setTimeout(() => {
setState(prev => ({ ...prev, latencyMs: Date.now() - t0 }));
}, 500);
}
}, heartbeatIntervalMs);
}, [channel, token, heartbeatIntervalMs, setStatus]);
const handleMessage = useCallback((raw: MessageEvent) => {
let msg: WSMessage;
try {
msg = JSON.parse(raw.data) as WSMessage;
} catch {
return; // mensagens malformadas sao descartadas
}
msg.seq = msg.seq ?? seqRef.current;
seqRef.current = msg.seq + 1;
if (msg.type === "data") pushData(msg);
// __pong, __error, __control sao tratados silenciosamente
}, [pushData]);
const handleError = useCallback((ev: Event) => {
setState(prev => ({
...prev,
error: new Error(`WS error [${url}] at ${new Date().toISOString()}`),
}));
handlersRef.current.onError?.(
new Error(`WS erro no broker ${url} — verifique TLS/ACL do proxy`),
);
}, [url]);
const handleClose = useCallback((_ev: CloseEvent) => {
timeoutRef.current && clearTimeout(timeoutRef.current);
heartbeatRef.current && clearInterval(heartbeatRef.current);
heartbeatRef.current = null;
wsRef.current = null;
const prevRetry = state.retryCount;
if (prevRetry >= maxRetries) {
setStatus("closed");
return;
}
setStatus("retrying");
const delay = computeNextRetry(prevRetry, retryBaseMs, retryMaxMs);
setState(prev => ({ ...prev, retryCount: prev.retryCount + 1 }));
retryRef.current = window.setTimeout(connect, delay);
}, [state.retryCount, maxRetries, retryBaseMs, retryMaxMs, setStatus]);
// ── função connect (idempotente) ────────────────────────────────
const connect = useCallback(() => {
if (
wsRef.current?.readyState === WebSocket.CONNECTING ||
wsRef.current?.readyState === WebSocket.OPEN
) {
return;
}
timeoutRef.current && clearTimeout(timeoutRef.current);
setStatus("connecting");
const headers: Record<string, string> = {};
if (token) headers["Authorization"] = token;
try {
// passar headers só funcionam em navegadores modernos (Chrome 101+)
wsRef.current = new WebSocket(url, [], token ? { headers } : undefined);
wsRef.current.onopen = handleOpen;
wsRef.current.onmessage = handleMessage;
wsRef.current.onerror = handleError;
wsRef.current.onclose = handleClose;
// timeout de conexão (ex: firewall barrando handshake WS)
timeoutRef.current = window.setTimeout(() => {
if (wsRef.current?.readyState !== WebSocket.OPEN) {
wsRef.current?.close();
}
}, connectTimeoutMs);
} catch (err) {
setState(prev => ({
...prev,
error: err instanceof Error ? err : new Error(String(err)),
status: "error",
}));
}
}, [
url,
channel,
token,
connectTimeoutMs,
handleOpen,
handleMessage,
handleError,
handleClose,
setStatus,
]);
// ── lifecycle ──────────────────────────────────────────────────
useEffect(() => {
connect();
return () => {
timeoutRef.current && clearTimeout(timeoutRef.current);
heartbeatRef.current && clearInterval(heartbeatRef.current);
retryRef.current && clearTimeout(retryRef.current);
unsubscribe.current = () => wsRef.current?.close();
unsubscribe.current();
};
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [url, channel, token]);
// ── API pública ────────────────────────────────────────────────
const send = useCallback(
(payload: unknown, _type: WSMessage["type"] = "data") => {
if (wsRef.current?.readyState !== WebSocket.OPEN) {
console.warn("[useWebSocket] Socket não está conectada — mensagem descartada");
return;
}
const envelope: WSMessage = {
id: generateId(),
channel,
type: _type,
timestamp: Date.now(),
payload,
seq: seqRef.current,
};
wsRef.current.send(JSON.stringify(envelope));
},
[channel],
);
const close = useCallback(() => {
timeoutRef.current && clearTimeout(timeoutRef.current);
heartbeatRef.current && clearInterval(heartbeatRef.current);
retryRef.current && clearTimeout(retryRef.current);
wsRef.current?.close(1000, "normal");
wsRef.current = null;
setStatus("closed");
unsubscribe.current();
}, [setStatus]);
const reconnect = useCallback(() => {
close();
setState(prev => ({ ...prev, retryCount: 0 }));
// pequena pausa antes de reconectar
setTimeout(connect, 500);
}, [close, connect]);
// ── registrar handlers externos ────────────────────────────────
useEffect(() => {
handlersRef.current = {
onMessage: config.onMessage as MessageHandler<T> | undefined,
onError: config.onError as ErrorHandler | undefined,
onStatus: config.onStatus as StatusHandler | undefined,
};
}, [config.onMessage, config.onError, config.onStatus]);
return {
...state,
send,
close,
reconnect,
} as WSState<T> & { send: (payload: unknown, type?: WSMessage["type"]) => void; close: () => void; reconnect: () => void };
}
export default useWebSocket;