PG Phriday: Parallel-O-Postgres

I wasn’t able to write an article last week due to an unexpected complication regarding tests I was running to verify its contents. So this week, it’s going to be extra special! Also long.

What’s the fastest way to load a Postgres table? If you believe the documentation, the COPY command is the best way to unceremoniously heave data into a table. Fortunately after all of our talk about partitions, our minds are primed and ready to think in chunks. Why restrict ourselves to one COPY, when we can summon an army?

Let’s start with our tried and true sensor_log table:

CREATE TABLE sensor_log (
  sensor_log_id  SERIAL PRIMARY KEY,
  location       VARCHAR NOT NULL,
  reading        BIGINT NOT NULL,
  reading_date   TIMESTAMP NOT NULL
);

CREATE INDEX idx_sensor_log_location ON sensor_log (location);
CREATE INDEX idx_sensor_log_date ON sensor_log (reading_date);

Now, there are numerous methods for filling this table. We’ve explored in the past and determined that COPY is the fastest method from built-in Postgres functionality. It does this by circumventing a large portion of otherwise necessary overhead. As an example, a simple Python script can insert 100k records in about 25 seconds with INSERT statements, or 1.5 seconds with a COPY command equivalent like psycopg’s copy_to.

And as previously mentioned, we can parallelize everything for even more benefit. Or can we? Take a look at this Python code for a very simple parallel loader:

# Set these to modify how large the COMMIT blocks will be,
# and how many days the data will represent.
# Total row count = chunk_size * date_range.

chunk_size = 100000
date_range = 8
threads = 1

# Includes

import psycopg2
from StringIO import StringIO
from datetime import datetime, timedelta, date, time
from multiprocessing import Pool

# Main program body

def load_part(part_date):

    # Create all of the fake data we'll be inserting. We'll use an
    # in-memory file so we can use a PostgreSQL COPY import.

    stamp = datetime.combine(part_date, time.max)
    raw_data = StringIO()

    for i in xrange(1, chunk_size + 1):
        stamp -= timedelta(seconds = 0.5)
        raw_data.write(
            '%s\t%s\t%s\n' % (i % 1000, i % 100, stamp)
        )

    raw_data.seek(0)

    # Connect to the database and insert everything we've generated.
    # This is really basic and lame, but works as a demo.

    db_conn = psycopg2.connect(database = 'postgres', user = 'postgres')
    cur = db_conn.cursor()

    cur.copy_from(
        raw_data, 'sensor_log', '\t',
        columns = ('location', 'reading', 'reading_date')
    )

    db_conn.commit()

    raw_data.close()
    db_conn.close()

# Generate a list of dates that we can send to the pool mapping function.
# This should keep our multiprocess stack full until all data is loaded.

date_list = [date.today() - timedelta(i) for i in xrange(0, date_range)]

pool = Pool(processes = threads)
res = pool.map(load_part, date_list)
pool.close()

There’s not really anything complicated going on in that script. We have a configurable worker pool, we’ve elected to group data in 100k-row batches, and we have eight batches to test several multiples of two. The real fun is getting the results. Here’s what they look like on an 8-CPU system with 32GB of RAM using Postgres 9.4 for 1, 2, 4, and 8 processes:

Pool Size Avg Time (s)
1 18.6
2 11.3
4 7.0
8 8.2

What happened at eight processes? It turns out our Python script does its own work, so there’s enough context switching that it becomes expected overhead. For this particular approach, it’s best to restrict parallelism to half the amount of CPUs on the system so that doesn’t happen. Why not a pool size of 6? Since the batches are even amounts of work, it effectively handles 400k rows at a time, leaving a final 200k to distribute to two workers for one last batch. So our run-time would remain unchanged from the 4-process case.

But this is loading the same table. We’ve been covering various aspects of partitions for weeks now, so why not see what happens with partitions? As it turns out, a lot changes. Here’s the same tests with a slightly altered Python script that assumes daily partitions for our simulated data:

Pool Size Avg Time (s)
1 16.2
2 9.6
4 6.0
8 6.8

We can account for the slightly lower times mainly due to differences in primary key calculations. The unique indexes that define them are only valid per partition, so building the indexes is faster with fewer candidates. That does mean that the effect should persist for larger, more production-capacity tables of course.

There’s a major reason I used Postgres 9.4 for these tests. One of the mechanisms COPY uses to streamline data into the target table relies on the WAL backend. Previous versions of Postgres only really had one working slot for this, so parallel COPY wasn’t really something that existed. Let’s see what happens with the parallel partition numbers with a 9.3 instance:

Pool Size Avg Time (s)
1 19.8
2 12.1
4 10.5
8 16.2

Take special note of that last item on the list. What happened is clear if we examine the logs for the COPY commands themselves:

LOG:  duration: 14688.654 ms  statement: COPY ...

Remember that there are eight of these executing in parallel, and each of them required about 14.5 seconds to run. So that’s actually 14.5*8, or almost two entire minutes of CPU time to load 800k rows. That’s six times slower than the sequential case. Even if we go down to four processes, each load requires about four seconds to run. That means four parallel processes require 32 seconds of CPU time. By trying to parallelize this in 9.3, we actually made it much more resource intensive.

It looks faster due to time elapsed, but that’s simply a side-effect of increased concurrency. Remember, at four seconds per batch set, it would take eight seconds of elapsed time to process two sets. After factoring in the Python script overhead, that’s perfectly inline with our results. This is why it’s important to consider the overall work being done, and not how quickly it finishes.

Knowing that, 9.4 isn’t exactly innocent when it comes to this issue. At four concurrent processes, each COPY requires about 2.5 seconds on this system. So even four concurrent processes is less efficient than a sequential load in 9.4, though the effect is far less pronounced than in 9.3.

Not to leave 9.5 out now that it’s in beta, here are its numbers using the partitioning script:

Pool Size Avg Time (s)
1 15.7
2 7.7
4 5.1
8 6.2

That looks like a pretty nice improvement over 9.4 in general, which is always nice to see between version bumps. At this point, each COPY requires about two seconds, bringing the 4-CPU case almost to parity with sequentially loading the tables. Converting a process to be parallel always introduces some amount of overhead, but this is a vast improvement from 9.3.

It’s also what I consider the final nail in the coffin for pushing upgrades. Postgres 9.3 is awesome, but 9.4 contains so many improvements it’s difficult to quantify them all. Most users and DBAs probably didn’t know COPY was essentially sequential across a server instance previous to 9.4, and that’s only one improvement that got very little fanfare according to Google. Imagine what else is in there!

There are probably a lot of things in 9.5 that equally defy simple analysis and could bear even more drastic results. But for now, check your Postgres version and if you are using parallel COPY on anything less than 9.4, you might want to upgrade soon.