Performance: Stream Controllers

In this section, you will learn about the following:

  • Why stream controllers can give better performance

  • Quirks and tricks for using stream controllers properly

Overview

In most of the previous examples, we used nested control structures that have deterministic behavior, for the most part. The triggering of a stage in a pipeline is gated in some way by the stages that precede it. The instrumentation guide provides detailed explanations on how to identify hotspots in these kinds of apps.

However, there are cases where stream controllers can provide huge speedups if used properly, because they allow data to be processed on an “as-available” basis. This allows the DRAM controllers to stay as busy as possible throughout the application and allows optimal dephasing to occur between parallel bodies of unrolled controllers to minimize congestion. One drawback is that these kinds of applications are more difficult to debug, especially using the --sim backend where parallel datapaths are not fully simulated. Another drawback is that there are some subtle considerations the user needs to be aware of to avoid indefinite hangs.

 

Inner Product with Streaming

@spatial object InnerProductStream extends SpatialApp {
  def main(args: Array[String]): Unit = {
    // Size of vector
    val N = args(0).to[Int]
    val n = ArgIn[Int]
    // Size of DRAM requests
    val dynB = args(1).to[Int]
    val dynBlockSize = ArgIn[Int]
    // Tile Size
    val BS = 64

    // Load par
    val P1 = 1
    val P2 = 1

    // Input data
    val A_data = Array.fill(N){ random[Int](4) }
    val B_data = Array.fill(N){ random[Int](4) }
    val a_dram = DRAM[Int](N)
    val b_dram = DRAM[Int](N)

    // Output
    val out = ArgOut[Int]

    // Set up memories and args
    setArg(dynBlockSize, dynB)
    setArg(n, N)
    setMem(a_dram, A_data)
    setMem(b_dram, B_data)

    Accel {
      // Set up accumulator and FIFOs
      val acc = Reg[Int](0)

      // Create stream controller to bind load and compute stages
      Stream{
        // Create on-chip memories
        val aBlk = FIFO[Int](BS)
        val bBlk = FIFO[Int](BS)

        // Load data
        Foreach(n by dynBlockSize){blk => 
          // Handle edge case
          val thisblk = min(dynBlockSize.value, n-blk)
          Parallel {
            aBlk load a_dram(blk::blk + thisblk par P1)
            bBlk load b_dram(blk::blk + thisblk par P1)
          }
        }

        // Greedily consume data
        Reduce(acc)(n by 1 par P2){i => aBlk.deq() * bBlk.deq()}{_+_}
      }
      // Copy result out
      out := acc

    }

    val result = getArg(out)
    val gold = A_data.zip(B_data){_*_}.reduce{_+_}

    println("expected: " + gold)
    println("result: " + result)

    val cksum = gold == result
    println("PASS: " + cksum + " (InnerProductStream)")
    assert(cksum)
  }
}

In the majority of these tutorials, we demonstrate how to use coarse-grained pipelining by expressing controller hierarchies a certain way. We introduce many algorithms this way because it is easier to understand and debug. Using Foreach, Reduce, and MemReduce outer controllers in Spatial default them to pipeline scheduling. Each child controller is activated individually by the parent, and the parent’s counter can only increment to the next iteration when all of its children who were active on the current iteration have returned their done flag. Performance is drastically reduced in cases where there are two separate controllers placed in adjacent siblings of a controller, each with a long datapath/latency. The first sibling must drain its entire datapath before the next sibling can begin consuming that element. If it is impossible to express the app such that the pairs are merged into one controller, using a Stream controller is the next best way to allow for greedy consumption as soon as data becomes available. Be careful, however, as using Stream controllers can become very tricky and difficult to debug, even if there is an overall performance benefit.

For this tutorial, we will use InnerProduct, as it was introduced earlier in a straightforward but sub-optimal implementation, to demonstrate how to use Stream controllers to achieve better performance in certain cases. The code on the left shows one way to implement the algorithm as a streaming app. The key idea here is that the DRAM controller can stay busy the entire time while there is still data to be processed, and the commands sent over the channels are not synchronized and scheduled together at the coarse grain level. As long as the FIFOs are not full, data will be consumed from DRAM. As long as the FIFOs are not empty, data will be drained from them.

The one tricky part about expressing the application this way is that it is possible to cause a deadlock if you are not careful about limiting the sizes of the DRAM commands. In this app, you can see that we had to make two tiling decisions. One is the actual fixed size of on-chip memories (BS), and the other is how much data to request in a single command (dynB). Since dynB does not describe any physical hardware, we can leave it as an ArgIn and experiment with it at run-time. Data will be draining from the FIFOs as soon as they have data available, so it is not necessarily bad to set dynB to be a value greater than BS. In fact, setting dynB large in order to avoid “short” DRAM transfers (as mentioned in a previous section) will help achieve better performance and amortize the overhead of DRAM transfers. It is up to the user to understand the memory interface on the target device to understand how large dynB can actually be. If it is set too large, and both requests are sharing the same DRAM channel, it is possible that one of the FIFOs will be receiving too much data back and fill up, creating a deadlock while the other FIFO sits empty, but the data currently caught in the interface cannot drain because its destination FIFO is full. It is useful to experiment with these numbers to understand the limits.

