Handling Complex Data Flows

Written By Jesse Schutt
Posted on

At its core, programming is nothing more than stacking little blocks of logic on each other, building up a structure comprised of detailed organization, intentional design, and functional beauty. Unless those goals are held front and center it is only a matter of time before the system devolves into a convoluted mess of overlapping, overstepping, messy code.

One place this happens frequently is in the area of a complex flow. Data starts in one form and is modified by the time it reaches the end. An example of this would be something like a sign-up form:

<?php

class FictitousRegisterController {

  public function register(Request $request): JsonResponse
  {
    if ($request->invitationCode = 'knock-knock') {
      $user = User::create([
        'email' => $request->email,
        'password' => $request->password,
      ]);
      
      $user->roles()->attach('member');

      if ($request->subscribe) {
        MailingService::subscribe($user->email);

        $user->update([
          'subscriber' => true,
        ]);
      }

      if ($request->has('group_uuids')) {
        collect($request->group_uuids)->each(function($uuid) use ($user) {
          if($group = Group::whereUuid($uuid)->first()) {
            $user->groups()->attach($group);
          } else {
            throw new InvalidGroupException($uuid . ' is an invalid group uuid!');
          }
        });
      } else {
        $user->groups()->attach(Group::whereDefault()->first());
      }

      Mailer::sendWelcomeEmail($user->email);
      
      return response()->json([
        'message' => 'Welcome!',
      ]);
    }
    
    return response()->json([
      'message' => 'Error!'
    ]);
  }
}

This method has a high cyclomatic complexity, or it has a high number of potential paths through, and would benefit from some refactoring. A common approach would be to break it into several private methods to isolate logic into more manageable portions:

<?php

class FictitiousRegisterController
{

  public function register(Request $request): JsonResponse
  {
    if ($this->hasCorrectCode($request->invitationCode)) {
      $user = $this->createUser($request);

      $this->handleMailingList($request->subscribe, $user);

      $this->handleGroupAssignment($request, $user);

      Mailer::sendWelcomeEmail($user->email);

      return response()->json([
        'message' => 'Welcome!',
      ]);
    }

    return response()->json([
      'message' => 'Error!'
    ]);
  }
  
  private function createUser($request) {
    $user = User::create([
      'email' => $request->email,
      'password' => $request->password,
    ]);

    $user->roles()->attach('member');
    
    return $user;
  }
  
  private function hasCorrectCode($code) {
    return $code = 'knock-knock';
  }
  
  private function handleMailingList($subscribe, $user) {
    if ($subscribe) {
      MailingService::subscribe($user->email);

      $user->update([
        'subscriber' => true,
      ]);
    }
  }
  
  private function handleGroupAssignment($request, $user) {
    if ($request->has('group_uuids')) {
      collect($request->group_uuids)->each(function ($uuid) use ($user) {
        if ($group = Group::whereUuid($uuid)->first()) {
          $user->groups()->attach($group);
        } else {
          throw new InvalidGroupException($uuid . ' is an invalid group uuid!');
        }
      });
    } else {
      $user->groups()->attach(Group::whereDefault()->first());
    }
  }
}

Before we move on let me say that the above may be a perfectly valid solution for many use cases. Not every scenario will require extensive refactoring. That said, let’s see why we might want to consider something else.

There are a few things that bother me about this approach. For example, testing private methods is less than ideal. We would have to write an overarching feature test that has little visibility into the code aside from the input and output. Adding any more logic in the future means we have to modify the class and hope that our feature test factors in the new functionality.

Let’s look at another way

If we step up a level from the implementation and think about the elements at play we can see there are basically three different elements:

  1. The request/response (the controller)
  2. The data itself (username/password/errors)
  3. The logic that acts upon the data

With those three players identified we can start breaking down the large method and relocate code into one of these three locations.

Items related to the gathering of input and the return of data in the response can all stay within the controller as that’s what it is there for.

<?php

class FictitiousRegisterController {

