Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 175 additions & 6 deletions concore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

module Concore

using Mmap

# -----------------------------------------------------------------------------
# Backend selection
# -----------------------------------------------------------------------------
Expand All @@ -23,8 +25,19 @@ abstract type AbstractBackend end
"""Default local file transport backend."""
struct FileBackend <: AbstractBackend end

"""Memory-mapped local file transport backend."""
struct MmapBackend <: AbstractBackend
segment_size::Int

function MmapBackend(segment_size::Int = 4096)
segment_size > 1 || throw(ArgumentError("mmap segment size must be greater than 1"))
new(segment_size)
end
end

# Compatibility name for proposal/docs wording.
const FileTransport = FileBackend
const MmapTransport = MmapBackend

# -----------------------------------------------------------------------------
# Path configuration
Expand Down Expand Up @@ -82,10 +95,146 @@ const _S_MAX_LEN = 65_536

_backend_inpath(::FileBackend) = inpath
_backend_outpath(::FileBackend) = outpath
_backend_inpath(::MmapBackend) = inpath
_backend_outpath(::MmapBackend) = outpath

_input_dir(port::Int) = _backend_inpath(_backend) * string(port)
_output_dir(port::Int) = _backend_outpath(_backend) * string(port)

const _mmap_segments = Dict{String, Tuple{IOStream, Vector{UInt8}}}()
const _mmap_cleanup_registered = Ref(false)

function _register_mmap_cleanup()
if !_mmap_cleanup_registered[]
atexit(mmap_cleanup)
_mmap_cleanup_registered[] = true
end
end

function _close_mmap_segment(path::AbstractString)
segment = pop!(_mmap_segments, String(path), nothing)
segment === nothing && return nothing

io, buf = segment
try
Mmap.sync!(buf)
catch
end
try
finalize(buf)
catch
end
try
isopen(io) && close(io)
catch
end
GC.gc()
return nothing
end

function _mmap_segment(path::AbstractString, size::Int)::Vector{UInt8}
segment = get(_mmap_segments, String(path), nothing)
if segment !== nothing && isopen(segment[1]) && length(segment[2]) >= size
return segment[2]
end
if segment !== nothing
_close_mmap_segment(path)
end

_register_mmap_cleanup()
mkpath(dirname(path))

io = open(path, isfile(path) ? "r+" : "w+")
if filesize(path) < size
seek(io, size - 1)
write(io, UInt8(0))
seekstart(io)
end

buf = try
Mmap.mmap(io, Vector{UInt8}, size)
catch
close(io)
rethrow()
end
_mmap_segments[String(path)] = (io, buf)
return buf
end

function _mmap_content(buf::Vector{UInt8})::String
nullpos = findfirst(iszero, buf)
last = nullpos === nothing ? length(buf) : nullpos - 1
last <= 0 && return ""
return String(copy(buf[1:last]))
end

function _mmap_read(path::AbstractString, initstr::AbstractString, size::Int)::String
isfile(path) || return String(initstr)
buf = _mmap_segment(path, size)
ins = _mmap_content(buf)
return isempty(ins) ? String(initstr) : ins
end

function _mmap_write(path::AbstractString, wire::AbstractString, size::Int)
bytes = Vector{UInt8}(wire)
if length(bytes) >= size
throw(ArgumentError("mmap payload exceeds segment size"))
end

buf = _mmap_segment(path, size)
n = length(bytes)
buf[1:n] .= bytes
buf[n + 1] = 0x00
if n + 2 <= size
buf[n + 2:size] .= 0x00
end
Mmap.sync!(buf)
return nothing
end

function _write_wire_file(path::AbstractString, wire::AbstractString)
segment = get(_mmap_segments, String(path), nothing)
if segment !== nothing && isopen(segment[1])
return _mmap_write(path, wire, length(segment[2]))
end

open(path, "w") do f
write(f, wire)
end
return nothing
end

"""Close open memory-mapped segment handles."""
function mmap_cleanup()
segments = collect(values(_mmap_segments))
empty!(_mmap_segments)

for (io, buf) in segments
try
Mmap.sync!(buf)
catch
end
try
finalize(buf)
catch
end
try
isopen(io) && close(io)
catch
end
end

segments = nothing
GC.gc()
return nothing
end

function _wire_content(raw::AbstractString)::String
nullpos = findfirst(==('\0'), raw)
nullpos === nothing && return String(raw)
return String(raw[1:nullpos - 1])
end

function _port_file_path(base::AbstractString, port::Int, name::AbstractString)::String
port_dir = abspath(base * string(port))
filepath = abspath(joinpath(port_dir, String(name)))
Expand Down Expand Up @@ -312,7 +461,11 @@ function concore_read(

ins = ""
try
ins = read(filepath, String)
if _backend isa MmapBackend
ins = _mmap_read(filepath, initstr, _backend.segment_size)
else
ins = _wire_content(read(filepath, String))
end
catch
ins = initstr
end
Expand All @@ -322,7 +475,11 @@ function concore_read(
while isempty(ins) && attempts < 5
sleep(delay)
try
ins = read(filepath, String)
if _backend isa MmapBackend
ins = _mmap_read(filepath, initstr, _backend.segment_size)
else
ins = _wire_content(read(filepath, String))
end
catch
end
attempts += 1
Expand Down Expand Up @@ -359,8 +516,10 @@ function concore_write(
outval = Float64[simtime + Float64(delta); Float64.(val)]
wire = _format_wire(outval)

open(filepath, "w") do f
write(f, wire)
if _backend isa MmapBackend
_mmap_write(filepath, wire, _backend.segment_size)
else
_write_wire_file(filepath, wire)
end

return nothing
Expand All @@ -380,8 +539,10 @@ function concore_write(
sleep(2 * delay)
filepath = _port_file_path(_backend_outpath(_backend), port, name)
mkpath(dirname(filepath))
open(filepath, "w") do f
write(f, val)
if _backend isa MmapBackend
_mmap_write(filepath, val, _backend.segment_size)
else
_write_wire_file(filepath, val)
end
return nothing
end
Expand Down Expand Up @@ -432,6 +593,13 @@ function concore_init!()
return nothing
end

function concore_init!(backend::AbstractBackend)
global _backend
_backend = backend
concore_init!()
return nothing
end

# Backward-compatible aliases (without !)
const load_iport = load_iport!
const load_oport = load_oport!
Expand All @@ -448,6 +616,7 @@ export tryparam, default_maxtime!, safe_parse_list
export load_iport!, load_oport!, load_params!, concore_init!
export load_iport, load_oport, load_params, default_maxtime, concore_init
export AbstractBackend, FileBackend, FileTransport
export MmapBackend, MmapTransport, mmap_cleanup

# -----------------------------------------------------------------------------
# Auto-initialize on load
Expand Down
1 change: 1 addition & 0 deletions tests/julia/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ using .Concore
include("test_protocol.jl")
include("test_wire_compat.jl")
include("test_interop.jl")
include("test_mmap.jl")
end
Loading
Loading