Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 56 additions & 19 deletions phlex/app/load_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
#include "phlex/source.hpp"

#include "boost/algorithm/string.hpp"
#include "boost/dll/import.hpp"
#include "boost/dll/shared_library.hpp"
#include "boost/json.hpp"

#include <cstdlib>
#include <functional>
#include <optional>
#include <string>
#include <string_view>

Expand All @@ -21,27 +21,61 @@ namespace phlex::experimental {
namespace {
constexpr std::string_view pymodule_name{"pymodule"};

// If factory function goes out of scope, then the library is unloaded...and that's
// bad.
// The shared_library member in each wrapper struct keeps the loaded .so
// alive for the lifetime of the wrapper. If it goes out of scope, the
// library is unloaded and the stored function pointer becomes invalid.
struct module_plugin {
boost::dll::shared_library lib;
detail::module_creator_t* fn{};

void operator()(module_graph_proxy<void_tag> proxy,
configuration const& config) const
{
fn(std::move(proxy), config);
}
};

struct source_plugin {
boost::dll::shared_library lib;
detail::source_creator_t* fn{};

void operator()(source_bundle bundle, configuration const& config) const
{
fn(bundle, config);
}
};

struct driver_plugin {
boost::dll::shared_library lib;
detail::driver_shim_t* fn{};

void operator()(driver_proxy proxy,
configuration const& config,
driver_bundle* out) const
{
fn(proxy, config, out);
}
};

// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
std::vector<std::function<detail::module_creator_t>> create_module;
std::vector<module_plugin> create_module;
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
std::vector<std::function<detail::source_creator_t>> create_source;
std::vector<source_plugin> create_source;
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
std::function<detail::driver_shim_t> create_driver;
std::optional<driver_plugin> create_driver;

template <typename creator_t>
std::function<creator_t> plugin_loader(std::string const& spec, std::string const& symbol_name)
std::pair<boost::dll::shared_library, creator_t*>
plugin_loader(std::string const& spec, std::string const& symbol_name)
{
// Called during single-threaded graph construction
char const* plugin_path_ptr =
std::getenv("PHLEX_PLUGIN_PATH"); // NOLINT(concurrency-mt-unsafe)
if (!plugin_path_ptr)
throw std::runtime_error("PHLEX_PLUGIN_PATH has not been set.");

using namespace boost;
std::vector<std::string> subdirs;
split(subdirs, plugin_path_ptr, is_any_of(":"));
boost::split(subdirs, plugin_path_ptr, boost::is_any_of(":"));

// FIXME: Need to test to ensure that first match wins.
for (auto const& subdir : subdirs) {
Expand All @@ -50,9 +84,11 @@ namespace phlex::experimental {
if (exists(shared_library_path)) {
// Load pymodule with rtld_global to make Python symbols available to extension modules
// (e.g., NumPy). Load all other plugins with rtld_local (default) to avoid symbol collisions.
auto const load_mode =
(spec == pymodule_name) ? dll::load_mode::rtld_global : dll::load_mode::default_mode;
return dll::import_symbol<creator_t>(shared_library_path, symbol_name, load_mode);
auto const load_mode = (spec == pymodule_name) ? boost::dll::load_mode::rtld_global
: boost::dll::load_mode::default_mode;
boost::dll::shared_library lib{shared_library_path, load_mode};
auto* fn = &lib.get<creator_t>(symbol_name);
return {std::move(lib), fn};
}
}
throw std::runtime_error("Could not locate library with specification '"s + spec +
Expand Down Expand Up @@ -89,8 +125,8 @@ namespace phlex::experimental {
auto const adjusted_config = detail::adjust_config(label, std::move(raw_config));

auto const& spec = value_to<std::string>(adjusted_config.at("cpp"));
auto& creator =
create_module.emplace_back(plugin_loader<detail::module_creator_t>(spec, "create_module"));
auto [lib, fn] = plugin_loader<detail::module_creator_t>(spec, "create_module");
auto& creator = create_module.emplace_back(module_plugin{std::move(lib), fn});

configuration const config{adjusted_config};
creator(g.module_proxy(config), config);
Expand All @@ -101,8 +137,8 @@ namespace phlex::experimental {
auto const adjusted_config = detail::adjust_config(label, std::move(raw_config));

auto const& spec = value_to<std::string>(adjusted_config.at("cpp"));
auto& creator =
create_source.emplace_back(plugin_loader<detail::source_creator_t>(spec, "create_source"));
auto [lib, fn] = plugin_loader<detail::source_creator_t>(spec, "create_source");
auto& creator = create_source.emplace_back(source_plugin{std::move(lib), fn});

// FIXME: Should probably use the parameter name (e.g.) 'plugin_label' instead of
// 'module_label', but that requires adjusting other parts of the system
Expand All @@ -120,9 +156,10 @@ namespace phlex::experimental {
// False positive: clang-analyzer cannot trace ownership through Boost's is_any_of<char>
// internal reference counting in classification.hpp.
// NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks,clang-analyzer-cplusplus.NewDelete)
create_driver = plugin_loader<detail::driver_shim_t>(spec, "create_driver");
auto [lib, fn] = plugin_loader<detail::driver_shim_t>(spec, "create_driver");
create_driver.emplace(driver_plugin{std::move(lib), fn});
driver_bundle result;
create_driver(driver_proxy{}, config, &result);
(*create_driver)(driver_proxy{}, config, &result);
return result;
}
}
67 changes: 67 additions & 0 deletions phlex/core/node_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,74 @@
#include <vector>

namespace phlex::experimental {
// A node_catalog is the framework_graph's registry of all the algorithm nodes
// that make up a Phlex data-processing application. It holds every kind of
// node the framework knows about (predicates, observers, outputs, folds,
// unfolds, transforms, providers, and sources), grouped by type, and serves
// as the single source of node information used to build and run the flow
// graph.

// The externally visible entry point for plugin modules, with the signature:
// extern "C" void create_module(
// phlex::experimental::module_graph_proxy<phlex::experimental::void_tag> m,
// phlex::configuration const& config)
//
// User plugins define this function via the PHLEX_REGISTER_ALGORITHMS macro,
// which expands to the extern "C" entry point. The body invokes module methods
// like transform(), predicate(), fold(), unfold(), observe(), and output() on
// the 'm' argument, which add nodes to this node_catalog.
//
// During application initialization, the framework loads each configured
// plugin shared library (PHLEX_PLUGIN_PATH), resolves the create_module entry
// point via Boost.DLL, and invokes it to populate this node_catalog before
// building the flow graph.

struct PHLEX_CORE_EXPORT node_catalog {
// How nodes are added to the node_catalog
// ---------------------------------------
// The node_catalog is populated as a side effect of *registration*, i.e.
// when user algorithms are declared to be part of the framework_graph.
// There are three insertion paths, all of which ultimately insert a node
// into one of the simple_ptr_map members:
//
// 1. Algorithm registration (transform, predicate, observe, fold,
// unfold, provide, output): Each declaration creates a short-lived
// *_api builder holding a registrar bound to the appropriate map (see
// registrar_for()). For all but output nodes, the registrar's
// destructor creates the node and inserts in at the end of the
// registration statement. For output nodes, an explicit call does
// this.
// 2. Sources: glue::source() inserts directly into `sources`, bypassing
// the registrar.
// 3. Implicit provider_node objects: make_computational_edges() creates
// and inserts into `providers` later, during graph finalization, to
// satisfy otherwise unrequited inputs.
//
// Module (plugin) loading time
// ----------------------------
// Most registrations originate from dynamically loaded *modules*. At graph
// construction time, load_module() (phlex/app/load_module.cpp) loads each
// configured plugin shared library found on PHLEX_PLUGIN_PATH. For each
// plugin, the `create_module` entry point is found (usually defined via the
// macro PHLEX_REGISTER_ALGORITHMS).
//
// The framework then invokes `create_module`, passing a module_graph_proxy
// that forwards fold/observe/predicate/transform/unfold/output calls into
// this catalog. The plugin's registration code therefore runs during
// single-threaded graph construction and adds its nodes here via the paths
// described above.

// Note: The loaded libraries' factory functions are kept alive for the
// lifetime of the job so the libraries are not unloaded out from under the
// registered nodes.

// Only the framework_graph for an application is intended to have a
// node_catalog; it should not get copied or assigned, so we disable copying
// and moving to prevent accidental use.
node_catalog() = default;
node_catalog(node_catalog const&) = delete;
node_catalog& operator=(node_catalog const&) = delete;

template <typename Ptr>
auto registrar_for(std::vector<std::string>& errors)
{
Expand Down
2 changes: 1 addition & 1 deletion phlex/module.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace phlex::experimental {
/// @brief Proxy for registering module algorithm nodes.
///
/// Passed to @c PHLEX_REGISTER_ALGORITHMS plugin entry points. Provides
/// access to fold, observe, predicate, transform, and unfold registration.
/// access to fold, observe, output, predicate, transform, and unfold registration.
/// Users never construct this type directly.
template <typename T>
class module_graph_proxy : graph_proxy<T> {
Expand Down
Loading