PG Phriday: Converting to Horizontal Distribution

May 27th, 2016 | Published in Database, Tech Talk | No Comments


Now that we’ve decided to really start embracing horizontal scaling builds, there is a critically important engine-agnostic element we need to examine. Given an existing table, how exactly should we split up the contents across our various nodes during the conversion process? Generally this is done by selecting a specific column and applying some kind of hash or custom distribution mechanism to ensure all node contents are reasonably balanced. But how do we go about figuring that out?

This question is usually answered with “use the primary key!” But this gets a bit more complex in cases where tables rely on composite keys. This doesn’t happen often, but can really throw a wrench into the works. Imagine for example, we’re using Postgres-XL and have four nodes numbered data0000 through data0003. Then we find this table:

CREATE TABLE comp_event
(
  group_code   TEXT NOT NULL,
  event_id     BIGINT NOT NULL,
  entry_tm     TIMETZ NOT NULL,
  some_data    TEXT NOT NULL
);
 
INSERT INTO comp_event
SELECT a.id % 10, a.id % 100000,
       '08:00'::TIMETZ + (a.id % 43200 || 's')::INTERVAL,
       repeat('a', a.id%10)
  FROM generate_series(1, 1000000) a(id);
 
ALTER TABLE comp_event ADD CONSTRAINT pk_comp_event
      PRIMARY KEY (group_code, event_id, entry_tm);
 
ANALYZE comp_event;

The default for Postgres-XL is to simply use the first column for distribution. This tends to fit most cases, as the first column is usually either the primary key, or a reasonable facsimile of it. We can even use a system view to confirm this is the case:

SELECT pcrelid::regclass AS TABLE_NAME, a.attname AS column_name
  FROM pgxc_class c
  JOIN pg_attribute a ON (a.attrelid = c.pcrelid)
 WHERE a.attnum = c.pcattnum;
 
 TABLE_NAME | column_name 
------------+-------------
 comp_event | group_code

But is this what we want? What would happen if we naively went ahead with the default value and converted the database? Well, the major problem is that we don’t know the hash algorithm Postgres-XL is using. It’s entirely possible that the resulting data distribution will be anywhere from “slightly off” to “completely awful,” and we need a way to verify uniform distribution before moving forward.

In the case of Postgres-XL, we can actually poll each node directly with EXECUTE DIRECT. Repeatedly executing the same query and just substituting the node name is both inefficient and cumbersome, especially if we have dozens or hundreds of nodes. Thankfully Postgres makes it easy to create functions that return sets, so let’s leverage that power in our favor:

CREATE TYPE pgxl_row_dist AS (node_name TEXT, total BIGINT);
 
CREATE OR REPLACE FUNCTION check_row_counts(tab_name REGCLASS)
RETURNS SETOF pgxl_row_dist AS
$BODY$
DECLARE
  r pgxl_row_dist;
  query TEXT;
