At Gridium, we have lots and lots of smart meter data. We have meter readings every 15 minutes for hundreds of thousands of meters, for thousands of days. We also have meter characteristics such as latitude and longitude, building square footage, and utility tariff information. We have years of hourly weather data from hundreds of weather stations. We know how to spin up hundreds Amazon EC2 instances to run R code using this data; the results power our products.
But what about developing new analytic ideas? Greg, our data scientist, writes code on his desktop machine and tests it against a small data set. Wouldn’t it be nice if he could access our infrastructure and quickly try his code with thousands of meter data sets, on hundreds of machines at the same time? He certainly thought so, and that’s why we developed Gridium Labs.
We already had a platform to spin up AWS machines and run product-related analytics jobs as needed. What we needed was a way for Greg to select interesting meters and date ranges, upload his R code, and then take advantage of our platform scaling capabilities to run the code on lots of machines at once. Unlike our regular batch analytics jobs, Labs needs an end user interface for choosing meters, running jobs, and retrieving results.
Gridium Labs makes our backend platform accessible to our internal end user. He uses a front end written with Ember that talks to a Python Pyramid web API. The webapp saves project data in MongoDB, launches and monitors asynchronous Celery tasks, and talks to the Java and Postgres backend platform system. The platform system spins up a dynamic pool of EC2 spot instances, sets them up with the data they need, launches the R code to churn through the data, and uploads the results to an S3 bucket.
The first thing Greg needs to do is narrow down the list of meters he wants to use. The meter selection screen has dropdowns for utility, product, and tariff; a date picker for latest available interval data; and a text box for the minimum days of interval data. Changing a field queries the webapp and updates the number of matching meters.
The webapp usually talks to the backend platform using a REST service, but our custom ORM doesn’t support complex SQL queries. The ORM is great for getting data out of the database and into Java objects for the basic queries we do all the time, but sometimes it gets in the way. Labs needs to combine results from multiple tables, which isn’t something the ORM does well. Instead, the webapp talks directly to the database; I wrote queries to select for products, utilities, and tariffs, and combine them with
intersect to get all matching meters.
Getting meter dates
Once Greg has a list of potentially interesting meters, he needs to find out what data we have available for each meter: date ranges for interval, weather, and billing data, and start and end dates for the next couple of billing periods. Greg wants in CSV format so that he can load it into R, explore it, and generate the list of analytic jobs he wants to run.
Usually, a front end application requests data from the Pyramid webapp. We store application-specific data in a MongoDB database, but our meter data is in a backend Postgres database. To get at this data, the Pyramid webapp makes requests to the platform REST API, gets the results, and packages them for Ember Data and the front end app.
Greg usually wants to run analytics on thousands of meters, but sending a POST request with thousands of meter ids to webapps, sending another big POST request to platform from there, and waiting for it to get the results is too slow (more than a few seconds). Instead of waiting for all results to be ready, I wrote a Celery task that requests the data from platform and uploads the results to S3. The webapp starts the task, then immediately returns the task id. The browser polls the webapp every few seconds to check on the status of the task, and updates a progress bar so the end user can see that something is happening.
Even in a headless task, a request for thousands of meters at once to the platform API may time out before it gets the results. I also don’t want to stress the platform server by making thousands of requests for individual meters. Instead, I wrote a new platform API method that takes a batch of meter ids and returns the results. The Python task uses a ThreadPoolExecutor to submit up to 10 batches of up to 100 meter ids at a time, and updates the task status as they return data. Once all the data is ready, the task uploads it to an S3 bucket. When the UI poll request finds that the task has finished, it presents the end user with a link to download the file.
Uploading and running analytic jobs
Greg uses the meter dates file to generate a file containing a list of analytic jobs he wants to run. The file contains one line for each run: the project id, meter id, a unique run id and some other fields passed on to the R code. He uploads the run file to the webapp, which stores it as a field in the MongoDB project document.
He likely wants to try multiple variants of R code with the same set of meters. The next step is to create a variant. A variant includes a name, description, R code and supporting files, and a pattern for which output files to save. He fills out a form, and hits Save to create a variant. The webapp saves the variant metadata to MongoDB, and uploads the files to the project’s subdirectory in an S3 bucket.
Once the variant is set up, Greg can submit the job to the backend platform and start running analytics. Submitting a job to the backend platform takes about 100ms. That doesn’t sound too bad for a few jobs, but like the meter dates step, it’s too slow for the UI when submitting thousands of jobs.
Clicking Run starts a Celery task that returns immediately. The task uses the run data from MongoDB to generate analytic messages, which it submits to the backend platform in batches. The platform API returns an analytic run ID for each run; the task saves this in a MongoDB field so that it can check run status later.
After submitting all of the jobs, the task schedules another task for a minute in the future to check the status of the runs. This task asks platform for the status of 1,000 jobs at time, and updates the run status field in MongoDB. If there are still jobs that haven’t completed, it reschedules itself. The UI also starts polling the webapp to get the run status, and updates a progress bar so that the end user can monitor the progress of the run.
Submitting jobs to platform creates messages on an SQS queue, which then get picked up and run by analytics instances. The platform server takes care of spinning up new instances as needed (up to 200 at a time), and dispatching analytic jobs to them. The message on the queue contains a field which tells the analytic machine which Java code to run to complete the job. The Labs launcher first sets up the environment. Like our product analytics, it queries the database to get interval and meter data and write it to local CSV files. A product analytic run uses code checked out from git to the filesystem, but a Labs run gets its code from an S3 bucket, with the path specified in the queue message. Next, the Labs launcher runs the R code and uploads the results to S3.
Some of our product analytics put a message into an SQS queue when they complete, so that other processes can pick up the messages and do something else, so I started down that path for Labs jobs as well. I wrote a Celery task that checked for results every few minutes. But the Amazon SQS API only allows getting 10 messages at a time. For Labs jobs that often have thousands of analytic runs, the task spent way too much time talking to the queue.
Instead of going through an external queue, I added an API method to platform that, given a list of analytic job ids, returns their status (one of initializing, queued, running, success, failed). An initializing job has been submitted to platform; a job is only in this state until platform generates a message and puts it on the SQS queue. A queued job has been placed on the queue and is awaiting its turn with an analytics instance. A running job is, of course, actually running. How long it runs varies a lot, since Greg can upload arbitrary R code, but it’s usually between one and ten minutes per job. A job will eventually succeed, fail, or if not heard from for a while, be marked lost (most likely because the spot instance went away).
When webapp starts a Labs run, it schedules a task that makes a request to the platform server to get the status of individual jobs and saves the results to MongoDB. Most of the jobs won’t change status in any one minute, so the task doesn’t request updates for all jobs. On each run, the task asks the platform API for the status of up to 1,000 jobs: first the running and initializing jobs (they’re most likely to change state), then queued jobs. The Ember application polls the webapp API regularly, and updates a progress bar so it’s easy to see how a Labs run is progressing.
The analytics servers upload their results to an S3 bucket when complete. These buckets have permissions set so they’re only visible to authorized users. The UI makes it easy to retrieve these results, even before the run is complete.
At first, the webapp downloaded the files for each run from S3 using the boto library, zipped them up, and returned the zipped file as an attachment. This worked fine when testing, but with real Labs runs containing thousands of results, it took too long to download large output twice (once from S3 to the webapp, and again from the webapp to the browser), and the webapps instance ran out of disk space.
In this flow, the webapp was really just providing a permissions layer, but AWS has a much simpler way to do that. Since our end user is an engineer, I took a step sideways from the pretty Ember UI and set him up with AWS CLI on his desktop machine. Now, the Ember app just generates a command line for easy cut and paste, and Greg downloads the results directly from S3 to his laptop.
Labs in use
The Labs project has made it easy for Greg, our data scientist to take advantage of our infrastructure, and run more and larger experiments than he ever could on one desktop server. Soon after starting to use it, Greg described how he was using it:
1410 runs queued (each one fitting a model about 40 times and making 40 one-day ahead predictions for a performance assessment), 100 running at a time. Should be done in a few hours. Doing this on an 8 core PC … might be done in a week.
More than 100,000 runs later, he’s still finding new things to do with it. For me, it was interesting to work in all parts of our stack, from Ember to Celery tasks to SQL. I got to try out the (then-new) Ember CLI on the front end, and now we’re using it for all of our Ember projects. Writing new platform code helped me get a deeper understanding of our systems; this helped with a later project where I wanted to take advantage of our pool of EC2 instances. Thinking about how to get all the pieces to talk to each other is always fun, and being able to do something we couldn’t yesterday — that’s what I love about software engineering.