PG Phriday: Massively Distributed Operation

Postgres has been lacking something for quite a while, and more than a few people have attempted to alleviate the missing functionality multiple times. I’m speaking of course, about parallel queries. There are several reasons for this, and among them include various distribution and sharding needs for large data sets. When tables start to reach hundreds of millions, or even billions of rows, even high cardinality indexes produce results very slowly.

I recently ran across an extension called pmpp for Poor Man’s Parallel Processing and decided to give it a try. It uses Postgres’ dblink system to invoke queries asynchronously and then collates the results locally. This allows further queries on that result set, as if it were a temporary table.

Theoretically this should be ideal for a distributed cluster of shards, so let’s see what happens if we try this with our sensor_log table in that configuration:

CREATE TABLE sensor_log (
  sensor_log_id  SERIAL PRIMARY KEY,
  location       VARCHAR NOT NULL,
  reading        BIGINT NOT NULL,
  reading_date   TIMESTAMP NOT NULL
);

CREATE INDEX idx_sensor_log_location ON sensor_log (location);
CREATE INDEX idx_sensor_log_date ON sensor_log (reading_date);
CREATE INDEX idx_sensor_log_time ON sensor_log ((reading_date::TIME));

CREATE SCHEMA shard_1;
SET search_path TO shard_1;
CREATE TABLE sensor_log (LIKE public.sensor_log INCLUDING ALL)
INHERITS (public.sensor_log);

CREATE SCHEMA shard_2;
SET search_path TO shard_2;
CREATE TABLE sensor_log (LIKE public.sensor_log INCLUDING ALL)
INHERITS (public.sensor_log);

CREATE SCHEMA shard_3;
SET search_path TO shard_3;
CREATE TABLE sensor_log (LIKE public.sensor_log INCLUDING ALL)
INHERITS (public.sensor_log);

CREATE SCHEMA shard_4;
SET search_path TO shard_4;
CREATE TABLE sensor_log (LIKE public.sensor_log INCLUDING ALL)
INHERITS (public.sensor_log);

The top sensor_log table in the public schema exists merely so we can query all the table sets as a whole without using a bunch of UNION statements. This should allow us to simulate how such a query would run without the benefit of parallel execution on each shard.

Now we need to fill the shards with data. Fortunately the generate_series function has an option to increment by arbitrary amounts, so simulating a has function for distribution is pretty easy. Here’s what that looks like:

INSERT INTO shard_1.sensor_log (location, reading, reading_date)
SELECT s.id % 1000, s.id % 100, now() - (s.id || 's')::INTERVAL
  FROM generate_series(1, 4000000, 4) s(id);

INSERT INTO shard_2.sensor_log (location, reading, reading_date)
SELECT s.id % 1000, s.id % 100, now() - (s.id || 's')::INTERVAL
  FROM generate_series(2, 4000000, 4) s(id);

INSERT INTO shard_3.sensor_log (location, reading, reading_date)
SELECT s.id % 1000, s.id % 100, now() - (s.id || 's')::INTERVAL
  FROM generate_series(3, 4000000, 4) s(id);

INSERT INTO shard_4.sensor_log (location, reading, reading_date)
SELECT s.id % 1000, s.id % 100, now() - (s.id || 's')::INTERVAL
  FROM generate_series(4, 4000000, 4) s(id);

Clearly a real sharding scenario would have a lot more involved in distributing the data. But this is poor-man’s parallelism, so it’s only appropriate to have a bunch of lazy shards to go with it.

In any case, we’re ready to query these tables. The way we generated the data, each table contains a million rows representing about six weeks of entries. A not infrequent use case for this structure is checking various time periods distributed across multiple days. That’s why we created the index on the TIME portion of our reading_date column.

If for example, we wanted to examine how 2PM looked across all of our data, we would do something like this:

\timing on

SELECT count(*)
  FROM public.sensor_log
 WHERE reading_date::TIME >= '14:00'
   AND reading_date::TIME < '15:00';

