Genode components seem to work

This commit is contained in:
Ehmry - 2018-09-07 18:54:30 +02:00
parent e08ea11600
commit 7af8971a0c
17 changed files with 680 additions and 356 deletions

6
.gitignore vendored
View File

@ -4,3 +4,9 @@ ipldrepl
/genode/dagfs_genode/dagfs_fs
/genode/dagfs_genode/dagfs_fs_store
/genode/dagfs_genode/dagfs_rom
/genode/dagfs_genode/dagfs_tcp_store
/genode/dagfs_genode/bin/dagfs_fs
/genode/dagfs_genode/bin/dagfs_fs_store
/genode/dagfs_genode/bin/dagfs_rom
/genode/dagfs_genode/bin/dagfs_server
/genode/dagfs_genode/bin/dagfs_tcp_store

View File

@ -1,12 +1,12 @@
# Package
version = "0.1.1"
version = "0.1.2"
author = "Emery Hemingway"
description = "A simple content addressed file-system"
license = "GPLv3"
srcDir = "src"
requires "nim >= 0.18.0", "base58", "cbor >= 0.2.0"
requires "nim >= 0.18.0", "base58", "cbor >= 0.5.1"
bin = @["dagfs_repl.nim"]
bin = @["dagfs_repl"]
skipFiles = @["dagfs_repl.nim"]

View File

@ -1,19 +1,23 @@
# Package
version = "0.1.0"
version = "0.1.2"
author = "Emery Hemingway"
description = "Dagfs TCP server"
license = "GPLv3"
srcDir = "src"
binDir = "bin"
bin = @[
"dagfs_fs",
"dagfs_fs_store",
"dagfs_rom",
"dagfs_tcp_client",
"dagfs_tcp_server"
"dagfs_server",
"dagfs_tcp_store"
]
backend = "cpp"
# Dependencies
requires "nim >= 0.18.1", "dagfs", "genode"
task genode, "Build for Genode":
exec "nimble build --os:genode -d:posix -d:tcpdebug"

View File

