Source: oplog/log.js

  1. /**
  2. * @module Log
  3. * @description
  4. * Log is a verifiable, append-only log CRDT.
  5. *
  6. * Implemented as a Merkle-CRDT as per the paper
  7. * ["Merkle-CRDTs: Merkle-DAGs meet CRDTs"]{@link https://arxiv.org/abs/2004.00107}
  8. */
  9. import LRU from 'lru'
  10. import PQueue from 'p-queue'
  11. import Entry from './entry.js'
  12. import Clock, { tickClock } from './clock.js'
  13. import Heads from './heads.js'
  14. import ConflictResolution from './conflict-resolution.js'
  15. import MemoryStorage from '../storage/memory.js'
  16. const { LastWriteWins, NoZeroes } = ConflictResolution
  17. const randomId = () => new Date().getTime().toString()
  18. const maxClockTimeReducer = (res, acc) => Math.max(res, acc.clock.time)
  19. // Default storage for storing the Log and its entries. Default: Memory. Options: Memory, LRU, IPFS.
  20. const DefaultStorage = MemoryStorage
  21. // Default AccessController for the Log.
  22. // Default policy is that anyone can write to the Log.
  23. // Signature of an entry will always be verified regardless of AccessController policy.
  24. // Any object that implements the function `canAppend()` that returns true|false can be
  25. // used as an AccessController.
  26. const DefaultAccessController = async () => {
  27. // An AccessController may do any async initialization stuff here...
  28. return {
  29. canAppend: async (entry) => true
  30. }
  31. }
  32. /**
  33. * Create a new Log instance
  34. * @function
  35. * @param {IPFS} ipfs An IPFS instance
  36. * @param {Object} identity Identity.
  37. * @param {Object} options
  38. * @param {string} options.logId ID of the log
  39. * @param {Array<Entry>} options.logHeads Set the heads of the log
  40. * @param {Object} options.access AccessController (./default-access-controller)
  41. * @param {Array<Entry>} options.entries An Array of Entries from which to create the log
  42. * @param {module:Storage} [options.entryStorage] A compatible storage instance
  43. * for storing log entries. Defaults to MemoryStorage.
  44. * @param {module:Storage} [options.headsStorage] A compatible storage
  45. * instance for storing log heads. Defaults to MemoryStorage.
  46. * @param {module:Storage} [options.indexStorage] A compatible storage
  47. * instance for storing an index of log entries. Defaults to MemoryStorage.
  48. * @param {Function} options.sortFn The sort function - by default LastWriteWins
  49. * @return {module:Log~Log} sync An instance of Log
  50. * @memberof module:Log
  51. * @instance
  52. */
  53. const Log = async (identity, { logId, logHeads, access, entryStorage, headsStorage, indexStorage, sortFn } = {}) => {
  54. /**
  55. * @namespace Log
  56. * @description The instance returned by {@link module:Log}
  57. */
  58. if (identity == null) {
  59. throw new Error('Identity is required')
  60. }
  61. if (logHeads != null && !Array.isArray(logHeads)) {
  62. throw new Error('\'logHeads\' argument must be an array')
  63. }
  64. // Set Log's id
  65. const id = logId || randomId()
  66. // Access Controller
  67. access = access || await DefaultAccessController()
  68. // Oplog entry storage
  69. const _entries = entryStorage || await DefaultStorage()
  70. // Entry index for keeping track which entries are already in the log
  71. const _index = indexStorage || await DefaultStorage()
  72. // Heads storage
  73. headsStorage = headsStorage || await DefaultStorage()
  74. // Add heads to the state storage, ie. init the log state
  75. const _heads = await Heads({ storage: headsStorage, heads: logHeads })
  76. // Conflict-resolution sorting function
  77. sortFn = NoZeroes(sortFn || LastWriteWins)
  78. // Internal queues for processing appends and joins in their call-order
  79. const appendQueue = new PQueue({ concurrency: 1 })
  80. const joinQueue = new PQueue({ concurrency: 1 })
  81. /**
  82. * Returns the clock of the log.
  83. * @return {module:Clock}
  84. * @memberof module:Log~Log
  85. * @instance
  86. */
  87. const clock = async () => {
  88. // Find the latest clock from the heads
  89. const maxTime = Math.max(0, (await heads()).reduce(maxClockTimeReducer, 0))
  90. return Clock(identity.publicKey, maxTime)
  91. }
  92. /**
  93. * Returns the current heads of the log
  94. *
  95. * @return {Array<module:Log~Entry>}
  96. * @memberof module:Log~Log
  97. * @instance
  98. */
  99. const heads = async () => {
  100. const res = await _heads.all()
  101. return res.sort(sortFn).reverse()
  102. }
  103. /**
  104. * Returns all entries in the log
  105. *
  106. * @return {Array<module:Log~Entry>}
  107. * @memberof module:Log~Log
  108. * @instance
  109. */
  110. const values = async () => {
  111. const values = []
  112. for await (const entry of traverse()) {
  113. values.unshift(entry)
  114. }
  115. return values
  116. }
  117. /**
  118. * Retrieve an entry
  119. *
  120. * @param {string} hash The hash of the entry to retrieve
  121. * @return {module:Log~Entry}
  122. * @memberof module:Log~Log
  123. * @instance
  124. */
  125. const get = async (hash) => {
  126. const bytes = await _entries.get(hash)
  127. if (bytes) {
  128. const entry = await Entry.decode(bytes)
  129. return entry
  130. }
  131. }
  132. const has = async (hash) => {
  133. const entry = await _index.get(hash)
  134. return entry != null
  135. }
  136. /**
  137. * Append an new entry to the log
  138. *
  139. * @param {data} data Payload to add to the entry
  140. * @param {Object} options
  141. * @param {number} options.referencesCount TODO
  142. * @return {module:Log~Entry} Entry that was appended
  143. * @memberof module:Log~Log
  144. * @instance
  145. */
  146. const append = async (data, options = { referencesCount: 0 }) => {
  147. const task = async () => {
  148. // 1. Prepare entry
  149. // 2. Authorize entry
  150. // 3. Store entry
  151. // 4. return Entry
  152. // Get current heads of the log
  153. const heads_ = await heads()
  154. // Create the next pointers from heads
  155. const nexts = heads_.map(entry => entry.hash)
  156. // Get references (pointers) to multiple entries in the past
  157. // (skips the heads which are covered by the next field)
  158. const refs = await getReferences(heads_, options.referencesCount + heads_.length)
  159. // Create the entry
  160. const entry = await Entry.create(
  161. identity,
  162. id,
  163. data,
  164. tickClock(await clock()),
  165. nexts,
  166. refs
  167. )
  168. // Authorize the entry
  169. const canAppend = await access.canAppend(entry)
  170. if (!canAppend) {
  171. throw new Error(`Could not append entry:\nKey "${identity.hash}" is not allowed to write to the log`)
  172. }
  173. // The appended entry is now the latest head
  174. await _heads.set([entry])
  175. // Add entry to the entry storage
  176. await _entries.put(entry.hash, entry.bytes)
  177. // Add entry to the entry index
  178. await _index.put(entry.hash, true)
  179. // Return the appended entry
  180. return entry
  181. }
  182. return appendQueue.add(task)
  183. }
  184. /**
  185. * Join two logs.
  186. *
  187. * Joins another log into this one.
  188. *
  189. * @param {module:Log~Log} log Log to join with this Log
  190. *
  191. * @example
  192. *
  193. * await log1.join(log2)
  194. *
  195. * @memberof module:Log~Log
  196. * @instance
  197. */
  198. const join = async (log) => {
  199. if (!log) {
  200. throw new Error('Log instance not defined')
  201. }
  202. if (!isLog(log)) {
  203. throw new Error('Given argument is not an instance of Log')
  204. }
  205. if (_entries.merge) {
  206. await _entries.merge(log.storage)
  207. }
  208. const heads = await log.heads()
  209. for (const entry of heads) {
  210. await joinEntry(entry)
  211. }
  212. }
  213. /**
  214. * Join an entry into a log.
  215. *
  216. * @param {module:Log~Entry} entry Entry to join with this Log
  217. *
  218. * @example
  219. *
  220. * await log.joinEntry(entry)
  221. *
  222. * @memberof module:Log~Log
  223. * @instance
  224. */
  225. const joinEntry = async (entry) => {
  226. const task = async () => {
  227. /* 1. Check if the entry is already in the log and return early if it is */
  228. const isAlreadyInTheLog = await has(entry.hash)
  229. if (isAlreadyInTheLog) {
  230. return false
  231. }
  232. const verifyEntry = async (entry) => {
  233. // Check that the Entry belongs to this Log
  234. if (entry.id !== id) {
  235. throw new Error(`Entry's id (${entry.id}) doesn't match the log's id (${id}).`)
  236. }
  237. // Verify if entry is allowed to be added to the log
  238. const canAppend = await access.canAppend(entry)
  239. if (!canAppend) {
  240. throw new Error(`Could not append entry:\nKey "${entry.identity}" is not allowed to write to the log`)
  241. }
  242. // Verify signature for the entry
  243. const isValid = await Entry.verify(identity, entry)
  244. if (!isValid) {
  245. throw new Error(`Could not validate signature for entry "${entry.hash}"`)
  246. }
  247. }
  248. /* 2. Verify the entry */
  249. await verifyEntry(entry)
  250. /* 3. Find missing entries and connections (=path in the DAG) to the current heads */
  251. const headsHashes = (await heads()).map(e => e.hash)
  252. const hashesToAdd = new Set([entry.hash])
  253. const hashesToGet = new Set([...entry.next, ...entry.refs])
  254. const connectedHeads = new Set()
  255. const traverseAndVerify = async () => {
  256. const getEntries = Array.from(hashesToGet.values()).filter(has).map(get)
  257. const entries = await Promise.all(getEntries)
  258. for (const e of entries) {
  259. hashesToGet.delete(e.hash)
  260. await verifyEntry(e)
  261. hashesToAdd.add(e.hash)
  262. for (const hash of [...e.next, ...e.refs]) {
  263. const isInTheLog = await has(hash)
  264. if (!isInTheLog && !hashesToAdd.has(hash)) {
  265. hashesToGet.add(hash)
  266. } else if (headsHashes.includes(hash)) {
  267. connectedHeads.add(hash)
  268. }
  269. }
  270. }
  271. if (hashesToGet.size > 0) {
  272. await traverseAndVerify()
  273. }
  274. }
  275. await traverseAndVerify()
  276. /* 4. Add missing entries to the index (=to the log) */
  277. for (const hash of hashesToAdd.values()) {
  278. await _index.put(hash, true)
  279. }
  280. /* 5. Remove heads which new entries are connect to */
  281. for (const hash of connectedHeads.values()) {
  282. await _heads.remove(hash)
  283. }
  284. /* 6. Add new entry to entries (for pinning) */
  285. await _entries.put(entry.hash, entry.bytes)
  286. /* 6. Add the new entry to heads (=union with current heads) */
  287. await _heads.add(entry)
  288. return true
  289. }
  290. return joinQueue.add(task)
  291. }
  292. /**
  293. * TODO
  294. * @memberof module:Log~Log
  295. * @instance
  296. */
  297. const traverse = async function * (rootEntries, shouldStopFn) {
  298. // By default, we don't stop traversal and traverse
  299. // until the end of the log
  300. const defaultStopFn = () => false
  301. shouldStopFn = shouldStopFn || defaultStopFn
  302. // Start traversal from given entries or from current heads
  303. rootEntries = rootEntries || (await heads())
  304. // Sort the given given root entries and use as the starting stack
  305. let stack = rootEntries.sort(sortFn)
  306. // Keep a record of all the hashes of entries we've traversed and yielded
  307. const traversed = {}
  308. // Keep a record of all the hashes we are fetching or have already fetched
  309. let toFetch = []
  310. const fetched = {}
  311. // A function to check if we've seen a hash
  312. const notIndexed = (hash) => !(traversed[hash] || fetched[hash])
  313. // Current entry during traversal
  314. let entry
  315. // Start traversal and process stack until it's empty (traversed the full log)
  316. while (stack.length > 0) {
  317. stack = stack.sort(sortFn)
  318. // Get the next entry from the stack
  319. entry = stack.pop()
  320. if (entry) {
  321. const { hash, next } = entry
  322. // If we have an entry that we haven't traversed yet, process it
  323. if (!traversed[hash]) {
  324. // Yield the current entry
  325. yield entry
  326. // If we should stop traversing, stop here
  327. const done = await shouldStopFn(entry)
  328. if (done === true) {
  329. break
  330. }
  331. // Add to the hash indices
  332. traversed[hash] = true
  333. fetched[hash] = true
  334. // Add the next and refs hashes to the list of hashes to fetch next,
  335. // filter out traversed and fetched hashes
  336. toFetch = [...toFetch, ...next].filter(notIndexed)
  337. // Function to fetch an entry and making sure it's not a duplicate (check the hash indices)
  338. const fetchEntries = (hash) => {
  339. if (!traversed[hash] && !fetched[hash]) {
  340. fetched[hash] = true
  341. return get(hash)
  342. }
  343. }
  344. // Fetch the next/reference entries
  345. const nexts = await Promise.all(toFetch.map(fetchEntries))
  346. // Add the next and refs fields from the fetched entries to the next round
  347. toFetch = nexts
  348. .filter(e => e !== null && e !== undefined)
  349. .reduce((res, acc) => Array.from(new Set([...res, ...acc.next])), [])
  350. .filter(notIndexed)
  351. // Add the fetched entries to the stack to be processed
  352. stack = [...nexts, ...stack]
  353. }
  354. }
  355. }
  356. }
  357. /**
  358. * Async iterator over the log entries
  359. *
  360. * @param {Object} options
  361. * @param {amount} options.amount Number of entried to return. Default: return all entries.
  362. * @param {string} options.gt Beginning hash of the iterator, non-inclusive
  363. * @param {string} options.gte Beginning hash of the iterator, inclusive
  364. * @param {string} options.lt Ending hash of the iterator, non-inclusive
  365. * @param {string} options.lte Ending hash of the iterator, inclusive
  366. * @return {Symbol.asyncIterator} Iterator object of log entries
  367. *
  368. * @examples
  369. *
  370. * (async () => {
  371. * log = await Log(testIdentity, { logId: 'X' })
  372. *
  373. * for (let i = 0; i <= 100; i++) {
  374. * await log.append('entry' + i)
  375. * }
  376. *
  377. * let it = log.iterator({
  378. * lte: 'zdpuApFd5XAPkCTmSx7qWQmQzvtdJPtx2K5p9to6ytCS79bfk',
  379. * amount: 10
  380. * })
  381. *
  382. * for await (let entry of it) {
  383. * console.log(entry.payload) // 'entry100', 'entry99', ..., 'entry91'
  384. * }
  385. * })()
  386. *
  387. * @memberof module:Log~Log
  388. * @instance
  389. */
  390. const iterator = async function * ({ amount = -1, gt, gte, lt, lte } = {}) {
  391. // TODO: write comments on how the iterator algorithm works
  392. if (amount === 0) {
  393. return
  394. }
  395. if (typeof lte === 'string') {
  396. lte = [await get(lte)]
  397. }
  398. if (typeof lt === 'string') {
  399. const entry = await get(lt)
  400. const nexts = await Promise.all(entry.next.map(n => get(n)))
  401. lt = nexts
  402. }
  403. if (lt != null && !Array.isArray(lt)) throw new Error('lt must be a string or an array of Entries')
  404. if (lte != null && !Array.isArray(lte)) throw new Error('lte must be a string or an array of Entries')
  405. const start = (lt || (lte || await heads())).filter(i => i != null)
  406. const end = (gt || gte) ? await get(gt || gte) : null
  407. const amountToIterate = (end || amount === -1) ? -1 : amount
  408. let count = 0
  409. const shouldStopTraversal = async (entry) => {
  410. count++
  411. if (!entry) {
  412. return false
  413. }
  414. if (count >= amountToIterate && amountToIterate !== -1) {
  415. return true
  416. }
  417. if (end && Entry.isEqual(entry, end)) {
  418. return true
  419. }
  420. return false
  421. }
  422. const useBuffer = end && amount !== -1 && !lt && !lte
  423. const buffer = useBuffer ? new LRU(amount + 2) : null
  424. let index = 0
  425. const it = traverse(start, shouldStopTraversal)
  426. for await (const entry of it) {
  427. const skipFirst = (lt && Entry.isEqual(entry, start))
  428. const skipLast = (gt && Entry.isEqual(entry, end))
  429. const skip = skipFirst || skipLast
  430. if (!skip) {
  431. if (useBuffer) {
  432. buffer.set(index++, entry.hash)
  433. } else {
  434. yield entry
  435. }
  436. }
  437. }
  438. if (useBuffer) {
  439. const endIndex = buffer.keys.length
  440. const startIndex = endIndex > amount ? endIndex - amount : 0
  441. const keys = buffer.keys.slice(startIndex, endIndex)
  442. for (const key of keys) {
  443. const hash = buffer.get(key)
  444. const entry = await get(hash)
  445. yield entry
  446. }
  447. }
  448. }
  449. /**
  450. * Clear all entries from the log and the underlying storages
  451. * @memberof module:Log~Log
  452. * @instance
  453. */
  454. const clear = async () => {
  455. await _index.clear()
  456. await _heads.clear()
  457. await _entries.clear()
  458. }
  459. /**
  460. * Close the log and underlying storages
  461. * @memberof module:Log~Log
  462. * @instance
  463. */
  464. const close = async () => {
  465. await _index.close()
  466. await _heads.close()
  467. await _entries.close()
  468. }
  469. /**
  470. * Check if an object is a Log.
  471. * @param {Log} obj
  472. * @return {boolean}
  473. * @memberof module:Log~Log
  474. * @instance
  475. */
  476. const isLog = (obj) => {
  477. return obj && obj.id !== undefined &&
  478. obj.clock !== undefined &&
  479. obj.heads !== undefined &&
  480. obj.values !== undefined &&
  481. obj.access !== undefined &&
  482. obj.identity !== undefined &&
  483. obj.storage !== undefined
  484. }
  485. /**
  486. * Get an array of references to multiple entries in the past.
  487. * @param {Array<Entry>} heads An array of Log heads starting rom which the references are collected from.
  488. * @param {number} amount The number of references to return.
  489. * @return {Array<string>}
  490. * @private
  491. */
  492. const getReferences = async (heads, amount = 0) => {
  493. let refs = []
  494. const shouldStopTraversal = async (entry) => {
  495. return refs.length >= amount && amount !== -1
  496. }
  497. for await (const { hash } of traverse(heads, shouldStopTraversal)) {
  498. refs.push(hash)
  499. }
  500. refs = refs.slice(heads.length + 1, amount)
  501. return refs
  502. }
  503. return {
  504. id,
  505. clock,
  506. heads,
  507. values,
  508. all: values, // Alias for values()
  509. get,
  510. has,
  511. append,
  512. join,
  513. joinEntry,
  514. traverse,
  515. iterator,
  516. clear,
  517. close,
  518. access,
  519. identity,
  520. storage: _entries
  521. }
  522. }
  523. export { Log as default, DefaultAccessController, Clock }