0 Comments

Its getting harder and harder to come up with relevant subtitles for this particular topic. I should probably just starting using numbers.

Totally nailed this one though.

To the topic at hand though, you might be forgiven for thinking that we had our AWS Lambda ELB Logs Processor well under control from my last post. It was processing files happily all the way through and everything seemed to be arriving in our log stack in an acceptable timeframe.

To give a complete picture, you can see the source for the Lambda function here.

Inside the source, the #{something} notation is an Octopus Deploy variable substitution. Basically, when this code is deployed into AWS using Octopus Deploy, those strings are replaced with real values appropriate for the environment being deployed to.

Its long enough that I didn’t just want to paste it into this post, but still short enough to fit comfortably in a single file. It streams a file from S3 line by line, parses each line into its constituent parts and then posts each line to a logstash TCP endpoint. It has some basic connection pooling (homegrown) and features some basic logging.

Unfortunately, the logging turned out to be pretty painful to use in production. This was primarily because there were multiple places inside the function that write log messages for each line encountered. In our sample data this is fine, because there are tens of lines. In production, we get over 60000 lines every run of the function, which means the CloudWatch logs are basically unusable. Not only that, but the information that comes out in the logs is pretty useless at scale, stating things like “starting posting to logstash”, “processing file”, “posted to logstash” and so on.

The other issue was the connection pooling. It seemed to be working (in that it reduced the pace of the function such that it no longer ran out of TCP connections), but I wasn’t entirely sure it was doing what we thought it was. To be honest, the code was pretty hard to understand, mostly as a result of the use of aysnchronous callbacks. To me, it seemed like a connection pool should be something that other people have needed, so surely there was a package available that would meet our needs.

Taking these two points into considering, I set about improving the function by:

  1. Finding and using a library for the connection pool, hopefully making it easier to read and reason about.
  2. Making the logging output actually useful when the function was executed in production.

Car Pool

Looking around there are a few packages that enable pooling of various resources available in NPM. As seems to be the pattern though, they are of varying quality and completeness. In the end I settled on a package called “generic-pool” (Github here), which seemed pretty good. A major point in its favour was that it had a version greater than 0, which seems rare for an NPM package.

Unfortunately for me, it was promise based,and I had no idea what promises were or how to use them. I am, however, familiar with the Task Parallel Library in C#, which seems to be similar conceptually, so I didn’t have to learn a whole bunch of concepts entirely from scratch.

Using the library to set up a pool of TCP connections was relatively straightforward, as the snippet below shows.

const poolFactory = {
    create: function() {
        return new Promise(function(resolve, reject) {
            const socket = net.createConnection(config.logstashPort, config.logstashHost);
            summary.connections.created += 1;
            resolve(socket);
        })
    },
    destroy: function(socket) {
        return new Promise(function(resolve, reject) {
            socket.end();
            socket.destroy();
            summary.connections.destroyed += 1;
            resolve();
        })
    },
    validate: function(socket) {
        return new Promise(function(resolve, reject){
            resolve(!socket.destroyed);
        })
    }
};

var poolOptions = {
    max: config.connectionCountLimit,
    min: 0,
    acquireTimeoutMillis: config.connectionWaitMilliseconds,
    testOnBorrow: true
};

var pool = gPool.createPool(poolFactory, poolOptions);

With a pool in place, all that was left to do was to incorporate it into the actual line parsing/processing engine, which meant a shift away from asynchronous callbacks to promises.

summary.lines.parsed += 1;

var promise = pool.acquire()
    .then((socket) => { return post(socket, entry); })
    .then((socket) => { pool.release(socket); })
    .catch((error) => { 
        summary.failures.sending.total += 1;
        if (summary.failures.sending.lastFew.length >= 5) {
            summary.failures.sending.lastFew.shift();
        }
        summary.failures.sending.lastFew.push(error);
    });

promises.push(promise);

The promises collection/array is used to determine when the function is complete, which is the opportunity to do anything that needs to happen right at the end.

Like logging.

Log Ride

Speaking of logging, the first improvement was to remove all instances where the function wrote a message for each line. Easy. Now at least the production logs wouldn’t be an unreadable mess.

