import { pipe } from 'it-pipe'
import PQueue from 'p-queue'
import { EventEmitter } from 'events'
import { TimeoutController } from 'timeout-abort-controller'
import pathJoin from './utils/path-join.js'
const DefaultTimeout = 30000 // 30 seconds
/**
* @module Sync
* @description
* The Sync Protocol for OrbitDB synchronizes the database operations {@link module:Log} between multiple peers.
*
* The Sync Protocol sends and receives heads between multiple peers,
* both when opening a database and when a database is updated, ie.
* new entries are appended to the log.
*
* When Sync is started, a peer subscribes to a pubsub topic of the log's id.
* Upon subscribing to the topic, peers already connected to the topic receive
* the subscription message and "dial" the subscribing peer using a libp2p
* custom protocol. Once connected to the subscribing peer on a direct
* peer-to-peer connection, the dialing peer and the subscribing peer exchange
* the heads of the Log each peer currently has. Once completed, the peers have
* the same "local state".
*
* Once the initial sync has completed, peers notify one another of updates to
* the log, ie. updates to the database, using the initially opened pubsub
* topic subscription. A peer with new heads broadcasts changes to other peers
* by publishing the updated heads to the pubsub topic. Peers subscribed to the
* same topic will then receive the update and will update their log's state,
* the heads, accordingly.
*
* The Sync Protocol is eventually consistent. It guarantees that once all
* messages have been sent and received, peers will observe the same log state
* and values. The Sync Protocol does not guarantee the order in which messages
* are received or even that a message is recieved at all, nor any timing on
* when messages are received.
*
* @example
* // Using defaults
* const sync = await Sync({ ipfs, log, onSynced: (peerId, heads) => ... })
*
* @example
* // Using all parameters
* const sync = await Sync({ ipfs, log, events, onSynced: (peerId, heads) => ..., start: false })
* sync.events.on('join', (peerId, heads) => ...)
* sync.events.on('leave', (peerId) => ...)
* sync.events.on('error', (err) => ...)
* await sync.start()
*/
/**
* Creates a Sync instance for sychronizing logs between multiple peers.
*
* @function
* @param {Object} params One or more parameters for configuring Sync.
* @param {IPFS} params.ipfs An IPFS instance.
* @param {Log} params.log The log instance to sync.
* @param {EventEmitter} [params.events] An event emitter to use. Events
* emitted are 'join', 'leave' and 'error'. If the parameter is not provided,
* an EventEmitter will be created.
* @param {onSynced} [params.onSynced] A callback function that is called after
* the peer has received heads from another peer.
* @param {Boolean} [params.start] True if sync should start automatically,
* false otherwise. Defaults to true.
* @return {module:Sync~Sync} sync An instance of the Sync Protocol.
* @memberof module:Sync
* @instance
*/
const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
/**
* @namespace module:Sync~Sync
* @description The instance returned by {@link module:Sync}.
*/
/**
* Callback function when new heads have been received from other peers.
* @callback module:Sync~Sync#onSynced
* @param {PeerID} peerId PeerID of the peer who we received heads from
* @param {Entry[]} heads An array of Log entries
*/
/**
* Event fired when when a peer has connected and the exchange of
* heads has been completed.
* @event module:Sync~Sync#join
* @param {PeerID} peerId PeerID of the peer who we received heads from
* @param {Entry[]} heads An array of Log entries
* @example
* sync.events.on('join', (peerID, heads) => ...)
*/
/**
* Event fired when a peer leaves the sync protocol.
* @event module:Sync~Sync#leave
* @param {PeerID} peerId PeerID of the peer who left
* @example
* sync.events.on('leave', (peerID) => ...)
*/
/**
* Event fired when an error occurs.
* @event module:Sync~Sync#error
* @param {Error} error The error that occured
* @example
* sync.events.on('error', (error) => ...)
*/
if (!ipfs) throw new Error('An instance of ipfs is required.')
if (!log) throw new Error('An instance of log is required.')
const libp2p = ipfs.libp2p
const pubsub = ipfs.libp2p.services.pubsub
const address = log.id
const headsSyncAddress = pathJoin('/orbitdb/heads/', address)
const queue = new PQueue({ concurrency: 1 })
/**
* Set of currently connected peers for the log for this Sync instance.
* @name peers
* @†ype Set
* @memberof module:Sync~Sync
* @instance
*/
const peers = new Set()
/**
* Event emitter that emits Sync changes. See Events section for details.
* @†ype EventEmitter
* @memberof module:Sync~Sync
* @instance
*/
events = events || new EventEmitter()
timeout = timeout || DefaultTimeout
let started = false
const onPeerJoined = async (peerId) => {
const heads = await log.heads()
events.emit('join', peerId, heads)
}
const sendHeads = (source) => {
return (async function * () {
const heads = await log.heads()
for await (const { bytes } of heads) {
yield bytes
}
})()
}
const receiveHeads = (peerId) => async (source) => {
for await (const value of source) {
const headBytes = value.subarray()
if (headBytes && onSynced) {
await onSynced(headBytes)
}
}
if (started) {
await onPeerJoined(peerId)
}
}
const handleReceiveHeads = async ({ connection, stream }) => {
const peerId = String(connection.remotePeer)
try {
peers.add(peerId)
await pipe(stream, receiveHeads(peerId), sendHeads, stream)
} catch (e) {
peers.delete(peerId)
events.emit('error', e)
}
}
const handlePeerSubscribed = async (event) => {
const task = async () => {
const { peerId: remotePeer, subscriptions } = event.detail
const peerId = String(remotePeer)
const subscription = subscriptions.find(e => e.topic === address)
if (!subscription) {
return
}
if (subscription.subscribe) {
if (peers.has(peerId)) {
return
}
const timeoutController = new TimeoutController(timeout)
const { signal } = timeoutController
try {
peers.add(peerId)
const stream = await libp2p.dialProtocol(remotePeer, headsSyncAddress, { signal })
await pipe(sendHeads, stream, receiveHeads(peerId))
} catch (e) {
console.error(e)
peers.delete(peerId)
if (e.code === 'ERR_UNSUPPORTED_PROTOCOL') {
// Skip peer, they don't have this database currently
} else {
events.emit('error', e)
}
} finally {
if (timeoutController) {
timeoutController.clear()
}
}
} else {
peers.delete(peerId)
events.emit('leave', peerId)
}
}
queue.add(task)
}
const handleUpdateMessage = async message => {
const { topic, data } = message.detail
const task = async () => {
try {
if (data && onSynced) {
await onSynced(data)
}
} catch (e) {
events.emit('error', e)
}
}
if (topic === address) {
queue.add(task)
}
}
/**
* Add a log entry to the Sync Protocol to be sent to peers.
* @function add
* @param {Entry} entry Log entry
* @memberof module:Sync~Sync
* @instance
*/
const add = async (entry) => {
if (started) {
await pubsub.publish(address, entry.bytes)
}
}
/**
* Stop the Sync Protocol.
* @function stop
* @memberof module:Sync~Sync
* @instance
*/
const stopSync = async () => {
if (started) {
started = false
await queue.onIdle()
pubsub.removeEventListener('subscription-change', handlePeerSubscribed)
pubsub.removeEventListener('message', handleUpdateMessage)
await libp2p.unhandle(headsSyncAddress)
await pubsub.unsubscribe(address)
peers.clear()
}
}
/**
* Start the Sync Protocol.
* @function start
* @memberof module:Sync~Sync
* @instance
*/
const startSync = async () => {
if (!started) {
// Exchange head entries with peers when connected
await libp2p.handle(headsSyncAddress, handleReceiveHeads)
pubsub.addEventListener('subscription-change', handlePeerSubscribed)
pubsub.addEventListener('message', handleUpdateMessage)
// Subscribe to the pubsub channel for this database through which updates are sent
await pubsub.subscribe(address)
started = true
}
}
// Start Sync automatically
if (start !== false) {
await startSync()
}
return {
add,
stop: stopSync,
start: startSync,
events,
peers
}
}
export { Sync as default }