Now that we’ve decided to really start embracing horizontal scaling builds, there is a critically important engine-agnostic element we need to examine. Given an existing table, how exactly should we split up the contents across our various nodes during the conversion process? Generally this is done by selecting a specific column and applying some kind of hash or custom distribution mechanism to ensure all node contents are reasonably balanced. But how do we go about figuring that out?
This question is usually answered with “use the primary key!” But this gets a bit more complex in cases where tables rely on composite keys. This doesn’t happen often, but can really throw a wrench into the works. Imagine for example, we’re using Postgres-XL and have four nodes numbered data0000 through data0003. Then we find this table:
CREATE TABLE comp_event ( group_code TEXT NOT NULL, event_id BIGINT NOT NULL, entry_tm TIMETZ NOT NULL, some_data TEXT NOT NULL ); INSERT INTO comp_event SELECT a.id % 10, a.id % 100000, '08:00'::TIMETZ + (a.id % 43200 || 's')::INTERVAL, repeat('a', a.id%10) FROM generate_series(1, 1000000) a(id); ALTER TABLE comp_event ADD CONSTRAINT pk_comp_event PRIMARY KEY (group_code, event_id, entry_tm); ANALYZE comp_event;
The default for Postgres-XL is to simply use the first column for distribution. This tends to fit most cases, as the first column is usually either the primary key, or a reasonable facsimile of it. We can even use a system view to confirm this is the case:
SELECT pcrelid::regclass AS TABLE_NAME, a.attname AS column_name FROM pgxc_class c JOIN pg_attribute a ON (a.attrelid = c.pcrelid) WHERE a.attnum = c.pcattnum; TABLE_NAME | column_name ------------+------------- comp_event | group_code
But is this what we want? What would happen if we naively went ahead with the default value and converted the database? Well, the major problem is that we don’t know the hash algorithm Postgres-XL is using. It’s entirely possible that the resulting data distribution will be anywhere from “slightly off” to “completely awful,” and we need a way to verify uniform distribution before moving forward.
In the case of Postgres-XL, we can actually poll each node directly with
EXECUTE DIRECT. Repeatedly executing the same query and just substituting the node name is both inefficient and cumbersome, especially if we have dozens or hundreds of nodes. Thankfully Postgres makes it easy to create functions that return sets, so let’s leverage that power in our favor:
CREATE TYPE pgxl_row_dist AS (node_name TEXT, total BIGINT); CREATE OR REPLACE FUNCTION check_row_counts(tab_name REGCLASS) RETURNS SETOF pgxl_row_dist AS $BODY$ DECLARE r pgxl_row_dist; query TEXT; BEGIN FOR r.node_name IN SELECT node_name FROM pgxc_node WHERE node_type = 'D' LOOP query = 'EXECUTE DIRECT ON (' || r.node_name || ') ''SELECT count(*) FROM ' || tab_name::TEXT || ''''; EXECUTE query INTO r.total; RETURN NEXT r; END LOOP; END; $BODY$ LANGUAGE plpgsql;
This function should exist in some form with the standard Postgres-XL distribution. Unfortunately if it does, I couldn’t find any equivalent. Regardless, with this in hand, we can provide a table name and see how many rows exist on each node no matter our cluster size. For our four node cluster, each node should have about 250,000 rows, give or take some variance caused by the hashing algorithm. Let’s see what the distribution actually resembles:
SELECT * FROM check_row_counts('comp_event'); node_name | total -----------+-------- data0000 | 600000 data0001 | 200000 data0002 | 200000 data0003 | 0
That’s… unfortunate. The table doesn’t list its columns in order of cardinality since that’s never been a concern before now. Beyond that, the first column is part of our primary key, so it makes sense to be listed near the top anyway. Position is hardly a reliable criteria beyond a first approximation, so how do we fix this?
Let’s examine the Postgres statistics catalog for the
comp_event table, and see how cardinality is actually represented:
SELECT attname, n_distinct FROM pg_stats WHERE tablename = 'comp_event'; attname | n_distinct ------------+------------ group_code | 10 event_id | 12471.5 entry_tm | -0.158365 some_data | 10
The sample insert statement we used to fill
comp_event should have already made this clear, but not everything is an example. If we assume the table already existed, or we loaded it with from multiple sources or scripts, the statistics would be our primary guide.
In this particular case, the
entry_tm columns would be much better candidates to achieve balanced distribution. For now, let’s just keep things simple and use the
event_id column since the primary difference is the cardinality. There’s no reason to introduce multiple variables such as column type quite yet.
Let’s check our row totals after telling Postgres-XL we want to use
event_id for hashing:
TRUNCATE TABLE comp_event; ALTER TABLE comp_event DISTRIBUTE BY HASH (event_id); INSERT INTO comp_event SELECT a.id % 10, a.id % 100000, '08:00'::TIMETZ + (a.id % 43200 || 's')::INTERVAL, repeat('a', a.id%10) FROM generate_series(1, 1000000) a(id); SELECT * FROM check_row_counts('comp_event'); node_name | total -----------+-------- data0000 | 250050 data0001 | 249020 data0002 | 249730 data0003 | 251200
Much better! Now our queries will retrieve data from all four nodes, and the first node isn’t working three times harder than the others. If we had gone into production using the previous distribution, our cluster would be unbalanced and we’d be chasing performance problems. Or if we figured this out too late, we’d have to rebalance all of the data, which can take hours or even days depending on row count. No thanks!
It’s important to do this kind of analysis before moving data into a horizontally capable cluster. The Postgres
pg_stats table makes that easy to accomplish. And if repeating this process for every table is too irritating, we can even do it in bulk. Let’s construct an unholy abomination that returns the primary key column with the highest cardinality for all tables:
SELECT DISTINCT ON (schemaname, tablename) schemaname, tablename, attname FROM ( SELECT s.schemaname, c.relname AS tablename, a.attname, i.indisprimary, i.indisunique, SUM(s.n_distinct) AS total_values FROM pg_index i JOIN pg_attribute a ON ( a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) ) JOIN pg_class c ON (c.oid = i.indrelid) JOIN pg_namespace n ON (n.oid = c.relnamespace) JOIN pg_stats s ON ( s.schemaname = n.nspname AND s.tablename = c.relname AND s.attname = a.attname ) WHERE i.indisunique AND s.schemaname NOT IN ('pg_catalog', 'information_schema') GROUP BY 1, 2, 3, 4, 5 ) cols ORDER BY schemaname, tablename, CASE WHEN total_values < 0 THEN -total_values * 9e20 ELSE total_values END DESC, indisprimary DESC, indisunique DESC;
Gross! But at least we only have to do that once or twice before restoring all of our data in the new horizontally scaled cluster. We could even make the query uglier and have it generate our
ALTER TABLE statements so we don’t need to manually correct the distribution of every table. And don’t forget that this process applies to nearly all distribution mechanisms which depend on column contents, not just Postgres-XL. Just do your due diligence, and everything should work out.