BEGIN
  FOR r.node_name IN
      SELECT node_name
        FROM pgxc_node WHERE node_type = 'D'
  LOOP
    query = 'EXECUTE DIRECT ON (' || r.node_name || ') 
      ''SELECT count(*) FROM ' || tab_name::TEXT || '''';
    EXECUTE query INTO r.total;
    RETURN NEXT r;
  END LOOP;
END;
$BODY$ LANGUAGE plpgsql;

This function should exist in some form with the standard Postgres-XL distribution. Unfortunately if it does, I couldn’t find any equivalent. Regardless, with this in hand, we can provide a table name and see how many rows exist on each node no matter our cluster size. For our four node cluster, each node should have about 250,000 rows, give or take some variance caused by the hashing algorithm. Let’s see what the distribution actually resembles:

SELECT * FROM check_row_counts('comp_event');
 
 node_name | total  
-----------+--------
 data0000  | 600000
 data0001  | 200000
 data0002  | 200000
 data0003  |      0

That’s… unfortunate. The table doesn’t list its columns in order of cardinality since that’s never been a concern before now. Beyond that, the first column is part of our primary key, so it makes sense to be listed near the top anyway. Position is hardly a reliable criteria beyond a first approximation, so how do we fix this?

Let’s examine the Postgres statistics catalog for the comp_event table, and see how cardinality is actually represented:

SELECT attname, n_distinct
  FROM pg_stats
 WHERE tablename = 'comp_event';
 
  attname   | n_distinct 
------------+------------
 group_code |         10
 event_id   |    12471.5
 entry_tm   |  -0.158365
 some_data  |         10

The sample insert statement we used to fill comp_event should have already made this clear, but not everything is an example. If we assume the table already existed, or we loaded it with from multiple sources or scripts, the statistics would be our primary guide.

In this particular case, the event_id or entry_tm columns would be much better candidates to achieve balanced distribution. For now, let’s just keep things simple and use the event_id column since the primary difference is the cardinality. There’s no reason to introduce multiple variables such as column type quite yet.

Let’s check our row totals after telling Postgres-XL we want to use event_id for hashing:

TRUNCATE TABLE comp_event;
ALTER TABLE comp_event DISTRIBUTE BY HASH (event_id);
 
INSERT INTO comp_event
SELECT a.id % 10, a.id % 100000,
       '08:00'::TIMETZ + (a.id % 43200 || 's')::INTERVAL,
       repeat('a', a.id%10)
  FROM generate_series(1, 1000000) a(id);
 
SELECT * FROM check_row_counts('comp_event');
 
 node_name | total  
-----------+--------
 data0000  | 250050
 data0001  | 249020
 data0002  | 249730
 data0003  | 251200

Much better! Now our queries will retrieve data from all four nodes, and the first node isn’t working three times harder than the others. If we had gone into production using the previous distribution, our cluster would be unbalanced and we’d be chasing performance problems. Or if we figured this out too late, we’d have to rebalance all of the data, which can take hours or even days depending on row count. No thanks!

It’s important to do this kind of analysis before moving data into a horizontally capable cluster. The Postgres pg_stats table makes that easy to accomplish. And if repeating this process for every table is too irritating, we can even do it in bulk. Let’s construct an unholy abomination that returns the primary key column with the highest cardinality for all tables:

SELECT DISTINCT ON (schemaname, tablename)
       schemaname, tablename, attname
  FROM (
    SELECT s.schemaname, c.relname AS tablename,
           a.attname, i.indisprimary, i.indisunique,
           SUM(s.n_distinct) AS total_values
      FROM pg_index i
      JOIN pg_attribute a ON (
               a.attrelid = i.indrelid AND
               a.attnum = ANY(i.indkey)
           )
      JOIN pg_class c ON (c.oid = i.indrelid)
      JOIN pg_namespace n ON (n.oid = c.relnamespace)
      JOIN pg_stats s ON (
               s.schemaname = n.nspname AND
               s.tablename = c.relname AND
               s.attname = a.attname
           )
     WHERE i.indisunique
       AND s.schemaname NOT IN ('pg_catalog', 'information_schema')
     GROUP BY 1, 2, 3, 4, 5
) cols
ORDER BY schemaname, tablename, 
      CASE WHEN total_values < 0 THEN -total_values * 9e20
           ELSE total_values END DESC,
      indisprimary DESC, indisunique DESC;

Gross! But at least we only have to do that once or twice before restoring all of our data in the new horizontally scaled cluster. We could even make the query uglier and have it generate our ALTER TABLE statements so we don’t need to manually correct the distribution of every table. And don’t forget that this process applies to nearly all distribution mechanisms which depend on column contents, not just Postgres-XL. Just do your due diligence, and everything should work out.

Happy scaling!


Tags: , , ,

PG Phriday: Trusty Table Tiers

May 20th, 2016 | Published in Database, Tech Talk | 7 Comments


I always advocate breaking up large Postgres tables for a few reasons. Beyond query performance concerns, maintaining one monolithic structure is always more time consuming and consequentially more dangerous. The time required to create a dozen small indexes may be slightly longer than a single larger one, but we can treat the smaller indexes as incremental. If we want to rebuild, add more indexes, or fix any corruption, why advocate an all-or-nothing proposition? Deleting from one large table will be positively glacial compared to simply dropping an entire expired partition. The list just goes on and on.

On the other hand, partitioning in Postgres can be pretty intimidating. There are so many manual steps involved, that it’s easy to just kick the can down the road and tackle the problem later, or not at all. Extensions like the excellent pg_partman remove much of the pain involved in wrangling an army of partitions, and we strongly suggest using some kind of tool-kit instead of reinventing the wheel.

The main limitation with most existing partition management libraries is that they never deviate from the examples listed in the Postgres documentation. It’s always: create inherited tables, add redirection triggers, automate, rinse, repeat. In most cases, this is exactly the right approach. Unfortunately triggers are slow, and especially in an OLTP context, this can introduce sufficient overhead that partitions are avoided entirely.

Well, there is another way to do partitioning that’s almost never mentioned. The idea is to actually utilize the base table as a storage target, and in lieu of triggers, schedule data movement during low-volume time periods. The primary benefit to this is that there’s no more trigger overhead. It also means we can poll the base table itself for recent data with the ONLY clause. This is a massive win for extremely active tables, and the reason tab_tier was born.

Let’s create some data for testing this out:

CREATE TABLE sensor_log (
  id            INT PRIMARY KEY,
  location      VARCHAR NOT NULL,
  reading       BIGINT NOT NULL,
  reading_date  TIMESTAMP NOT NULL
);
 
INSERT INTO sensor_log (id, location, reading, reading_date)
SELECT s.id, s.id % 1000, s.id % 100,
       CURRENT_DATE - ((s.id * 10) || 's')::INTERVAL
  FROM generate_series(1, 5000000) s(id);
 
CREATE INDEX idx_sensor_log_location ON sensor_log (location);
CREATE INDEX idx_sensor_log_date ON sensor_log (reading_date);
 
ANALYZE sensor_log;

Now we have 5-million rows in a table with a defined date column that’s a perfect candidate for partitioning. The way this data is currently distributed, we have content going back to late 2014. Imagine in this scenario we don’t need this much live information at all times. So we decide to keep one week of logs for active use, and relegate everything else into some kind of monthly partition.

This is how all of that would look in tab_tier:

CREATE EXTENSION tab_tier;
 
SELECT tab_tier.register_tier_root('public', 'sensor_log', 'reading_date');
 
UPDATE tab_tier.tier_root
   SET root_retain = '1 week'::INTERVAL,
       part_period = '1 month'::INTERVAL
 WHERE root_schema = 'public'
   AND root_table = 'sensor_log';
 
SELECT tab_tier.bootstrap_tier_parts('public', 'sensor_log');
 
\dt
 
                 List OF relations
 Schema |          Name          | TYPE  |  Owner   
--------+------------------------+-------+----------
 public | sensor_log             | TABLE | postgres
 public | sensor_log_part_201410 | TABLE | postgres
 public | sensor_log_part_201411 | TABLE | postgres
 public | sensor_log_part_201412 | TABLE | postgres
 public | sensor_log_part_201501 | TABLE | postgres
 public | sensor_log_part_201502 | TABLE | postgres
 public | sensor_log_part_201503 | TABLE | postgres
 public | sensor_log_part_201504 | TABLE | postgres
 public | sensor_log_part_201505 | TABLE | postgres
 public | sensor_log_part_201506 | TABLE | postgres
 public | sensor_log_part_201507 | TABLE | postgres
 public | sensor_log_part_201508 | TABLE | postgres
 public | sensor_log_part_201509 | TABLE | postgres
 public | sensor_log_part_201510 | TABLE | postgres
 public | sensor_log_part_201511 | TABLE | postgres
 public | sensor_log_part_201512 | TABLE | postgres
 public | sensor_log_part_201601 | TABLE | postgres
 public | sensor_log_part_201602 | TABLE | postgres
 public | sensor_log_part_201603 | TABLE | postgres
 public | sensor_log_part_201604 | TABLE | postgres
 public | sensor_log_part_201605 | TABLE | postgres

Taking this piece by piece, the first thing we did after creating the extension itself, was to call the register_tier_root function. This officially tells tab_tier about the table, and creates a record with configuration elements we can tweak. And that’s exactly what we do by setting the primary retention window and the partition size. Creating all of the partitions manually is pointless, so we also invoke bootstrap_tier_parts. Its job is to check the range of dates currently represented in the table, and create all of the partitions necessary to store it.

What did not happen here, is any data movement. This goes back to our original concern regarding maintenance. Some tables may be several GB or even TB in size, and moving all of that data as one gargantuan operation would be a really bad idea. Instead, tab_tier provides the migrate_tier_data function to relocate data for a specific partition.

With a bit of clever SQL, we can even generate a script for it:

COPY (
  SELECT 'SELECT tab_tier.migrate_tier_data(''public'', ''sensor_log'', ''' || 
         REPLACE(part_table, 'sensor_log_part_', '') || ''');' AS part_name
    FROM tab_tier.tier_part
    JOIN tab_tier.tier_root USING (tier_root_id)
   WHERE root_schema = 'public'
     AND root_table = 'sensor_log'
   ORDER BY part_table
) TO '/tmp/move_parts.sql';
 
\i /tmp/move_parts.SQL
 
SELECT COUNT(*) FROM ONLY sensor_log;
 
 COUNT 
-------
 60480
 
SELECT COUNT(*) FROM sensor_log_part_201504;
 
 COUNT  
--------
 259200

Following some debugging notices, all of our data has moved to the appropriate partition. We verified that by checking the base table and a randomly chosen partition for record counts. At this point, the table is now ready for regular maintenance. In this case “maintenance” means regularly calling the cap_tier_partitions and migrate_all_tiers functions. The first ensures target partitions always exist, and the second moves any pending data to a waiting partition for all tables we’ve registered.

And that’s it. We’re completely done with this table. If we stopped here, we could be secure in the knowledge we no longer have to worry about some gigantic monolith ruining our day some time in the future. But that’s not how tab_tier got its name. One or two levels does not a tier make; the real “secret sauce” is its support for long term storage.

One thing we didn’t really cover, and most partition systems never even consider, is that partitioning is only half of the story. On an extremely active system, having months or years of data just sitting around is relatively frowned upon. The mere presence of older data might encourage using it, transforming our finely tuned OLTP engine into a mixed workload wreck. One or two queries against those archives, and suddenly our cache is tainted and everything is considerably slower.

We need to move that data off of the system, and there are quite a few ways to do that. Some might use ETL scripts or systems like talend to accomplish that goal. Or we can just use tab_tier and a Postgres foreign table. Let’s now dictate that only six months of archives should ever exist on the primary server. Given that constraint, this is how we could proceed:

-- Do this on some kind of archive server
 
CREATE USER arc_user PASSWORD 'PasswordsAreLame';
 
CREATE TABLE sensor_log (
  id            INT PRIMARY KEY,
  location      VARCHAR NOT NULL,
  reading       BIGINT NOT NULL,
  reading_date  TIMESTAMP NOT NULL,
  snapshot_dt   TIMESTAMP WITHOUT TIME ZONE
);
 
GRANT ALL ON sensor_log TO arc_user;
 
-- Back on the data source..,
 
UPDATE tab_tier.tier_root
   SET lts_threshold = '6 months'::INTERVAL,
       lts_target = 'public.sensor_log_archive'
 WHERE root_schema = 'public'
   AND root_table = 'sensor_log';
 
CREATE EXTENSION postgres_fdw;
 
CREATE USER arc_user PASSWORD 'PasswordsAreLame';
GRANT tab_tier_role TO arc_user;
GRANT ALL ON ALL TABLES IN SCHEMA PUBLIC TO tab_tier_role;
 
CREATE SERVER arc_srv 
  FOREIGN DATA WRAPPER postgres_fdw 
  OPTIONS (dbname 'postgres', host 'archive-host');
 
CREATE USER MAPPING FOR arc_user 
  SERVER arc_srv 
  OPTIONS (USER 'arc_user', password 'PasswordsAreLame');
 
CREATE FOREIGN TABLE sensor_log_archive (
  id            INT,
  location      VARCHAR NOT NULL,
  reading       BIGINT NOT NULL,
  reading_date  TIMESTAMP NOT NULL,
  snapshot_dt   TIMESTAMP WITHOUT TIME ZONE
 
) SERVER arc_srv OPTIONS (TABLE_NAME 'sensor_log');
 
GRANT INSERT ON sensor_log_archive TO tab_tier_role;
 
-- Connect as arc_user, then run this:
 
SELECT tab_tier.archive_tier('public', 'sensor_log');
 
SELECT COUNT(*) FROM sensor_log_archive;
 
  COUNT  
---------
 3263360

Whew! That was a lot of work. Maybe a future version of tab_tier should provide a wrapper for that. In any case, all we did was set up a foreign table on a remote server, create a separate user to handle the data movement, and tell tab_tier about our six month threshold for long term storage, and the target table itself.

Using a foreign table isn’t required here, since the target can be any kind of table, but isn’t that the whole point of this exercise? The cool thing about Postgres foreign data wrappers is that we could have used any of them. In this case we’re just moving data to another remote Postgres instance, but we could have dumped everything into Cassandra or Hadoop instead. Take that, subspace!

For those who noticed all of the ridiculous GRANT statements, please remember this is only for demonstration purposes. A real system would probably use ALTER DEFAULT PRIVILEGES to give tab_tier_role more limited control over a specific schema and tables specifically designed for archival. The extension doesn’t add its own privileges—even to tables it creates—in case controls are tightly locked down. We don’t want to hijack any carefully laid down security. Instead tab_tier just propagates any ACLs it finds on root tables to new partitions.

This is the same reason we ran the archive_tier (or archive_all_tiers) routine as a different user. Since we’re using a foreign user mapping, we want to limit data leak potential by isolating the movement process from the table owner or a superuser. We recommend using this approach for any foreign table usage whenever possible.

With all of that out of the way, we still need to clean up. We archived all of the partition content, but the partitions themselves are still sitting around and gathering dust. Let’s fix that by running one final step as the owner of sensor_log or any superuser:

SELECT part_table
  FROM tab_tier.tier_part
 WHERE is_archived;
 
       part_table       
------------------------
 sensor_log_part_201410
 sensor_log_part_201411
 sensor_log_part_201412
 sensor_log_part_201501
 sensor_log_part_201502
 sensor_log_part_201503
 sensor_log_part_201504
 sensor_log_part_201505
 sensor_log_part_201506
 sensor_log_part_201507
 sensor_log_part_201508
 sensor_log_part_201509
 sensor_log_part_201510
 
SELECT tab_tier.drop_archived_tiers();
 
SELECT COUNT(*) FROM sensor_log_archive;
 
  COUNT  
---------
 1736640

During the archival process itself, tab_tier marks the related metadata so archived tables will no longer be used in any of the data movement functions. It also makes them an easy target for removal with a maintenance function. We can see that everything worked as a large portion of our data is no longer part of the sensor_log inheritance tree. Now the archived data is securely located on another system that’s probably geared more toward OLAP use, or some incomprehensible Hive we don’t have to worry about.

I for one, welcome our incomprehensible Hive overlords.


Tags: , , ,

PG Phriday: Bountiful Beta Benefits

May 13th, 2016 | Published in Database, Tech Talk | No Comments


The Postgres developers recently announced the availability of the first public beta for Postgres 9.6. I would be highly remiss to ignore such an opportunity to dig into any interesting functionality listed in the 9.6 release notes. All in all, it’s a pretty exciting series of advancements, and assuming this is a glimpse of what we see when 9.6 drops, I’d say we’re on the right track.

Plentiful Parallelism

Probably the most high-profile addition for 9.6 is parallel operation on certain query plans. I already examined this in depth, but how did the feature stack up over several months of development? Let’s use the same test and see:

CREATE TABLE para_test AS
SELECT a.id, repeat(' ', 20) AS junk
  FROM generate_series(1, 20000000) a(id);
 
ANALYZE para_test;
 
SET max_parallel_degree TO 1; -- The planner worked up to 5.
 
EXPLAIN ANALYZE
SELECT *
  FROM para_test
 WHERE junk LIKE '%s%';

Give or take a bit of variance, it looks pretty similar based on the best case of several runs:

Workers Avg Time (s)
0 3.7
1 1.9
2 1.4
3 1.1
4 0.9
5 0.8

This is really good to see. We still get the most benefit from the initial activation; even one parallel worker drastically improves performance. After that—at least for sequence scans—returns diminish quite a bit. Perhaps more interesting however, is the new support for other operations, like nested loops. Check this out:

EXPLAIN ANALYZE
SELECT p1.id
  FROM para_test p1
  JOIN para_test p2 USING (id)
 WHERE id BETWEEN 1 AND 100000;
 
 Gather  (cost=5500.25..217096.45 ROWS=93502 width=4)
         (actual TIME=67.443..1651.985 ROWS=100000 loops=1)
   Workers Planned: 5
   Workers Launched: 5
   ->  Hash JOIN  (cost=4500.25..207681.27 ROWS=93502 width=4)
                  (actual TIME=77.348..1621.807 ROWS=16667 loops=6)
         Hash Cond: (p2.id = p1.id)
         ->  Parallel Seq Scan ON para_test p2
              (cost=0.00..187059.00 ROWS=4000000 width=4)
              (actual TIME=0.014..498.137 ROWS=3333333 loops=6)
         ->  Hash  (cost=3331.48..3331.48 ROWS=93502 width=4)
                   (actual TIME=76.616..76.616 ROWS=100000 loops=6)
               Buckets: 131072  Batches: 1  Memory Usage: 4540kB
               ->  INDEX ONLY Scan USING idx_test_id ON para_test p1
                     (cost=0.44..3331.48 ROWS=93502 width=4)
                     (actual TIME=0.069..43.965 ROWS=100000 loops=6)
                     INDEX Cond: ((id >= 1) AND (id <= 100000))
                     Heap Fetches: 100000
 Planning TIME: 0.371 ms
 Execution TIME: 1662.068 ms

Now, it turns out this is actually a major performance hit for this contrived example. Without parallelism enabled, the nested loop operates in about 350ms. There’s clearly still some work needed under the hood, but the fact that this works at all is an encouraging step forward.

Spinlock Showdown

Several spinlocks have been removed and replaced with atomic locks to avoid costs associated with tight lock loops. Results of these modifications look encouraging. Our own tests reflect those glowing datapoints, with a bit of a caveat.

We began the comparison with 9.4 using pgbench on an 8-CPU VM with 32GB of RAM under various conditions and weren’t encouraged by the output.

Clients 9.4 9.6
1 6100 5500
2 12350 11000
4 23850 23500
8 38450 34500
16 47800 46000
32 43800 42750

Pgbench performance is worse and inconsistent in our VM environment across the board. Postgres 9.4 regularly outperforms it on both standard and prepared queries, and there’s some very strange judder between tests when the amount of clients matches the amount of system CPUs. After that, everything seems to even out within variance, but something strange is going on. Is it because the tests are on a VM instead of bare hardware? Is it because the VM was on kernel 3.13? Something clearly smells fishy here.

Why can this kind of variance be a problem? Because these are the numbers on a piece of bare metal with 32 CPUs and 64GB of RAM:

Clients 9.4 9.6
1 10800 10500
2 21500 21000
4 41000 40600
8 69000 73000
16 97000 118000
32 100000 167000

Note that the scaling improvements are far more obvious in this scenario than on our VM. It seems 9.6 is currently more environment sensitive than 9.4. Even if it’s the fault of the VM software, the fact that it affects 9.6 disproportionately is distressing. Hopefully that’ll go away by the time it’s officially released.

Foreign Federation

In Postgres versions older than 9.6, foreign tables are treated as individual entities and the external server isn’t really taken into consideration. If two foreign tables exist on the same server, all results will be fetched locally and joined as a third step. This not only fetches far more rows than necessary, but forces the local system to work harder joining everything after building temporary structures. It’s incredibly inefficient.

An easy way to visualize this is to create a couple garbage tables:

-- On external server (trust auth for testing)
 
CREATE TABLE foo AS
SELECT a.id FROM generate_series(1, 10) a(id);
 
CREATE TABLE bar AS
SELECT a.id FROM generate_series(1, 10) a(id);
 
-- On another system
 
CREATE EXTENSION postgres_fdw;
 
CREATE SERVER ext_srv 
  FOREIGN DATA WRAPPER postgres_fdw 
  OPTIONS (dbname 'postgres', host 'localhost', port '5434');
 
CREATE USER MAPPING FOR postgres 
  SERVER ext_srv 
  OPTIONS (USER 'postgres');
 
CREATE FOREIGN TABLE foo (
  id INT
) SERVER ext_srv OPTIONS (TABLE_NAME 'foo');
 
CREATE FOREIGN TABLE bar (
  id INT
) SERVER ext_srv OPTIONS (TABLE_NAME 'bar');

These tables are exceedingly simple. Yet here’s what happens if we try a basic JOIN with a where clause in 9.4 or 9.5:

-- In 9.4
 
EXPLAIN
SELECT foo.*
  FROM foo
  JOIN bar ON (foo.id = bar.id)
 WHERE foo.id = 5;
 
                               QUERY PLAN                               
------------------------------------------------------------------------
 Nested Loop  (cost=200.00..296.58 ROWS=225 width=4)
   ->  FOREIGN Scan ON foo  (cost=100.00..146.86 ROWS=15 width=4)
   ->  Materialize  (cost=100.00..146.94 ROWS=15 width=4)
         ->  FOREIGN Scan ON bar  (cost=100.00..146.86 ROWS=15 width=4)
 
-- In 9.5
 
                           QUERY PLAN                            
-----------------------------------------------------------------
 Nested Loop  (cost=200.00..202.30 ROWS=1 width=4)
   ->  FOREIGN Scan ON foo  (cost=100.00..101.15 ROWS=1 width=4)
   ->  FOREIGN Scan ON bar  (cost=100.00..101.15 ROWS=1 width=4)

The 9.4 plan means Postgres will obtain the results of foo, and then loop through each row and obtain a result from bar. That’s potentially really awful. The 9.5 plan is better, but still not ideal. In that case, it’s pushing down the WHERE clause and pulling the matches from both tables, then joining them locally.

But 9.6 uses a much different approach:

                     QUERY PLAN                     
----------------------------------------------------
 FOREIGN Scan  (cost=100.00..102.28 ROWS=1 width=4)
   Relations: (public.foo) INNER JOIN (public.bar)

That’s right, it’s just letting the foreign system perform the entire JOIN, since both tables are local in that context. That’s a massive game-changing improvement. Pushing work to the remote server where appropriate saves bandwidth, allocation, and work on both ends of the equation. The potential returns for heavy users of foreign tables are multiple orders of magnitude.

Updating and deleting data from foreign tables is equally affected by this kind of pushdown logic. Look at the subtle difference between 9.5 and 9.6:

EXPLAIN
DELETE FROM foo
 WHERE id BETWEEN 5 AND 7;
 
-- On 9.5
 
                           QUERY PLAN                            
-----------------------------------------------------------------
 DELETE ON foo  (cost=100.00..101.15 ROWS=2 width=6)
   ->  FOREIGN Scan ON foo  (cost=100.00..101.15 ROWS=1 width=6)
 
-- On 9.6
 
                            QUERY PLAN                             
-------------------------------------------------------------------
 DELETE ON foo  (cost=100.00..101.15 ROWS=2 width=6)
   ->  FOREIGN DELETE ON foo  (cost=100.00..101.15 ROWS=2 width=6)

So what is the difference between a “Foreign Scan” and a “Foreign Delete”? Believe it or not, the scan means Postgres 9.5 and older fetch the rows, and then issue separate DELETE statements for each. They use the same process for UPDATE statements. That’s… not great, especially as the amount of matches increases.

Once again, 9.6 saves the day. It allows the foreign system to take care of the whole process, and send back the results. The foreign system acts as if the whole DELETE statement were issued locally without any of that back-and-forth nonsense.

Vacuuming the Warehouse

The VACUUM process is vastly improved. Because Postgres uses MVCC, it requires regular maintenance to ensure the transaction counter does not wrap around and cause data loss. This makes sense, but in the context of a warehouse or any large system with several TB of data, it becomes more and more difficult to perform this maintenance simply due to performance constraints of storage systems.

In Postgres 9.5 and below, a VACUUM process must occasionally visit every single page of every single table. If a data warehouse is just sitting around accumulating data, why repeatedly vacuum all of the old data that hasn’t changed? Well, that all changes with 9.6. Now Postgres keeps track of pages that contain only unchanged values, and can skip them outright. This should drastically reduce storage IO during maintenance, and complete much faster as a bonus. For large idle warehouses, the difference can be an order of magnitude or more.

Persnickety Planner

There’s a veritably endless list of improvements in the release notes. The last I found noteworthy beyond the exciting stuff everyone drools over was a somewhat obscure planner tweak.

Do you have tables that use composite foreign keys? The query planner would normally multiply the statistical probability of each column individually, even though this usually indicates a strong correlation. This in turn causes row underestimates and bad query plans based on those artificially low values. If the planner expects 10 rows, when there are really 8000, that’s the difference between a nested loop biting off more than it can chew, and a relatively faster merge or hash operation on the larger result set.

This isn’t a general case for other implicit or explicit correlations, but it’s a move in the right direction. There is a multivariate statistics patch, but it didn’t survive the latest code freeze. It’s looking like we won’t see this kind of improvement on a general basis in 9.6, but this foreign key improvement is an example of how things could operate if a version of the patch is accepted.

In the end, 9.6 is looking like another great advancement over previous versions. I can’t wait to use it for realsies.


Tags: , , , , ,

PG Phriday: Big Data is Hard

May 6th, 2016 | Published in Database, Tech Talk | 6 Comments


Let’s just get the obvious out of the way early: dealing with multiple Terabytes or Petabytes in a database context is something of a nightmare. Distributing it, retrieving it, processing it, aggregating and reporting on it, are all complicated—and perhaps worst of all—non-intuitive. Everything from tooling and maintenance, to usage and input, are either ad-hoc or obfuscated by several special-purpose APIs and wrappers.

One of the reasons a self-scaling database is such a killer app, derives from the failure rate from having so many moving parts. A proxy here, a shard API there, a few design modifications to accommodate implementation quirks, and suddenly we have a fragile, janky golem. A lumbering monstrosity that our applications and data depend on, where modifying or replacing any part of it will mean redistributing all the data at the very least.

And that’s only one reason why big data is hard. Another stems from something a little more subtle: deciding where everything goes. I hope experienced readers just groaned a bit. Done improperly, data pulled from such a system is not only inefficient, but it’s often wrong. The groan is caused by the knowledge that it’s exceptionally easy to miss accounting for some critical detail related to the data, and end up with 40TB of useless garbage by the time someone notices.

Here’s a way that might happen. Consider an application that sells products to cartoon characters. For illustrative purposes, it’s pretty basic and not fully normalized. That in itself isn’t a problem, and it’s more fun to query anyway. The structure consists of two simple tables in a container schema, and we are building it with sharding in mind:

CREATE SCHEMA toon_store;
SET search_path TO toon_store;
 
CREATE TABLE toon_account 
(
  account_id   INT NOT NULL PRIMARY KEY,
  first_name   VARCHAR NOT NULL,
  last_name    VARCHAR NOT NULL,
  email        VARCHAR NOT NULL,
  created_dt   TIMESTAMPTZ NOT NULL DEFAULT now()
);
 
CREATE TABLE toon_order
(
  order_id     INT NOT NULL PRIMARY KEY,
  account_id   INT NOT NULL,
  product      TEXT NOT NULL,
  quantity     INT NOT NULL,
  order_dt     TIMESTAMPTZ NOT NULL DEFAULT now()
);

Notice that we didn’t use SERIAL to handle autonumbering the ID fields. That’s one of the first concessions we’ve had to make in order to shard our data in the future. If we allowed the tables to assign surrogate IDs in independent shards, we would inevitably encounter conflicts. There are a few ways around this, because we still want IDs.

  1. Postgres sequences can start on an arbitrary number, and increment by arbitrary values. We could specifically create sequences that were tailored to each shard, such that there would be no conflicting assignments. This is a very simple approach, but is extremely inelastic. The incremental value must exceed the amount of potential shards, or we end up with wraparound problems. Implementation is also somewhat annoying, requiring a separate custom application to create and manage shards.
  2. Use UUID. UUIDs are generated in such a way that they can’t conflict. The main complication here is that UUIDs are rarely utilized in existing architectures. This means converting all existing data to implement them where distribution is necessary. They’re also much larger than an INT, and have a cascading effect of making all foreign keys, indexes, and network transfers of the underlying data larger and slower. These caveats don’t matter in most cases, but should be considered.
  3. We could use a function to generate the ID as the DEFAULT. In the function, we would define our shard distribution and ID algorithm. This is how shard_manager and similar extensions work. Instagram used this process in their large Postgres database, and it worked fine for them. Like UUIDs, this kind of custom solution must happen before any data is loaded into the system for best effect. Further, we actually have to make decisions regarding how many shards we could potentially create, and relying on 64-bit integers means we’ll eventually run out of IDs. We might think our application has moved on in 50-100 years, but y2k problems in the year 2000 suggest making those kind of assumptions is ultimately destructive.
  4. We let the application itself handle ID generation. Often in the case of sharding, the application has some kind of API that intercepts all queries, and makes shard-aware decisions like where to insert data, or what values need to be provided to prevent duplicate keys. This is basically just a lazier and more dangerous version of the previous option. But what happens when another ad-hoc application or data-source appears? Well, we either have to hack together some way of utilizing the existing key distribution system into incorporating these components, or duplicate the algorithm in every new system that wants to use our data. Developers hate duplicating efforts, because it’s a vector for bugs and code drift. It also means everything is ideally written in the same language, which is limiting in many circumstances.

That’s a lot to think about, so we’ll ignore it for now. Let’s just imagine we’ve fixed that problem, and IDs are magically solved. The next step is to decide on where to put the shards. A Postgres instance can have multiple databases, so let’s just isolate each shard in its own database, so we don’t have to worry about renaming our schemas or tables. We can just run the same creation script in each database and call it a day:

createdb shard1
createdb shard2
 
psql -f schema.sql shard1
psql -f schema.sql shard2

Federating data this way isn’t exactly efficient. Postgres databases can’t join between each other unless it’s through some kind of access layer like a Foreign Data Wrapper. That means an external connection to each database is necessary to obtain data. But isn’t the idea of Shared Nothing structures built upon no interaction between the shards? And suppose we did use schemas instead of databases to segment the data; we’d want separate connections anyway to prevent needlessly complicating the physical/logical shard map. We may need to move shards to re-balance the cluster after all, so the assumption should be that shards are physically distinct entities.

With that in mind, let’s fill our cluster with some arbitrary data. We need to distribute everything, so let’s just use some naive modulus math on the primary key of each table:

-- Run this on shard1
SET search_path TO toon_store;
 
INSERT INTO toon_account (account_id, first_name, last_name, email)
VALUES (1, 'Fred', 'Flintstone', 'fred@quarry.com');
 
INSERT INTO toon_order (order_id, account_id, product, quantity)
VALUES (1, 1, 'Apple Tablet', 2);
 
INSERT INTO toon_order (order_id, account_id, product, quantity)
VALUES (3, 2, 'Bolt #7', 5);
 
-- Run this on shard2
SET search_path TO toon_store;
 
INSERT INTO toon_account (account_id, first_name, last_name, email)
VALUES (2, 'Gadget', 'Hackwrench', 'gadget@rangers.org');
 
INSERT INTO toon_order (order_id, account_id, product, quantity)
VALUES (2, 1, 'TromBONE', 1);
 
INSERT INTO toon_order (order_id, account_id, product, quantity)
VALUES (4, 2, 'Coo-Coo Cola', 12);

This is fairly straight-forward; Fred wanted some entertainment options, and Gadget has some work to do, and grabbed a case of refreshment.

At least one of you started screaming and executed an epic facepalm at what we just did. For everyone else, this is where we return to the importance of choosing distribution methods. In this case, we made an extremely elementary mistake and didn’t account for keeping associated data together. In this case, we distributed data based on the primary key of each individual table, hoping that would evenly distribute the data itself. This only works for tables that are completely independent of each other. If we ever want to execute a JOIN, the arbitrary row distribution means some rows that would normally return are simply missing.

To make this more obvious, let’s use some magic similar to PMPP, and broadcast the same query to both of our shards and examine the results:

SELECT a.first_name, a.last_name, o.product, o.quantity
  FROM toon_store.toon_account a
  JOIN toon_store.toon_order o USING (account_id);
 
-- From shard1
 
 first_name | last_name  |   product    | quantity 
------------+------------+--------------+----------
 Fred       | Flintstone | Apple Tablet |        2
 
-- From shard2
 
 first_name | last_name  |   product    | quantity 
------------+------------+--------------+----------
 Gadget     | Hackwrench | Coo-Coo Cola |       12

Well, we’re obviously missing two rows from those results. But why? Because Fred is only on shard1, and Gadget is only on shard2. If one or the other has orders on both shards, we’ll only retrieve those that happen to reside on the same shard as they do. This is one of the simplest mistakes to make when building a distributed data system, and the easiest to fix. There are two mainstream approaches to addressing this:

  1. Key hashing must be consistent across the system. This means taking one concept that runs through the application and distributing based on that. In our case, customers only interact with their own data. Since both tables share the account_id field, we could organize the data so that field determines the appropriate shard. Consequently, the application can make that same assumption. Combine this with a physical to logical map, and an application could instantly transform the ID to connect to, and retrieve from, the appropriate shard. If we cache the mapping itself, we could wrap the database driver with an intermediate conversion layer to obfuscate much of this.
  2. Replicate critical tables to all shards as metadata. In some ways, this is a variant of the previous approach. Compared to tables with several hundred million or billions of rows, some tables are simply inconsequential and do not require shard distribution. In these cases, it’s common to simply duplicate the table to every shard in its complete form. The toon_account table is a perfect candidate for this, since the number of accounts is minuscule compared to the volume of orders.

More often than not, the two approaches are combined. With extensions such as pg_logical taking the place of older trigger-based table replication mechanisms, we don’t have to worry so much about performance concerns, either. This may encourage DBAs to replicate tables more often than before, instead of suppressing an inward cringe at the prospect of dealing with a brittle trigger parasite slurping a changelog from every table. Properly leveraged, our broken joins work exactly as expected, with few to no concessions at the user level.

Unfortunately as with icebergs, the bulky expanse of further concerns exists below the surface. If we made incorrect assumptions about how data is distributed or related, we may need to start over. If we want a use case where new interactions are not supported by the current distribution model, we may need to start over. If our hashing algorithm is flawed in some way, we could have uneven data distribution, and may need to start over.

Consider our example. Say we fixed it by distributing based on account_id. If Gadget buys orders of magnitude more product from our store than Fred, her shard would contain far more data and be much larger. Now we’re stuck with one giant shard in relation to the others. To fix it, we’d want to replicate the toon_account table to every shard, and hash on order_id instead. Now we need to designate some shard as the primary source for the account table, such that all data modification commands only target that location. Or we could use some type of bi-directional replcation to merge all insert vectors, and deal with the inherent complexity that approach implies. Oh yeah, and we have to re-balance the toon_order table.

The rabbit hole only gets deeper. This example has two very simple tables. In a real production context with dozens or even hundreds of tables, we must be even more careful. In the end, that’s part of the reason there’s no single solution, or even a feature-complete working model. For every circumstance where Postgres-XL’s approach is perfect, there are others where depending on a coordinator isn’t as efficient as a shard-aware driver wrapper. For every query that can simply be broadcast to every known node and aggregated, there are others where all associated data is best when strongly tied to the same location in an independent silo.

I like to pretend I’m some kind of expert in this realm, but the reality is that there’s no such thing. The more I learn about the full implications of horizontal scaling, the more overwhelming it becomes. I suspect this is why so many of the Postgres scaling solutions are either proprietary (Redshift), abandoned (Stado), unstable (Postgres-XL), or a shambling heap of caveats and implementation quirks (Citus, Greenplum).

In the end, we’re still restricted to the realm of custom solutions to get the job done. Whether that’s adapting loading and querying around Citus and its many limitations, or wrapping PMPP with a few functions and building a driver wrapper to abstract away the complexity, we’re firmly in ad-hoc land. This still works, but the expertise required to build, maintain, and service the end result is the kind of job security I could do without. 😉


Tags: , ,

PG Phriday: Derivation Deluge

April 29th, 2016 | Published in Database, Tech Talk | 13 Comments


Having run into a bit of a snag with Postgres-XL, and not wanting to be dead in the water with our project, I went on a bit of a knowledge quest. Database scaling is hard, so I expected a bunch of either abandoned or proprietary approaches. In addition, as a huge fans of Postgres, compatibility or outright use of the Postgres core was a strict prerequisite.

So, what options are out there? Is there even anything worth further investigation? Maybe more importantly, what do you do when you’re under a bit of a scheduling constraint? Projects need to move forward after all, and regardless of preferences, sometimes concessions are necessary. The first step was obviously the list of databases derived from Postgres.

At first glance, that’s a pretty big list. If we look carefully though, we can see that quite a few of those projects were abandoned years ago. Others are commercial, not based on scalability, or both. Being commercial isn’t automatically a disqualification, but most of the commercial options were forked from practically ancient versions of Postgres and never kept up compatibility, or don’t mention the version at all. Amazon Redshift fits that profile, being based on Postgres 8.0, which few would want to use these days. Fujitsu Enterprise is another, which doesn’t even list which version they’re based on, nor do they provide a download for testing purposes.

What’s left? It’s hard to tell from the Wiki page, so I just started with the projects that include some kind of data scaling not based on replication. These candidates present a longer list than I’d initially anticipated, which is always a good problem to have!

Let’s scrutinize the nominees.

CitusDB

It’s not really a secret that CitusDB and Postgres-XL are both tackling the same problem, and are currently the top two contenders. Unlike Postgres-XL and its approach of extending SQL syntax to directly embrace data distribution, CitusDB is actually just a Postgres extension.

As a result, it’s a bit more janky. There’s no CREATE TABLE ... DISTRIBUTE BY magic. Instead, and like other Postgres extensions, we call functions to populate metadata and control CitusDB operation. Fortunately, the CitusDB documentation is amazingly thorough. Not only are there downloads with cut-and-paste tutorials for evaluation, they also provide Docker containers with fully operational test clusters.

I also have to admit that I originally wrote this off entirely, based on my experience with pg_shard last year. I opened a ticket complaining about missing transaction support, and they never closed it. I didn’t realize that was because pg_shard was merged into the new Citus extension, along with a veritable raft of other modifications and fixes.

My easy tutorial install didn’t exhibit the problems I had with pg_shard, so this warrants deeper testing. I’ve got VMs galore, and itchy scripting fingers.

Greenplum

I’ve known about Greenplum for a long time. There was a lot of excitement when Pivotal announced that they were opening the source. A parallel scaling Postgres? No way!

Well, this comes with a giant caveat. If we look at the dates listed in the Wiki, Greenplum is listed as starting in 2005. They’re not kidding, and unfortunately it seems Pivotal executed a “fork it and forget it” maneuver. The documentation admits Greenplum is based on Postgres 8.2, with elements of functionality from 8.3.

Like Amazon’s Redshift, this immediately disqualifies Greenplum from consideration for anyone using a newer version. Our own databases are on 9.4 pending an upgrade plan; there’s no way we could justify such a massive downgrade, even for horizontal scaling improvements. EnterpriseDB had a similar problem when they started selling their version of 8.3; they were far behind for years before they managed to reduce their version lag by only a few months. Greenplum never even bothered.

This may be an amazing product, but we can’t use it to replace existing Postgres 9.4 databases that need scaling. Will Greenplum catch up now that it’s been open-sourced? I can’t say. It would definitely be cool, but I’m not holding my breath. Incidentally, this is one of the reasons all of those projects on the Wiki have definitive end dates. Keeping up with Postgres after forking is extremely difficult if you don’t merge your enhancements back into core. It’s all too easy to fall hopelessly behind and become nothing but an academic concern.

HadoopDB

Hadoop is the… uh, “new” kid on the block regarding big data. It’s designed to leverage multiple systems or VMs to spread storage mining, which would be a great way to address a VLDB system. So in 2009, a university project spawned HadoopDB to turn Postgres into a SQL interface and aggregator for Hadoop.

Sadly, that’s where the story ends. The Postgres Wiki says it’s still active, but for all intents and purposes, it has been abandoned. The quick start guide hasn’t been updated since 2009, and Thomas Koch did a presentation as late as 2011 denoting it as an effective proof of concept, but not much else.

In the end, it’s a really cool “glue” between Hadoop and Postgres. But without updates to enhance the interlinks, speed, efficiency, and bugs, it’s not suitable for a production environment. The project lived on in Hadapt before being acquired by Teradata and renamed to presto. That means there’s some potential to contact Teradata and make an existing Hadoop datastore more user friendly. The job of converting an existing Postgres cluster to a Hadoop equivalent is left as an (onerous) exercise for the user.

Postgres-X2

The Postgres-X2 project is a bit of a conundrum. Unlike Postgres-XL which is active and backed by 2ndQuadrant, Postgres-X2 seems to be a direct continuation of the abandoned Postgres-XC codebase. As a result, they’re still stuck on Postgres 9.3. Further, they likely have similar issues as we encountered with Postgres-XL, or worse due to the project’s stagnancy. After exploring the github repository, it turns out the last update to any code was two years ago.

Maybe there’s another repository elsewhere, but this project should be considered dead unless they pull a Holy Grail and claim otherwise.

Stado

I first encountered Stado back when it was GridSQL, one of many EnterpriseDB projects. It works by abstracting several database instances through a port proxy, distributing writes and reads arbitrarily based on its own internal magic. It uses Postgres as a filesystem of sorts, and connecting to Postgres directly reveals this in greater detail. Object names are the result of hash functions, and even databases are only tangentially linked to the desired given nomenclature.

Stado is all about metadata, and Postgres is its chosen host. Because I had experience with a previous incarnation, I made an exception and considered it undead for testing purposes, even though the Wiki says it died in 2011. It’s just a Java proxy after all, so what could it hurt to see if it still works with recent Postgres releases?

As it turns out, it can hurt a lot. It seems my memory of GridSQL was a little hazy, as what’s going on here isn’t merely a distribution proxy. It’s transformative and extremely restrictive, throwing up errors for “unknown” keywords such as SCHEMA. No schema support means there’s no way we can use it, which is something of a shame. The performance metrics were encouraging back in the day, and the concept it represents is sound.

Consider the PMPP extension, for example. When I looked at it late last year, I loved the simplicity. Take a query, broadcast it to every known Postgres node, and present the results. Wrap the function in another query, and it can be re-aggregated to merge everything together. I was hoping Stado did this automatically, and that was all. Nope. Oh well.

The Winner?

If someone could write something that worked like I thought Stado did, I’d probably kiss them. It would require manipulating the Postgres query planner or a proxy of some kind, but that’s all I really want. Send a query to multiple nodes, let them execute it independently in parallel, keep track of aggregate functions used, and apply them to the appropriate columns in the final result. It seems so simple, but the underlying complexity is clearly more involved.

The thing is, large warehouse databases usually contain data that’s already been in another source. Primary key collisions are unlikely, as some kind of engine (Postgres-XL, ETL, etc.) has already distributed data according to some kind of hash function. I just want a query that can invoke the cluster in parallel. That’s all. Extensions like PMPP do half of the job, but short of rewriting existing applications to leverage it properly, it’s only really usable for new projects.

So I’ll probably be looking into CitusDB a bit more. It seems to work the way I want, and adds shard redundancy as an extra benefit. I’ll put it on some VMs and unceremoniously thrash it after dumping hundreds of GB into its lap and record the ensuing chaos. Hopefully these tests go better than when I subjected pg_shard to the same treatment.

Otherwise, the state of Postgres scaling is incomplete, and there are no projects I know of that will suit our requirements. As a Postgres DBA, I probably try too hard to use it as a hammer on every nail, but it’s just so close as to be immensely frustrating.

Wish me luck!


Tags: , , , , , ,

« Older Posts