Jonasfj.dk/Blog
A blog by Jonas Finnemann Jensen


December 30, 2019
Running acyclic steps in Dart
Filed under: Computer,English,Google by jonasfj at 10:42 pm

When writing servers I often find myself having an acyclic graph of minor setup tasks. These tasks often include steps such as:

  • Setup credentials from instance metadata,
  • Load and validate configuration,
  • Connect to database and/or shared cache (redis, memcache),
  • Subscribe to a message broker (pub/sub, RabbitMQ, etc),
  • Start background tasks,
  • Load templates from disk,
  • Setup request routers and handlers,
  • Listen for incoming HTTP requests,

Depending on the service many of these steps have to be done before the server starts accepting traffic. As many of the steps depend upon the output of previous steps, this often becomes a large sequential method that does one step at time.

It usually happens that some steps are completely unrelated and could easily run concurrently. For example, there is no reason my server application can’t load templates from disk while also initiating a connection to database.

However, a method that concurrently executes unrelated steps, while ensuring that steps with inter-dependencies are executed sequentially, can quickly become complex and hard to maintain. To solve this problem I’ve published package:acyclic_steps. I’m sure there are other ways of solving this problem, feel free to show how you do this in the comment section.

Disclaimer: package:acyclic_steps is not an officially supported Google product. See the public release process documentation for details. Essentially, I enjoy publishing neat re-usable bits and patterns through the dart-neats project, which I started in order to publish neat things 🙂

At the high-level package:acyclic_steps facilities the definition of steps. Where a step may produce a value, and may have dependencies upon other steps.

/// A Step that loads configuration.
// NOTE: Typing Step<Config> is necessary for inference 
final Step<Config> configStep = Step
    .define('load-config')
    .build(() async {
  // Load configuration somehow
  return Config.fromJson(json.decode(
    await File('config.json').readAsString(),
  ));
});

/// A step that connects to a database.
final Step<DBConnection> databaseStep = Step
  .define('connect-database')
  .dep(configStep) // Add dependency upon the configStep
  .build((
  // Result from the configStep is injected as cfg.
  // Typing not needed, as Config will be inferred from configStep.
  cfg,
) async {
  return await DBConnection.connect(cfg.database);
});

On its own a step isn’t a very interesting object. You can get its name and direct dependencies, which is useful for printing the graph (and debugging). But one cannot simply execute a step.

Instead a step is executed using a Runner. The Runner takes care of executing dependent steps first, with maximum concurrency. The Runner object also caches the result of a step, such that a step is only executed once (by a given Runner), even if multiple other steps depend upon it.

Future<void> main() async {
  // Create Runner object
  final runner = Runner();

  // Run the databaseStep, which will run the configStep first.
  final dbconn = await runner.run(databaseStep);
  // NOTE: The type of the result is inferred from databaseStep.
  assert(dbconn is DBConnection);
}

Having to define a graph of steps from scratch whenever the initial input changes is not very convenient. To facilitate injection of some initial input for a graph of steps, the Runner object allows for steps to be overridden.

When a step is overridden a value is injected into the Runner objects internal cache. Such that any step that depends on the overridden step gets the injected value. This mechanism can be used to inject an initial value into a graph of steps, by defining virtual steps that must be overwritten with a value.

/// A virtual step for injecting initial configuration.
final Step<Config> configStep = Step
    .define('config')
    .build(() async {
  throw UnsupportedError('configStep must be overridden');
});

Future<void> main() async {
  // Create Runner object
  final runner = Runner();

  runner.override(configStep, Config(/* ... */));

  // Run the databaseStep, which will use the result of the overridden configStep
  final dbconn = await runner.run(databaseStep);
}

The ability to override a step can also be used for dependency injection. When writing tests it might be desirable to override the step that provides a shared cache (like redis or memcached) with a fake in-memory implementation suitable for local testing.

Beyond overriding steps the Runner object also allows the execution of steps to be wrapped. This can be useful for measuring and reporting the execution time of steps, handling or wrapping certain exceptions, or as illustrated below retry steps that fail due to I/O exceptions.

import 'package:retry/retry.dart';

Future<void> main() async {
  Future<T> wrapStepWithRetry<T>(
    Step<T> step,
    Future<T> Function() runStep,
  ) async {
    return await retry(() async => await runStep());
  }

  // Create Runner object
  final runner = Runner(wrapRunStep: wrapStepWithRetry);

  // Run the databaseStep, with a runner that will retry step execution.
  final dbconn = await runner.run(databaseStep);
}

Leave a comment