Platform Events With Async Apex
Posted: July 12, 2021

Platform Events With Async Apex

Table of Contents:

  • Database.RaisesPlatformEvents And Batch Apex
  • Transaction Finalizers & Queueable Apex
  • Platform Events With Async Apex Closing Thoughts
  • Postscript

You’ve heard about the joys of using platform events from Jonathan in Advanced Logging Using Nebula Logger — their versatility and interoperability with exception handling being a primary example of where the two features dovetail nicely together.

Something interesting starts to occur when your platform events start firing other platform events, especially when those platform events originate from Apex that is already running asynchronously. You might be wanting to fire a platform event from Batch or Queueable Apex in order to inform a LWC or Flow that a process has finished; you might be looking to add logging to something similar. There are an unlimited number of ways in which this sort of situation might come up! Starting with a Test Driven experiment typically helps us to inform ourselves of what to expect when a platform event is fired from Batch Apex (for example). Let’s dive in.

Database.RaisesPlatformEvents And Batch Apex

So what would firing a platform event from a Batch class look like, in practice? For Batch Apex, we can make use of the Database.RaisesPlatformEvents interface on our Batch class; this interface opts our Batchable into firing a BatchApexErrorEvent on any uncaught exception in the code. Funnily enough, test classes themselves can be used as Batch /Queueables:

@IsTest
private class BatchApexErrorEventExample implements Database.Batchable<SObject>, Database.RaisesPlatformEvents {

  // prior to writing the test, I'll create the minimum possible boilerplate
  // so that this class conforms to the Database.Batchable interface:

  private final Boolean shouldThrow;
  public BatchApexErrorEventExample(Boolean shouldThrow) {
    this.shouldThrow = shouldThrow;
  }

  public Database.QueryLocator start(Database.BatchableContext bc) {
    if (this.shouldThrow) {
      throw new IllegalArgumentException('Start');
    }
    return Database.getQueryLocator([SELECT Id FROM Account]);
  }

  public void execute(Database.BatchableContext bc, List<SObject> records) {
    // no-op for now
  }

  public void finish(Database.BatchableContext bc) {
    // another no-op
  }

  // this is the test I'd *LIKE* to write
  @IsTest
  static void shouldFireBatchApexErrorEvent() {
    Test.startTest();
    Database.executeBatch(new BatchApexErrorEventExample(true));
    Test.stopTest();

    // sort of a stab in the dark at the moment
    System.assertEquals(1, Limits.getPublishImmediateDML());
  }
}

Specifically, what I’m hoping to demonstrate here is how somebody unfamiliar with Database.RaisesPlatformEvents can use tests to drive out their understanding of how this particular interface works. For example: do we have to try/catch the thrown exception, or will the firing of the BatchApexErrorEvent take care of that for us?

Note that passing Booleans is a terrible practice; in keeping with the “red, green, refactor” mantra for TDD, we’ll try to get a passing test prior to cleaning things up. For now, we need some kind of conditional to prevent the compiler from complaining about an unreachable code statement, and the shouldThrow argument keeps things pretty explicit as to our expectations.

Running the test, we get:

System.IllegalArgumentException: Start

OK — so there’s no error handling built in; we’ll need to provide that in our test method prior to being able to validate that a platform event has been fired:

@IsTest
static void shouldFireBatchApexErrorEvent() {
  Test.startTest();
  try {
    Database.executeBatch(new BatchApexErrorEventExample(true));
  } catch(Exception ex) {
    System.assertEquals('Start', ex.getMessage());
  }
  Test.stopTest();

  System.assertEquals(1, Limits.getPublishImmediateDML());
}

Interestingly, this produces the same error — and it highlights an interesting platform quirk; namely, that I’ll need to invert my Test.startTest() and Test.stopTest() nesting order with the try/catch block. Turns out that because the async cache is only “flushed” when Test.stopTest() is called, the exception can’t be caught unless the try/catch is outside the start/stop blocks:

@IsTest
static void shouldFireBatchApexErrorEvent() {
  try {
    Test.startTest();
    Database.executeBatch(new BatchApexErrorEventExample(true));
    // now we can add an uncatchable exception below; if an exception
    // isn't thrown above, an assert exception can't be caught
    System.assert(false, 'Should not make it here, this exception is uncatchable');
    Test.stopTest();
  } catch(Exception ex) {
    System.assertEquals('Start', ex.getMessage());
  }

  System.assertEquals(1, Limits.getPublishImmediateDML());
}