The next step was to add some metrics to the logging, so we could track down the function was doing. This needed to include some statistics like lines read, lines parsed and lines sent to logstash.

This wasn’t so bad (simply introduce a function scoped variable for the metrics, and then increment the various counters whenever the specified event occurred), but it did start to cause issues when I tried to refactor the function to break it down into smaller, more easily reasoned about components. I couldn’t easily move any of the inner functions around because then they wouldn’t have the metrics object in scope. I think I might have been able to solve this problem by adding the metrics object in as a parameter to each of the functions that needed to edit it, but this would have made using those functions in the various asynchronous callbacks much harder.

With the line based metrics sorted, it was a relatively easy matter to add metrics for connection creation and destruction via the pool construct outlined above, so I don’t need to go into too much detail about that.

Finally, I also wanted to include some information about errors that occur in the process, making sure that if every single line failed for some reason, the CloudWatch logs would still remain readable. The easiest way to do this was to accumulate a small queue of the most recent errors, pushing older ones out in favour of newer ones with the understanding that the last 5 errors is probably enough information to diagnose issues. You can see the code for doing this in the promise catch handler above.

The only thing left to do was to actually log the metrics object that contained all of this juicy information, and the only place where this was possible was in the handler for when the readline module had finished reading the entire file.

function handleReaderClose() {
    console.log('File reader for ELB log file is closing because all lines have been read. Waiting for all promises (for sending parsed lines to logstash) to resolve');
    Promise
        .all(promises)
        .then(() => { console.log("Cleaning up the connection pool, which has [%s/%s] (current/max) connections", pool.size, pool.max); return pool.drain(); })
        .then(() => pool.clear())
        .then(() => { 
            console.log("All processing complete. Summary follows"); 
            console.log("%s", JSON.stringify(summary, fixStringifyError, 4)); 
        });
}

There is a few other things in there that I’m not going into too much detail about, like cleaning up the pool and a custom function to stringify errors, but the important part for this discussion is the loggin.

Conclusion

After doing the work above, I’m a bit happier with the current state of the AWS Lambda ELB Logs Processor. I feel like its better structured and easier to reason about, mostly due to the introduction of promises instead of callbacks. In addition to the quality of the code itself, the improved log output makes it easier to see how the function is going on a day to day basis as it chews through thousands and thousands of lines every run.

In particular, the summary that gets output right at the end is extremely useful for getting a high level picture of the output of a single run without overloading the reader.

An example of the sort of output we see from a normal (successful) run is below:

START RequestId: c0e94f4d-051f-11e7-8fbb-2b8356f39769 Version: $LATEST
2017-03-09T23:25:59.785Z    c0e94f4d-051f-11e7-8fbb-2b8356f39769    Retrieving ELK log file from S3 bucket/key specified in the initiating event. Bucket: [REDACTED], Key: [REDACTED]
2017-03-09T23:27:31.344Z    c0e94f4d-051f-11e7-8fbb-2b8356f39769    File reader for ELB log file is closing because all lines have been read. Waiting for all promises (for sending parsed lines to logstash) to resolve
2017-03-09T23:27:34.924Z    c0e94f4d-051f-11e7-8fbb-2b8356f39769    Cleaning up the connection pool, which has [48/100] (current/max) connections
2017-03-09T23:27:34.967Z    c0e94f4d-051f-11e7-8fbb-2b8356f39769    All processing complete. Summary follows
2017-03-09T23:27:35.003Z    c0e94f4d-051f-11e7-8fbb-2b8356f39769
{
    "lines": {
        "encountered": 76464,
        "parsed": 76464,
        "sent": 76464
    },
    "connections": {
        "created": 48,
        "destroyed": 48
    },
    "failures": {
        "parsing": {
            "total": 0,
            "lastFew": []
        },
        "sending": {
            "total": 0,
            "lastFew": []
        }
    }
}
END RequestId: c0e94f4d-051f-11e7-8fbb-2b8356f39769
REPORT RequestId: c0e94f4d-051f-11e7-8fbb-2b8356f39769    Duration: 95227.56 ms    Billed Duration: 95300 ms Memory Size: 192 MB    Max Memory Used: 165 MB

There are still many improvements to be made (especially regarding the structure of the repo and its complete lack of automated tests), but its definitely more betterer than it was.