  public function register(Request $request): JsonResponse
  {
    $traveler = (new RegisterTraveler())->setRequest($request);
    
    $pipes = [
      ValidateInvitationCode::class,
      CreateUser::class,
      AssignPermissions::class,
      HandleMailingList::class,
      AssignToGroups::class,
      SendWelcomeEmail::class,
    ];
    
    return app(Pipeline::class)
      ->send($traveler)
      ->through($pipes)
      ->then(function ($traveler) {
        return response()->json([
          'message' => 'Success',
        ]);
      });

The actual data itself (username, password, groups) can be centralized into a simple data transfer object, or DTO.

<?php

class RegisterTraveler {

  private $request;

  private $user;

  public function setRequest($request)
  {
    $this->request = $request;
    return $this;
  }

  public function getRequest()
  {
    return $this->request;
  }

  public function setUser($user)
  {
    $this->user = $user;
    return $this;
  }

  public function getUser()
  {
    return $this->user;
  }
}

Lastly, the individual steps that do the acting can be moved into a group of “steps”, each located within their own class.

<?php

class CreateUser implements PipeInterface {
  public function handle($traveler, $next)
  {
    $traveler->setUser(
      User::create([
        'email' => $traveler->getRequest()->email,
        'password' => $traveler()->getRequest()->password,
      ])
    );

    return $next($traveler);
  }
}
<?php

class HandleMailingList implements PipeInterface
{
  public function handle($traveler, $next)
  {
    if ($traveler->getRequest()->subscribe) {
      MailingService::subscribe($traveler->getUser()->email);

      $traveler->getUser()->update([
        'subscriber' => true,
      ]);
    }

    return $next($traveler);
  }
}

Notice how the high-level theory has driven out a solution that closely reflects the primary players?

The controller receives the input, assembles it into a DTO, and a Laravel Pipeline churns through the individual steps, returning the modified output. It’s a beautiful thing.

We’ve used this pipeline approach to break down complex flows at Zaengle a number of times with good success but there were a few issues to resolve as we established our pattern:

  1. Exiting the pipeline
  2. What to do with partial completion
  3. Breaking down pipes into sub-pipelines

Exiting the Pipeline

How can we abort if something goes wrong in one of the steps? The simplest way we’ve found is to throw an exception from a pipe. Wrapping the pipeline in a try/catch has allowed us to handle potential errors from the pipes.

<?php

class FictitiousRegisterController {

  public function register(Request $request): JsonResponse
  {
    $traveler = (new RegisterTraveler())->setRequest($request);

    $pipes = [
      ValidateInvitationCode::class,
      CreateUser::class,
      AssignPermissions::class,
      HandleMailingList::class,
      AssignToGroups::class,
      SendWelcomeEmail::class,
    ];
    
    try {
      return app(Pipeline::class)
        ->send($traveler)
        ->through($pipes)
        ->then(function ($traveler) {
          return response()->json([
            'message' => 'Success',
          ]);
        });
    } catch (Exception $e) {
      return response()->json([
        'message' => $e->getMessage(),
      ]);
    }
<?php

class AssignToGroups implements PipeInterface
{
  public function handle($traveler, $next)
  {
    if ($traveler->getRequest()->has('group_uuids')) {
      collect($traveler->getRequest()->group_uuids)->each(function($uuid) use ($traveler) {
        if($group = Group::whereUuid($uuid)->first()) {
          $traveler->getUser()->groups()->attach($group);
        } else {
          throw new InvalidGroupException($uuid . ' is an invalid group uuid!');
        }
      });
    } else {
      $traveler->getUser()->groups()->attach(Group::whereDefault()->first());
    }
    
    return $next($traveler);
  }
}

Partial Completion

Since we’ve set up the try/catch already, adding in a database transaction helps clean up database state if an anomoly occurs. Start the transaction before the pipeline kicks off, commit it if the process completes successfully, and rollback in the exception catcher if there’s a problem.

<?php

class FictitiousRegisterController {

  public function register(Request $request): JsonResponse
  {
    $traveler = (new RegisterTraveler())->setRequest($request);

    $pipes = [
      ValidateInvitationCode::class,
      CreateUser::class,
      AssignPermissions::class,
      HandleMailingList::class,
      AssignToGroups::class,
      SendWelcomeEmail::class,
    ];

    try {
      DB::beginTransaction();
      return app(Pipeline::class)
        ->send($traveler)
        ->through($pipes)
        ->then(function ($traveler) {
          DB::commit();
          return response()->json([
            'message' => 'Success',
          ]);
        });
    } catch (Exception $e) {
      DB::rollback();
      return response()->json([
        'message' => $e->getMessage(),
      ]);
    }

Sub Pipelines

Infrequently we will have some branching logic that makes sense to be within a single pipe. If that logic becomes too complex we will break it apart into a sub-pipeline. All the same principles apply to a sub-pipeline as a normal pipeline.

Benefits of Pipelines

Earlier I noted that it can be difficult to test a single method that calls a number of private methods. With a pipeline approach we have the freedom of testing individual pipes in isolation, as well as having higher-level feature tests that ensure the given input produces the expected output.

<?php

class CreateUserTest extends TestCase {
    /** @test */
    public function it_creates_a_user()
    {
      $traveler = (new RegisterTraveler)->setRequest(new Request(['email' => 'test', 'password' => 'password']));
      
      (new CreateUser)->handle($traveler, function () {});
      
      $this->assertInstanceOf(User::class, $traveler->getUser());
    }
}

Conclusion

A pipeline is one solution we’ve used to handle complex data flows. There are other options that may work better for you. The biggest takeaway from this experience was to think one step above the actual implementation and identify what the main players were, define their responsibilities, and implement a solution that maintained their integrity.

  1. Hero image