From 13cb901a63b26064ecca571a66e5bd18dcf22f9c Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Fri, 8 Feb 2019 14:30:08 +0100 Subject: [PATCH] WiP! Replicator --- src/blobsets/replicator.nim | 121 ++++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 48 deletions(-) diff --git a/src/blobsets/replicator.nim b/src/blobsets/replicator.nim index f68af85..15b888d 100644 --- a/src/blobsets/replicator.nim +++ b/src/blobsets/replicator.nim @@ -1,54 +1,79 @@ -import std/streams, std/strutils, std/os, cbor -import ../blobsets, ./stores +type + RepBlobStream = ref RepBlobStreamObj + RepBlobStreamObj = object of BlobStreamObj + src: BlobStream + dst: IngestStream + id: BlobId + size: BiggestInt + ingestPos: BiggestInt + +proc repClose(s: BlobStream) = + var s = (RepBlobStream)s + close s.src + cancel s.dst + +proc finish(s: RepBlobStream) = + var buf = "" + while s.ingestPos < s.size: + buf.setLen(min(blobLeafSize, s.size - s.ingestPos)) + let n = read(s.src, buf, buf.len) + ingest(s.dst, buf, n) + inc s.ingestPos, n + +proc repSetPos(s: BlobStream; pos: BiggestInt) = + var s = (RepBlobStream)s + if pos != ingestPos and s.ingestPos < s.size: + finish s + s.src.pos = pos + +proc repGetPos(s: BlobStream): BiggestInt = + var s = (RepBlobStream)s + s.src.pos + +proc repRead(s: BlobStream; buffer: pointer; bufLen: int): int = + var s = (RepBlobStream)s + result = read(s.src, buffer, bufLen) + if s.ingestPos < s.size: + if bufLen < blobLeafSize: + finish s + else: + ingest(s.dst, buffer, result) type - DagfsReplicator* = ref DagfsReplicatorObj - DagfsReplicatorObj* = object of DagfsStoreObj - toStore, fromStore: DagfsStore - cache: string - cacheCid: Cid + RepStore = ref RepStoreObj + RepStoreObj = object of BlobStoreObj + src, dst: BlobStore -proc replicatedPut(s: DagfsStore; blk: string): Cid = - var r = DagfsReplicator(s) - r.toStore.put blk +proc replicatorOpenBlobStream(store: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind) = + var + store = (RepStore)store + try: + result = openBlobStream(store.dst, id, size, kind) + except KeyError: + let + src = openBlobStream(store.src, id, size, kind) + dst = openIngestStream(store.dst, size, kind) + size = if 0 < size: size else: src.size + result = RepStore( + closeImpl: repClose, + setPosImpl: repSetPos, + getPosImpl: repGetPos, + readImpl: repRead, + dst: src, dst: dst, id: id, size: size + ) -proc replicatedGetBuffer(s: DagfsStore; cid: Cid; buf: pointer; len: Natural): int = - var r = DagfsReplicator(s) - if r.cacheCid == cid: - assert(cid.verify(r.cache), "cached block is invalid from previous get") - if r.cache.len > len: - raise newException(BufferTooSmall, "") - result = r.cache.len - copyMem(buf, r.cache[0].addr, result) - else: - try: - result = r.toStore.getBuffer(cid, buf, len) - r.cacheCid = cid - r.cache.setLen result - copyMem(r.cache[0].addr, buf, result) - assert(cid.verify(r.cache), "cached block is invalid after copy from To store") - except MissingObject: - result = r.fromStore.getBuffer(cid, buf, len) - r.cacheCid = cid - r.cache.setLen result - copyMem(r.cache[0].addr, buf, result) - assert(cid.verify(r.cache), "replicate cache is invalid after copy from From store") - discard r.toStore.put r.cache +proc replicatorOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind) = + openIngestStream(RepStore(store).dst) -proc replicatedGet(s: DagfsStore; cid: Cid; result: var string) = - var r = DagfsReplicator(s) - try: r.toStore.get(cid, result) - except MissingObject: - r.fromStore.get(cid, result) - discard r.toStore.put result +proc replicatorCloseStore(s: BlobStore) = + var s = (RepStore)s + close s.src + close s.dst -proc newDagfsReplicator*(toStore, fromStore: DagfsStore): DagfsReplicator = - ## Blocks retrieved by `get` are not verified. - DagfsReplicator( - putImpl: replicatedPut, - getBufferImpl: replicatedGetBuffer, - getImpl: replicatedGet, - toStore: toStore, - fromStore: fromStore, - cache: "", - cacheCid: initCid()) +proc newReplicatorStore*(src, dst: Store): BlobStore = + RepStore( + closeImpl: replicatorCloseStore, + openBlobStreamImpl: replicatorOpenBlobStream, + openIngestStreamImpl: replicatorOpenIngestStream, + src: src, dst: dst, + )