@ -1,7 +1,9 @@
when not defined(genode):
{.error: "Genode only Dagfs client".}
import cbor, genode, std/tables, std/strutils
import cbor, std/tables, std/strutils
import genode, genode/signals
import dagfs, dagfs/stores, ./dagfs_session
@ -14,123 +16,199 @@ type
DagfsClientBase {.importcpp, header: dagfsClientH.} = object
DagfsClientCpp = Constructible[DagfsClientBase]
proc sigh_ack_avail(cpp: DagfsClientCpp; sig: SignalContextCapability) {.
importcpp: "#->conn.channel().sigh_ack_avail(@)", tags: [RpcEffect].}
proc construct(cpp: DagfsClientCpp; env: GenodeEnv; label: cstring; txBufSize: int) {.
importcpp.}
proc readyToSubmit(cpp: DagfsClientCpp): bool {.
proc bulk_buffer_size(cpp: DagfsClientCpp): csize {.
importcpp: "#->conn.source().bulk_buffer_size()".}
proc sigh_ack_avail(cpp: DagfsClientCpp; sig: SignalContextCapability) {.
importcpp: "#->conn.channel().sigh_ack_avail(@)".}
proc ready_to_submit(cpp: DagfsClientCpp): bool {.
importcpp: "#->conn.source().ready_to_submit()".}
proc readyToAck(cpp: DagfsClientCpp): bool {.
proc ready_to_ack(cpp: DagfsClientCpp): bool {.
importcpp: "#->conn.source().ready_to_ack()".}
proc ackAvail(cpp: DagfsClientCpp): bool {.
proc ack_avail(cpp: DagfsClientCpp): bool {.
importcpp: "#->conn.source().ack_avail()".}
proc allocPacket(cpp: DagfsClientCpp; size = MaxPacketSize): DagfsPacket {.
proc alloc_packet(cpp: DagfsClientCpp; size = MaxPacketSize): DagfsPacket {.
importcpp: "#->conn.source().alloc_packet(@)".}
proc packetContent(cpp: DagfsClientCpp; pkt: DagfsPacket): pointer {.
proc packet_content(cpp: DagfsClientCpp; pkt: DagfsPacket): pointer {.
importcpp: "#->conn.source().packet_content(@)".}
proc submitPacket(cpp: DagfsClientCpp; pkt: DagfsPacket; cid: cstring; op: DagfsOpcode) {.
proc submit_packet(cpp: DagfsClientCpp; pkt: DagfsPacket) {.
importcpp: "#->conn.source().submit_packet(@)".}
proc submit_packet(cpp: DagfsClientCpp; pkt: DagfsPacket; cid: cstring; op: DagfsOpcode) {.
importcpp: "#->conn.source().submit_packet(Dagfs::Packet(#, (char const *)#, #))".}
proc getAckedPacket(cpp: DagfsClientCpp): DagfsPacket {.
proc get_acked_packet(cpp: DagfsClientCpp): DagfsPacket {.
importcpp: "#->conn.source().get_acked_packet()".}
proc releasePacket(cpp: DagfsClientCpp; pkt: DagfsPacket) {.
proc release_packet(cpp: DagfsClientCpp; pkt: DagfsPacket) {.
importcpp: "#->conn.source().release_packet(@)".}
type
DagfsClient* = ref DagfsClientObj
DagfsClientObj = object of DagfsStoreObj
## Dagfs session client
DagfsFrontend* = ref DagfsFrontendObj
DagfsFrontendObj = object of DagfsStoreObj
## Dagfs session client consuming a store.
cpp: DagfsClientCpp
proc icClose(s: DagfsStore) =
var ic = DagfsClient(s)
destruct ic.cpp
proc fendClose(s: DagfsStore) =
var fend = DagfsFrontend(s)
destruct fend.cpp
proc icPut(s: DagfsStore; blk: string): Cid =
proc fendPutBuffer(s: DagfsStore; buf: pointer; len: Natural): Cid =
## Put block to Dagfs server, blocks for two packet round-trip.
let ic = DagfsClient(s)
var fend = DagfsFrontend(s)
var
blk = blk
pktCid = dagHash blk
if pktCid == zeroBlock:
pktCid = dagHash(buf, len)
if pktCid == zeroChunk:
return pktCid
assert(ic.cpp.readyToSubmit, "Dagfs client packet queue congested")
var pkt = ic.cpp.allocPacket(blk.len)
let pktBuf = ic.cpp.packetContent pkt
defer: ic.cpp.releasePacket pkt
assert(fend.cpp.readyToSubmit, "Dagfs client packet queue congested")
var pkt = fend.cpp.allocPacket(len)
let pktBuf = fend.cpp.packetContent pkt
defer: fend.cpp.releasePacket pkt
assert(not pktBuf.isNil, "allocated packet has nil content")
assert(pkt.size >= blk.len)
pkt.setLen blk.len
copyMem(pktBuf, blk[0].addr, blk.len)
assert(ic.cpp.readyToSubmit, "Dagfs client packet queue congested")
ic.cpp.submitPacket(pkt, pktCid.toHex, PUT)
let ack = ic.cpp.getAckedPacket()
assert(len <= pkt.size)
pkt.setLen len
copyMem(pktBuf, buf, len)
assert(fend.cpp.readyToSubmit, "Dagfs client packet queue congested")
fend.cpp.submitPacket(pkt, pktCid.toHex, PUT)
let ack = fend.cpp.getAckedPacket()
doAssert(ack.error == OK)
result = ack.cid()
assert(result.isValid, "server returned a packet with and invalid CID")
proc icGetBuffer(s: DagfsStore; cid: Cid; buf: pointer; len: Natural): int =
proc fendPut(s: DagfsStore; blk: string): Cid =
## Put block to Dagfs server, blocks for two packet round-trip.
let fend = DagfsFrontend(s)
var
blk = blk
pktCid = dagHash blk
if pktCid == zeroChunk:
return pktCid
assert(fend.cpp.readyToSubmit, "Dagfs client packet queue congested")
var pkt = fend.cpp.allocPacket(blk.len)
let pktBuf = fend.cpp.packetContent pkt
defer: fend.cpp.releasePacket pkt
assert(not pktBuf.isNil, "allocated packet has nil content")
assert(blk.len <= pkt.size)
pkt.setLen blk.len
copyMem(pktBuf, blk[0].addr, blk.len)
assert(fend.cpp.readyToSubmit, "Dagfs client packet queue congested")
fend.cpp.submitPacket(pkt, pktCid.toHex, PUT)
let ack = fend.cpp.getAckedPacket()
doAssert(ack.error == OK)
result = ack.cid()
assert(result.isValid, "server returned a packet with and invalid CID")
proc fendGetBuffer(s: DagfsStore; cid: Cid; buf: pointer; len: Natural): int =
## Get from Dagfs server, blocks for packet round-trip.
let ic = DagfsClient(s)
assert(ic.cpp.readyToSubmit, "Dagfs client packet queue congested")
let pkt = ic.cpp.allocPacket len
ic.cpp.submitPacket(pkt, cid.toHex, GET)
let ack = ic.cpp.getAckedPacket
let fend = DagfsFrontend(s)
assert(fend.cpp.readyToSubmit, "Dagfs client packet queue congested")
let pkt = fend.cpp.allocPacket len
fend.cpp.submitPacket(pkt, cid.toHex, GET)
let ack = fend.cpp.getAckedPacket
doAssert(ack.cid == cid)
if ack.error == OK:
let pktBuf = ic.cpp.packetContent ack
let pktBuf = fend.cpp.packetContent ack
assert(not pktBuf.isNil, "ack packet has nil content")
assert(ack.len <= len)
assert(ack.len > 0)
result = ack.len
copyMem(buf, pktBuf, result)
if pkt.size > 0:
ic.cpp.releasePacket pkt
fend.cpp.releasePacket pkt
# free the original packet that was allocated
case ack.error:
of OK: discard
of MISSING:
raise cid.newMissingObject
raiseMissing cid
else:
raise newException(CatchableError, "Dagfs packet error " & $ack.error)
proc icGet(s: DagfsStore; cid: Cid; result: var string) =
proc fendGet(s: DagfsStore; cid: Cid; result: var string) =
## Get from Dagfs server, blocks for packet round-trip.
let ic = DagfsClient(s)
assert(ic.cpp.readyToSubmit, "Dagfs client packet queue congested")
let pkt = ic.cpp.allocPacket()
defer: ic.cpp.releasePacket pkt
ic.cpp.submitPacket(pkt, cid.toHex, GET)
let ack = ic.cpp.getAckedPacket()
let fend = DagfsFrontend(s)
assert(fend.cpp.readyToSubmit, "Dagfs client packet queue congested")
let pkt = fend.cpp.allocPacket()
defer: fend.cpp.releasePacket pkt
fend.cpp.submitPacket(pkt, cid.toHex, GET)
let ack = fend.cpp.getAckedPacket()
doAssert(ack.cid == cid)
case ack.error:
of OK:
let ackBuf = ic.cpp.packetContent ack
let ackBuf = fend.cpp.packetContent ack
assert(not ackBuf.isNil)
assert(ack.len > 0)
assert(0 < ack.len, "server return zero length packet")
result.setLen ack.len
copyMem(result[0].addr, ackBuf, result.len)
assert(cid.verify(result), "Dagfs client packet failed verification")
of MISSING:
raise cid.newMissingObject
raiseMissing cid
else:
raise newException(CatchableError, "Dagfs packet error " & $ack.error)
const
DefaultDagfsBufferSize* = 1 shl 20
defaultBufferSize* = maxChunkSize * 4
proc newDagfsClient*(env: GenodeEnv; label = ""; bufferSize = DefaultDagfsBufferSize): DagfsClient =
proc newDagfsFrontend*(env: GenodeEnv; label = ""; bufferSize = defaultBufferSize): DagfsFrontend =
## Open a new frontend client connection.
## Blocks retrieved by `get` are not verified.
proc construct(cpp: DagfsClientCpp; env: GenodeEnv; label: cstring; txBufSize: int) {.
importcpp.}
new result
construct(result.cpp, env, label, bufferSize)
result.closeImpl = icClose
result.putImpl = icPut
result.getBufferImpl = icGetBuffer
result.getImpl = icGet
result.closeImpl = fendClose
result.putBufferImpl = fendPutBuffer
result.putImpl = fendPut
result.getBufferImpl = fendGetBuffer
result.getImpl = fendGet
type
DagfsBackend* = ref DagfsBackendObj
DagfsBackendObj = object
## Dagfs session client providing a store.
cpp: DagfsClientCpp
store: DagfsStore
sigh: SignalHandler
const zeroHex = zeroChunk.toHex
proc newDagfsBackend*(env: GenodeEnv; store: DagfsStore; label = ""; bufferSize = defaultBufferSize): DagfsBackend =
## Open a new backend client connection.
doAssert(bufferSize > maxChunkSize, "Dagfs backend session buffer is too small")
let bend = DagfsBackend(store: store)
construct(bend.cpp, env, label, bufferSize)
bend.sigh = env.ep.newSignalHandler do ():
while bend.cpp.ackAvail:
var pkt = bend.cpp.getAckedPacket()
let
buf = bend.cpp.packetContent(pkt)
cid = pkt.cid
case pkt.operation
of GET:
try:
let n = store.getBuffer(cid, buf, pkt.size)
pkt.setLen(n)
bend.cpp.submitPacket(pkt, cid.toHex, PUT)
except MissingChunk:
pkt.setError(MISSING)
bend.cpp.submitPacket(pkt)
of PUT:
let putCid = store.putBuffer(buf, pkt.len)
doAssert(putCid == cid, $putCid & " PUT CID mismatch with server")
bend.cpp.submitPacket(pkt, putCid.toHex, Idle)
else:
echo "unhandled packet from server"
bend.cpp.submitPacket(pkt, zeroHex, Idle)
bend.cpp.sighAckAvail(bend.sigh.cap)
for _ in 1..(bend.cpp.bulkBufferSize div maxChunkSize):
let pkt = bend.cpp.allocPacket(maxChunkSize)
assert(bend.cpp.readyToSubmit)
bend.cpp.submitPacket(pkt, zeroHex, IDLE)
bend

View File

@ -58,7 +58,7 @@ type
SessionObj = object
sig: SignalHandler
cpp: FsSessionComponent
store: DagfsClient
store: DagfsFrontend
label: string
rootDir: FsNode
next: Handle
@ -112,8 +112,8 @@ proc inode(cid: Cid): culong = hash(cid).culong
template fsRpc(session: SessionPtr; body: untyped) =
try: body
except MissingObject:
let e = (MissingObject)getCurrentException()
except MissingChunk:
let e = (MissingChunk)getCurrentException()
echo "Synchronous RPC failure, missing object ", e.cid
raiseLookupFailed()
except:
@ -231,7 +231,7 @@ proc processPacket(session: SessionRef; pkt: var FsPacket) =
if pkt.operation == READ:
let
node = session.nodes[pkt.handle]
pktBuf = cast[ptr array[maxBlockSize, char]](session.cpp.packetContent pkt)
pktBuf = cast[ptr array[maxChunkSize, char]](session.cpp.packetContent pkt)
# cast the pointer to an array pointer for indexing
case node.kind
of fileNode:
@ -297,7 +297,7 @@ proc processPacket(session: SessionRef; pkt: var FsPacket) =
else:
echo "ignoring ", pkt.operation, " packet from ", session.label
proc newSession(env: GenodeEnv; store: DagfsClient; label: string; root: FsNode; txBufSize: int): SessionRef =
proc newSession(env: GenodeEnv; store: DagfsFrontend; label: string; root: FsNode; txBufSize: int): SessionRef =
proc construct(cpp: FsSessionComponent; env: GenodeEnv; txBufSize: int; state: SessionPtr; cap: SignalContextCapability) {.
importcpp.}
let session = new SessionRef
@ -323,10 +323,10 @@ componentConstructHook = proc(env: GenodeEnv) =
var
policies = newSeq[XmlNode](8)
sessions = initTable[ServerId, SessionRef]()
let store = env.newDagfsClient()
let store = env.newDagfsFrontend()
## The Dagfs session client backing File_system sessions.
proc createSession(env: GenodeEnv; store: DagfsClient; id: ServerId; label, rootPath: string; rootCid: Cid; txBufSize: int) =
proc createSession(env: GenodeEnv; store: DagfsFrontend; id: ServerId; label, rootPath: string; rootCid: Cid; txBufSize: int) =
var ufsRoot: FsNode
try: ufsRoot = store.openDir(rootCid)
except: ufsRoot = nil
@ -420,4 +420,5 @@ componentConstructHook = proc(env: GenodeEnv) =
configRom = env.newRomHandler("config", processConfig)
process configRom
process sessionsRom
echo "initial rom contents processed"
env.parent.announce "File_system"

View File

@ -5,36 +5,16 @@
#
#
# Copyright (C) 2017 Genode Labs GmbH
# Copyright (C) 2017 - 2018 Genode Labs GmbH
#
# This file is part of the Genode OS framework, which is distributed
# under the terms of the GNU Affero General Public License version 3.
#
import std/streams, std/strutils,
genode, genode/servers, genode/roms, genode/parents,
dagfs, dagfs/stores, ./dagfs_server
import std/streams, std/strutils, genode,
dagfs, dagfs/stores, ./dagfs_client
componentConstructHook = proc (env: GenodeEnv) =
let
store = newFileStore("/") ## Storage backend for sessions
server = env.newDagfsServer(store) ## Server to the store
proc processSessions(sessionsRom: RomClient) =
## ROM signal handling procedure
## Create and close 'Dagfs' sessions from the
## 'sessions_requests' ROM.
update sessionsRom
let rs = sessionsRom.newStream
var requests = initSessionRequestsParser sessionsRom
for id in requests.close:
server.close id
for id, label, args in requests.create "dagfs":
server.create id, label, args
close rs
let sessionsHandler = env.newRomHandler("session_requests", processSessions)
## Session requests routed to us from the parent
env.parent.announce("dagfs") # Announce service to parent.
process sessionsHandler # Process the initial request backlog.
backend = env.newDagfsBackend(store)

View File

@ -68,7 +68,7 @@ proc readFile(store: DagfsStore; s: Stream; file: FsNode) =
componentConstructHook = proc(env: GenodeEnv) =
var
store = env.newDagfsClient()
store = env.newDagfsFrontend()
policies = newSeq[XmlNode](8)
sessions = initTable[ServerId, Session]()
@ -104,8 +104,8 @@ componentConstructHook = proc(env: GenodeEnv) =
proc createSession(env: GenodeEnv; id: ServerId; label: string; rootCid: Cid) =
var cap = RomSessionCapability()
try: cap = createSessionNoTry(id, label, rootCid)
except MissingObject:
let e = (MissingObject)getCurrentException()
except MissingChunk:
let e = (MissingChunk)getCurrentException()
echo "cannot resolve '", label, "', ", e.cid, " is missing"
except:
echo "unhandled exception while resolving '", label, "', ",

View File

@ -1,20 +1,20 @@
#
# \brief Dagfs server factory
# \brief Dagfs routing server
# \author Emery Hemingway
# \date 2017-11-11
#
#
# Copyright (C) 2017 Genode Labs GmbH
# Copyright (C) 2017-2018 Genode Labs GmbH
#
# This file is part of the Genode OS framework, which is distributed
# under the terms of the GNU Affero General Public License version 3.
#
import std/strtabs, std/tables, std/xmltree, std/strutils
import std/strtabs, std/tables, std/xmltree, std/strutils, std/deques
import cbor, genode, genode/signals, genode/servers, genode/parents,
dagfs, dagfs/stores, ./dagfs_session
import dagfs, dagfs/stores, ./dagfs_session,
genode, genode/signals, genode/servers, genode/parents, genode/roms
const
currentPath = currentSourcePath.rsplit("/", 1)[0]
@ -24,74 +24,156 @@ const
type
DagfsSessionComponentBase {.importcpp, header: dagfsserverH.} = object
SessionCpp = Constructible[DagfsSessionComponentBase]
Session = ref object
proc construct(cpp: SessionCpp; env: GenodeEnv; args: cstring) {.importcpp.}
proc packetHandler(cpp: SessionCpp; cap: SignalContextCapability) {.
importcpp: "#->packetHandler(@)".}
proc packetContent(cpp: SessionCpp; pkt: DagfsPacket): pointer {.
importcpp: "#->sink().packet_content(@)".}
proc packetAvail(cpp: SessionCpp): bool {.
importcpp: "#->sink().packet_avail()".}
proc readyToAck(cpp: SessionCpp): bool {.
importcpp: "#->sink().ready_to_ack()".}
proc peekPacket(cpp: SessionCpp): DagfsPacket {.
importcpp: "#->sink().peek_packet()".}
proc getPacket(cpp: SessionCpp): DagfsPacket {.
importcpp: "#->sink().get_packet()".}
proc acknowledgePacket(cpp: SessionCpp; pkt: DagfsPacket) {.
importcpp: "#->sink().acknowledge_packet(@)".}
proc acknowledgePacket(cpp: SessionCpp; pkt: DagfsPacket; cid: cstring; op: DagfsOpcode) {.
importcpp: "#->sink().acknowledge_packet(Dagfs::Packet(#, (char const *)#, #))".}
template acknowledgePacket(cpp: SessionCpp; pkt: DagfsPacket; cid: Cid; op: DagfsOpcode) =
acknowledgePacket(cpp, pkt, cid.toHex, op)
type
Session = ref SessionObj
SessionObj = object of RootObj
cpp: SessionCpp
sig: SignalHandler
store: DagfsStore
id: ServerId
label: string
proc processPacket(session: Session; pkt: var DagfsPacket) =
proc packetContent(cpp: SessionCpp; pkt: DagfsPacket): pointer {.
importcpp: "#->sink().packet_content(@)".}
let cid = pkt.cid
case pkt.operation
of PUT:
try:
var
pktBuf = session.cpp.packetContent pkt
heapBuf = newString pkt.len
copyMem(heapBuf[0].addr, pktBuf, heapBuf.len)
let putCid = session.store.put(heapBuf)
assert(putCid.isValid, "server packet returned invalid CID from put")
pkt.setCid putCid
except:
echo "unhandled PUT error ", getCurrentExceptionMsg()
pkt.setError ERROR
Frontend = ref object of SessionObj
discard
Backend = ref object of SessionObj
idle: Deque[DagfsPacket]
prio: int
Frontends = OrderedTableRef[ServerId, Frontend]
Backends = OrderedTableRef[ServerId, Backend]
proc `$`(s: Session): string = s.label
proc submitGet*(bend: Backend; cid: Cid): bool =
if 0 < bend.idle.len:
let pkt = bend.idle.popFirst()
bend.cpp.acknowledgePacket(pkt, cid, GET)
result = true
proc submitPut*(bend: Backend; cid: Cid; buf: pointer; len: int): bool =
if 0 < bend.idle.len:
var pkt = bend.idle.popFirst()
copyMem(bend.cpp.packetContent(pkt), buf, len)
pkt.setLen(len)
bend.cpp.acknowledgePacket(pkt, cid, PUT)
result = true
proc isPending(fend: Frontend; cid: Cid): bool =
if fend.cpp.packetAvail and fend.cpp.readyToAck:
result = (cid == fend.cpp.peekPacket.cid)
proc isPending(fend: Session; cid: Cid; op: DagfsOpcode): bool =
if fend.cpp.packetAvail and fend.cpp.readyToAck:
let pkt = fend.cpp.peekPacket()
result = (pkt.operation == op and cid == pkt.cid)
proc processPacket(backends: Backends; fend: Frontend): bool =
if backends.len < 1:
echo "cannot service frontend client, no backends connected"
var pkt = fend.cpp.getPacket
pkt.setError MISSING
fend.cpp.acknowledgePacket(pkt)
return true
let
pkt = fend.cpp.peekPacket
cid = pkt.cid
op = pkt.operation
case op
of GET:
try:
let
pktBuf = session.cpp.packetContent pkt
n = session.store.getBuffer(cid, pktBuf, pkt.size)
pkt.setLen n
except BufferTooSmall:
pkt.setError OVERSIZE
except MissingObject:
pkt.setError MISSING
except:
echo "unhandled GET error ", getCurrentExceptionMsg()
pkt.setError ERROR
for bend in backends.values:
if bend.submitGet(cid):
break
of PUT:
let
buf = fend.cpp.packetContent(pkt)
len = pkt.len
for bend in backends.values:
if bend.submitPut(cid, buf, len):
break
else:
echo "invalid packet operation"
pkt.setError ERROR
var ack = fend.cpp.getPacket()
ack.setError ERROR
fend.cpp.acknowledgePacket(ack)
result = true
proc newSession(env: GenodeEnv; store: DagfsStore; id: ServerId; label, args: string): Session =
## Create a new session and packet handling procedure
let session = new Session
assert(not session.isNil)
proc construct(cpp: SessionCpp; env: GenodeEnv; args: cstring) {.importcpp.}
session.cpp.construct(env, args)
session.store = store
session.id = id
session.label = label
session.sig = env.ep.newSignalHandler do ():
proc packetAvail(cpp: SessionCpp): bool {.
importcpp: "#->sink().packet_avail()".}
proc readyToAck(cpp: SessionCpp): bool {.
importcpp: "#->sink().ready_to_ack()".}
while session.cpp.packetAvail and session.cpp.readyToAck:
proc getPacket(cpp: SessionCpp): DagfsPacket {.
importcpp: "#->sink().get_packet()".}
var pkt = session.cpp.getPacket()
session.processPacket pkt
proc acknowledgePacket(cpp: SessionCpp; pkt: DagfsPacket) {.
importcpp: "#->sink().acknowledge_packet(@)".}
session.cpp.acknowledgePacket(pkt)
proc processPacket(frontends: Frontends; bend: Backend): bool =
let
pkt = bend.cpp.getPacket
cid = pkt.cid
op = pkt.operation
case op
of PUT:
assert(0 < pkt.len)
for fend in frontends.values:
if fend.isPending(cid, GET):
var ack = fend.cpp.getPacket
if ack.size < pkt.len:
ack.setError(OVERSIZE)
fend.cpp.acknowledgePacket(ack)
else:
ack.setLen(pkt.len)
copyMem(fend.cpp.packetContent(ack), bend.cpp.packetContent(pkt), ack.len)
fend.cpp.acknowledgePacket(ack, cid, PUT)
of IDLE:
for fend in frontends.values:
if fend.isPending(cid, PUT):
fend.cpp.acknowledgePacket(fend.cpp.getPacket, cid, IDLE)
else:
echo "invalid backend packet operation from ", bend.label
bend.idle.addLast pkt
true
proc packetHandler(cpp: SessionCpp; cap: SignalContextCapability) {.
importcpp: "#->packetHandler(@)".}
session.cpp.packetHandler(session.sig.cap)
result = session
proc newFrontend(env: GenodeEnv; backends: Backends; args, label: string): Frontend =
let fend = Frontend(label: label)
fend.cpp.construct(env, args)
fend.sig = env.ep.newSignalHandler do ():
while fend.cpp.packetAvail and fend.cpp.readyToAck:
if not backends.processPacket(fend): break
fend.cpp.packetHandler(fend.sig.cap)
fend
proc newBackend(env: GenodeEnv; frontends: Frontends; args: string; prio: int; label: string): Backend =
let bend = Backend(
label: label,
idle: initDeque[DagfsPacket](),
prio: prio)
bend.cpp.construct(env, args)
bend.sig = env.ep.newSignalHandler do ():
assert(bend.cpp.packetAvail, $bend & " signaled but no packet avail")
assert(bend.cpp.readyToAck, $bend & " signaled but not ready to ack")
while bend.cpp.packetAvail and bend.cpp.readyToAck:
if not frontends.processPacket(bend): break
bend.cpp.packetHandler(bend.sig.cap)
bend
proc manage(ep: Entrypoint; s: Session): DagfsSessionCapability =
## Manage a session from the default entrypoint.
@ -109,36 +191,71 @@ proc dissolve(ep: Entrypoint; s: Session) =
dissolve(s.sig)
GC_unref s
type
DagfsServer* = ref object
env: GenodeEnv
store*: DagfsStore
sessions*: Table[ServerId, Session]
componentConstructHook = proc(env: GenodeEnv) =
var
policies = newSeq[XmlNode]()
backends = newOrderedTable[ServerId, Backend]()
frontends = newOrderedTable[ServerId, Frontend]()
proc newDagfsServer*(env: GenodeEnv; store: DagfsStore): DagfsServer =
DagfsServer(
env: env, store: store,
sessions: initTable[ServerId, Session]())
proc processConfig(rom: RomClient) {.gcsafe.} =
update rom
policies.setLen 0
let configXml = rom.xml
configXml.findAll("default-policy", policies)
if policies.len > 1:
echo "more than one '<default-policy/>' found, ignoring all"
policies.setLen 0
configXml.findAll("policy", policies)
proc create*(server: DagfsServer; id: ServerId; label, args: string) =
if not server.sessions.contains id:
try:
let
session = newSession(server.env, server.store, id, label, args)
cap = server.env.ep.manage(session)
server.sessions[id] = session
proc deliverSession(env: GenodeEnv; id: ServerId; cap: DagfsSessionCapability) {.
importcpp: "#->parent().deliver_session_cap(Genode::Parent::Server::Id{#}, #)".}
server.env.deliverSession(id, cap)
echo "session opened for ", label
except:
echo "failed to create session for '", label, "', ", getCurrentExceptionMsg()
server.env.parent.sessionResponseDeny id
proc processSessions(rom: RomClient) =
update rom
var requests = initSessionRequestsParser(rom)
proc close*(server: DagfsServer; id: ServerId) =
## Close a session at the Dagfs server.
if server.sessions.contains id:
let session = server.sessions[id]
server.env.ep.dissolve(session)
server.sessions.del id
server.env.parent.sessionResponseClose id
for id in requests.close:
var s: Session
if frontends.contains id:
s = frontends[id]
frontends.del id
elif backends.contains id:
s = backends[id]
backends.del id
env.ep.dissolve s
env.parent.sessionResponseClose(id)
for id, label, args in requests.create "Dagfs":
let policy = policies.lookupPolicy label
if policy.isNil:
echo "no policy matched '", label, "'"
env.parent.sessionResponseDeny(id)
else:
var session: Session
let role = policy.attr("role")
case role
of "frontend":
let fend = newFrontend(env, backends, args, label)
frontends[id] = fend
session = fend
of "backend":
var prio = 1
try: prio = policy.attr("prio").parseInt
except: discard
let bend = newBackend(env, frontends, args, prio, label)
backends[id] = bend
backends.sort(proc (x, y: (ServerId, Backend)): int =
x[1].prio - y[1].prio)
session = bend
else:
echo "invalid role for policy ", policy
env.parent.sessionResponseDeny(id)
continue
let cap = env.ep.manage(session)
proc deliverSession(env: GenodeEnv; id: ServerId; cap: DagfsSessionCapability) {.
importcpp: "#->parent().deliver_session_cap(Genode::Parent::Server::Id{#}, #)".}
env.deliverSession(id, cap)
echo "session opened for ", label
let
sessionsRom = env.newRomHandler("session_requests", processSessions)
configRom = env.newRomHandler("config", processConfig)
process configRom
process sessionsRom

View File

@ -25,7 +25,7 @@ type
header: "<dagfs_session/dagfs_session.h>".} = object
DagfsOpcode* {.importcpp: "Dagfs::Packet::Opcode".} = enum
PUT, GET, INVALID
PUT, GET, IDLE, INVALID
DagfsError* {.importcpp: "Dagfs::Packet::Error".} = enum
OK, MISSING, OVERSIZE, FULL, ERROR

View File

@ -2,7 +2,7 @@ when not defined(genode):
{.error: "Genode only module".}
import std/asyncdispatch
import dagfs/stores, dagfs/tcp
import ./dagfs_client, dagfs/tcp
when not defined(genode):
{.error: "Genode only server".}
@ -10,6 +10,7 @@ when not defined(genode):
componentConstructHook = proc (env: GenodeEnv) =
echo "--- Dagfs TCP server ---"
let
store = newFileStore "/"
store = env.newDagfsFrontend()
server = newTcpServer store
waitFor server.serve()
quit "--- Dagfs TCP server died ---"

View File

@ -39,7 +39,7 @@ namespace Dagfs {
*/
struct Dagfs::Packet final : Genode::Packet_descriptor
{
enum Opcode { PUT, GET, INVALID };
enum Opcode { PUT, GET, IDLE, INVALID };
enum Error {
OK, /* put or get success */
@ -60,7 +60,7 @@ struct Dagfs::Packet final : Genode::Packet_descriptor
Packet(Packet p, Cid cid, Opcode op)
:
Genode::Packet_descriptor(p.offset(), p.size()),
_cid(cid), _length(p.size()), _op(op), _err(OK)
_cid(cid), _length(p.length()), _op(op), _err(OK)
{ }
Packet(Cid cid, Genode::size_t length,

View File

@ -3,13 +3,15 @@ import base58/bitcoin, cbor
import ./dagfs/priv/hex, ./dagfs/priv/blake2
const
maxBlockSize* = 1 shl 18
## Maximum supported block size.
maxChunkSize* = 1 shl 18
## Maximum supported chunk size.
digestLen* = 32
## Length of a block digest.
## Length of a chunk digest.
cidSize* = digestLen
## Size of CID object in memory
type Cid* = object
## Content IDentifier, used to identify blocks.
## Chunk IDentifier
digest*: array[digestLen, uint8]
proc initCid*(): Cid = Cid()
@ -102,17 +104,26 @@ proc parseCid*(s: string): Cid =
result.digest[i] = raw[i].byte
const
zeroBlock* = parseCid "8ddb61928ec76e4ee904cd79ed977ab6f5d9187f1102975060a6ba6ce10e5481"
## CID of zero block of maximum size.
zeroChunk* = parseCid "8ddb61928ec76e4ee904cd79ed977ab6f5d9187f1102975060a6ba6ce10e5481"
## CID of zero chunk of maximum size.
proc take*(cid: var Cid; buf: var string) =
## Take a raw digest from a string buffer.
doAssert(buf.len == digestLen)
copyMem(cid.digest[0].addr, buf[0].addr, digestLen)
proc dagHash*(buf: pointer; len: Natural): Cid =
## Generate a CID for a string of data using the BLAKE2b hash algorithm.
assert(len <= maxChunkSize)
var b: Blake2b
blake2b_init(b, digestLen, nil, 0)
blake2b_update(b, buf, len)
var s = blake2b_final(b)
copyMem(result.digest[0].addr, s[0].addr, digestLen)
proc dagHash*(data: string): Cid =
## Generate a CID for a string of data using the BLAKE2b hash algorithm.
assert(data.len <= maxBlockSize)
assert(data.len <= maxChunkSize)
var b: Blake2b
blake2b_init(b, digestLen, nil, 0)
blake2b_update(b, data, data.len)
@ -130,9 +141,9 @@ proc verify*(cid: Cid; data: string): bool =
return false
true
iterator simpleChunks*(s: Stream; size = maxBlockSize): string =
iterator simpleChunks*(s: Stream; size = maxChunkSize): string =
## Iterator that breaks a stream into simple chunks.
doAssert(size <= maxBlockSize)
doAssert(size <= maxChunkSize)
var tmp = newString(size)
while not s.atEnd:
tmp.setLen(size)

View File

@ -1,4 +1,4 @@
import strutils, streams, tables, cbor, os, math
import strutils, streams, tables, cbor, os, math, asyncfile, asyncdispatch
import ../dagfs, ./stores
@ -244,13 +244,27 @@ proc lookupFile*(dir: FsNode; name: string): tuple[cid: Cid, size: BiggestInt] =
proc addFile*(store: DagfsStore; path: string): FsNode =
## Add a file to the store and a FsNode.
let
fStream = newFileStream(path, fmRead)
file = openAsync(path, fmRead)
fileSize = file.getFileSize
u = newUnixfsFile()
u.links = newSeqOfCap[FileLink](1)
for chunk in fStream.simpleChunks:
let cid = store.put(chunk)
u.links.add FileLink(cid: cid, size: chunk.len)
u.size.inc chunk.len
var
buf = newString(min(maxChunKSize, fileSize))
pos = 0
let shortLen = fileSize mod maxChunKSize
if 0 < shortLen:
buf.setLen shortLen
# put the short chunck first
while true:
let n = waitFor file.readBuffer(buf[0].addr, buf.len)
buf.setLen n
let cid = store.put(buf)
u.links.add FileLink(cid: cid, size: buf.len)
u.size.inc buf.len
pos.inc n
if pos >= fileSize: break
buf.setLen maxChunkSize
close file
if u.size == 0:
# return the CID for a raw nothing
u.cid = dagHash("")
@ -261,7 +275,6 @@ proc addFile*(store: DagfsStore; path: string): FsNode =
else:
u.cid = store.putDag(u.toCbor)
result = u
close fStream
proc addDir*(store: DagfsStore; dirPath: string): FsNode =
var dRoot = newFsRoot()
@ -288,7 +301,7 @@ proc openDir*(store: DagfsStore; cid: Cid): FsNode =
assert cid.isValid
var raw = ""
try: store.get(cid, raw)
except MissingObject: raise cid.newMissingObject
except MissingChunk: raiseMissing cid
# this sucks
result = parseFs(raw, cid)
assert(result.kind == dirNode)

View File

@ -71,6 +71,17 @@ proc compress(c: var Blake2b, last: int = 0) =
c.hash[i] = c.hash[i] xor v[i] xor v[i+8]
c.buffer_idx = 0
{.push boundChecks: off.}
proc blake2b_update*(c: var Blake2b, buf: pointer, data_size: int) =
var data = cast[ptr array[0, uint8]](buf)
for i in 0..<data_size:
if c.buffer_idx == 128:
inc(c.offset, c.buffer_idx)
compress(c)
c.buffer[c.buffer_idx] = data[i]
inc(c.buffer_idx)
{.pop.}
proc blake2b_update*(c: var Blake2b, data: cstring|string|seq|uint8, data_size: int) =
for i in 0..<data_size:
if c.buffer_idx == 128:

View File

@ -1,21 +1,22 @@
import std/streams, std/strutils, std/os
import std/asyncfile, std/asyncdispatch
import cbor
import ../dagfs, ./priv/hex
type
MissingObject* = ref object of CatchableError
cid*: Cid ## Missing object identifier
MissingChunk* = ref object of CatchableError
cid*: Cid ## Missing chunk identifier
BufferTooSmall* = object of CatchableError
proc newMissingObject*(cid: Cid): MissingObject =
MissingObject(msg: "object missing from store", cid: cid)
template raiseMissing*(cid: Cid) =
raise MissingChunk(msg: "chunk missing from store", cid: cid)
type
DagfsStore* = ref DagfsStoreObj
DagfsStoreObj* = object of RootObj
closeImpl*: proc (s: DagfsStore) {.nimcall, gcsafe.}
putImpl*: proc (s: DagfsStore; blk: string): Cid {.nimcall, gcsafe.}
putBufferImpl*: proc (s: DagfsStore; buf: pointer; len: Natural): Cid {.nimcall, gcsafe.}
putImpl*: proc (s: DagfsStore; chunk: string): Cid {.nimcall, gcsafe.}
getBufferImpl*: proc (s: DagfsStore; cid: Cid; buf: pointer; len: Natural): int {.nimcall, gcsafe.}
getImpl*: proc (s: DagfsStore; cid: Cid; result: var string) {.nimcall, gcsafe.}
@ -23,17 +24,24 @@ proc close*(s: DagfsStore) =
## Close active store resources.
if not s.closeImpl.isNil: s.closeImpl(s)
proc put*(s: DagfsStore; blk: string): Cid =
proc putBuffer*(s: DagfsStore; buf: pointer; len: Natural): Cid =
## Put a chunk into the store.
assert(0 < len and len <= maxChunkSize)
assert(not s.putBufferImpl.isNil)
s.putBufferImpl(s, buf, len)
proc put*(s: DagfsStore; chunk: string): Cid =
## Place a raw block to the store. The hash argument specifies a required
## hash algorithm, or defaults to a algorithm choosen by the store
## implementation.
assert(0 < chunk.len and chunk.len <= maxChunkSize)
assert(not s.putImpl.isNil)
assert(blk.len > 0)
s.putImpl(s, blk)
s.putImpl(s, chunk)
proc getBuffer*(s: DagfsStore; cid: Cid; buf: pointer; len: Natural): int =
## Copy a raw block from the store into a buffer pointer.
assert cid.isValid
assert(cid.isValid)
assert(0 < len)
assert(not s.getBufferImpl.isNil)
result = s.getBufferImpl(s, cid, buf, len)
assert(result > 0)
@ -65,7 +73,7 @@ type
FileStore* = ref FileStoreObj
## A store that writes nodes and leafs as files.
FileStoreObj = object of DagfsStoreObj
root: string
root, buf: string
proc parentAndFile(fs: FileStore; cid: Cid): (string, string) =
## Generate the parent path and file path of CID within the store.
@ -73,17 +81,32 @@ proc parentAndFile(fs: FileStore; cid: Cid): (string, string) =
result[0] = fs.root / digest[0..1]
result[1] = result[0] / digest[2..digest.high]
proc fsPut(s: DagfsStore; blk: string): Cid =
proc fsPutBuffer(s: DagfsStore; buf: pointer; len: Natural): Cid =
var fs = FileStore(s)
result = dagHash blk
if result != zeroBlock:
result = dagHash(buf, len)
if result != zeroChunk:
let (dir, path) = fs.parentAndFile(result)
if not existsDir dir:
createDir dir
if not existsFile path:
fs.buf.setLen(len)
copyMem(addr fs.buf[0], buf, fs.buf.len)
let
tmp = fs.root / "tmp"
writeFile(tmp, fs.buf)
moveFile(tmp, path)
proc fsPut(s: DagfsStore; chunk: string): Cid =
var fs = FileStore(s)
result = dagHash chunk
if result != zeroChunk:
let (dir, path) = fs.parentAndFile(result)
if not existsDir dir:
createDir dir
if not existsFile path:
let
tmp = fs.root / "tmp"
writeFile(tmp, blk)
writeFile(tmp, chunk)
moveFile(tmp, path)
proc fsGetBuffer(s: DagfsStore; cid: Cid; buf: pointer; len: Natural): int =
@ -91,25 +114,25 @@ proc fsGetBuffer(s: DagfsStore; cid: Cid; buf: pointer; len: Natural): int =
let (_, path) = fs.parentAndFile cid
if existsFile path:
let fSize = path.getFileSize
if fSize > maxBlockSize:
if maxChunkSize < fSize:
discard tryRemoveFile path
raise cid.newMissingObject
if fSize > len.int64:
raise newException(BufferTooSmall, "")
raiseMissing cid
if len.int64 < fSize:
raise newException(BufferTooSmall, "file is $1 bytes, buffer is $2" % [$fSize, $len])
let file = open(path, fmRead)
result = file.readBuffer(buf, len)
close file
if result == 0:
raise cid.newMissingObject
raiseMissing cid
proc fsGet(s: DagfsStore; cid: Cid; result: var string) =
var fs = FileStore(s)
let (_, path) = fs.parentAndFile cid
if existsFile path:
let fSize = path.getFileSize
if fSize > maxBlockSize:
if fSize > maxChunkSize:
discard tryRemoveFile path
raise cid.newMissingObject
raiseMissing cid
result.setLen fSize.int
let
file = open(path, fmRead)
@ -117,14 +140,16 @@ proc fsGet(s: DagfsStore; cid: Cid; result: var string) =
close file
doAssert(n == result.len)
else:
raise cid.newMissingObject
raiseMissing cid
proc newFileStore*(root: string): FileStore =
## Blocks retrieved by `get` are not hashed and verified.
if not existsDir(root):
createDir root
new result
result.putBufferImpl = fsPutBuffer
result.putImpl = fsPut
result.getBufferImpl = fsGetBuffer
result.getImpl = fsGet
result.root = root
result.buf = ""

View File

@ -1,15 +1,55 @@
import std/asyncnet, std/asyncdispatch, std/streams, cbor
import std/asyncnet, std/asyncdispatch, std/streams
import ../dagfs, ./stores
proc toInt(chars: openArray[char]): BiggestInt =
const
defaultPort = Port(1023)
proc toInt(chars: openArray[char]): int32 =
for c in chars.items:
result = (result shl 8) or c.BiggestInt
result = (result shl 8) or c.int32
const maxErrorLen = 128
type
Tag = enum
errTag = 0'i16
getTag = 1
putTag = 2
MessageBody = object {.union.}
error: array[maxErrorLen, char]
putLen: int32
Message = object {.packed.}
len: int32
cid: Cid
tag: Tag
body: MessageBody
const
defaultPort = Port(1024)
errTag = toInt "err"
getTag = toInt "get"
putTag = toInt "put"
errMsgBaseSize = 4 + cidSize + 2
getMsgSize = 4 + cidSize + 2
putMsgSize = 4 + cidSize + 2 + 4
minMsgSize = getMsgSize
maxMsgSize = 4 + cidSize + maxErrorLen
when isMainModule:
doAssert(maxMsgSize == sizeof(msg))
proc `$`(msg: Message): string =
result = "[" & $msg.cid & "]["
case msg.tag
of errTag:
result.add "err]["
let n = clamp(msg.len - errMsgBaseSize, 0, maxErrorLen)
for i in 0..<n:
result.add msg.body.error[i]
result.add "]"
of getTag:
result.add "get]"
of putTag:
result.add "put]["
result.add $msg.body.putLen
result.add "]"
type
TcpServer* = ref TcpServerObj
@ -19,80 +59,93 @@ type
proc newTcpServer*(store: DagfsStore; port = defaultPort): TcpServer =
## Create a new TCP server that serves `store`.
result = TcpServer(sock: newAsyncSocket(buffered=false), store: store)
result.sock.bindAddr(port, "127.0.0.1")
result.sock.setSockOpt(OptReuseAddr, true)
# some braindead unix cruft
result = TcpServer(sock: newAsyncSocket(buffered=true), store: store)
result.sock.bindAddr(port)
echo "listening on port ", port.int
proc process(server: TcpServer; client: AsyncSocket) {.async.} =
## Process messages from a TCP client.
proc send(sock: AsyncSocket; msg: ptr Message): Future[void] =
sock.send(msg, msg.len.int)
proc recv(sock: AsyncSocket; msg: ptr Message) {.async.} =
msg.len = 0
var n = await sock.recvInto(msg, 4)
if minMsgSize <= msg.len and msg.len <= maxMsgSize:
n = await sock.recvInto(addr msg.cid, msg.len-4)
if n < minMsgSize-4:
close sock
#zeroMem(msg, errMsgBaseSize)
proc sendError(sock: AsyncSocket; cid: Cid; str: string): Future[void] =
var
tmpBuf = ""
blkBuf = ""
block loop:
while not client.isClosed:
block:
tmpBuf.setLen(256)
let n = await client.recvInto(addr tmpBuf[0], tmpBuf.len)
if n < 40: break loop
tmpBuf.setLen n
let
tmpStream = newStringStream(tmpBuf)
cmd = parseCbor tmpStream
when defined(tcpDebug):
echo "C: ", cmd
if cmd.kind != cborArray or cmd.seq.len < 3: break loop
case cmd[0].getInt
of errTag:
break loop
of getTag:
let
cid = cmd[1].toCid
resp = newCborArray()
try:
server.store.get(cid, blkBuf)
resp.add(putTag)
resp.add(cmd[1])
resp.add(blkBuf.len)
when defined(tcpDebug):
echo "S: ", resp
await client.send(encode resp)
await client.send(blkBuf)
except:
resp.add(errTag)
resp.add(cmd[1])
resp.add(getCurrentExceptionMsg())
when defined(tcpDebug):
echo "S: ", resp
await client.send(encode resp)
of putTag:
# TODO: check if the block is already in the store
let resp = newCborArray()
resp.add(newCborInt getTag)
resp.add(cmd[1])
resp.add(cmd[2])
msg = Message(tag: errTag)
str = str
strLen = min(msg.body.error.len, str.len)
msgLen = errMsgBaseSize + strLen
msg.len = msgLen.int32
copyMem(msg.body.error[0].addr, str[0].addr, strLen)
when defined(tcpDebug):
debugEcho "S: ", msg
sock.send(addr msg)
proc process(server: TcpServer; host: string; client: AsyncSocket) {.async.} =
## Process messages from a TCP client.
echo host, " connected"
var
msg: Message
chunk = ""
try:
block loop:
while not client.isClosed:
await client.recv(addr msg)
when defined(tcpDebug):
echo "S: ", resp
await client.send(encode resp)
doAssert(cmd[2].getInt <= maxBlockSize)
tmpBuf.setLen cmd[2].getInt
blkBuf.setLen 0
while blkBuf.len < cmd[2].getInt:
let n = await client.recvInto(tmpBuf[0].addr, tmpBuf.len)
if n == 0: break loop
tmpBuf.setLen n
blkBuf.add tmpBuf
let cid = server.store.put(blkBuf)
doAssert(cid == cmd[1].toCid)
else: break loop
close client
debugEcho "C: ", msg
case msg.tag
of errTag:
echo host, ": ", $msg
break loop
of getTag:
try:
server.store.get(msg.cid, chunk)
msg.len = putMsgSize
msg.tag = putTag
msg.body.putLen = chunk.len.int32
when defined(tcpDebug):
debugEcho "S: ", msg
await client.send(addr msg)
await client.send(chunk)
except:
msg.tag = errTag
await client.sendError(msg.cid, getCurrentExceptionMsg())
of putTag:
# TODO: check if the block is already in the store
if maxChunkSize < msg.body.putLen:
await client.sendError(msg.cid, "maximum chunk size is " & $maxChunkSize)
break
chunk.setLen msg.body.putLen
msg.len = getMsgSize
msg.tag = getTag
when defined(tcpDebug):
debugEcho "S: ", msg
await client.send(addr msg)
let n = await client.recvInto(chunk[0].addr, chunk.len)
if n != chunk.len:
break loop
let cid = server.store.put(chunk)
if cid != msg.cid:
await client.sendError(msg.cid, "put CID mismatch")
else:
break loop
except: discard
if not client.isClosed:
close client
echo host, " closed"
proc serve*(server: TcpServer) {.async.} =
## Service client connections to server.
listen server.sock
while not server.sock.isClosed:
let (host, sock) = await server.sock.acceptAddr()
asyncCheck server.process(sock)
asyncCheck server.process(host, sock)
proc close*(server: TcpServer) =
## Close a TCP server.
@ -104,78 +157,102 @@ type
sock: AsyncSocket
buf: string
proc tcpClientPut(s: DagfsStore; blk: string): Cid =
proc tcpClientPutBuffer(s: DagfsStore; buf: pointer; len: Natural): Cid =
var client = TcpClient(s)
result = dagHash blk
if result != zeroBlock:
result = dagHash(buf, len)
if result != zeroChunk:
var msg: Message
block put:
let cmd = newCborArray()
cmd.add(newCborInt putTag)
cmd.add(toCbor result)
cmd.add(newCborInt blk.len)
msg.len = putMsgSize
msg.cid = result
msg.tag = putTag
msg.body.putLen = len.int32
when defined(tcpDebug):
echo "C: ", cmd
waitFor client.sock.send(encode cmd)
debugEcho "C: ", msg
waitFor client.sock.send(addr msg)
block get:
let
respBuf = waitFor client.sock.recv(256)
s = newStringStream(respBuf)
resp = parseCbor s
waitFor client.sock.recv(addr msg)
when defined(tcpDebug):
echo "S: ", resp
case resp[0].getInt
debugEcho "S: ", msg
case msg.tag
of getTag:
if resp[1] == result:
waitFor client.sock.send(blk)
if msg.cid == result:
waitFor client.sock.send(buf, len)
else:
close client.sock
raiseAssert "server sent out-of-order \"get\" message"
of errTag:
raiseAssert resp[2].getText
raiseAssert $msg
else:
raiseAssert "invalid server message"
proc tcpClientPut(s: DagfsStore; chunk: string): Cid =
var client = TcpClient(s)
result = dagHash chunk
if result != zeroChunk:
var msg: Message
block put:
msg.len = putMsgSize
msg.cid = result
msg.tag = putTag
msg.body.putLen = chunk.len.int32
when defined(tcpDebug):
debugEcho "C: ", msg
waitFor client.sock.send(addr msg)
block get:
waitFor client.sock.recv(addr msg)
when defined(tcpDebug):
debugEcho "S: ", msg
case msg.tag
of getTag:
if msg.cid == result:
waitFor client.sock.send(chunk)
else:
close client.sock
raiseAssert "server sent out-of-order \"get\" message"
of errTag:
raiseAssert $msg
else:
raiseAssert "invalid server message"
proc tcpClientGetBuffer(s: DagfsStore; cid: Cid; buf: pointer; len: Natural): int =
assert(getTag != 0)
var client = TcpClient(s)
var
client = TcpClient(s)
msg: Message
block get:
let cmd = newCborArray()
cmd.add(newCborInt getTag)
cmd.add(toCbor cid)
cmd.add(newCborInt len)
msg.len = getMsgSize
msg.cid = cid
msg.tag = getTag
when defined(tcpDebug):
echo "C: ", cmd
waitFor client.sock.send(encode cmd)
debugEcho "C: ", msg
waitFor client.sock.send(addr msg)
block put:
let
respBuf = waitFor client.sock.recv(256, {Peek})
s = newStringStream(respBuf)
resp = parseCbor s
skip = s.getPosition
waitFor client.sock.recv(addr msg)
when defined(tcpDebug):
echo "S: ", resp
case resp[0].getInt
debugEcho "S: ", msg
case msg.tag
of putTag:
doAssert(resp[1] == cid)
result = resp[2].getInt.int
doAssert(skip <= len and result <= len)
discard waitFor client.sock.recvInto(buf, skip)
result = waitFor client.sock.recvInto(buf, result)
doAssert(msg.cid == cid)
result = msg.body.putLen.int
doAssert(result <= len)
let n = waitFor client.sock.recvInto(buf, result)
doAssert(n == result)
of errTag:
raise MissingObject(msg: resp[2].getText, cid: cid)
raise MissingChunk(msg: $msg, cid: cid)
else:
raise cid.newMissingObject
raiseMissing cid
proc tcpClientGet(s: DagfsStore; cid: Cid; result: var string) =
result.setLen maxBlockSize
result.setLen maxChunkSize
let n = s.getBuffer(cid, result[0].addr, result.len)
result.setLen n
assert(result.dagHash == cid)
proc newTcpClient*(host: string; port = defaultPort): TcpClient =
new result
result.sock = waitFor asyncnet.dial(host, port, buffered=false)
result.sock = waitFor asyncnet.dial(host, port, buffered=true)
result.buf = ""
result.putBufferImpl = tcpClientPutBuffer
result.putImpl = tcpClientPut
result.getBufferImpl = tcpClientGetBuffer
result.getImpl = tcpClientGet

View File

@ -524,8 +524,8 @@ proc eval(ast: NodeRef; env: Env): NodeRef =
newNodeError(getCurrentExceptionMsg(), input)
except FieldError:
newNodeError("invalid argument", input)
except MissingObject:
newNodeError("object not in store", input)
except MissingChunk:
newNodeError("chunk not in store", input)
except OSError:
newNodeError(getCurrentExceptionMsg(), input)