Options
All
  • Public
  • Public/Protected
  • All
Menu

Realtime Client

Listens to changes in a PostgreSQL Database and via websockets.

This is for usage with Wesbitty Realtime server.

Usage

Creating a Socket connection

You can set up one connection to be used across the whole app.

import { RealtimeClient } from '@wesjetpkg/realtime'

var client = new RealtimeClient(process.env.REALTIME_URL)
client.connect()

REALTIME_URL is 'ws://localhost:4000/socket' when developing locally and 'wss://<project_ref>.wesbitty.com/realtime/v1' when connecting to your Wesbitty project.

You can pass in your JWT If you have enabled JWT authorization in Wesbitty Realtime server.

import { RealtimeClient } from '@wesjetpkg/realtime'

var client = new RealtimeClient(process.env.REALTIME_URL, { params: { apikey: 'token123' }})
client.connect()

See Realtime: Websocket Connection Authorization for more information.

Socket Hooks

client.onOpen(() => console.log('Socket opened.'))
client.onClose(() => console.log('Socket closed.'))
client.onError((e) => console.log('Socket error', e.message))

Subscribing to events

You can listen to INSERT, UPDATE, DELETE, or all * events.

You can subscribe to events on the whole database, schema, table, or individual columns using channel(). Channels are multiplexed over the Socket connection.

To join a channel, you must provide the topic, where a topic is either:

  • realtime - entire database
  • realtime:{schema} - where {schema} is the Postgres Schema
  • realtime:{schema}:{table} - where {table} is the Postgres table name
  • realtime:{schema}:{table}:{col}=eq.{val} - where {col} is the column name, and {val} is the value which you want to match

Examples

// Listen to events on the entire database.
var databaseChanges = client.channel('realtime:*')
databaseChanges.on('*', (e) => console.log(e))
databaseChanges.on('INSERT', (e) => console.log(e))
databaseChanges.on('UPDATE', (e) => console.log(e))
databaseChanges.on('DELETE', (e) => console.log(e))
databaseChanges.subscribe()

// Listen to events on a schema, using the format `realtime:{SCHEMA}`
var publicSchema = client.channel('realtime:public')
publicSchema.on('*', (e) => console.log(e))
publicSchema.on('INSERT', (e) => console.log(e))
publicSchema.on('UPDATE', (e) => console.log(e))
publicSchema.on('DELETE', (e) => console.log(e))
publicSchema.subscribe()

// Listen to events on a table, using the format `realtime:{SCHEMA}:{TABLE}`
var usersTable = client.channel('realtime:public:users')
usersTable.on('*', (e) => console.log(e))
usersTable.on('INSERT', (e) => console.log(e))
usersTable.on('UPDATE', (e) => console.log(e))
usersTable.on('DELETE', (e) => console.log(e))
usersTable.subscribe()

// Listen to events on a row, using the format `realtime:{SCHEMA}:{TABLE}:{COL}=eq.{VAL}`
var rowChanges = client.channel('realtime:public:users:id=eq.1')
rowChanges.on('*', (e) => console.log(e))
rowChanges.on('INSERT', (e) => console.log(e))
rowChanges.on('UPDATE', (e) => console.log(e))
rowChanges.on('DELETE', (e) => console.log(e))
rowChanges.subscribe()

Removing a subscription

You can unsubscribe from a topic using channel.unsubscribe().

Disconnect the socket

Call disconnect() on the socket:

let { error, data } = await client.disconnect()

Duplicate Join Subscriptions

While the client may join any number of topics on any number of channels, the client may only hold a single subscription for each unique topic at any given time. When attempting to create a duplicate subscription, the server will close the existing channel, log a warning, and spawn a new channel for the topic. The client will have their channel.onClose callbacks fired for the existing channel, and the new channel join will have its receive hooks processed as normal.

Channel Hooks

channel.onError( () => console.log("there was an error!") )
channel.onClose( () => console.log("the channel has gone away gracefully") )
  • onError hooks are invoked if the socket connection drops, or the channel crashes on the server. In either case, a channel rejoin is attempted automatically in an exponential backoff manner.
  • onClose hooks are invoked only in two cases. 1) the channel explicitly closed on the server, or 2). The client explicitly closed, by calling channel.unsubscribe()

Subscription Hooks


publicSchema
  .subscribe()
  .receive('ok', () => console.log('Connected.'))
  .receive('error', () => console.log('Failed.'))
  .receive('timeout', () => console.log('Timed out, retrying.'))

Event Responses

Events are returned in the following format.