The plots below show this app’s runtime for N=12800 for different values of dynB. These results are with all parallelization factors set to 1, and the baseline is the naive implementation of InnerProduct from the previous tutorial with its parallelization factors also set to 1. Note that the tradeoffs vary greatly with choices for BS and parallelization factors.

Characterization of Streaming InnerProduct over baseline implementation, as a function of dynB tile size. A value of 256 or greater results in a hang, as described in the text

Characterization of Streaming InnerProduct over baseline implementation, as a function of dynB tile size. A value of 256 or greater results in a hang, as described in the text

Note that there are two major utilization disadvantages when using Stream controllers to hide coarse-grain pipelining latencies. First, the counter of the parent controller is duplicated to all of its children, since each child of a Stream controller can fire independently of the others. They must each maintain their own shadow of the parent’s counter, hence increasing the amount of logic resources required. Second, Stream controllers have variable latency, so many assumptions for banking and lockstep-ness are broken and the compiler will most likely choose to duplicate memories rather than banking/broadcasting the accesses.

 

Convolution with streaming

 
@spatial object ConvolutionStream extends SpatialApp {
  type T = FixPt[TRUE,_32,_0]

  def main(args: Array[String]): Unit = {

    // Set properties to be known at compile-time
    val window = 5
    val IN_CHANS_MAX = 256
    val OUT_CHANS_MAX = 256

    // Create ArgIns/Outs
    val IN_POINTS = ArgIn[Int]
    val OUT_POINTS = ArgIn[Int]
    val IN_CHANS = ArgIn[Int]
    val OUT_CHANS = ArgIn[Int]
    val STRIDE = ArgIn[Int]
    val donttouch = ArgOut[Bit]
    val n_points_in = args(0).to[Int]
    val n_points_out = args(1).to[Int]
    val n_chans_in = args(2).to[Int]
    val n_chans_out = args(3).to[Int]
    val stride = args(4).to[Int]
    setArg(IN_POINTS, n_points_in)
    setArg(OUT_POINTS, n_points_out)
    setArg(IN_CHANS, n_chans_in)
    setArg(OUT_CHANS, n_chans_out)
    setArg(STRIDE, stride)

    // Create data structures
    val in_data = (0::n_points_in,0::n_chans_in){(i,j) => (i+j).to[T]}
    val kernel = (0::window, 0::n_chans_in, 0::n_chans_out){(i,j,k) => ((i+j+k)%3).to[T]}
    val DATA = DRAM[T](IN_POINTS,IN_CHANS)
    val KERNEL = DRAM[T](window, IN_CHANS, OUT_CHANS)
    val RESULT = DRAM[T](OUT_POINTS,OUT_CHANS)
    setMem(DATA, in_data)
    setMem(KERNEL, kernel)
    printMatrix(in_data.t, "Data: (n_points is leading dimension)")
    printTensor3(kernel, "Kernel: (each matrix is in_chans x out_chans)")

    Accel {
      // Create local memory for kernel values
      val kernel_sram = SRAM[T](window, IN_CHANS_MAX, OUT_CHANS_MAX)
      kernel_sram load KERNEL

      // Create stream controller to run once per output point (which includes all output channels per point)
      Stream.Foreach(OUT_POINTS by 1){ pt =>

        // Create FIFOs to hold input data (declare here so parallelizing stream results in duplication of these)
        val in_fifos = List.tabulate(window){_ => FIFO[T](IN_CHANS_MAX)}
        // Create FIFO to buffer output data
        val line_out = FIFO[T](OUT_CHANS_MAX)
        // Create signalling FIFO to mediate control
        val store_ready = FIFO[Bit](8)

        // Fetch data
        in_fifos.zipWithIndex.map{case (f, i) => 
          // Quirk of compiler's Pipe insertion, guarantee that pt * STRIDE is computed locally
          'FETCH.Pipe{ 
            val C = pt * STRIDE
            f load DATA(max(0,min(C - (window/2) + i, IN_POINTS-1)), 0::IN_CHANS)
          }
        }

        // Greedily consume data
        // Quirk of compiler's Pipe insertion, guarantee that pt * STRIDE is computed locally
        'COMPUTE.Pipe{ 
          val C = pt * STRIDE
          // Allocate temp accumulator
          val line_out_sram = SRAM[T](OUT_CHANS_MAX)
          // Compute partial result for each IN_CHAN
          MemReduce(line_out_sram(0::OUT_CHANS))(IN_CHANS by 1){ic => 
            val local_acc = SRAM[T](OUT_CHANS_MAX)
            val data_raw = in_fifos.map{f => f.deq()}
            // While we have input data, run it against each output channel's kernel
            Foreach(OUT_CHANS by 1){oc => 
              val filter = List.tabulate(window){lane => kernel_sram(lane, ic, oc)}
              val data = List.tabulate(window){lane => mux(C - (window/2) + lane >= 0 && C - (window/2) + lane < IN_POINTS-1, data_raw(lane), 0.to[T])}
              val acc = filter.zip(data).map{case (a,b) => a*b}.reduceTree{_+_}
              local_acc(oc) = acc
            }
            local_acc
          }{_+_}
          // Quickly copy results to output FIFO and indicate data is ready
          Foreach(OUT_CHANS by 1){oc => line_out.enq(line_out_sram(oc)); if (oc == 0) {store_ready.enq(true)}}
        }

        // Store data out
        'STORE.Pipe{
          donttouch := store_ready.deq() // Do not want to begin issuing store commands too soon
          RESULT(pt, 0::OUT_CHANS) store line_out  
        }
        
      }

    }

    println(r"donttouch: $donttouch") // Guarantee compiler will not DCE

    // Compute gold
    val gold = (0::n_points_out, 0::n_chans_out){(pt, oc) => 
      val lane_prods = List.tabulate(window){lane => 
        val C = pt * stride
        Array.tabulate(n_chans_in){ic => 
          val data = if (C - (window/2) + lane >= 0 && C - (window/2) + lane < IN_POINTS-1) in_data(C + lane - (window/2), ic) else 0.to[T]
          data * kernel(lane, ic, oc)
        }.reduce{_+_}
      }
      lane_prods.reduce{_+_}
    }
    val got = getMatrix(RESULT)
    printMatrix(got.t, "Got")
    printMatrix(gold.t, "Gold")

    val cksum = gold == got
    println("PASS: " + cksum + " (ConvolutionStream)")
    assert(cksum)
  }
}

