PG Phriday: Converting to Horizontal Distribution

Now that we’ve decided to really start embracing horizontal scaling builds, there is a critically important engine-agnostic element we need to examine. Given an existing table, how exactly should we split up the contents across our various nodes during the conversion process? Generally this is done by selecting a specific column and applying some kind of hash or custom distribution mechanism to ensure all node contents are reasonably balanced. But how do we go about figuring that out?

This question is usually answered with “use the primary key!” But this gets a bit more complex in cases where tables rely on composite keys. This doesn’t happen often, but can really throw a wrench into the works. Imagine for example, we’re using Postgres-XL and have four nodes numbered data0000 through data0003. Then we find this table:

CREATE TABLE comp_event
(
  group_code   TEXT NOT NULL,
  event_id     BIGINT NOT NULL,
  entry_tm     TIMETZ NOT NULL,
  some_data    TEXT NOT NULL
);

INSERT INTO comp_event
SELECT a.id % 10, a.id % 100000,
       '08:00'::TIMETZ + (a.id % 43200 || 's')::INTERVAL,
       repeat('a', a.id%10)
  FROM generate_series(1, 1000000) a(id);

ALTER TABLE comp_event ADD CONSTRAINT pk_comp_event
      PRIMARY KEY (group_code, event_id, entry_tm);

ANALYZE comp_event;

The default for Postgres-XL is to simply use the first column for distribution. This tends to fit most cases, as the first column is usually either the primary key, or a reasonable facsimile of it. We can even use a system view to confirm this is the case:

SELECT pcrelid::regclass AS table_name, a.attname AS column_name
  FROM pgxc_class c
  JOIN pg_attribute a ON (a.attrelid = c.pcrelid)
 WHERE a.attnum = c.pcattnum;

 table_name | column_name 
------------+-------------
 comp_event | group_code

But is this what we want? What would happen if we naively went ahead with the default value and converted the database? Well, the major problem is that we don’t know the hash algorithm Postgres-XL is using. It’s entirely possible that the resulting data distribution will be anywhere from “slightly off” to “completely awful,” and we need a way to verify uniform distribution before moving forward.

In the case of Postgres-XL, we can actually poll each node directly with EXECUTE DIRECT. Repeatedly executing the same query and just substituting the node name is both inefficient and cumbersome, especially if we have dozens or hundreds of nodes. Thankfully Postgres makes it easy to create functions that return sets, so let’s leverage that power in our favor:

CREATE TYPE pgxl_row_dist AS (node_name TEXT, total BIGINT);

CREATE OR REPLACE FUNCTION check_row_counts(tab_name REGCLASS)
RETURNS SETOF pgxl_row_dist AS
$BODY$
DECLARE
  r pgxl_row_dist;
  query TEXT;
BEGIN
  FOR r.node_name IN
      SELECT node_name
        FROM pgxc_node WHERE node_type = 'D'
  LOOP
    query = 'EXECUTE DIRECT ON (' || r.node_name || ') 
      ''SELECT count(*) FROM ' || tab_name::TEXT || '''';
    EXECUTE query INTO r.total;
    RETURN NEXT r;
  END LOOP;
END;
$BODY$ LANGUAGE plpgsql;

This function should exist in some form with the standard Postgres-XL distribution. Unfortunately if it does, I couldn’t find any equivalent. Regardless, with this in hand, we can provide a table name and see how many rows exist on each node no matter our cluster size. For our four node cluster, each node should have about 250,000 rows, give or take some variance caused by the hashing algorithm. Let’s see what the distribution actually resembles:

SELECT * FROM check_row_counts('comp_event');

 node_name | total  
-----------+--------
 data0000  | 600000
 data0001  | 200000
 data0002  | 200000
 data0003  |      0

That’s… unfortunate. The table doesn’t list its columns in order of cardinality since that’s never been a concern before now. Beyond that, the first column is part of our primary key, so it makes sense to be listed near the top anyway. Position is hardly a reliable criteria beyond a first approximation, so how do we fix this?

Let’s examine the Postgres statistics catalog for the comp_event table, and see how cardinality is actually represented:

SELECT attname, n_distinct
  FROM pg_stats
 WHERE tablename = 'comp_event';

  attname   | n_distinct 
------------+------------
 group_code |         10
 event_id   |    12471.5
 entry_tm   |  -0.158365
 some_data  |         10

The sample insert statement we used to fill comp_event should have already made this clear, but not everything is an example. If we assume the table already existed, or we loaded it with from multiple sources or scripts, the statistics would be our primary guide.

In this particular case, the event_id or entry_tm columns would be much better candidates to achieve balanced distribution. For now, let’s just keep things simple and use the event_id column since the primary difference is the cardinality. There’s no reason to introduce multiple variables such as column type quite yet.

Let’s check our row totals after telling Postgres-XL we want to use event_id for hashing:

TRUNCATE TABLE comp_event;
ALTER TABLE comp_event DISTRIBUTE BY HASH (event_id);

INSERT INTO comp_event
SELECT a.id % 10, a.id % 100000,
       '08:00'::TIMETZ + (a.id % 43200 || 's')::INTERVAL,
       repeat('a', a.id%10)
  FROM generate_series(1, 1000000) a(id);

SELECT * FROM check_row_counts('comp_event');

 node_name | total  
-----------+--------
 data0000  | 250050
 data0001  | 249020
 data0002  | 249730
 data0003  | 251200

Much better! Now our queries will retrieve data from all four nodes, and the first node isn’t working three times harder than the others. If we had gone into production using the previous distribution, our cluster would be unbalanced and we’d be chasing performance problems. Or if we figured this out too late, we’d have to rebalance all of the data, which can take hours or even days depending on row count. No thanks!

It’s important to do this kind of analysis before moving data into a horizontally capable cluster. The Postgres pg_stats table makes that easy to accomplish. And if repeating this process for every table is too irritating, we can even do it in bulk. Let’s construct an unholy abomination that returns the primary key column with the highest cardinality for all tables:

SELECT DISTINCT ON (schemaname, tablename)
       schemaname, tablename, attname
  FROM (
    SELECT s.schemaname, c.relname AS tablename,
           a.attname, i.indisprimary, i.indisunique,
           sum(s.n_distinct) AS total_values
      FROM pg_index i
      JOIN pg_attribute a ON (
               a.attrelid = i.indrelid AND
               a.attnum = ANY(i.indkey)
           )
      JOIN pg_class c ON (c.oid = i.indrelid)
      JOIN pg_namespace n ON (n.oid = c.relnamespace)
      JOIN pg_stats s ON (
               s.schemaname = n.nspname AND
               s.tablename = c.relname AND
               s.attname = a.attname
           )
     WHERE i.indisunique
       AND s.schemaname NOT IN ('pg_catalog', 'information_schema')
     GROUP BY 1, 2, 3, 4, 5
) cols
ORDER BY schemaname, tablename, 
      CASE WHEN total_values < 0 THEN -total_values * 9e20
           ELSE total_values END DESC,
      indisprimary DESC, indisunique DESC;

Gross! But at least we only have to do that once or twice before restoring all of our data in the new horizontally scaled cluster. We could even make the query uglier and have it generate our ALTER TABLE statements so we don’t need to manually correct the distribution of every table. And don’t forget that this process applies to nearly all distribution mechanisms which depend on column contents, not just Postgres-XL. Just do your due diligence, and everything should work out.

Happy scaling!