WiP! Replicator

This commit is contained in:
Ehmry - 2019-02-08 14:30:08 +01:00
parent b9733f0d19
commit 13cb901a63
1 changed files with 73 additions and 48 deletions

View File

@ -1,54 +1,79 @@
import std/streams, std/strutils, std/os, cbor
import ../blobsets, ./stores
type
RepBlobStream = ref RepBlobStreamObj
RepBlobStreamObj = object of BlobStreamObj
src: BlobStream
dst: IngestStream
id: BlobId
size: BiggestInt
ingestPos: BiggestInt
proc repClose(s: BlobStream) =
var s = (RepBlobStream)s
close s.src
cancel s.dst
proc finish(s: RepBlobStream) =
var buf = ""
while s.ingestPos < s.size:
buf.setLen(min(blobLeafSize, s.size - s.ingestPos))
let n = read(s.src, buf, buf.len)
ingest(s.dst, buf, n)
inc s.ingestPos, n
proc repSetPos(s: BlobStream; pos: BiggestInt) =
var s = (RepBlobStream)s
if pos != ingestPos and s.ingestPos < s.size:
finish s
s.src.pos = pos
proc repGetPos(s: BlobStream): BiggestInt =
var s = (RepBlobStream)s
s.src.pos
proc repRead(s: BlobStream; buffer: pointer; bufLen: int): int =
var s = (RepBlobStream)s
result = read(s.src, buffer, bufLen)
if s.ingestPos < s.size:
if bufLen < blobLeafSize:
finish s
else:
ingest(s.dst, buffer, result)
type
DagfsReplicator* = ref DagfsReplicatorObj
DagfsReplicatorObj* = object of DagfsStoreObj
toStore, fromStore: DagfsStore
cache: string
cacheCid: Cid
RepStore = ref RepStoreObj
RepStoreObj = object of BlobStoreObj
src, dst: BlobStore
proc replicatedPut(s: DagfsStore; blk: string): Cid =
var r = DagfsReplicator(s)
r.toStore.put blk
proc replicatorOpenBlobStream(store: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind) =
var
store = (RepStore)store
try:
result = openBlobStream(store.dst, id, size, kind)
except KeyError:
let
src = openBlobStream(store.src, id, size, kind)
dst = openIngestStream(store.dst, size, kind)
size = if 0 < size: size else: src.size
result = RepStore(
closeImpl: repClose,
setPosImpl: repSetPos,
getPosImpl: repGetPos,
readImpl: repRead,
dst: src, dst: dst, id: id, size: size
)
proc replicatedGetBuffer(s: DagfsStore; cid: Cid; buf: pointer; len: Natural): int =
var r = DagfsReplicator(s)
if r.cacheCid == cid:
assert(cid.verify(r.cache), "cached block is invalid from previous get")
if r.cache.len > len:
raise newException(BufferTooSmall, "")
result = r.cache.len
copyMem(buf, r.cache[0].addr, result)
else:
try:
result = r.toStore.getBuffer(cid, buf, len)
r.cacheCid = cid
r.cache.setLen result
copyMem(r.cache[0].addr, buf, result)
assert(cid.verify(r.cache), "cached block is invalid after copy from To store")
except MissingObject:
result = r.fromStore.getBuffer(cid, buf, len)
r.cacheCid = cid
r.cache.setLen result
copyMem(r.cache[0].addr, buf, result)
assert(cid.verify(r.cache), "replicate cache is invalid after copy from From store")
discard r.toStore.put r.cache
proc replicatorOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind) =
openIngestStream(RepStore(store).dst)
proc replicatedGet(s: DagfsStore; cid: Cid; result: var string) =
var r = DagfsReplicator(s)
try: r.toStore.get(cid, result)
except MissingObject:
r.fromStore.get(cid, result)
discard r.toStore.put result
proc replicatorCloseStore(s: BlobStore) =
var s = (RepStore)s
close s.src
close s.dst
proc newDagfsReplicator*(toStore, fromStore: DagfsStore): DagfsReplicator =
## Blocks retrieved by `get` are not verified.
DagfsReplicator(
putImpl: replicatedPut,
getBufferImpl: replicatedGetBuffer,
getImpl: replicatedGet,
toStore: toStore,
fromStore: fromStore,
cache: "",
cacheCid: initCid())
proc newReplicatorStore*(src, dst: Store): BlobStore =
RepStore(
closeImpl: replicatorCloseStore,
openBlobStreamImpl: replicatorOpenBlobStream,
openIngestStreamImpl: replicatorOpenIngestStream,
src: src, dst: dst,
)