The animation below shows the general idea of convolution. It can be easily extended to handle more dimensions, as is common in a convolutional neural network. It does not attempt to show any “cleverness” in how the memory is structured or used, but rather a naive implementation that clearly demonstrates the concept.

vokoscreen-2018-12-07_14-25-12.gif

In a previous tutorial, we demonstrated one way to express convolution. Here we will use stream controllers to implement specifically the convolution introduced above and show how this implementation is much faster. The next animation below specifically shows how we will reorder the memory accesses and map the algorithm to hardware. It does not necessarily show how the stream controller mediates the interactions between FIFOs, but the annotated code should make that concept clear.

vokoscreen-2018-12-07_15-10-21.gif

The code on the left shows how to write this application. Example input arguments would be “128 64 4 3 2” for a problem with a 4-channel, 128-point input processed into a 3-channel, 64-point output by using a stride of 2.

We assume that the kernel is small enough to fit on-chip, but tiling strategies can be used if this assumption is not true.

We want the controller that mediates the prefetch-compute-store pipeline to be a stream controller. In this example, we have 5 FIFOs to hold input data, 1 FIFO to hold output data, and an extra control-helper FIFO that we will discuss later.

In the FETCH stages, we have one FIFO loading data from DRAM per Pipe. One quirk with current Spatial is that, due to unit pipe insertion and control flow, you must manually bind primitives floating in outer controllers to their consuming controllers. In this case, the calculation of C is in front of the load controller sub-tree, so it will get wrapped in its own inner Pipe. The zipWithIndex will create 5 copies of this structure. If each one were not wrapped in an explicit Pipe and C was computed once above all of these and shared by each load, the compiler will crash at some point. This is because the Stream controller implements a contract where data production-consumption is destructive. If the outer controller were changed to a regular Pipe.Foreach, then the pipe inserter would be OK as there would be a 5-buffered register feeding each of the 5 load stages. Buffering has no semantic meaning for Stream controllers, however. A later update to the compiler will handle these cases correctly and this explicit Pipe{} will not be required.

We compute the convolution inside the COMPUTE stage as normal. One easy mistake is to deq() from the FIFOs inside the innermost loop. However, because the read is destructive, this would give the incorrect answer in the best case and cause a hang in the worst case, as the stage would be attempting to consume much more data than would be produced by the FETCH stages.

At the very end of this loop, we copy data from the accumulating SRAM to a FIFO. This loop is much quicker than the MemReduce stage so it should not add much overhead. One thing to notice is that we enqueue data to the store_ready FIFO after the first element is written to the FIFO. In the STORE stage, the purpose of this FIFO is explained.

In Spatial, the immediate children of stream controllers are always enabled as long as they have not exhausted the counter of their parent and as long as their inbound and outbound memories are valid/ready. The store API results in a controller sub-tree where the first stage issues a command to DRAM to open up a data streaming sequence. The next stage is the one that dequeues from the line_out FIFO. Therefore, this command will be issued long before data is ready to be stored. This could cause a hang as the controller is waiting to store data before servicing other requests. Using the Bit FIFO is useful for ensuring that the command will not be issued until there is at least one element in the data FIFO waiting to be stored.

We write data from the Bit communication FIFO to an ArgOut and print the value to guarantee that Spatial and compilers that run later on will not dead-code eliminate this FIFO.