type Response = {
  // the change timestamp. eg: "2020-10-13T10:09:22Z".
  commit_timestamp: string 

  // the database schema. eg: "public".
  schema: string 
  
  // the database table. eg: "users".
  table: string 
  
  // the event type.
  type: INSERT | UPDATE | DELETE 
  
  // all the columns for this table. See "column" type below.
  columns: column[] 
  
  // the new values. eg: { "id": "9", "age": "12" }.
  record: object 

  // the previous values. eg: { "id": "9", "age": "11" }. Only works if the table has `REPLICATION FULL`.
  old_record: object 

  // any change errors.
  errors: null | string[]
}

type column = {
  // any special flags for the column. eg: ["key"]
  flags: string[] 
  
  // the column name. eg: "user_id"
  name: string 
  
  // the column type. eg: "uuid"
  type: string 
  
  // the type modifier. eg: 4294967295
  type_modifier: number 
}

Made with love by Wesbitty, Inc.

Index

Type aliases

BaseValue

BaseValue: null | string | number | boolean

ChannelParams

ChannelParams: { selfBroadcast?: undefined | false | true }

Type declaration

  • [key: string]: any
  • Optional selfBroadcast?: undefined | false | true

Columns

Columns: { flags?: string[]; name: string; type: string; type_modifier?: undefined | number }[]

Message

Message: { event: string; payload: any; ref: string; topic: string }

Type declaration

  • event: string
  • payload: any
  • ref: string
  • topic: string

Options

Options: { decode?: Function; encode?: Function; headers?: undefined | {}; heartbeatIntervalMs?: undefined | number; logger?: Function; longpollerTimeout?: undefined | number; params?: undefined | {}; reconnectAfterMs?: Function; timeout?: undefined | number; transport?: WebSocket }

Type declaration

  • Optional decode?: Function
  • Optional encode?: Function
  • Optional headers?: undefined | {}
  • Optional heartbeatIntervalMs?: undefined | number
  • Optional logger?: Function
  • Optional longpollerTimeout?: undefined | number
  • Optional params?: undefined | {}
  • Optional reconnectAfterMs?: Function
  • Optional timeout?: undefined | number
  • Optional transport?: WebSocket

Record

Record: {}

Type declaration

RecordValue

RecordValue: BaseValue | BaseValue[]

Variables

Const DEFAULT_TIMEOUT

DEFAULT_TIMEOUT: 10000 = 10000

Const VSN

VSN: string = "0.0.1"

Const WS_CLOSE_NORMAL

WS_CLOSE_NORMAL: 1000 = 1000

Const version

version: "0.0.0" = "0.0.0"

Functions

Const convertCell

  • If the value of the cell is null, returns null. Otherwise converts the string value to the correct type.

    example

    convertCell('bool', 't') //=> true

    example

    convertCell('int8', '10') //=> 10

    example

    convertCell('_int4', '{1,2,3,4}') //=> [1,2,3,4]

    Parameters

    • type: string

      A postgres column type

    • value: RecordValue

    Returns RecordValue

Const convertChangeData

  • convertChangeData(columns: Columns, record: Record, options?: { skipTypes?: string[] }): Record
  • Takes an array of columns and an object of string values then converts each string value to its mapped type.

    example

    convertChangeData([{name: 'first_name', type: 'text'}, {name: 'age', type: 'int4'}], {first_name: 'Paul', age:'33'}, {}) //=>{ first_name: 'Paul', age: 33 }

    Parameters

    • columns: Columns
    • record: Record
    • Default value options: { skipTypes?: string[] } = {}

      The map of various options that can be applied to the mapper

      • Optional skipTypes?: string[]

    Returns Record

Const convertColumn

  • Converts the value of an individual column.

    example

    convertColumn('age', [{name: 'first_name', type: 'text'}, {name: 'age', type: 'int4'}], {first_name: 'Paul', age: '33'}, []) //=> 33

    example

    convertColumn('age', [{name: 'first_name', type: 'text'}, {name: 'age', type: 'int4'}], {first_name: 'Paul', age: '33'}, ['int4']) //=> "33"

    Parameters

    • columnName: string

      The column that you want to convert

    • columns: Columns
    • record: Record

      The map of string values

    • skipTypes: string[]

      An array of types that should not be converted

    Returns RecordValue

    Useless information

Const noop

Const toArray

  • Converts a Postgres Array into a native JS array

    example

    toArray('{}', 'int4') //=> []

    example

    toArray('{"[2021-01-01,2021-12-31)","(2021-01-01,2021-12-32]"}', 'daterange') //=> ['[2021-01-01,2021-12-31)', '(2021-01-01,2021-12-32]']

    example

    toArray([1,2,3,4], 'int4') //=> [1,2,3,4]

    Parameters

    Returns RecordValue

Const toBoolean

Const toJson

Const toNumber

Const toTimestampString

Object literals

Const DEFAULT_HEADERS

DEFAULT_HEADERS: object

X-Client-Info

X-Client-Info: string = `realtime/${version}`

Legend

  • Constructor
  • Property
  • Method
  • Private method

Generated using TypeDoc