Time: 1215.589 ms

SELECT count(*)
  FROM shard_1.sensor_log
 WHERE reading_date::TIME >= '14:00'
   AND reading_date::TIME < '15:00';

Time: 265.620 ms

The second run with just one partition is included to give some insight at how fast the query could be if all four partitions could be checked at once. Here’s where the pmpp extension comes into play. It lets us send as many queries as we want in parallel, and pulls the results as they complete. Each query can be set to a different connection, too.

For the sake of simplicity, we’ll just simulate the remote connections with a local loopback to the database where we created all of the shards. In a more advanced scenario, we would be using at least two Postgres instances on potentially separate servers.

Prepare to be amazed!

CREATE EXTENSION postgres_fdw;
CREATE EXTENSION pmpp;

CREATE SERVER loopback
FOREIGN DATA WRAPPER postgres_fdw 
OPTIONS (host 'localhost', dbname 'postgres', port '5433');

CREATE USER MAPPING
   FOR postgres 
SERVER loopback
OPTIONS (user 'postgres', password 'test');

\timing on

CREATE TEMP TABLE tmp_foo (total INT);

SELECT sum(total) FROM pmpp.distribute( NULL::tmp_foo, 'loopback',
  array[
    'SELECT count(*) FROM shard_1.sensor_log
      WHERE reading_date::TIME >= ''14:00''
        AND reading_date::TIME < ''15:00''',
    'SELECT count(*) FROM shard_2.sensor_log
      WHERE reading_date::TIME >= ''14:00''
        AND reading_date::TIME < ''15:00''',
    'SELECT count(*) FROM shard_3.sensor_log
      WHERE reading_date::TIME >= ''14:00''
        AND reading_date::TIME < ''15:00''',
    'SELECT count(*) FROM shard_4.sensor_log
      WHERE reading_date::TIME >= ''14:00''
        AND reading_date::TIME < ''15:00'''
  ]
);

Time: 349.503 ms

Nice, eh? With a bit more “wrapping” to hide the ugliness of broadcasting a query to multiple servers, this has some major potential! Since we separated the shards by schema, we could even bootstrap the connections so the same query could be sent to each without any further modifications. Of course, the Postgres foreign data wrapper doesn’t let us set the schema for created servers, so we’d need another workaround like pg_service.conf, but the components are there.

The primary caveat is that this approach works best for queries that drastically reduce the query set using aggregates. The problem is that Postgres needs to run the query on each system, fetch the results into a temporary structure, and then return it again from the distribute() function. This means there’s an inversely proportional relationship between row count and speed; there’s a lot of overhead involved.

Take a look at what happens when we try to run the aggregate locally:

\timing on

SELECT count(*) FROM pmpp.distribute( NULL::sensor_log, 'loopback',
  array[
    'SELECT * FROM shard_1.sensor_log
      WHERE reading_date::TIME >= ''14:00''
        AND reading_date::TIME < ''15:00''',
    'SELECT * FROM shard_2.sensor_log
      WHERE reading_date::TIME >= ''14:00''
        AND reading_date::TIME < ''15:00''',
    'SELECT * FROM shard_3.sensor_log
      WHERE reading_date::TIME >= ''14:00''
        AND reading_date::TIME < ''15:00''',
    'SELECT * FROM shard_4.sensor_log
      WHERE reading_date::TIME >= ''14:00''
        AND reading_date::TIME < ''15:00'''
  ]
);

Time: 4059.308 ms

Basically, do anything possible to reduce the result set on queries. Perform aggregation remotely if possible, and don’t be surprised if pulling back tens of thousands of rows takes a bit longer than expected.

Given that caveat, this is a very powerful extension. It’s still very early in its development cycle, and could use some more functionality, but the core is definitely there. Now please excuse me while I contemplate ways to glue it to my shard_manager extension and methods of wrapping it to simplify invocation.