One hurdle cleared — now the test is failing because our assert message doesn’t pass:

System.AssertException: Assertion Failed: Expected: 1, Actual: 0

Again, interesting. Remember that “async cache flushing” due to Test.stopTest() I just mentioned? Guess what? It also resets the counters in the Limits class (or at least that’s true when testing callouts made asynchronously — will the same theory hold water for platform events?). That would mean if we move the assert to right before the stopTest() call, we’ll be able to test the platform events have been fired before the Limits class resets. Alas, that ends up not working either.

This is a bit awkward, but not entirely unexpected. Ultimately, we’ve exceeded the scope of what it’s possible to test in isolation and exposed an awkward corner of the platform — we’ll need to actually create a BatchApexErrorEvent handler in order to validate that Database.RaisesPlatformEvents is working correctly:

trigger BatchApexErrorEventTrigger on BatchApexErrorEvent (after insert) {
  // ideally you're using a trigger handler framework
  // but we'll use Trigger.new for this simple example
  new BatchApexErrorEventHandler().handle(Trigger.new);
}

public class BatchApexErrorEventHandler {
  public void handle(List<BatchApexErrorEvent> batchApexErrorEvents) {
    System.debug('Made it here');
  }
}

Now here’s the really interesting part: if I just run the test again, as is, I won’t even see the “Made it here” debug statement in the logs. This is due to something that ends up making actual sense; if we think about our batch class, it’s already an async process. The BatchApexErrorEvent is now layering another async process on top of the one we’re already forcing to run by executing our Batchable. How can we continue to flush stacked async contexts in Apex after we’ve called Test.stopTest()?

Enter the Test.getEventBus().deliver() command:

@IsTest
static void shouldFireBatchApexErrorEvent() {
  try {
    Test.startTest();
    Database.executeBatch(new BatchApexErrorEventExample(true));
    // now we can add an uncatchable exception below; if an exception
    // isn't thrown above, an assert exception can't be caught
    System.assert(false, 'Should not make it here, this exception is uncatchable');
    Test.stopTest();
  } catch(Exception ex) {
    System.assertEquals('Start', ex.getMessage());
  }
  Test.getEventBus().deliver();

  // we'll come back to an actual assert for this
  // in a second
}

Now the debug statement shows in our logs and we can sew up the test (in the short term) by adding a @TestVisible static variable:

public class BatchApexErrorEventHandler {
  @TestVisible
  private static Boolean wasCalled = false;
  public void handle(List<BatchApexErrorEvent> batchApexErrorEvents) {
    wasCalled = true;
  }
}

@IsTest
static void shouldFireBatchApexErrorEvent() {
  try {
    Test.startTest();
    Database.executeBatch(new BatchApexErrorEventExample(true));
    // now we can add an uncatchable exception below; if an exception
    // isn't thrown above, an assert exception can't be caught
    System.assert(false, 'Should not make it here, this exception is uncatchable');
    Test.stopTest();
  } catch(Exception ex) {
    System.assertEquals('Start', ex.getMessage());
  }
  Test.getEventBus().deliver();

  System.assertEquals(true, BatchApexErrorEventHandler.wasCalled);
}

Et voilà!! We’ve now established that Database.RaisesPlatformEvents works exactly the way that we’d like it to. Something curious, on the subject of Test.getEventBus().deliver() calls is that you need to add additional calls to that method as the number of async contexts you’re operating in increases.

As an example, if you use Nebula Logger, adding log handling into your BatchApexErrorEventHandler is a great way to start tracking job failures across Batch classes:

public class BatchApexErrorEventHandler {
  private static final String LOG_TEMPLATE = 'Error occurred during {0} at phase: {1}.';

  public void handle(List<BatchApexErrorEvent> batchApexErrorEvents) {
    Set<Id> batchJobIds = new Set<Id>();
    // we only get the Id of the job; this logic assumes we may want
    // to only log/handle events for specific failures
    for (BatchApexErrorEvent batchErrorEvent : batchApexErrorEvents) {
      batchJobIds.add(batchErrorEvent.AsyncApexJobId);
    }
    Map<Id, AsyncApexJob> matchingJobs = new Map<Id, AsyncApexJob>([
      SELECT Id, ApexClass.Name
      FROM AsyncApexJob
      WHERE Id = :batchJobIds
    ]);

    for (BatchApexErrorEvent batchErrorEvent : batchApexErrorEvents) {
      String className = matchingJobs.get(batchErrorEvent.AsyncApexJobId).ApexClass.Name;
      switch on className {
        // you may not control all batch jobs that fail;
        // you might also only want to handle certain jobs
        when 'NameOfYourClass' {
          this.fillLog(batchErrorEvent, className);
        }
      }
    }
    Logger.saveLog();
  }

