• Type: Enhancement
    • Status: Open (View Workflow)
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: Future
    • Component/s: REST
    • Labels:


      None of our GET endpoints stream the response. We first build the entire response body in memory before writing it to the client. When querying large data sets this can create heap pressure and even lead to OutOfMemoryErrors. Consider this simple REST integration test that I wrote:

      class StressITest extends RESTTest {
        void stressReads() {
          Cluster cluster = new Cluster.Builder().addContactPoint("").build()
          def session = cluster.connect("hawkular_metrics_rest_tests")
          DataAccess dataAccess = new DataAccessImpl(session)
          DateTime start = now().minusYears(1).minusMonths(3)
          DateTime time = start
          DateTime end = now()
          def metricId = new MetricId<Double>("STRESS", GAUGE, "G1")
          def inflightRequests = new Semaphore(150)
          def dataPoints = 0
          def abort = false
          def exception = null
          def concurrentReads = 25
          def executors = Executors.newFixedThreadPool(concurrentReads)
          while (time.isBefore(end)) {
            if (abort) {
              throw exception
            def metric = new Metric<>(metricId, [
                new DataPoint(time.millis, 3.14 as double),
                new DataPoint(time.plusSeconds(15).millis, 3.14 as double),
                new DataPoint(time.plusSeconds(30).millis, 3.14 as double),
                new DataPoint(time.plusSeconds(45).millis, 3.14 as double)
            def result = dataAccess.insertGaugeData(metric, days(10).toStandardSeconds().seconds)
            time = time.plusMinutes(1)
            result.subscribe({ dataPoints++; inflightRequests.release(); }, {t -> abort = true; exception = t})
          println "DATA POINTS WRITTEN = $dataPoints"
          def requests = new CountDownLatch(concurrentReads)
          concurrentReads.each {
              try {
                def response = hawkularMetrics.get(
                    path: "gauges/G1/raw",
                    headers: [(tenantHeaderName): 'STRESS'],
                    query: [start: start.millis, end: end.millis])
                assertEquals(200, response.status)
                println "FINISHED READ"
              } catch (Exception e) {
          requests.await(3, TimeUnit.MINUTES)

      It first generates 654,202 data points and then executes 25 concurrent reads over the whole data set. This test consistently crashes with an OutOfMemoryError. This is for raw data and most of our read requests are likely to be for stats (i.e., bucketed data), which is part of the reason I am giving this a low priority; however, the fact remains that it is entirely possible run out of memory. We should be streaming our responses to provide better scalability. And even with bucketed data, it is still possible to generate an OutOfMemoryError.

      Streaming the responses is a much more natural fit with RxJava. I did a quick prototype to verify that streaming responses prevents the OutOfMemoryErrors.

          public StreamingOutput streamRawData(
                  @Suspended AsyncResponse asyncResponse,
                  @PathParam("id") String id,
                  @ApiParam(value = "Defaults to now - 8 hours") @QueryParam("start") Long start,
                  @ApiParam(value = "Defaults to now") @QueryParam("end") Long end,
                  @ApiParam(value = "Use data from earliest received, subject to retention period")
                  @QueryParam("fromEarliest") Boolean fromEarliest,
                  @ApiParam(value = "Limit the number of data points returned") @QueryParam("limit") Integer limit,
                  @ApiParam(value = "Data point sort order, based on timestamp") @QueryParam("order") Order order
          ) {
              MetricId<Double> metricId =  new MetricId<>(tenantId, GAUGE, id);
              TimeRange timeRange = new TimeRange(start, end);
              Observable<DataPoint<Double>> dataPoints = metricsService.findDataPoints(metricId, timeRange.getStart(),
                      timeRange.getEnd(), 0, Order.DESC);
              return new StreamingOutput() {
                  boolean isFirst = true;
                  @Override public void write(OutputStream output) throws IOException, WebApplicationException {
                              dataPoint -> {
                                  try {
                                      String json;
                                      if (isFirst) {
                                          json = "{\"timestamp\": " + dataPoint.getTimestamp() + ", \"value\": " +
                                                  dataPoint.getValue() + "}";
                                      } else {
                                          json = ",{\"timestamp\": " + dataPoint.getTimestamp() + ", \"value\": " +
                                                  dataPoint.getValue() + "}";
                                  } catch (IOException e) {
                              t -> t.printStackTrace(),
                              () -> {
                                  try {
                                  } catch (IOException e) {

      I do think this is somewhat low priority at the moment, but we should keep this in mind when think adding or changing endpoints. With the way I implemented tag-based bucketing for HWKMETRICS-373, the entire data set has to be loaded into memory. I honestly do not anticipate this being a problem, but it could be so we should at least think about it some.

        Gliffy Diagrams




              • Assignee:
                john.sanda John Sanda
              • Votes:
                0 Vote for this issue
                3 Start watching this issue


                • Created: