Source: databases/keyvalue-indexed.js

/**
 * @namespace Databases-KeyValueIndexed
 * @memberof module:Databases
 * @description
 * Key-Value database that uses an index in order to provide fast queries.
 *
 * Key-value pairs are stored to the configured storage.
 * @example <caption>Specify a custom storage</caption>
 * import { createHelia } from 'helia'
 * import { createOrbitDB, KeyValueIndexed, IPFSBlockStorage } from 'orbitdb'
 *
 * const ipfs = createHelia()
 * const storage = await IPFSBlockStorage({ ipfs })
 * const orbitdb = await createOrbitDB({ ipfs })
 * const db = await orbitdb.open('my-kv', { Database: KeyValueIndexed({ storage }) })
 *
 * @augments module:Databases~Database
 * @augments module:Databases.Databases-KeyValue
 */
import KeyValue from './keyvalue.js'
import LevelStorage from '../storage/level.js'
import pathJoin from '../utils/path-join.js'

const valueEncoding = 'json'

/**
 * Defines an index for a KeyValue database.
 * @param {string} [directory] A location for storing the index-related data
 * @return {Index} A Index function.
 */
const Index = ({ directory } = {}) => async () => {
  const index = await LevelStorage({ path: directory, valueEncoding })
  const indexedEntries = await LevelStorage({ path: pathJoin(directory, '/_indexedEntries/'), valueEncoding })

  const update = async (log, entry) => {
    const keys = new Set()
    const toBeIndexed = new Set()
    const latest = entry.hash

    // Function to check if a hash is in the entry index
    const isIndexed = async (hash) => (await indexedEntries.get(hash)) === true
    const isNotIndexed = async (hash) => !(await isIndexed(hash))

    // Function to decide when the log traversal should be stopped
    const shoudStopTraverse = async (entry) => {
      // Go through the nexts of an entry and if any is not yet
      // indexed, add it to the list of entries-to-be-indexed
      for await (const hash of entry.next) {
        if (await isNotIndexed(hash)) {
          toBeIndexed.add(hash)
        }
      }
      // If the latest entry and all its nexts are indexed and to-be-indexed list is empty,
      // we don't have anything more to process, so return true to stop the traversal
      return await isIndexed(latest) && toBeIndexed.size === 0
    }

    // Traverse the log and stop when everything has been processed
    for await (const entry of log.traverse(null, shoudStopTraverse)) {
      const { hash, payload } = entry
      // If an entry is not yet indexed, process it
      if (await isNotIndexed(hash)) {
        const { op, key } = payload
        if (op === 'PUT' && !keys.has(key)) {
          keys.add(key)
          await index.put(key, entry)
          await indexedEntries.put(hash, true)
        } else if (op === 'DEL' && !keys.has(key)) {
          keys.add(key)
          await index.del(key)
          await indexedEntries.put(hash, true)
        }
        // Remove the entry (hash) from the list of to-be-indexed entries
        toBeIndexed.delete(hash)
      }
    }
  }

  /**
   * Closes the index and its storages.
   */
  const close = async () => {
    await index.close()
    await indexedEntries.close()
  }

  /**
   * Drops all records from the index and its storages.
   */
  const drop = async () => {
    await index.clear()
    await indexedEntries.clear()
  }

  return {
    get: index.get,
    iterator: index.iterator,
    update,
    close,
    drop
  }
}

/**
 * Defines a KeyValueIndexed database.
 * @param {module:Storage} [storage=LevelStorage] A compatible storage where
 * the key/value pairs are indexed.
 * @return {module:Databases.Databases-KeyValueIndexed} A KeyValueIndexed
 * function.
 * @memberof module:Databases
 */
const KeyValueIndexed = () => async ({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate }) => {
  // Set up the directory for an index
  directory = pathJoin(directory || './orbitdb', `./${address}/_index/`)

  // Set up the index
  const index = await Index({ directory })()

  // Set up the underlying KeyValue database
  const keyValueStore = await KeyValue()({ ipfs, identity, address, name, access, directory, meta, headsStorage, entryStorage, indexStorage, referencesCount, syncAutomatically, onUpdate: index.update })

  /**
   * Gets a value from the store by key.
   * @function
   * @param {string} key The key of the value to get.
   * @return {*} The value corresponding to key or null.
   * @memberof module:Databases.Databases-KeyValueIndexed
   * @instance
   */
  const get = async (key) => {
    const entry = await index.get(key)
    if (entry) {
      return entry.payload.value
    }
  }

  /**
   * Iterates over keyvalue pairs.
   * @function
   * @param {Object} [filters={}] Various filters to apply to the iterator.
   * @param {string} [filters.amount=-1] The number of results to fetch.
   * @yields [string, string, string] The next key/value as key/value/hash.
   * @memberof module:Databases.Databases-KeyValueIndexed
   * @instance
   */
  const iterator = async function * ({ amount } = {}) {
    const it = index.iterator({ amount, reverse: true })
    for await (const record of it) {
      // 'index' is a LevelStorage that returns a [key, value] pair
      const entry = record[1]
      const { key, value } = entry.payload
      const hash = entry.hash
      yield { key, value, hash }
    }
  }

  /**
   * Closes the index and underlying storage.
   */
  const close = async () => {
    await keyValueStore.close()
    await index.close()
  }

  /**
   * Drops all records from the index and underlying storage.
   */
  const drop = async () => {
    await keyValueStore.drop()
    await index.drop()
  }

  return {
    ...keyValueStore,
    get,
    iterator,
    close,
    drop
  }
}

KeyValueIndexed.type = 'keyvalue'

export default KeyValueIndexed