  private void fillLog(BatchApexErrorEvent errorEvent, String className) {
    List<String> logTemplateArgs = new List<String> { className, errorEvent.Phase }
    for (String recordId : errorEvent.JobScope.split(',')) {
      Logger.error(String.format(LOG_TEMPLATE, logTemplateArgs), (Id) recordId);
    }
  }
}

In order for our test to validate that the calls to Logger.saveLog() are working properly, our test has to call Test.getEventBus().deliver():

@IsTest
static void shouldFireBatchApexErrorEvent() {
  try {
    Test.startTest();
    Database.executeBatch(new BatchApexErrorEventExample(true));
    // now we can add an uncatchable exception below; if an exception
    // isn't thrown above, an assert exception can't be caught
    System.assert(false, 'Should not make it here, this exception is uncatchable');
    Test.stopTest();
  } catch(Exception ex) {
    System.assertEquals('Start', ex.getMessage());
  }
  // flushes the platform event from Database.RaisesPlatformEvents
  Test.getEventBus().deliver();
  // flushes the Logger's platform event
  Test.getEventBus().deliver();


  List<Log__c> insertedLogs = [SELECT Id , ... FROM Log__c];
  System.assertEquals(1, insertedLogs.size());
  // additional asserts validating that the individual log entries
  // match the logs being created by BatchApexErrorEventHandler
}

Note that those extra calls to Test.getEventBus().deliver() apply regardless of what sort of async Apex is being performed:

  • future methods
  • Queueables
  • Platform Events
  • Batchables

So be sure to keep that in mind when testing asynchronous Apex combined with Platform Events! The tests for validating that the BatchApexErrorEvent gets fired for the execute and finish methods of our test batch class mean we’ll need to tweak the overall class a little bit as well:

@IsTest
private class BatchApexErrorEventExample implements Database.Batchable<SObject>, Database.RaisesPlatformEvents {

  // the BatchApexErrorEvent uses a text field
  // that align to the values for this enum
  enum Phase { START, EXECUTE, FINISH }

  private final Phase throwPoint;
  public BatchApexErrorEventExample(Phase throwPoint) {
    this.throwPoint = throwPoint;
  }

  public Database.QueryLocator start(Database.BatchableContext bc) {
    throwForMatchingPhase(Phase.START);
    return Database.getQueryLocator([SELECT Id FROM Account]);
  }

  public void execute(Database.BatchableContext bc, List<SObject> records) {
    throwForMatchingPhase(Phase.EXECUTE);
  }

  public void finish(Database.BatchableContext bc) {
    throwForMatchingPhase(Phase.FINISH);
  }

  private void throwForMatchingPhase(Phase matchingPhase) {
    if (this.throwPoint == matchingPhase) {
      throw new IllegalArgumentException(matchingPhase.name());
    }
  }

  @IsTest
  static void shouldFireBatchApexErrorEventOnStart() {
    Test.startTest();
    Database.executeBatch(new BatchApexErrorEventExample(Phase.START));
    Test.stopTest();

    // assertions for logs having been created
  }

  @IsTest
  static void shouldFireBatchApexErrorEventOnExecute() {
    Test.startTest();
    Database.executeBatch(new BatchApexErrorEventExample(Phase.EXECUTE));
    Test.stopTest();

    // assertions for logs having been created
  }

  @IsTest
  static void shouldFireBatchApexErrorEventOnFinish() {
    Test.startTest();
    Database.executeBatch(new BatchApexErrorEventExample(Phase.FINISH));
    Test.stopTest();

    // assertions for logs having been created
  }
}

Now we’ve tested and validated that no matter which section of a batch job fails, we can report on errors simply by implementing the Database.RaisesPlatformEvents interface. Amazing.

To close the Batchable section — keep in mind that the switch statement using hard-coded Apex class names is also a code-smell. There are a few better options for how to handle this in a more sustainable way:

  • log all batch class exceptions
  • use CMDT to fetch a list of Apex classes where you want to enable logging

Neither approach is necessarily better; it depends (as always) on what the expectations are for your org, how many Batch classes you have, the degree of control you have over making changes like this, etc …

Transaction Finalizers & Queueable Apex

Turning to the Queueable and the new Finalizer interface, we’ll need to make some subtle adjustments to the existing BatchApexErrorEventHandler in order to straighten things out between the two async Apex implementations:

// previously BatchApexErrorEventHandler
public class AsyncApexErrorEventHandler {
  private static final String LOG_TEMPLATE = 'Error occurred during {0} with message/position: {1}.';

  public void handle(List<SObject> errorEvents) {
    if (errorEvents.isEmpty()) {
      return;
    }
    SObject firstAsyncErrorEvent = errorEvents[0];
    switch on firstAsyncErrorEvent {
      when BatchApexErrorEvent batchEvent {
        this.handleBatchErrors((List<BatchApexErrorEvent>) errorEvents);
      }
      when QueueableApexErrorEvent__e queueableError {
        this.handleQueueableErrors((List<QueueableApexErrorEvent__e>) errorEvents);
      }
    }
    Logger.saveLog();
  }

  private void handleBatchErrors(List<BatchApexErrorEvent> events) {
    Set<Id> jobIds = new Set<Id>();
    for (BatchApexErrorEvent ev : events) {
      jobIds.add(ev.AsyncApexJobId);
    }
    Map<Id, AsyncApexJob> matchingJobs = new Map<Id, AsyncApexJob>([
      SELECT Id, ApexClass.Name
      FROM AsyncApexJob
      WHERE Id = :jobIds
    ]);

    for (BatchApexErrorEvent batchErrorEvent : batchApexErrorEvents) {
      String className = matchingJobs.get(batchErrorEvent.AsyncApexJobId).ApexClass.Name;
      switch on className {
        // you may not control all batch jobs that fail;
        // you might also only want to handle certain jobs
        when 'NameOfYourClass' {
          List<String> logTemplateArgs = new List<String>{ className, batchErrorEvent.Phase };
          for (String recordId : batchErrorEvent.JobScope.split(',')) {
            this.fillLog(logTemplateArgs, recordId);
          }
        }
      }
    }
  }

  private void fillLog(List<String> logTemplateArgs, String recordId) {
    Logger.error(String.format(LOG_TEMPLATE, logTemplateArgs), (Id) recordId);
  }

  private void handleQueueableErrors(List<QueueableApexErrorEvent__e> errorEvents) {
    System.assert(false, 'Not implemented!');
  }
}

Note that we’re absolutely allowed to refactor the existing code while using TDD. The only thing we can’t do is write net-new production-level code; hence the uncatchable assertion being thrown in our new method for Queueables without having first written a failing test. Let’s do that now.

// this was previously BatchApexErrorEventExample
@IsTest
private class AsyncApexErrorEventHandlerTests
  implements Database.Batchable<SObject>, Database.RaisesPlatformEvents {

  @IsTest
  static void shouldFireBatchApexErrorEvent() {
    // ...
  }

  @IsTest
  static void shouldLogQueueableErrors() {
    try {
      Test.startTest();
      System.enqueueJob(new FailedQueueable());
      Test.stopTest();
    } catch (Exception ex) {
      // for now, let's ignore the exception
      // and come back to it later - we just
      // want a failing test
      throw ex;
    }

    Test.getEventBus().deliver(); // QueueableApexErrorEvent__e
    Test.getEventBus().deliver(); // Logger

    System.assertEquals(1, [SELECT Count() FROM Log__c]);
  }

  private class FailedQueueable implements System.Queueable {

    public void execute (System.QueueableContext qc) {
      throw new IllegalArgumentException('Fail!');
    }
  }
}

So of course, the test fails, but the failure doesn’t actually trigger what we’d like; it doesn’t queue up the QueueableApexErrorEvent__e and we don’t get to our uncatchable exception back in the newly modified AsyncApexErrorEventHandler. We now have the grounds to modify the handler itself to conform to the Finalizer interface so that we can get on with testing!

trigger QueueableErrorEventTrigger on QueueableApexErrorEvent__e (after insert) {
  new AsyncApexErrorEventHandler().handle(Trigger.new);
}

public class AsyncApexErrorEventHandler implements Finalizer {
  private final Set<String> loggableRecordIds;
  private final String jobName;

  public AsyncApexErrorEventHandler() {
    // allow zero-arg constructor for batch route
  }

  public AsyncApexErrorEventHandler(Set<String> loggableRecordIds, String jobName) {
    this.loggableRecordIds = loggableRecordIds;
    this.jobName = jobName;
  }

  // ... rest of the code

