Realtime
Datafuse provides a globally distributed cluster of Realtime servers that enable the following functionality:
- Broadcast: Send ephemeral messages from client to clients with low latency.
- Presence: Track and synchronize shared state between clients.
- Postgres Changes: Listen to Postgres database changes and send them to authorized clients.
Realtime API
By default Realtime is disabled on your database. Let's turn on Realtime for a todos
table.
<Tabs scrollable size="small" type="underlined" defaultActiveId="dashboard"
- Go to the Database page in the Dashboard.
- Click on Publications in the sidebar.
- Control which database events are sent by toggling Insert, Update, and Delete.
- Control which tables broadcast changes by selecting Source and toggling each table.
alter
publication datafuse_realtime add table todos;
From the client, we can listen to any new data that is inserted into the todos
table:
<Tabs scrollable size="small" type="underlined" defaultActiveId="js" queryGroup="language"
// Initialize the JS client
import { createClient } from '@datafuse/datafuse-js'
const datafuse = createClient(SUPABASE_URL, SUPABASE_ANON_KEY)
// Create a function to handle inserts
const handleInserts = (payload) => {
console.log('Change received!', payload)
}
// Listen to inserts
datafuse
.channel('todos')
.on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'todos' }, handleInserts)
.subscribe()
import 'package:datafuse_flutter/datafuse_flutter.dart';
void main() async {
// Initialize the Flutter client
Datafuse.initialize(
url: 'https://<project>.datafuse.co',
anonKey: '<your-anon-key>',
realtimeClientOptions: const RealtimeClientOptions(
eventsPerSecond: 2,
),
);
runApp(const MyApp());
}
final datafuse = Datafuse.instance.client;
void handleInserts(payload) {
print('Change received! $payload');
}
// Listen to inserts
datafuse
.channel('todos')
.onPostgresChanges(
event: PostgresChangeEvent.insert,
schema: 'public',
table: 'todos',
callback: handleInserts)
.subscribe();
// Initialize the Swift client
import Datafuse
let datafuse = DatafuseClient(datafuseURL: SUPABASE_URL, datafuseKey: SUPABASE_ANON_KEY)
// Create channel
let channel = await datafuse.realtime.channel("todos")
// Create the observations before subscribing
let insertions = await channel.postgresChanges(
InsertAction.self,
schema: "public",
table: "todos"
)
await channel.subscribe()
for await insertion in insertions {
let todo = try insertion.decoded(as: Todo.self)
print("Todo inserted", todo)
}
Use subscribe() to listen to database changes.
The Realtime API works through PostgreSQL's replication functionality. Postgres sends database changes to a publication
called datafuse_realtime
, and by managing this publication you can control which data is broadcast.
Examples
Resources
Find the source code and documentation in the Datafuse GitHub repository.