Make read and ingest asynchronous
This commit is contained in:
parent
ccb0e96d71
commit
a315eb645a
|
@ -1,3 +1,4 @@
|
||||||
|
import std/asyncdispatch
|
||||||
import std/hashes, std/streams, std/strutils, std/bitops, std/unicode, std/endians
|
import std/hashes, std/streams, std/strutils, std/bitops, std/unicode, std/endians
|
||||||
import cbor, siphash
|
import cbor, siphash
|
||||||
import ./blobsets/priv/hex
|
import ./blobsets/priv/hex
|
||||||
|
@ -406,12 +407,12 @@ type
|
||||||
sizeImpl*: proc (s: BlobStream): BiggestInt {.nimcall, gcsafe.}
|
sizeImpl*: proc (s: BlobStream): BiggestInt {.nimcall, gcsafe.}
|
||||||
setPosImpl*: proc (s: BlobStream; pos: BiggestInt) {.nimcall, gcsafe.}
|
setPosImpl*: proc (s: BlobStream; pos: BiggestInt) {.nimcall, gcsafe.}
|
||||||
getPosImpl*: proc (s: BlobStream): BiggestInt {.nimcall, gcsafe.}
|
getPosImpl*: proc (s: BlobStream): BiggestInt {.nimcall, gcsafe.}
|
||||||
readImpl*: proc (s: BlobStream; buffer: pointer; bufLen: int): int {.nimcall, gcsafe.}
|
readImpl*: proc (s: BlobStream; buffer: pointer; bufLen: int): Future[int] {.nimcall, gcsafe.}
|
||||||
IngestStream* = ref IngestStreamObj
|
IngestStream* = ref IngestStreamObj
|
||||||
IngestStreamObj* = object of RootObj
|
IngestStreamObj* = object of RootObj
|
||||||
cancelImpl*: proc (s: IngestStream) {.nimcall, gcsafe.}
|
cancelImpl*: proc (s: IngestStream) {.nimcall, gcsafe.}
|
||||||
finishImpl*: proc (s: IngestStream): tuple[id: BlobId, size: BiggestInt] {.nimcall, gcsafe.}
|
finishImpl*: proc (s: IngestStream): tuple[id: BlobId, size: BiggestInt] {.nimcall, gcsafe.}
|
||||||
ingestImpl*: proc (s: IngestStream; buf: pointer; size: int) {.nimcall, gcsafe.}
|
ingestImpl*: proc (s: IngestStream; buf: pointer; size: int): Future[void] {.nimcall, gcsafe.}
|
||||||
|
|
||||||
proc close*(s: BlobStream) =
|
proc close*(s: BlobStream) =
|
||||||
assert(not s.closeImpl.isNil)
|
assert(not s.closeImpl.isNil)
|
||||||
|
@ -431,7 +432,7 @@ proc pos*(s: BlobStream): BiggestInt =
|
||||||
|
|
||||||
proc read*(s: BlobStream; buf: pointer; len: Natural): int =
|
proc read*(s: BlobStream; buf: pointer; len: Natural): int =
|
||||||
assert(not s.readImpl.isNil)
|
assert(not s.readImpl.isNil)
|
||||||
result = s.readImpl(s, buf, len)
|
waitFor s.readImpl(s, buf, len)
|
||||||
|
|
||||||
proc cancle*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
proc cancle*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
||||||
## Cancel and close ingest stream
|
## Cancel and close ingest stream
|
||||||
|
@ -446,19 +447,12 @@ proc finish*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
||||||
proc ingest*(s: IngestStream; buf: pointer; size: Natural) =
|
proc ingest*(s: IngestStream; buf: pointer; size: Natural) =
|
||||||
## Ingest stream
|
## Ingest stream
|
||||||
assert(not s.ingestImpl.isNil)
|
assert(not s.ingestImpl.isNil)
|
||||||
s.ingestImpl(s, buf, size)
|
waitFor s.ingestImpl(s, buf, size)
|
||||||
|
|
||||||
proc ingest*(s: IngestStream; buf: var string) =
|
proc ingest*(s: IngestStream; buf: string) =
|
||||||
## Ingest stream
|
## Ingest stream
|
||||||
assert(not s.ingestImpl.isNil)
|
assert(not s.ingestImpl.isNil)
|
||||||
s.ingestImpl(s, buf[0].addr, buf.len)
|
waitFor s.ingestImpl(s, buf[0].unsafeAddr, buf.len)
|
||||||
|
|
||||||
proc ingest*(s: IngestStream; buf: string) {.deprecated.} =
|
|
||||||
## Ingest stream
|
|
||||||
# TODO: zero-copy
|
|
||||||
assert(not s.ingestImpl.isNil)
|
|
||||||
var buf = buf
|
|
||||||
s.ingestImpl(s, buf[0].addr, buf.len)
|
|
||||||
|
|
||||||
type
|
type
|
||||||
BlobStore* = ref BlobStoreObj
|
BlobStore* = ref BlobStoreObj
|
||||||
|
@ -483,7 +477,9 @@ proc nullBlobClose(s: BlobStream) = discard
|
||||||
proc setPosNull(s: BlobStream; pos: BiggestInt) = discard
|
proc setPosNull(s: BlobStream; pos: BiggestInt) = discard
|
||||||
proc getPosNull(s: BlobStream): BiggestInt = discard
|
proc getPosNull(s: BlobStream): BiggestInt = discard
|
||||||
|
|
||||||
proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = 0
|
proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): Future[int] =
|
||||||
|
result = newFuture[int]()
|
||||||
|
complete result, 0
|
||||||
|
|
||||||
proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
|
proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
|
||||||
BlobStream(
|
BlobStream(
|
||||||
|
@ -499,7 +495,7 @@ proc nullFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
||||||
result.id = s.leaves[0]
|
result.id = s.leaves[0]
|
||||||
result.size = s.pos
|
result.size = s.pos
|
||||||
|
|
||||||
proc nullIngest(s: IngestStream; buf: pointer; len: Natural) =
|
proc nullIngest(s: IngestStream; buf: pointer; len: Natural): Future[void] =
|
||||||
var
|
var
|
||||||
s = NullIngestStream(s)
|
s = NullIngestStream(s)
|
||||||
off = 0
|
off = 0
|
||||||
|
@ -521,6 +517,8 @@ proc nullIngest(s: IngestStream; buf: pointer; len: Natural) =
|
||||||
s.ctx.update(buf[off].addr, n)
|
s.ctx.update(buf[off].addr, n)
|
||||||
off.inc n
|
off.inc n
|
||||||
s.pos.inc n
|
s.pos.inc n
|
||||||
|
result = newFuture[void]()
|
||||||
|
complete result
|
||||||
|
|
||||||
proc nullOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream =
|
proc nullOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream =
|
||||||
NullIngestStream(
|
NullIngestStream(
|
||||||
|
|
|
@ -51,9 +51,9 @@ proc getPosFs(s: BlobStream): BiggestInt =
|
||||||
var s = FsBlobStream(s)
|
var s = FsBlobStream(s)
|
||||||
(BiggestInt)s.file.getFilePos
|
(BiggestInt)s.file.getFilePos
|
||||||
|
|
||||||
proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): int =
|
proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): Future[int] =
|
||||||
var s = FsBlobStream(s)
|
var s = FsBlobStream(s)
|
||||||
result = waitFor s.file.readBuffer(buffer, len)
|
s.file.readBuffer(buffer, len)
|
||||||
|
|
||||||
proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
|
proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
|
||||||
var fs = FileStore(s)
|
var fs = FileStore(s)
|
||||||
|
@ -80,7 +80,7 @@ proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
||||||
result.size = s.pos
|
result.size = s.pos
|
||||||
moveFile(s.path, s.path.parentDir / result.id.toHex)
|
moveFile(s.path, s.path.parentDir / result.id.toHex)
|
||||||
|
|
||||||
proc fsIngest(s: IngestStream; buf: pointer; len: Natural) =
|
proc fsIngest(s: IngestStream; buf: pointer; len: Natural) {.async.} =
|
||||||
var
|
var
|
||||||
s = FsIngestStream(s)
|
s = FsIngestStream(s)
|
||||||
off = 0
|
off = 0
|
||||||
|
@ -100,7 +100,7 @@ proc fsIngest(s: IngestStream; buf: pointer; len: Natural) =
|
||||||
else:
|
else:
|
||||||
n = min(n, blobLeafSize-leafOff)
|
n = min(n, blobLeafSize-leafOff)
|
||||||
s.ctx.update(buf[off].addr, n)
|
s.ctx.update(buf[off].addr, n)
|
||||||
waitFor s.file.writeBuffer(buf[off].addr, n)
|
await s.file.writeBuffer(buf[off].addr, n)
|
||||||
off.inc n
|
off.inc n
|
||||||
s.pos.inc n
|
s.pos.inc n
|
||||||
|
|
||||||
|
|
|
@ -105,9 +105,9 @@ proc postIngest(hss: HttpStoreServer; req: Request): Future[void] =
|
||||||
|
|
||||||
proc serve*(hss: HttpStoreServer; port = Port(80)): Future[void] =
|
proc serve*(hss: HttpStoreServer; port = Port(80)): Future[void] =
|
||||||
## Serve requests to HTTP store.
|
## Serve requests to HTTP store.
|
||||||
proc handleRequest(req: Request): Future[void] =
|
proc handleRequest(req: Request) {.async.} =
|
||||||
try:
|
try:
|
||||||
case req.reqMethod
|
let fut = case req.reqMethod
|
||||||
of HttpGET:
|
of HttpGET:
|
||||||
get(hss, req)
|
get(hss, req)
|
||||||
of HttpHEAD:
|
of HttpHEAD:
|
||||||
|
@ -118,10 +118,13 @@ proc serve*(hss: HttpStoreServer; port = Port(80)): Future[void] =
|
||||||
postIngest(hss, req)
|
postIngest(hss, req)
|
||||||
else:
|
else:
|
||||||
req.respond(Http501, "method not implemented")
|
req.respond(Http501, "method not implemented")
|
||||||
|
await fut
|
||||||
except KeyError:
|
except KeyError:
|
||||||
req.respond(Http404, "blob not found")
|
await req.respond(Http404, "blob not found")
|
||||||
except ValueError:
|
except ValueError:
|
||||||
req.respond(Http400, getCurrentExceptionMsg())
|
await req.respond(Http400, getCurrentExceptionMsg())
|
||||||
except:
|
except:
|
||||||
req.respond(Http500, getCurrentExceptionMsg())
|
try:
|
||||||
|
await req.respond(Http500, getCurrentExceptionMsg())
|
||||||
|
except: discard
|
||||||
hss.server.serve(port, handleRequest)
|
hss.server.serve(port, handleRequest)
|
||||||
|
|
|
@ -35,13 +35,13 @@ proc getPosHttp(s: BlobStream): BiggestInt =
|
||||||
var s = (HttpBlobStream)s
|
var s = (HttpBlobStream)s
|
||||||
s.rangePos
|
s.rangePos
|
||||||
|
|
||||||
proc httpBlobRead(s: BlobStream; buffer: pointer; len: Natural): int =
|
proc httpBlobRead(s: BlobStream; buffer: pointer; len: Natural): Future[int] {.async.} =
|
||||||
assert(not buffer.isNil)
|
assert(not buffer.isNil)
|
||||||
var s = (HttpBlobStream)s
|
var s = (HttpBlobStream)s
|
||||||
let
|
let
|
||||||
headers = newHttpHeaders({"range": "bytes=$1-$2" % [ $s.rangePos, $(s.rangePos+len) ]})
|
headers = newHttpHeaders({"range": "bytes=$1-$2" % [ $s.rangePos, $(s.rangePos+len) ]})
|
||||||
resp = waitFor s.client.request(s.url, HttpGET, headers=headers)
|
resp = waitFor s.client.request(s.url, HttpGET, headers=headers)
|
||||||
var body = waitFor resp.body
|
var body = await resp.body
|
||||||
result = (int)min(body.len, len)
|
result = (int)min(body.len, len)
|
||||||
if result > 0:
|
if result > 0:
|
||||||
copyMem(buffer, body[0].addr, result)
|
copyMem(buffer, body[0].addr, result)
|
||||||
|
@ -75,13 +75,13 @@ proc httpFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
|
||||||
result.id = toBlobId resp.headers["blob-id"]
|
result.id = toBlobId resp.headers["blob-id"]
|
||||||
result.size = parseBiggestInt resp.headers["blob-size"]
|
result.size = parseBiggestInt resp.headers["blob-size"]
|
||||||
|
|
||||||
proc httpIngest(x: IngestStream; buf: pointer; len: Natural) =
|
proc httpIngest(x: IngestStream; buf: pointer; len: Natural) {.async.} =
|
||||||
var
|
var
|
||||||
s = HttpIngestStream(x)
|
s = HttpIngestStream(x)
|
||||||
body = newString(len)
|
body = newString(len)
|
||||||
copyMem(body[0].addr, buf, body.len)
|
copyMem(body[0].addr, buf, body.len)
|
||||||
# TODO: zero-copy
|
# TODO: zero-copy
|
||||||
let resp = waitFor s.client.request($s.url, HttpPUT, body)
|
let resp = await s.client.request($s.url, HttpPUT, body)
|
||||||
if resp.code != Http204:
|
if resp.code != Http204:
|
||||||
raiseAssert(resp.status)
|
raiseAssert(resp.status)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user