  public void execute(FinalizerContext fc) {
    if (fc.getResult == ParentJobResult.UNHANDLED_EXCEPTION) {
      QueueableApexErrorEvent__e queueableError = new QueueableApexErrorEvent__e(
        JobName__c = this.jobName,
        ConcatenatedRecordIds__c = new List<String>(this.loggableRecordIds).split(','),
        Message__c = fc.getException().getMessage()
      );
      EventBus.publish(queueableError);
    }
  }
}

// then in AsyncApexErrorEventHandlerTests

@IsTest
static void shouldLogQueueableErrors() {
  try {
    Test.startTest();
    System.enqueueJob(new FailedQueueable());
    Test.stopTest();
  } catch (Exception ex) {
    System.assertEquals('Fail!', ex.getMessage());
  }

  Test.getEventBus().deliver(); // QueueableApexErrorEvent__e
  Test.getEventBus().deliver(); // Logger

  System.assertEquals(1, [SELECT Count() FROM Log__c]);
}

private class FailedQueueable implements System.Queueable {

  public void execute (System.QueueableContext qc) {
    System.attachFinalizer(new AsyncApexErrorEventHandler(new Set<String>(), 'FailedQueueable'));
    throw new IllegalArgumentException('Fail!');
  }
}

And just like that, by commenting out the exception re-throw in our test, we’ve reached our uncatchable assert exception. The implementation for handling the Finalizer-driven code isn’t that much more code:

// in AsyncErrorEventHandler
private void handleQueueableErrors(List<QueueableApexErrorEvent__e> errorEvents) {
  for (QueueableApexErrorEvent__e errorEvent : errorEvents) {
    List<String> logFormatArgs = new List<String>{ errorEvent.JobName__c, errorEvent.Phase__c };
    if (String.isBlank(errorEvent.ConcatenatedRecordIds__c)) {
      // not all Queueables will necessarily have a discrete
      // set of recordIds to pass to this field - and that's ok!
      Logger.error(
        String.format(
          LOG_TEMPLATE,
          logFormatArgs
        )
      );
    } else {
      for (String recordId : errorEvent.ConcatenatedRecordIds__c.split(',')) {
        this.fillLog(logFormatArgs, recordId);
      }
    }
  }
}

And that’s mostly it! Of course, for actual tests, you’d want to assert meaningfully on the logs created to ensure that they had the correct count of errors; that the log entries created contained the formatted error strings for both the Batch and Queueable routes. You would also want to test the “passing record Ids to the Finalizer” route.

Platform Events With Async Apex Closing Thoughts

Using Test Driven Development to drive out the experience of experimenting with technology you haven’t used before on the platform is a fun way to quickly iterate on and challenge your understanding of the unknown. It also helps to create testing boundaries that align with the code you want to write; this allows you to spend less time reasoning about the way that things should work, and more time safely creating functionality while building up a network of tests. As well, it gives you the chance to start recognizing higher-order similarities between platform functions so that those functions can be grouped when it’s convenient — in our example, the newly rolled out Finalizer interface, in conjunction with the Database.RaisesPlatformEvents interface, helps to logically group the logging of async Apex errors into one handy class.

I “started” writing this post back in September of 2020 while playing around with Platform Events and asynx Apex while consulting, and knew that I’d need to finish it someday after integrating Nebula Logger into a flaky batch job a few months back. I worked with the Platform Events team to clarify some of their documentation on the Firing Platform Events from Batch Apex, but I think this article dovetails nicely with the content updates there by showing exactly how all of the pieces work together with Database.RaisesPlatformEvents.

As always — thank you for reading, and I hope you enjoyed!


Postscript

The last time I was writing Joys Of Apex on a plane, it was two back-to-back flights from Boston - Portland, OR while writing Batchable & Queueable Apex. That was in January 2020. A lot of things were about to change in the world & in my life, with COVID already popping up in the news abroad and my own impending exodus from the East Coast on the horizon. I’m now returning to Colorado after having flown back East to see friends. I always enjoy writing while flying; the challenge of trying to form a narrative that’s cohesive while not having access to the internet (for the most part) is something that I really enjoy.

More recently, I wrote almost the entirey of Creating A Round Robin Assignment while offline, which I thoroughly enjoyed — there wasn’t any code (aside from samples) in A Year In Open Source, but that’s also another fun “offline” read.

In the past three years, hundreds of thousands of you have come to read & enjoy the Joys Of Apex. Over that time period, I've remained staunchly opposed to advertising on the site, but I've made a Patreon account in the event that you'd like to show your support there. Know that the content here will always remain free. Thanks again for reading — see you next time!