From 219730dad8e6db66200cce75072fa62a2cad2a62 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Sun, 20 Jan 2019 17:14:32 +0100 Subject: [PATCH] Initial HTTP store --- .gitignore | 1 + src/blobset.nim | 24 +++++-- src/blobset.nim.cfg | 3 + src/blobsets.nim | 32 +++++++++- src/blobsets/filestores.nim | 27 +++++--- src/blobsets/httpservers.nim | 119 +++++++++++++++++++++++++++++++++++ src/blobsets/httpstores.nim | 108 +++++++++++++++++++++++++++++++ tests/test_http.nim | 26 ++++++++ 8 files changed, 323 insertions(+), 17 deletions(-) create mode 100644 src/blobset.nim.cfg create mode 100644 src/blobsets/httpservers.nim create mode 100644 src/blobsets/httpstores.nim create mode 100644 tests/test_http.nim diff --git a/.gitignore b/.gitignore index f36d27f..e5dea71 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /blobset /tests/test_set +/tests/test_http diff --git a/src/blobset.nim b/src/blobset.nim index d16f07c..d28a3f9 100644 --- a/src/blobset.nim +++ b/src/blobset.nim @@ -1,14 +1,15 @@ when not isMainModule: {.error: "this module is not a library, import blobsets instead".} -import std/nre, std/os, std/strutils, std/tables, std/parseopt, std/streams, std/rdstdin +import std/asyncdispatch, std/nre, std/os, std/strutils, std/tables, std/parseopt, std/streams, std/rdstdin import cbor -import ./blobsets, ./blobsets/filestores +import ./blobsets, ./blobsets/filestores, + ./blobsets/httpservers, ./blobsets/httpstores # Basic Spry import spryvm/spryvm -# Spry extra modules, as much as possible! +# Spry extra modules import spryvm/sprycore, spryvm/spryextend, spryvm/sprymath, spryvm/spryos, spryvm/spryio, spryvm/spryoo, spryvm/sprystring, spryvm/sprymodules, spryvm/spryreflect, spryvm/sprymemfile, spryvm/spryblock, @@ -24,13 +25,23 @@ when defined(readLine): #else: # import ./blobsets/tcp +proc openStore(): BlobStore = + #newFileStore("/tmp/blobs") + newHttpStore("http://127.0.0.1:8080/") + +proc serverMain() = + let + store = newFileStore("/tmp/blobs") + server = newHttpStoreServer(store) + waitFor server.serve((Port)8080) + proc dumpMain() = var args = newSeq[string]() for kind, key, val in getopt(): if kind == cmdArgument: args.add key if args.len > 1: - let store = newFileStore("/tmp/blobs") + let store = openStore() for i in 1..args.high: try: for chunk in store.dumpBlob(args[i].toBlobId): @@ -64,7 +75,7 @@ proc ingestMain() = args.add key if args.len > 1: var set = newBlobSet() - let store = newFileStore("/tmp/blobs") + let store = openStore() for i in 1..args.high: let path = normalizedPath args[i] set = store.insertPath(set, path.getFileInfo.kind, path) @@ -674,7 +685,7 @@ proc replMain() = scripted = true let #store = openStore() - store = newFileStore("/tmp/blobs") + store = openStore() env = newEnv(store) outStream = stdout.newFileStream readLine = if scripted: readLineSimple else: readLineFromStdin @@ -784,6 +795,7 @@ proc main() = of "repl": replMain() of "dump": dumpMain() of "ingest": ingestMain() + of "server": serverMain() of "spry": spryMain() else: quit("no such subcommand ") diff --git a/src/blobset.nim.cfg b/src/blobset.nim.cfg new file mode 100644 index 0000000..9d3df17 --- /dev/null +++ b/src/blobset.nim.cfg @@ -0,0 +1,3 @@ +# Disable this to use only primitive stdin +-d:readLine +--nilseqs:on diff --git a/src/blobsets.nim b/src/blobsets.nim index ea7d30e..1dce480 100644 --- a/src/blobsets.nim +++ b/src/blobsets.nim @@ -394,9 +394,17 @@ type BlobKind* = enum dataBlob, metaBlob +proc `$`*(k: BlobKind): string = + case k + of dataBlob: "data" + of metaBlob: "meta" + +type BlobStream* = ref BlobStreamObj BlobStreamObj* = object of RootObj closeImpl*: proc (s: BlobStream) {.nimcall, gcsafe.} + setPosImpl*: proc (s: BlobStream; pos: BiggestInt) {.nimcall, gcsafe.} + getPosImpl*: proc (s: BlobStream): BiggestInt {.nimcall, gcsafe.} readImpl*: proc (s: BlobStream; buffer: pointer; bufLen: int): int {.nimcall, gcsafe.} IngestStream* = ref IngestStreamObj IngestStreamObj* = object of RootObj @@ -407,6 +415,14 @@ proc close*(s: BlobStream) = assert(not s.closeImpl.isNil) s.closeImpl(s) +proc `pos=`*(s: BlobStream; pos: BiggestInt) = + assert(not s.setPosImpl.isNil) + s.setPosImpl(s, pos) + +proc pos*(s: BlobStream): BiggestInt = + assert(not s.getPosImpl.isNil) + s.getPosImpl(s) + proc read*(s: BlobStream; buf: pointer; len: Natural): int = assert(not s.readImpl.isNil) result = s.readImpl(s, buf, len) @@ -426,6 +442,13 @@ proc ingest*(s: IngestStream; buf: var string) = assert(not s.ingestImpl.isNil) s.ingestImpl(s, buf[0].addr, buf.len) +proc ingest*(s: IngestStream; buf: string) {.deprecated.} = + ## Ingest stream + # TODO: zero-copy + assert(not s.ingestImpl.isNil) + var buf = buf + s.ingestImpl(s, buf[0].addr, buf.len) + type BlobStore* = ref BlobStoreObj BlobStoreObj* = object of RootObj @@ -446,10 +469,17 @@ type proc nullBlobClose(s: BlobStream) = discard +proc setPosNull(s: BlobStream; pos: BiggestInt) = discard +proc getPosNull(s: BlobStream): BiggestInt = discard + proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = 0 proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream = - BlobStream(closeImpl: nullBlobClose, readImpl: nullBlobRead) + BlobStream( + closeImpl: nullBlobClose, + setPosImpl: setPosNull, + getPosImpl: getPosNull, + readImpl: nullBlobRead) proc nullFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = var s = NullIngestStream(s) diff --git a/src/blobsets/filestores.nim b/src/blobsets/filestores.nim index 18b8682..f2a35ec 100644 --- a/src/blobsets/filestores.nim +++ b/src/blobsets/filestores.nim @@ -43,22 +43,29 @@ proc fsBlobClose(s: BlobStream) = var s = FsBlobStream(s) close s.file +proc setPosFs(s: BlobStream; pos: BiggestInt) = + var s = FsBlobStream(s) + s.file.setFilePos (int64)pos + +proc getPosFs(s: BlobStream): BiggestInt = + var s = FsBlobStream(s) + (BiggestInt)s.file.getFilePos + proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = var s = FsBlobStream(s) result = waitFor s.file.readBuffer(buffer, len) proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream = var fs = FileStore(s) - let stream = FsBlobStream() - result = stream - case kind - of dataBlob: - stream.path = fs.root / "data" / $id - of metaBlob: - stream.path = fs.root / "blob" / $id - stream.file = openAsync(stream.path, fmRead) - stream.closeImpl = fsBlobClose - stream.readImpl = fsBlobRead + let path = fs.root / $kind / $id + FsBlobStream( + closeImpl: fsBlobClose, + setPosImpl: setPosFs, + getPosImpl: getPosFs, + readImpl: fsBlobRead, + path: path, + file: openAsync(path, fmRead), + ) proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = var s = FsIngestStream(s) diff --git a/src/blobsets/httpservers.nim b/src/blobsets/httpservers.nim new file mode 100644 index 0000000..93fd313 --- /dev/null +++ b/src/blobsets/httpservers.nim @@ -0,0 +1,119 @@ +import std/asyncdispatch, std/asynchttpserver, std/parseutils, std/random, std/net, std/strutils, std/tables +import ../blobsets + +export net.Port + +type + HttpStoreServer = ref object + server: AsyncHttpServer + store: BlobStore + ingests: Table[string, IngestStream] + blobs: Table[BlobId, BlobStream] + # TODO: tables must be purged periodically + rng: Rand + +proc newHttpStoreServer*(backend: BlobStore): HttpStoreServer = + randomize() + HttpStoreServer( + server: newAsyncHttpServer(), + store: backend, + ingests: initTable[string, IngestStream](), + blobs: initTable[BlobId, BlobStream](), + rng: initRand(random(high int))) + +func parseRange(range: string): tuple[a: int, b: int] = + ## Parse an HTTP byte range string. + var start = skip(range, "bytes=") + if start > 0: + start.inc parseInt(range, result.a, start) + if skipWhile(range, {'-'}, start) == 1: + discard parseInt(range, result.b, start+1) + if result.b < result.a: + reset result + +proc head(hss: HttpStoreServer; req: Request): Future[void] = + var stream: BlobStream + let elems = req.url.path.split '/' + if elems.len == 3: + let + kind = case elems[1] + of "data": dataBlob + of "meta": metaBlob + else: raise newException(ValueError, "bad GET path " & req.url.path) + blob = toBlobId elems[2] + var stream = hss.blobs.getOrDefault blob + if stream.isNil: + stream = hss.store.openBlobStream(blob, kind=kind) + hss.blobs[blob] = stream + req.respond(Http200, "") + +proc get(hss: HttpStoreServer; req: Request): Future[void] = + var stream: BlobStream + let elems = req.url.path.split '/' + if elems.len == 3: + let + kind = case elems[1] + of "data": dataBlob + of "meta": metaBlob + else: raise newException(ValueError, "bad GET path " & req.url.path) + blob = toBlobId elems[2] + var stream = hss.blobs.getOrDefault blob + if stream.isNil: + stream = hss.store.openBlobStream(blob, kind=kind) + hss.blobs[blob] = stream + let + (pos, endPos) = parseRange req.headers["range"] + len = endPos - pos + stream.pos = pos + var body = newString(len) + let n = stream.read(body[0].addr, len) + body.setLen n + return req.respond(Http206, body) + req.respond(Http404, "invalid request") + +proc putIngest(hss: HttpStoreServer; req: Request): Future[void] = + let stream = hss.ingests[req.url.path] + stream.ingest(req.body) + req.respond(Http204, "") + +proc postIngest(hss: HttpStoreServer; req: Request): Future[void] = + case req.url.path + of "/ingest/data": + let + size = parseBiggestInt req.headers["ingest-size"] + key = "/ingest/" & $hss.rng.next + headers = newHttpHeaders({"Location": key}) + hss.ingests[key] = hss.store.openIngestStream(size, dataBlob) + req.respond(Http201, "", headers) + of "/ingest/meta": + let + size = parseBiggestInt req.headers["ingest-size"] + key = "/ingest/" & $hss.rng.next + headers = newHttpHeaders({"Location": key}) + hss.ingests[key] = hss.store.openIngestStream(size, metaBlob) + req.respond(Http201, "", headers) + else: + let + stream = hss.ingests[req.url.path] + (blob, size) = finish stream + headers = newHttpHeaders({ + "blob-id": blob.toHex, + "blob-size": $size, + }) + hss.ingests.del req.url.path + req.respond(Http204, "", headers) + +proc serve*(hss: HttpStoreServer; port = Port(80)): Future[void] = + proc handleRequest(req: Request): Future[void] = + case req.reqMethod + of HttpGET: + get(hss, req) + of HttpHEAD: + head(hss, req) + of HttpPUT: + putIngest(hss, req) + of HttpPOST: + postIngest(hss, req) + else: + req.respond(Http501, "method not implemented") + hss.server.serve(port, handleRequest) diff --git a/src/blobsets/httpstores.nim b/src/blobsets/httpstores.nim new file mode 100644 index 0000000..9e42203 --- /dev/null +++ b/src/blobsets/httpstores.nim @@ -0,0 +1,108 @@ +import std/asyncdispatch, std/httpclient, std/strutils, std/uri +import ../blobsets + +type + HttpBlobStream = ref HttpBlobStreamObj + HttpBlobStreamObj = object of BlobStreamObj + client: AsyncHttpClient + url: string + rangePos: BiggestInt + + HttpIngestStream = ref HttpIngestStreamObj + HttpIngestStreamObj = object of IngestStreamObj + client: AsyncHttpClient + url: string + ctx: Blake2b256 + leaves: seq[BlobId] + leaf: string + buffOff: int + pos, nodeOffset: BiggestInt + + HttpStore* = ref HttpStoreObj + HttpStoreObj = object of BlobStoreObj + url: Uri + +proc httpBlobClose(s: BlobStream) = + var s = (HttpBlobStream)s + close s.client + +proc setPosHttp(s: BlobStream; pos: BiggestInt) = + var s = (HttpBlobStream)s + s.rangePos = pos + +proc getPosHttp(s: BlobStream): BiggestInt = + var s = (HttpBlobStream)s + s.rangePos + +proc httpBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = + assert(not buffer.isNil) + var s = (HttpBlobStream)s + let + headers = newHttpHeaders({"range": "bytes=$1-$2" % [ $s.rangePos, $(s.rangePos+len) ]}) + resp = waitFor s.client.request(s.url, HttpGET, headers=headers) + var body = waitFor resp.body + assert(not body.isNil) + result = (int)min(body.len, len) + if result > 0: + copyMem(buffer, body[0].addr, result) + s.rangePos.inc result + +proc httpOpenBlobStream(store: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream = + var store = HttpStore(store) + let stream = HttpBlobStream( + client: newAsyncHttpClient(), + closeImpl: httpBlobClose, + setPosImpl: setPosHttp, + getPosImpl: getPosHttp, + readImpl: httpBlobRead, + url: $((store.url / $kind) / id.toHex) + ) + let resp = waitFor stream.client.head(stream.url) + assert(resp.code in {Http200, Http204}, resp.status) + stream + +proc httpFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = + var s = HttpIngestStream(s) + let resp = waitFor s.client.request($s.url, HttpPOST) + if not resp.code.is2xx: + raiseAssert(resp.status) + result.id = toBlobId resp.headers["blob-id"] + result.size = parseBiggestInt resp.headers["blob-size"] + +proc httpIngest(x: IngestStream; buf: pointer; len: Natural) = + var + s = HttpIngestStream(x) + body = newString(len) + copyMem(body[0].addr, buf, body.len) + # TODO: zero-copy + let resp = waitFor s.client.request($s.url, HttpPUT, body) + if resp.code != Http204: + raiseAssert(resp.status) + +proc httpOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream = + var store = HttpStore(s) + let + client = newAsyncHttpClient() + url = (store.url / "ingest") / $kind + headers = newHttpHeaders({"ingest-size": $size}) + resp = waitFor client.request($url, HttpPOST, headers=headers) + if resp.code == Http201: + var ingestUrl = parseUri resp.headers["Location"] + if not ingestUrl.isAbsolute: + ingestUrl = combine(store.url, ingestUrl) + result = HttpIngestStream( + finishImpl: httpFinish, + ingestImpl: httpIngest, + client: client, + url: $ingestUrl, + leaf: newString(min(size.int, blobLeafSize)), + ) + else: + raiseAssert(resp.status) + +proc newHttpStore*(url: string): HttpStore = + HttpStore( + url: parseUri url, + openBlobStreamImpl: httpOpenBlobStream, + openIngestStreamImpl: httpOpenIngestStream, + ) diff --git a/tests/test_http.nim b/tests/test_http.nim new file mode 100644 index 0000000..9cf0694 --- /dev/null +++ b/tests/test_http.nim @@ -0,0 +1,26 @@ +import std/asyncdispatch, std/net, std/random, std/strutils, std/unittest + +import ../src/blobsets, ../src/blobsets/filestores, ../src/blobsets/httpstores, ../src/blobsets/httpservers + +suite "Http store": + randomize() + + let + port = (Port)rand(1 shl 15) + store = newNullStore() + server = newHttpStoreServer(store) + asyncCheck server.serve(port) + + let + url = "http://127.0.0.1:$1/" % $port + client = newHttpStore url + + var + blob: BlobId + size: BiggestInt + + test "ingest": + (blob, size) = client.ingestFile("tests/test_http.nim") + test "dump": + for chunk in store.dumpBlob(blob): + echo chunk