Source: database.js

/**
 * @module Database
 * @description
 * Database is the base class for OrbitDB data stores and handles all lower
 * level add operations and database sync-ing using IPFS.
 */
import { EventEmitter } from 'events'
import PQueue from 'p-queue'
import Sync from './sync.js'
import { Log, Entry } from './oplog/index.js'
import { ComposedStorage, LRUStorage, IPFSBlockStorage, LevelStorage } from './storage/index.js'
import pathJoin from './utils/path-join.js'

const defaultReferencesCount = 16
const defaultCacheSize = 1000

/**
 * Creates an instance of Database.
 * @function
 * @param {Object} params One or more parameters for configuring Database.
 * @param {IPFS} params.ipfs An IPFS instance.
 * @param {Identity} [params.identity] An Identity instance.
 * @param {string} [params.address] The address of the database.
 * @param {string} [params.name] The name of the database.
 * @param {module:AccessControllers} [params.access] An AccessController
 * instance.
 * @param {string} [params.directory] A location for storing Database-related
 * data. Defaults to ./orbitdb/[params.address].
 * @param {*} [params.meta={}] The database's metadata.
 * @param {module:Storage} [params.headsStorage] A compatible storage
 * instance for storing log heads. Defaults to ComposedStorage.
 * @param {module:Storage} [params.entryStorage] A compatible storage instance
 * for storing log entries. Defaults to ComposedStorage.
 * @param {module:Storage} [params.indexStorage] A compatible storage
 * instance for storing an index of log entries. Defaults to ComposedStorage.
 * @param {number} [params.referencesCount=16]  The maximum distance between
 * references to other entries.
 * @param {boolean} [params.syncAutomatically=false] If true, sync databases
 * automatically. Otherwise, false.
 * @param {function} [params.onUpdate] A function callback. Fired when an
 * entry is added to the oplog.
 * @return {module:Databases~Database} An instance of Database.
 * @instance
 */
const Database = async ({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate }) => {
  /**
   * @namespace module:Databases~Database
   * @description The instance returned by {@link module:Database~Database}.
   */

  /**
   * Event fired when an update occurs.
   * @event module:Databases~Database#update
   * @param {module:Entry} entry An entry.
   * @example
   * database.events.on('update', (entry) => ...)
   */

  /**
   * Event fired when a close occurs.
   * @event module:Databases~Database#close
   * @example
   * database.events.on('close', () => ...)
   */

  /**
   * Event fired when a drop occurs.
   * @event module:Databases~Database#drop
   * @example
   * database.events.on('drop', () => ...)
   */

  /** Events inherited from Sync */

  /**
   * Event fired when when a peer has connected to the database.
   * @event module:Databases~Database#join
   * @param {PeerID} peerId PeerID of the peer who connected
   * @param {Entry[]} heads An array of Log entries
   * @example
   * database.events.on('join', (peerID, heads) => ...)
   */

  /**
   * Event fired when a peer has disconnected from the database.
   * @event module:Databases~Database#leave
   * @param {PeerID} peerId PeerID of the peer who disconnected
   * @example
   * database.events.on('leave', (peerID) => ...)
   */

  directory = pathJoin(directory || './orbitdb', `./${address}/`)
  meta = meta || {}
  referencesCount = Number(referencesCount) > -1 ? referencesCount : defaultReferencesCount

  entryStorage = entryStorage || await ComposedStorage(
    await LRUStorage({ size: defaultCacheSize }),
    await IPFSBlockStorage({ ipfs, pin: true })
  )

  headsStorage = headsStorage || await ComposedStorage(
    await LRUStorage({ size: defaultCacheSize }),
    await LevelStorage({ path: pathJoin(directory, '/log/_heads/') })
  )

  indexStorage = indexStorage || await ComposedStorage(
    await LRUStorage({ size: defaultCacheSize }),
    await LevelStorage({ path: pathJoin(directory, '/log/_index/') })
  )

  const log = await Log(identity, { logId: address, access, entryStorage, headsStorage, indexStorage })

  const events = new EventEmitter()

  const queue = new PQueue({ concurrency: 1 })

  /**
   * Adds an operation to the oplog.
   * @function addOperation
   * @param {*} op Some operation to add to the oplog.
   * @return {string} The hash of the operation.
   * @memberof module:Databases~Database
   * @instance
   * @async
   */
  const addOperation = async (op) => {
    const task = async () => {
      const entry = await log.append(op, { referencesCount })
      await sync.add(entry)
      if (onUpdate) {
        await onUpdate(log, entry)
      }
      events.emit('update', entry)
      return entry.hash
    }
    const hash = await queue.add(task)
    await queue.onIdle()
    return hash
  }

  const applyOperation = async (bytes) => {
    const task = async () => {
      const entry = await Entry.decode(bytes)
      if (entry) {
        const updated = await log.joinEntry(entry)
        if (updated) {
          if (onUpdate) {
            await onUpdate(log, entry)
          }
          events.emit('update', entry)
        }
      }
    }
    await queue.add(task)
  }

  /**
   * Closes the database, stopping sync and closing the oplog.
   * @memberof module:Databases~Database
   * @instance
   * @async
   */
  const close = async () => {
    await sync.stop()
    await queue.onIdle()
    await log.close()
    if (access && access.close) {
      await access.close()
    }
    events.emit('close')
  }

  /**
   * Drops the database, clearing the oplog.
   * @memberof module:Databases~Database
   * @instance
   * @async
   */
  const drop = async () => {
    await queue.onIdle()
    await log.clear()
    if (access && access.drop) {
      await access.drop()
    }
    events.emit('drop')
  }

  const sync = await Sync({ ipfs, log, events, onSynced: applyOperation, start: syncAutomatically })

  return {
    /**
     * The address of the database.
     * @†ype string
     * @memberof module:Databases~Database
     * @instance
     */
    address,
    /**
     * The name of the database.
     * @†ype string
     * @memberof module:Databases~Database
     * @instance
     */
    name,
    identity,
    meta,
    close,
    drop,
    addOperation,
    /**
     * The underlying [operations log]{@link module:Log~Log} of the database.
     * @†ype {module:Log~Log}
     * @memberof module:Databases~Database
     * @instance
     */
    log,
    /**
     * A [sync]{@link module:Sync~Sync} instance of the database.
     * @†ype {module:Sync~Sync}
     * @memberof module:Databases~Database
     * @instance
     */
    sync,
    /**
     * Set of currently connected peers for this Database instance.
     * @†ype Set
     * @memberof module:Databases~Database
     * @instance
     */
    peers: sync.peers,
    /**
     * Event emitter that emits Database changes. See Events section for details.
     * @†ype EventEmitter
     * @memberof module:Databases~Database
     * @instance
     */
    events,
    /**
     * The [access controller]{@link module:AccessControllers} instance of the database.
     * @memberof module:Databases~Database
     * @instance
     */
    access
  }
}

export default Database