diff --git a/concore.jl b/concore.jl index 5fb2da9..1b23ff0 100644 --- a/concore.jl +++ b/concore.jl @@ -13,6 +13,8 @@ module Concore +using Mmap + # ----------------------------------------------------------------------------- # Backend selection # ----------------------------------------------------------------------------- @@ -23,8 +25,19 @@ abstract type AbstractBackend end """Default local file transport backend.""" struct FileBackend <: AbstractBackend end +"""Memory-mapped local file transport backend.""" +struct MmapBackend <: AbstractBackend + segment_size::Int + + function MmapBackend(segment_size::Int = 4096) + segment_size > 1 || throw(ArgumentError("mmap segment size must be greater than 1")) + new(segment_size) + end +end + # Compatibility name for proposal/docs wording. const FileTransport = FileBackend +const MmapTransport = MmapBackend # ----------------------------------------------------------------------------- # Path configuration @@ -82,10 +95,146 @@ const _S_MAX_LEN = 65_536 _backend_inpath(::FileBackend) = inpath _backend_outpath(::FileBackend) = outpath +_backend_inpath(::MmapBackend) = inpath +_backend_outpath(::MmapBackend) = outpath _input_dir(port::Int) = _backend_inpath(_backend) * string(port) _output_dir(port::Int) = _backend_outpath(_backend) * string(port) +const _mmap_segments = Dict{String, Tuple{IOStream, Vector{UInt8}}}() +const _mmap_cleanup_registered = Ref(false) + +function _register_mmap_cleanup() + if !_mmap_cleanup_registered[] + atexit(mmap_cleanup) + _mmap_cleanup_registered[] = true + end +end + +function _close_mmap_segment(path::AbstractString) + segment = pop!(_mmap_segments, String(path), nothing) + segment === nothing && return nothing + + io, buf = segment + try + Mmap.sync!(buf) + catch + end + try + finalize(buf) + catch + end + try + isopen(io) && close(io) + catch + end + GC.gc() + return nothing +end + +function _mmap_segment(path::AbstractString, size::Int)::Vector{UInt8} + segment = get(_mmap_segments, String(path), nothing) + if segment !== nothing && isopen(segment[1]) && length(segment[2]) >= size + return segment[2] + end + if segment !== nothing + _close_mmap_segment(path) + end + + _register_mmap_cleanup() + mkpath(dirname(path)) + + io = open(path, isfile(path) ? "r+" : "w+") + if filesize(path) < size + seek(io, size - 1) + write(io, UInt8(0)) + seekstart(io) + end + + buf = try + Mmap.mmap(io, Vector{UInt8}, size) + catch + close(io) + rethrow() + end + _mmap_segments[String(path)] = (io, buf) + return buf +end + +function _mmap_content(buf::Vector{UInt8})::String + nullpos = findfirst(iszero, buf) + last = nullpos === nothing ? length(buf) : nullpos - 1 + last <= 0 && return "" + return String(copy(buf[1:last])) +end + +function _mmap_read(path::AbstractString, initstr::AbstractString, size::Int)::String + isfile(path) || return String(initstr) + buf = _mmap_segment(path, size) + ins = _mmap_content(buf) + return isempty(ins) ? String(initstr) : ins +end + +function _mmap_write(path::AbstractString, wire::AbstractString, size::Int) + bytes = Vector{UInt8}(wire) + if length(bytes) >= size + throw(ArgumentError("mmap payload exceeds segment size")) + end + + buf = _mmap_segment(path, size) + n = length(bytes) + buf[1:n] .= bytes + buf[n + 1] = 0x00 + if n + 2 <= size + buf[n + 2:size] .= 0x00 + end + Mmap.sync!(buf) + return nothing +end + +function _write_wire_file(path::AbstractString, wire::AbstractString) + segment = get(_mmap_segments, String(path), nothing) + if segment !== nothing && isopen(segment[1]) + return _mmap_write(path, wire, length(segment[2])) + end + + open(path, "w") do f + write(f, wire) + end + return nothing +end + +"""Close open memory-mapped segment handles.""" +function mmap_cleanup() + segments = collect(values(_mmap_segments)) + empty!(_mmap_segments) + + for (io, buf) in segments + try + Mmap.sync!(buf) + catch + end + try + finalize(buf) + catch + end + try + isopen(io) && close(io) + catch + end + end + + segments = nothing + GC.gc() + return nothing +end + +function _wire_content(raw::AbstractString)::String + nullpos = findfirst(==('\0'), raw) + nullpos === nothing && return String(raw) + return String(raw[1:nullpos - 1]) +end + function _port_file_path(base::AbstractString, port::Int, name::AbstractString)::String port_dir = abspath(base * string(port)) filepath = abspath(joinpath(port_dir, String(name))) @@ -312,7 +461,11 @@ function concore_read( ins = "" try - ins = read(filepath, String) + if _backend isa MmapBackend + ins = _mmap_read(filepath, initstr, _backend.segment_size) + else + ins = _wire_content(read(filepath, String)) + end catch ins = initstr end @@ -322,7 +475,11 @@ function concore_read( while isempty(ins) && attempts < 5 sleep(delay) try - ins = read(filepath, String) + if _backend isa MmapBackend + ins = _mmap_read(filepath, initstr, _backend.segment_size) + else + ins = _wire_content(read(filepath, String)) + end catch end attempts += 1 @@ -359,8 +516,10 @@ function concore_write( outval = Float64[simtime + Float64(delta); Float64.(val)] wire = _format_wire(outval) - open(filepath, "w") do f - write(f, wire) + if _backend isa MmapBackend + _mmap_write(filepath, wire, _backend.segment_size) + else + _write_wire_file(filepath, wire) end return nothing @@ -380,8 +539,10 @@ function concore_write( sleep(2 * delay) filepath = _port_file_path(_backend_outpath(_backend), port, name) mkpath(dirname(filepath)) - open(filepath, "w") do f - write(f, val) + if _backend isa MmapBackend + _mmap_write(filepath, val, _backend.segment_size) + else + _write_wire_file(filepath, val) end return nothing end @@ -432,6 +593,13 @@ function concore_init!() return nothing end +function concore_init!(backend::AbstractBackend) + global _backend + _backend = backend + concore_init!() + return nothing +end + # Backward-compatible aliases (without !) const load_iport = load_iport! const load_oport = load_oport! @@ -448,6 +616,7 @@ export tryparam, default_maxtime!, safe_parse_list export load_iport!, load_oport!, load_params!, concore_init! export load_iport, load_oport, load_params, default_maxtime, concore_init export AbstractBackend, FileBackend, FileTransport +export MmapBackend, MmapTransport, mmap_cleanup # ----------------------------------------------------------------------------- # Auto-initialize on load diff --git a/tests/julia/runtests.jl b/tests/julia/runtests.jl index df11e24..0a56935 100644 --- a/tests/julia/runtests.jl +++ b/tests/julia/runtests.jl @@ -10,4 +10,5 @@ using .Concore include("test_protocol.jl") include("test_wire_compat.jl") include("test_interop.jl") + include("test_mmap.jl") end diff --git a/tests/julia/test_mmap.jl b/tests/julia/test_mmap.jl new file mode 100644 index 0000000..6992f20 --- /dev/null +++ b/tests/julia/test_mmap.jl @@ -0,0 +1,203 @@ +@testset "Mmap backend" begin + + function reset_mmap_state!() + Concore.mmap_cleanup() + Concore._backend = Concore.FileBackend() + Concore.simtime = 0.0 + Concore.delay = 0.0 + Concore.s = "" + Concore.olds = "" + Concore.retrycount = 0 + end + + function with_mmap_tempdir(f) + dir = mktempdir(; cleanup=false) + try + f(dir) + finally + Concore.mmap_cleanup() + GC.gc() + rm(dir; recursive=true, force=true) + end + end + + @testset "backend type" begin + @test Concore.MmapBackend <: Concore.AbstractBackend + @test Concore.MmapBackend().segment_size == 4096 + @test Concore.MmapBackend(512).segment_size == 512 + @test Concore.MmapTransport === Concore.MmapBackend + @test_throws ArgumentError Concore.MmapBackend(1) + end + + @testset "opt in at init" begin + old_backend = Concore._backend + try + Concore.concore_init!(Concore.MmapBackend(512)) + @test Concore._backend isa Concore.MmapBackend + @test Concore._backend.segment_size == 512 + finally + Concore._backend = old_backend + Concore.mmap_cleanup() + end + end + + @testset "write uses existing wire format" begin + with_mmap_tempdir() do dir + reset_mmap_state!() + old_outpath = Concore.outpath + try + Concore._backend = Concore.MmapBackend(512) + Concore.outpath = joinpath(dir, "out") + Concore.simtime = 5.0 + + Concore.concore_write(1, "signal", [42.0, 3.14]) + + filepath = joinpath(Concore.outpath * "1", "signal") + @test isfile(filepath) + @test Concore._mmap_content(Concore._mmap_segments[filepath][2]) == "[5.0, 42.0, 3.14]" + finally + Concore.outpath = old_outpath + reset_mmap_state!() + end + end + end + + @testset "reads normal file backend output" begin + with_mmap_tempdir() do dir + reset_mmap_state!() + old_inpath = Concore.inpath + try + Concore._backend = Concore.MmapBackend(512) + Concore.inpath = joinpath(dir, "in") + mkpath(Concore.inpath * "1") + write(joinpath(Concore.inpath * "1", "signal"), "[7.0, 11.0, 12.5]") + + result = Concore.concore_read(1, "signal", "[0.0, 0.0, 0.0]") + + @test result == [11.0, 12.5] + @test Concore.simtime == 7.0 + finally + Concore.inpath = old_inpath + reset_mmap_state!() + end + end + end + + @testset "mmap round trip matches file backend behavior" begin + with_mmap_tempdir() do dir + reset_mmap_state!() + old_inpath = Concore.inpath + old_outpath = Concore.outpath + try + Concore._backend = Concore.MmapBackend(512) + Concore.inpath = joinpath(dir, "io") + Concore.outpath = joinpath(dir, "io") + Concore.simtime = 9.0 + + Concore.concore_write(1, "roundtrip", [1.5, 2.5, 3.5]) + + Concore.s = "" + Concore.olds = "" + Concore.simtime = 0.0 + result = Concore.concore_read(1, "roundtrip", "[0.0, 0.0, 0.0, 0.0]") + + @test result == [1.5, 2.5, 3.5] + @test Concore.simtime == 9.0 + finally + Concore.inpath = old_inpath + Concore.outpath = old_outpath + reset_mmap_state!() + end + end + end + + @testset "file backend reads mmap output" begin + with_mmap_tempdir() do dir + reset_mmap_state!() + old_inpath = Concore.inpath + old_outpath = Concore.outpath + try + Concore.inpath = joinpath(dir, "io") + Concore.outpath = joinpath(dir, "io") + Concore._backend = Concore.MmapBackend(512) + Concore.simtime = 4.0 + + Concore.concore_write(1, "signal", [8.0, 9.0]) + + Concore._backend = Concore.FileBackend() + Concore.simtime = 0.0 + result = Concore.concore_read(1, "signal", "[0.0, 0.0, 0.0]") + + @test result == [8.0, 9.0] + @test Concore.simtime == 4.0 + finally + Concore.inpath = old_inpath + Concore.outpath = old_outpath + reset_mmap_state!() + end + end + end + + @testset "file backend can overwrite mapped path" begin + with_mmap_tempdir() do dir + reset_mmap_state!() + old_inpath = Concore.inpath + old_outpath = Concore.outpath + try + Concore.inpath = joinpath(dir, "io") + Concore.outpath = joinpath(dir, "io") + Concore._backend = Concore.MmapBackend(512) + Concore.concore_write(1, "signal", [1.0]) + + Concore._backend = Concore.FileBackend() + Concore.simtime = 2.0 + Concore.concore_write(1, "signal", [20.0]) + result = Concore.concore_read(1, "signal", "[0.0, 0.0]") + + @test result == [20.0] + @test Concore.simtime == 2.0 + finally + Concore.inpath = old_inpath + Concore.outpath = old_outpath + reset_mmap_state!() + end + end + end + + @testset "shorter writes clear stale bytes" begin + with_mmap_tempdir() do dir + reset_mmap_state!() + old_outpath = Concore.outpath + try + Concore._backend = Concore.MmapBackend(512) + Concore.outpath = joinpath(dir, "out") + + Concore.concore_write(1, "signal", [100.0, 200.0, 300.0]) + Concore.concore_write(1, "signal", [1.0]) + + filepath = joinpath(Concore.outpath * "1", "signal") + @test Concore._mmap_content(Concore._mmap_segments[filepath][2]) == "[0.0, 1.0]" + finally + Concore.outpath = old_outpath + reset_mmap_state!() + end + end + end + + @testset "payload must fit segment" begin + with_mmap_tempdir() do dir + reset_mmap_state!() + old_outpath = Concore.outpath + try + Concore._backend = Concore.MmapBackend(16) + Concore.outpath = joinpath(dir, "out") + + @test_throws ArgumentError Concore.concore_write(1, "signal", [1.0, 2.0, 3.0]) + finally + Concore.outpath = old_outpath + reset_mmap_state!() + end + end + end + +end