Category: VLDB

Reading Parallel Execution Plans With Bloom Pruning And Composite Partitioning

You’ve probably heard sayings like “sometimes things aren’t always what they seem” and “people lie”. Well, sometimes execution plans lie. It’s not really by intent, but it is sometimes difficult (or impossible) to represent everything in a query execution tree in nice tabular format like dbms_xplan gives.

One of the optimizations that was introduced back in 10gR2 was the use of bloom filters. Bloom filters can be used in two ways: 1) for filtering or 2) for partition pruning (bloom pruning) starting with 11g. Frequently the data models used in data warehousing are dimensional models (star or snowflake) and most Oracle warehouses use simple range (or interval) partitioning on the fact table date key column as that is the filter that yields the largest I/O reduction from partition pruning (most queries in a time series star schema include a time window, right!). As a result, it is imperative that the join between the date dimension and the fact table results in partition pruning.

Let’s consider a basic two table join between a date dimension and a fact table. For these examples I’m using STORE_SALES and DATE_DIM which are TPC-DS tables (I frequently use TPC-DS for experiments as it uses a dimensional (star) model and has a data generator.) STORE_SALES contains a 5 year window of data ranging from 1998-01-02 to 2003-01-02.

Range Partitioned STORE_SALES

For this example I used range partitioning on STORE_SALES.SS_SOLD_DATE_SK using 60 one month partitions (plus 1 partition for NULL SS_SOLD_DATE_SK values) that align with the date dimension (DATE_DIM) on calendar month boundaries. STORE_SALES has the parallel attribute (PARALLEL 16 in this case) set on the table to enable Oracle’s Parallel Execution (PX). Let’s look at the execution time and plan for our test query:

SQL> select
  2    max(ss_sales_price)
  3  from
  4    store_sales ss,
  5    date_dim d
  6  where
  7    ss_sold_date_sk = d_date_sk and
  8    d_year = 2000
  9  ;

MAX(SS_SALES_PRICE)
-------------------
                200

Elapsed: 00:00:41.67

SQL> select * from table(dbms_xplan.display_cursor(format=>'basic +parallel +partition +predicate'));

PLAN_TABLE_OUTPUT
--------------------------------------------------------------------------------------------------
EXPLAINED SQL STATEMENT:
------------------------
select   max(ss_sales_price) from   store_sales ss,   date_dim d where
 ss_sold_date_sk=d_date_sk and   d_year = 2000

Plan hash value: 934332680

---------------------------------------------------------------------------------------------------
| Id  | Operation                     | Name         | Pstart| Pstop |    TQ  |IN-OUT| PQ Distrib |
--------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT              |             |       |       |        |      |            |
|   1 |  SORT AGGREGATE               |             |       |       |        |      |            |
|   2 |   PX COORDINATOR              |             |       |       |        |      |            |
|   3 |    PX SEND QC (RANDOM)        | :TQ10001    |       |       |  Q1,01 | P->S | QC (RAND)  |
|   4 |     SORT AGGREGATE            |             |       |       |  Q1,01 | PCWP |            |
|*  5 |      HASH JOIN                |             |       |       |  Q1,01 | PCWP |            |
|   6 |       BUFFER SORT             |             |       |       |  Q1,01 | PCWC |            |
|   7 |        PART JOIN FILTER CREATE| :BF0000     |       |       |  Q1,01 | PCWP |            |
|   8 |         PX RECEIVE            |             |       |       |  Q1,01 | PCWP |            |
|   9 |          PX SEND BROADCAST    | :TQ10000    |       |       |        | S->P | BROADCAST  |
|* 10 |           TABLE ACCESS FULL   | DATE_DIM    |       |       |        |      |            |
|  11 |       PX BLOCK ITERATOR       |             |:BF0000|:BF0000|  Q1,01 | PCWC |            |
|* 12 |        TABLE ACCESS FULL      | STORE_SALES |:BF0000|:BF0000|  Q1,01 | PCWP |            |
--------------------------------------------------------------------------------------------------

Predicate Information (identified by operation id):
---------------------------------------------------

   5 - access("SS_SOLD_DATE_SK"="D_DATE_SK")
  10 - filter("D_YEAR"=2000)
  12 - access(:Z>=:Z AND :Z<=:Z)

In this execution plan you can see the creation of the bloom filter on line 7 which is populated from the values of D_DATE_SK from DATE_DIM. That bloom filter is then used to partition prune on the STORE_SALES table. This is why we see :BF0000 in the Pstart/Pstop columns.

Range-Hash Composite Partitioned STORE_SALES

For the next experiment, I kept the same range partitioning scheme but also added hash subpartitioning using the SS_ITEM_SK column (using 4 hash subpartitions per range partition). STORE_SALES2 has 61 range partitions x 4 hash subpartitions for a total of 244 aggregate partitions. Let’s look at the execution plan for our test query:

SQL> select
  2    max(ss_sales_price)
  3  from
  4    store_sales2 ss,
  5    date_dim d
  6  where
  7    ss_sold_date_sk = d_date_sk and
  8    d_year = 2000
  9  ;

MAX(SS_SALES_PRICE)
-------------------
                200

Elapsed: 00:00:41.06

SQL> select * from table(dbms_xplan.display_cursor(format=>'basic +parallel +partition +predicate'));

PLAN_TABLE_OUTPUT
--------------------------------------------------------------------------------------------------
EXPLAINED SQL STATEMENT:
------------------------
select   max(ss_sales_price) from   store_sales2 ss,   date_dim d where
  ss_sold_date_sk=d_date_sk and   d_year = 2000

Plan hash value: 2496395846

---------------------------------------------------------------------------------------------------
| Id  | Operation                     | Name         | Pstart| Pstop |    TQ  |IN-OUT| PQ Distrib |
---------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT              |              |       |       |        |      |            |
|   1 |  SORT AGGREGATE               |              |       |       |        |      |            |
|   2 |   PX COORDINATOR              |              |       |       |        |      |            |
|   3 |    PX SEND QC (RANDOM)        | :TQ10001     |       |       |  Q1,01 | P->S | QC (RAND)  |
|   4 |     SORT AGGREGATE            |              |       |       |  Q1,01 | PCWP |            |
|*  5 |      HASH JOIN                |              |       |       |  Q1,01 | PCWP |            |
|   6 |       BUFFER SORT             |              |       |       |  Q1,01 | PCWC |            |
|   7 |        PART JOIN FILTER CREATE| :BF0000      |       |       |  Q1,01 | PCWP |            |
|   8 |         PX RECEIVE            |              |       |       |  Q1,01 | PCWP |            |
|   9 |          PX SEND BROADCAST    | :TQ10000     |       |       |        | S->P | BROADCAST  |
|* 10 |           TABLE ACCESS FULL   | DATE_DIM     |       |       |        |      |            |
|  11 |       PX BLOCK ITERATOR       |              |     1 |     4 |  Q1,01 | PCWC |            |
|* 12 |        TABLE ACCESS FULL      | STORE_SALES2 |     1 |   244 |  Q1,01 | PCWP |            |
---------------------------------------------------------------------------------------------------

Predicate Information (identified by operation id):
---------------------------------------------------

   5 - access("SS_SOLD_DATE_SK"="D_DATE_SK")
  10 - filter("D_YEAR"=2000)
  12 - access(:Z>=:Z AND :Z<=:Z)

Once again you can see the creation of the bloom filter from DATE_DIM on line 7, however you will notice that we no longer see :BF0000 as our Pstart and Pstop values. In fact, it may appear that partition pruning is not taking place at all as we see 1/244 as our Pstart/Pstop values. However, if we compare the execution times between the range and range/hash queries you note they are identical to the nearest second, thus there really is no way that partition (bloom) pruning is not taking place. After all, if this plan read all 5 years of data it would take 5 times as long as reading just 1 year and that certainly is not the case. Would you have guessed that partition pruning is taking place had we not worked though the range only experiment first? Hmmm…

So What Is Going On?

Before we dive in, let’s quickly look at what the execution plans would look like if PX was not used (using serial execution).

--
-- Range Partitioned, Serial Execution
--

---------------------------------------------------------------------
| Id  | Operation                     | Name        | Pstart| Pstop |
---------------------------------------------------------------------
|   0 | SELECT STATEMENT              |             |       |       |
|   1 |  SORT AGGREGATE               |             |       |       |
|*  2 |   HASH JOIN                   |             |       |       |
|   3 |    PART JOIN FILTER CREATE    | :BF0000     |       |       |
|*  4 |     TABLE ACCESS FULL         | DATE_DIM    |       |       |
|   5 |    PARTITION RANGE JOIN-FILTER|             |:BF0000|:BF0000|
|   6 |     TABLE ACCESS FULL         | STORE_SALES |:BF0000|:BF0000|
---------------------------------------------------------------------
              
--
-- Range-Hash Composite Partitioned, Serial Execution
--
                                       
----------------------------------------------------------------------
| Id  | Operation                     | Name         | Pstart| Pstop |
----------------------------------------------------------------------
|   0 | SELECT STATEMENT              |              |       |       |
|   1 |  SORT AGGREGATE               |              |       |       |
|*  2 |   HASH JOIN                   |              |       |       |
|   3 |    PART JOIN FILTER CREATE    | :BF0000      |       |       |
|*  4 |     TABLE ACCESS FULL         | DATE_DIM     |       |       |
|   5 |    PARTITION RANGE JOIN-FILTER|              |:BF0000|:BF0000|
|   6 |     PARTITION HASH ALL        |              |     1 |     4 |
|   7 |      TABLE ACCESS FULL        | STORE_SALES2 |     1 |   244 |
----------------------------------------------------------------------

When using composite partitioning, pruning is placed on one of the partition iterators. When the two nested partition iterators (range/hash in this case) are changed into a block iterator (line 14 – PX BLOCK ITERATOR), we have to pick a “victim” in the query plan tree since only one node in the plan needs now to carry the pruning information (with PX the pruning is really done by the QC, not the row source like in serial plans). As a result, the information associated the the victimized partition iterator is lost in the explain plan. This is why there is no :BF0000 for Pstart/Pstop in the plan in this case. It is probably more accurate to have the parallel plans for both range and range/hash look like this:

---------------------------------------------------------------------------------------------------
| Id  | Operation                     | Name         | Pstart| Pstop |    TQ  |IN-OUT| PQ Distrib |
---------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT              |              |       |       |        |      |            |
|   1 |  SORT AGGREGATE               |              |       |       |        |      |            |
|   2 |   PX COORDINATOR              |              |       |       |        |      |            |
|   3 |    PX SEND QC (RANDOM)        | :TQ10001     |       |       |  Q1,01 | P->S | QC (RAND)  |
|   4 |     SORT AGGREGATE            |              |       |       |  Q1,01 | PCWP |            |
|*  5 |      HASH JOIN                |              |       |       |  Q1,01 | PCWP |            |
|   6 |       BUFFER SORT             |              |       |       |  Q1,01 | PCWC |            |
|   7 |        PART JOIN FILTER CREATE| :BF0000      |       |       |  Q1,01 | PCWP |            |
|   8 |         PX RECEIVE            |              |       |       |  Q1,01 | PCWP |            |
|   9 |          PX SEND BROADCAST    | :TQ10000     |       |       |        | S->P | BROADCAST  |
|* 10 |           TABLE ACCESS FULL   | DATE_DIM     |       |       |        |      |            |
|  11 |       PX BLOCK ITERATOR       |              |       |       |  Q1,01 | PCWC |            |
|* 12 |        TABLE ACCESS FULL      | STORE_SALES  |:BF0000|:BF0000|  Q1,01 | PCWP |            |
---------------------------------------------------------------------------------------------------

Where the bloom pruning is on the TABLE ACCESS FULL row source. This is because there is no Pstart/Pstop for a PX BLOCK ITERATOR row source (it’s block ranges, so partition information is lost – it had been contained in level above this).

Hopefully this helps you understand and correctly identify execution plans contain bloom pruning even though at first glance you may not think they do. If you are uncertain, use the execution stats for the query looking at metrics like amount of data read and execution times to provide some empirical insight.

The Core Performance Fundamentals Of Oracle Data Warehousing – Set Processing vs Row Processing

[back to Introduction]

In over six years of doing data warehouse POCs and benchmarks for clients there is one area that I frequently see as problematic: “batch jobs”.  Most of the time these “batch jobs” take the form of some PL/SQL procedures and packages that generally perform some data load, transformation, processing or something similar.  The reason these are so problematic is that developers have hard-coded “slow” into them.  I’m generally certain these developers didn’t know they had done this when they coded their PL/SQL, but none the less it happened.

So How Did “Slow” Get Hard-Coded Into My PL/SQL?

Generally “slow” gets hard-coded into PL/SQL because the PL/SQL developer(s) took the business requirements and did a “literal translation” of each rule/requirement one at a time instead of looking at the “before picture” and the “after picture” and determining the most efficient way to make those data changes.  Many times this can surface as cursor based row-by-row processing, but it also can appear as PL/SQL just running a series of often poorly thought out SQL commands.

Hard-Coded Slow Case Study

The following is based on a true story. Only the facts names have been changed to protect the innocent.

Here is a pseudo code snippet based on a portion of some data processing I saw in a POC:

{truncate all intermediate tables}
insert into temp1 select * from t1 where create_date = yesterday;
insert into temp1 select * from t2 where create_date = yesterday;
insert into temp1 select * from t3 where create_date = yesterday;
insert into temp2 select * from temp1 where {some conditions};
insert into target_table select * from temp2;
for each of 20 columns
loop
  update target_table t
    set t.column_name =
      (select column_name
       from t4
       where t.id=t4.id )
    where i.column_name is null;
end loop
update target_table t set {list of 50 columns} = select {50 columns} from t5 where t.id=t5.id;

I’m going to stop there as any more of this will likely make you cry more than you already should be.

I almost hesitate to ask the question, but isn’t it quite obvious what is broken about this processing?  Here’s the major inefficiencies as I see them:

  • What is the point of inserting all the data into temp1, only then to filter some of it out when temp2 is populated.  If you haven’t heard the phrase “filter early” you have some homework to do.
  • Why publish into the target_table and then perform 20 single column updates, followed by a single 50 column update?  Better question yet: Why perform any bulk updates at all?  Bulk updates (and deletes) are simply evil – avoid them at all costs.

So, as with many clients that come in and do an Exadata Database Machine POC, they really weren’t motivated to make any changes to their existing code, they just wanted to see how much performance the Exadata platform would give them.  Much to their happiness, this reduced their processing time from over 2.5 days (weekend job that started Friday PM but didn’t finish by Monday AM) down to 10 hours, a savings of over 2 days (24 hours).  Now, it could fail and they would have time to re-run it before the business opened on Monday morning.  Heck, I guess if I got back 24 hours out of 38 I’d be excited too, if I were not a database performance engineer who knew there was even more performance left on the table, waiting to be exploited.

Feeling unsatisfied, I took it upon myself to demonstrate the significant payback that re-engineering can yield on the Exadata platform and I coded up an entirely new set-based data flow in just a handful of SQL statements (no PL/SQL).  The result: processing an entire week’s worth of data (several 100s of millions of rows) now took just 12 minutes.  That’s right — 7 days worth of events scrubbed, transformed, enriched and published in just 12 minutes.

When I gently broke the news to this client that it was possible to load the week’s events in just 12 minutes they were quite excited (to say the least).  In fact, one person even said (a bit out of turn), “well, that would mean that a single day’s events could be loaded in just a couple minutes and that would give a new level of freshness to the data which would allow the business to make faster, better decisions due to the timeliness of the data.”  My response: “BINGO!”  This client now had the epiphany of what is now possible with Exadata where previously it was impossible.

It’s Not a Need, It’s a Want

I’m not going to give away my database engineer hat for a product marketing hat just yet (or probably ever), but this is the reality that exists.  IT shops started with small data sets and use small data set programming logic on their data, and that worked for some time.  The reason: because inefficient processing on a small data set is only a little inefficient, but the same processing logic on a big data set is very inefficient.  This is why I have said before: In oder to fully exploit the Oracle Exadata platform (or any current day platform) some re-engineering may be required. Do not be mistaken — I am not saying you need to re-engineer your applications for Exadata.  I am saying you will want to re-engineer your applications for Exadata as those applications simply were not designed to leverage the massively parallel processing that Exadata allows one to do.  It’s time to base design decisions based on today’s technology, not what was available when your application was designed.  Fast forward to now.

The Core Performance Fundamentals Of Oracle Data Warehousing – Data Loading

[back to Introduction]

Getting flat file data into your Oracle data warehouse is likely a daily (or more possibly frequent) task, but it certainly does not have to be a difficult one.  Bulk loading data rates are governed by the following operations and hardware resources:

  1. How fast can the data be read
  2. How fast can data be written out
  3. How much CPU power is available

I’m always a bit amazed (and depressed) when I hear people complain that their data loading rates are slow and they proceed to tell me things like:

  • The source files reside on a shared NFS filer (or similar) and it has just a single GbE (1 Gigabit Ethernet) network path to the Oracle database host(s).
  • The source files reside on this internal disk volume which consists of a two disk mirror (or a volume with very few spindles).

Maybe it’s not entirely obvious so let me spell it out (as I did in this tweet):

One can not load data into a database faster than it can be delivered from the source. Database systems must obey the laws of physics!

Or putting it another way: Don’t fall victim to slow data loading because of a slow performing data source.

Given a system that can provide data at fast enough rates, the loading rate becomes a factor of point #2 and #3. The database operations can be simplified to:

  1. Read lines from flat files
  2. Process lines into columns, internal data types and optionally compress
  3. Write rows/columns out to disk

In most cases a reasonably size system becomes CPU bound, not write bound, on data loads as almost all Oracle data warehouses use compression which increases CPU consumption but reduces the IO requirement for the writes.  Or putting it another way:  Bulk loading into a compressed table should be a CPU bound operation, not a disk (write) bound operation.

Data Loading Best Practices (What To Do and Why To Do It)

Oracle offers two methods to load data from flat files: 1) SQL*Loader and 2) External Tables.  I would highly recommend that bulk loads (especially PDML loads) be done via External Tables and SQL*Loader only be used for non-parallel loads (PARALLEL=false) with small amounts of data (not bulk loads).  The high level reason for this recommendation is that External Tables have nearly all the SQL functionality of a heap table and allow numerous more optimizations than SQL*Loader does and there are some undsireable side effects (mostly in the space management layer) from using PARALLEL=true with SQL*Loader.

In order to avoid the reading of the flat files being the bottleneck, use a filesystem that is backed by numerous spindles (more than enough to provide the desired loading rate) and consider using compressed files in conjunction with the external table preprocessor.  Using the preprocessor is especially useful if  there are proportionally more CPU resources on the database system than network or disk bandwidth because the use of compression on the source files allows for a larger logical row “delivery” for a given file size.  Something that may not be obvious either is to put the flat files on a filesystem that is mounted using directio mount options.  This will eliminate the file system cache being flooded with data that will (likely) never be read again (how many times do you load the same files?).  Another option that becomes available with Oracle 11.2 is DBFS (database filesystem) and is what is frequently used with the Oracle Database Machines & Exadata which is a fast and scalable solution for staging flat files.

In order to achieve the best loading speeds be sure to:

  • Use External Tables
  • Use a staging filesystem (and network) fast enough to meet your loading speed requirements (and consider directio mount options)
  • Use Parallel Execution (parallel CTAS or PDML INSERT)
  • Use direct-path loads (nologging CTAS or INSERT /*+ APPEND */)
  • Use a large enough initial/next extent size (8MB is usually enough)

If you follow these basic recommendations you should be able to achieve loading speeds that easily meet your requirements (otherwise you likely just need more hardware).

Loading Data At Ludicrous Speed

I’ve yet to come across a reasonably designed system that is capable of becoming write bound as systems simply either 1) do not have enough CPU to do so or 2) are unable to read the source flat files anywhere near fast enough to do so.  I have, however, conducted experiments to test write throughput of a Sun Oracle Database Machine (Exadata V2) by using flat files cached completely in the filesystem cache and referencing them numerous times in the External Table DDL. The results should be quite eye opening for many, especially those who think the Oracle database can not load data fast.  Loading into an uncompressed table, I was able load just over 1TB of flat file data (over 7.8 billion rows) in a mear 4.6 minutes (275 seconds).  This experiment does not represent typical loading speed rates as it’s unlikely the source files are on a filesystem as fast as main memory, but it does demonstrate that if the flat file data could be delivered at such rates, the Oracle software and hardware can easily load it at close to physics speed (the max speed the hardware is capable of).

SQL> create table fastload
  2  pctfree 0
  3  parallel
  4  nologging
  5  nocompress
  6  storage(initial 8m next 8m)
  7  tablespace ts_smallfile
  8  as
  9  select * from et_fastload;

Table created.

Elapsed: 00:04:35.49

SQL> select count(*) from fastload;

     COUNT(*)
-------------
7,874,466,950

Elapsed: 00:01:06.54

SQL> select ceil(sum(bytes)/1024/1024) mb from user_segments where segment_name='FASTLOAD';

       MB
---------
1,058,750

SQL> exec dbms_stats.gather_table_stats(user,'FASTLOAD');

PL/SQL procedure successfully completed.

SQL> select num_rows,blocks,avg_row_len from user_tables where table_name='FASTLOAD';

  NUM_ROWS     BLOCKS AVG_ROW_LEN
---------- ---------- -----------
7874466950  135520008         133

Just so you don’t think I’m making this stuff up, check out the SQL Monitor Report for the execution, noting the IO throughput graph from the Metrics tab (10GB/s write throughput isn’t half bad).

So as you can see, flat file data loading has really become more of a data delivery problem rather than a data loading problem.  If the Oracle Database, specifically the Exadata powered Oracle Database Machine, can bulk load data from an external table whose files reside in the filesystem cache at a rate of 13TB per hour (give or take), you probably don’t have to worry too much about meeting your data loading rate business requirements (wink).

Note: Loading rates will vary slightly depending on table definition, number of columns, data types, compression type, etc.

References

The Core Performance Fundamentals Of Oracle Data Warehousing – Parallel Execution

[back to Introduction]

Leveraging Oracle’s Parallel Execution (PX) in your Oracle data warehouse is probably the most important feature/technology one can use to speed up operations on large data sets.  PX is not, however, “go fast” magic pixi dust for any old operation (if thats what you think, you probably don’t understand the parallel computing paradigm). With Oracle PX, a large task is broken up into smaller parts, sub-tasks if you will, and each sub-task is then worked on in parallel.  The goal of Oracle PX: divide and conquer.  This allows a significant amount of hardware resources to be engaged in solving a single problem and is what allows the Oracle database to scale up and out when working with large data sets.

I though I’d touch on some basics and add my observations but this is by far not an exhaustive write up on Oracle’s Parallel Execution.  There is an entire chapter in the Oracle Database documentation on PX as well as several white papers.  I’ve listed all these in the Resources section at the bottom of this post.  Read them, but as always, feel free to post questions/comments here.  Discussion adds great value.

A Basic Example of Parallel Execution

Consider a simple one table query like the one below.

You can see that the PX Coordinator (also known as the Query Coordinator or QC) breaks up the “work” into several chunks and those chunks are worked on by the PX Server Processes.  The technical term for the chunk a PX Server Process works on is called a granule.  Granules can either be block-based or partition-based.

When To Use Parallel Execution

PX is a key component in data warehousing as that is where large data sets usually exist.  The most common operations that use PX are queries (SELECTs) and data loads (INSERTs or CTAS).  PX is most commonly controlled by using the PARALLEL attribute on the object, although it can be controlled by hints or even Oracle’s Database Resource Manager.  If you are not using PX in your Oracle data warehouse than you are probably missing out on a shedload of performance opportunity.

When an object has its PARALLEL attribute set or the PARALLEL hint is used queries will leverage PX, but to leverage PX for DML operations (INSERT/DELETE/UPDATE) remember to alter your session by using the command:

alter session [enable|force] parallel dml;

Do Not Fear Parallel Execution

Since Oracle’s PX is designed to take advantage of multiple CPUs (or CPU cores) at a time, it can leverage significant hardware resources, if available.  From my experiences in talking with Oracle DBAs, the ability for PX to do this scares them. This results in DBAs implementing a relatively small degree of parallelism (DOP) for a system that could possibly support a much higher level (based on #CPUs).  Often times though, the system that PX is being run on is not a balanced system and frequently has much more CPU power than disk and channel bandwidth, so data movement from disk becomes the bottleneck well before the CPUs are busy.  This results in many statements like “Parallel Execution doesn’t work” or similar because the user/DBA isn’t observing a decrease in execution time with more parallelism.  Bottom line:  if the hardware resources are not available, the software certainly can not scale.

Just for giggles (and education), here is a snippet from top(1) from a node from an Oracle Database Machine running a single query (across all 8 database nodes) at DOP 256.

top - 20:46:44 up 5 days,  3:48,  1 user,  load average: 36.27, 37.41, 35.75
Tasks: 417 total,  43 running, 373 sleeping,   0 stopped,   1 zombie
Cpu(s): 95.6%us,  1.6%sy,  0.0%ni,  2.2%id,  0.0%wa,  0.2%hi,  0.4%si,  0.0%st
Mem:  74027752k total, 21876824k used, 52150928k free,   440692k buffers
Swap: 16771852k total,        0k used, 16771852k free, 13770844k cached

USER       PID  PR  NI  VIRT  SHR  RES S %CPU %MEM    TIME+  COMMAND
oracle   16132  16   0 16.4g 5.2g 5.4g R 63.8  7.6 709:33.02 ora_p011_orcl
oracle   16116  16   0 16.4g 4.9g 5.1g R 60.9  7.2 698:35.63 ora_p003_orcl
oracle   16226  15   0 16.4g 4.9g 5.1g R 59.9  7.2 702:01.01 ora_p028_orcl
oracle   16110  16   0 16.4g 4.9g 5.1g R 58.9  7.2 697:20.51 ora_p000_orcl
oracle   16122  15   0 16.3g 4.9g 5.0g R 56.9  7.0 694:54.61 ora_p006_orcl

(Quite the TIME+ column there, huh!)

Summary

In this post I’ve been a bit light on the technicals of PX, but that is mostly because 1) this is a fundamentals post and 2) there is a ton of more detail in the referenced documentation and I really don’t feel like republishing what already exists. Bottom line, Oracle Parallel Execution is a must for scaling performance in your Oracle data warehouse.  Take the time to understand how to leverage it to maximize performance in your environment and feel free to start a discussion here if you have questions.

References

The Core Performance Fundamentals Of Oracle Data Warehousing – Partitioning

[back to Introduction]

Partitioning is an essential performance feature for an Oracle data warehouse because partition elimination (or partition pruning) generally results in the elimination of a significant amount of table data to be scanned. This results in a need for less system resources and improved query performance. Someone once told me “the fastest I/O is the one that never happens.” This is precisely the reason that partitioning is a must for Oracle data warehouses – it’s a huge I/O eliminator. I frequently refer to partition elimination as the anti-index. An index is used to find a small amount data that is required; partitioning is used to eliminate vasts amounts of data that is not required.

Main Uses For Partitioning

I would classify the main reasons to use partitioning in your Oracle data warehouse into these four areas:

  • Data Elimination
  • Partition-Wise Joins
  • Manageability (Partition Exchange Load, Local Indexes, etc.)
  • Information Lifecycle Management (ILM)

Partitioning Basics

The most common partitioning design pattern found in Oracle data warehouses is to partition the fact tables by range (or interval) on the event date/time column. This allows for partition elimination of all the data not in the desired time window in queries. For example: If I have a fact table that contains point of sale (POS) data, each line item for a given transaction has a time stamp of when the item was scanned. Let’s say this value is stored in column EVENT_TS which is a DATE or TIMESTAMP data type. In most cases it would make sense to partition by range on EVENT_TS using one day partitions. This means every query that uses a predicate filter on EVENT_TS (which should be nearly every one) can eliminate significant amounts of data that is not required to satisfy the query predicate.  If you want to look at yesterday’s sales numbers, there is no need to bring back rows from last week or last month!

Subpartitioning Options

Depending on the schema design of your data warehouse you may also chose to subpartition a table. This allows one to further segment a table to allow for even more data elimination or it can allow for partition-wise joins which allow for reduced usage of CPU and memory resources by minimizing the amount of data exchanged between parallel execution server processes. In third normal form (3NF) schemas it is very beneficial to use hash partitioning or subpartitioning to allow for partition-wise joins (see Oracle Parallel Execution: Interconnect Myths And Misunderstandings) for this exact reason. Dimensional models (star schemas) may also benefit from hash subpartitioning and partition-wise joins. Generally it is best to hash subpartition on a join key column to a very large dimension, like CUSTOMER, so that a partition-wise join will be used between the fact table and the large dimension table.

Manageability

Managing large objects can be challenging for a variety of reasons which is why Oracle Partitioning allows for many operations to be done at a global or partition (or subpartition) level.  This makes it much easier to deal with tables or indexes of large sizes.  It also is transparent to applications so the SQL that runs against a non-partitioned object will run as-is against a partitioned object.  Some of the key features include:

  • Partition Exchange Load – Data can be loaded “out of line” and exchanged into a partitioned table.
  • Local Indexes – It takes much less time to build local indexes than global indexes.
  • Compression – Can be applied at the segment level so it’s possible to have a mix of compressed and non-compressed partitions.
  • Segment Moves/Rebuilds/Truncates/Drops – Each partition (or subpartition) is a segment and can be operated on individually and independently of the other partitions in the table.
  • Information Lifecycle Management (ILM) – Partitioning allows implementation of an ILM strategy.

Summary

I’d classify partitioning as a “must have” for Oracle data warehouses for both the performance and manageability reasons described above. Partitioning should lower query response time as well as resource utilization do to “smart” data access (only go after the data the query needs). There are additional partitioning design patterns as well and the Oracle documentation contains descriptions of them as well as examples.

Oracle Documentation References:

The Core Performance Fundamentals Of Oracle Data Warehousing – Table Compression

[back to Introduction]
Editor’s note: This blog post does not cover Exadata Hybrid Columnar Compression.

The first thing that comes to most people’s mind when database table compression is mentioned is the savings it yields in terms of disk space. While reducing the footprint of data on disk is relevant, I would argue it is the lesser of the benefits for data warehouses. Disk capacity is very cheap and generally plentiful, however, disk bandwidth (scan speed) is proportional to the number of spindles, no mater what the disk capacity and thus is more expensive. Table compression reduces the footprint on the disk drives that a given data set occupies so the amount of physical data that must be read off the disk platters is reduced when compared to the uncompressed version. For example, if 4000 GB of raw data can compress to 1000 GB, it can be read off the same disk drives 4X as fast because it is reading and transferring 1/4 of the data off the spindles (relative to the uncompressed size). Likewise, table compression allows for the database buffer cache to contain more data without having to increase the memory allocation because more rows can be stored in a compressed block/page compared to an uncompressed block/page.

Row major table compression comes in two flavors with the Oracle database: BASIC and OLTP.  In 11.1 these were also known by the key phrases COMPRESS or COMPRESS FOR DIRECT_LOAD OPERATIONS and COMPRESS FOR ALL OPERATIONS.  The BASIC/DIRECT_LOAD compression has been part of the Oracle database since version 9 and ALL OPERATIONS/OLTP compression was introduced in 11.1 with the Advanced Compression option.

Oracle row major table compression works by storing the column values for a given block in a symbol table at the beginning of the block. The more repeated values per block, even across columns, the better the compression ratio. Sorting data can increase the compression ratio as ordering the data will generally allow more repeat values per block. Specific compression ratios and gains from sorting data are very data dependent but compression ratios are generally between 2x and 4x.

Compression does add some CPU overhead when direct path loading data, but there is no measurable performance overhead when reading data as the Oracle database can operate on compressed blocks directly without having to first uncompress the block. The additional CPU required when bulk loading data is generally well worth the down wind gains for data warehouses. This is because most data in a well designed data warehouse is write once, read many times. Insert only and infrequently modified tables are ideal candidates for BASIC compression.  If the tables have significant DML performed against them, then OLTP compression would be advised (or no compression).

Given that most Oracle data warehouses that I have seen are constrained by I/O bandwidth (see Balanced Hardware Configuration) it is highly recommended to leverage compression so the logical table scan rate can increase proportionally to the compression ratio.  This will result in faster table and partition scans on the same hardware.

Oracle Documentation References:

The Core Performance Fundamentals Of Oracle Data Warehousing – Balanced Hardware Configuration

[back to Introduction]

If you want to build a house that will stand the test of time, you need to build on a solid foundation. The same goes for architecting computer systems that run databases. If the underlying hardware is not sized appropriately it will likely lead to people blaming software. All too often I see data warehouse systems that are poorly architected for the given workload requirements. I frequently tell people, “you can’t squeeze blood from a turnip“, meaning if the hardware resources are not there for the software to use, how can you expect the software to scale?

Undersizing data warehouse systems has become an epidemic with open platforms – platforms that let you run on any brand and configuration of hardware. This problem has been magnified over time as the size of databases have grown significantly, and generally outpacing the experience of those managing them. This has caused the “big three” database vendors to come up with suggested or recommended hardware configurations for their database platforms:

 

Simply put, the reasoning behind those initiatives was to help customers architect systems that are well balanced and sized appropriately for the size of their data warehouse.

Balanced Hardware Configurations

The foundation for a well performing data warehouse (or any system for that matter) is the hardware that it runs on. There are three main hardware resources to consider when sizing your data warehouse hardware. Those are:

  1. Number of CPUs
  2. Number of storage devices (HDDs or SSDs)
  3. I/O bandwidth between CPUs and storage devices

 

NB: I’ve purposely left off memory (RAM) as most systems are pretty well sized at 2GB or 4GB per CPU core these days.

A balanced system has the following characteristics:

dd459146image013en-us.jpg

 

As you can see, each of the three components are sized proportionally to each other. This allows for the max system throughput capacity as no single resource will become the bottleneck before any other. This was one of the critical design decisions that went into the Oracle Database Machine.

Most DBAs and System Admins know what the disk capacity numbers are for their systems, but when it comes to I/O bandwidth or scan rates, most are unaware of what the system is capable of in theory, let alone in practice. Perhaps I/O bandwidth utilization should be included in the system metrics that are collected for your databases. You do collect system metrics, right?

There are several “exchanges” that data must flow through from storage devices to host CPUs, many of which could become bottlenecks. Those include:

  • Back-end Fibre Channel loops (the fibre between the drive shelves and the storage array server processor)
  • Front-end Fibre Channel ports
  • Storage array server processors (SP)
  • Host HBAs

One should understand the throughput capacity of each of these components to ensure that one (or more) of them do not restrict the flow of data to the CPUs prematurely.

 

Unbalanced Hardware Configurations

All too frequently systems are not architected as balanced systems and the system ends up being constrained in one of the following three scenarios:

dd459146image011en-us.jpg

 

From the production systems that I have seen, the main deficiency is in I/O bandwidth (both I/O Channel and HDD). I believe there are several reasons for this. First, too many companies capacity plan for their data warehouse based on the size the data occupies on disk alone. That is, they purchase the number of HDDs for the system based on the drive capacity, not on the I/O bandwidth requirement. Think of it like this: If you were to purchase 2 TB of mirrored disk capacity (4 TB total) would you rather purchase 28 x 146 GB drives or 14 x 300 GB drives (or even 4 x 1 TB drives)? You may ask: Well, what is the difference (other than price); in each case you have the same net capacity, correct? Indeed, both configurations do have the same capacity, but I/O bandwidth (how fast you can read data off the HDDs) is proportional to the number of HDDs, not the capacity. Thus it should be slightly obvious then that 28 HDDs can deliver 2X the disk I/O bandwidth that 14 HDDs can. This means that it will take 2X as long to read the same amount of data off of 14 HDDs as 28 HDDs.

Unfortunately what tends to happen is that the bean counter types will see only two things:

  1. The disk capacity (space) you want to purchase (or the capacity that is required)
  2. The price per MB/GB/TB

This is where someone worthy of the the title systems architect needs to stand up and explain the concept of I/O bandwidth and the impact it has on data warehouse performance (your systems architect does know this, correct?). This is generally a difficult discussion because I/O bandwidth is not a line item on a purchase order, it is a derived metric that requires both thought and engineering (which means someone had to do some thinking about the requirements for this system!).

 

Summary

When sizing the hardware for your data warehouse consider your workload and understand following (and calculate numbers for them!):

  1. What rate (in MB/GB per second) can the CPUs consume data?
  2. What rate can storage devices produce data (scan rate)?
  3. What rate can the data be delivered from the storage array(s) to the host HBAs?

 

If you are unable to answer these questions in theory then you need to sit down and do some calculations. Then you need to use some micro benchmarks (like Oracle ORION) and prove out those calculations. This will give you the “speed limit” and an metric by which you can measure your database workload against. All computer systems much obey the laws of physics! There is no way around that.

Additional Readings

Kevin Closson has several good blog posts on related topics including:

as well as numerous others.

 

Oracle Documentation References:

 

The Core Performance Fundamentals Of Oracle Data Warehousing – Introduction

At the 2009 Oracle OpenWorld Unconference back in October I lead a chalk and talk session entitled The Core Performance Fundamentals Of Oracle Data Warehousing. Since this was a chalk and talk I spared the audience any powerpoint slides but I had several people request that make it into a presentation so they could share it with others. After some thought, I decided that a series of blog posts would probably be a better way to share this information, especially since I tend to use slides as a speaking outline, not a condensed version of a white paper. This will be the first of a series of posts discussing what I consider to be the key features and technologies behind well performing Oracle data warehouses.

Introduction

As an Oracle database performance engineer who has done numerous customer data warehouse benchmarks and POCs over the past 5+ years, I’ve seen many data warehouse systems that have been plagued with problems on nearly every DBMS commonly used in data warehousing. Interestingly enough, many of these systems were facing many of the same problems. I’ve compiled a list of topics that I consider to be key features and/or technologies for Oracle data warehouses:

Core Performance Fundamental Topics

In the upcoming posts, I’ll deep dive into each one of these topics discussing why these areas are key for a well performing Oracle data warehouse. Stay tuned…

Oracle OpenWorld 2009: The Real-World Performance Group

Even though Oracle OpenWorld 2009 is a few months away, I thought I would take a moment to mention that the Oracle Real-World Performance Group will again be hosting three sessions. Hopefully you are no stranger to our Oracle database performance sessions and this year we have what I think will be a very exciting and enlightening session: The Terabyte Hour with the Real-World Performance Group. If you are the slightest bit interested in seeing just how fast the Oracle Database Machine really is and how it can devour flat files in no time, rip through and bend data at amazing speeds, this is the session for you. All the operations will be done live for you to observe. No smoke. No mirrors. Pure Exadata performance revealed.

Session ID: S311237
Session Title: Real-World Database Performance Roundtable
Session Abstract: This session is a panel discussion including Oracle’s Real-World Performance Group and other invited performance experts. To make the hour productive, attendees need to write their questions on postcards and hand them to the panel at the beginning of the session. The questions should stick to the subject matter of real-world database performance. The panel members look forward to meeting you and answering your questions.
   
Session ID: S311239
Session Title: The Terabyte Hour with the Real-World Performance Group
Session Abstract: Last year at Oracle OpenWorld, Oracle launched the Oracle Database Machine, a complete package of software, servers, and storage with the power to tackle large-scale business intelligence problems immediately and scale linearly as your data warehouse grows. In this session, Oracle’s Real-World Performance Group demonstrates how to use an Oracle Database Machine to load, transform, and query a 1-terabyte database in less than an hour. The demonstration shows techniques for exploiting full database parallelism in a simple but optimal manner.
   
Session ID: S311238
Session Title: Current Trends in Real-World Database Performance
Session Abstract: The year 2009 has been an exciting one for Oracle’s Real-World Performance Group. The group has been challenged by bigger databases, new performance challenges, and now the Oracle Database Machine with Oracle Exadata Storage Server. This session focuses on some of the real-world performance ideas and solutions that have worked over the last year, including performance design philosophies, best practices, and a few tricks and tips.

Oracle Parallel Execution: Interconnect Myths And Misunderstandings

A number of weeks back I had come across a paper/presentation by Riyaj Shamsudeen entitled Battle of the Nodes: RAC Performance Myths (avaiable here). As I was looking through it I saw one example that struck me as very odd (Myth #3 – Interconnect Performance) and I contacted him about it. After further review Riyaj commented that he had made a mistake in his analysis and offered up a new example. I thought I’d take the time to discuss this as parallel execution seems to be one of those areas where many misconceptions and misunderstandings exist.

The Original Example

I thought I’d quickly discuss why I questioned the initial example. The original query Riyaj cited is this one:

select /*+ full(tl) parallel (tl,4) */
       avg (n1),
       max (n1),
       avg (n2),
       max (n2),
       max (v1)
from   t_large tl;

As you can see this is a very simple single table aggregation without a group by. The reason that I questioned the validity of this example in the context of interconnect performance is that the parallel execution servers (parallel query slaves) will each return exactly one row from the aggregation and then send that single row to the query coordinator (QC) which will then perform the final aggregation. Given that, it would seem impossible that this query could cause any interconnect issues at all.

Riyaj’s Test Case #1

Recognizing the original example was somehow flawed, Riyaj came up with a new example (I’ll reference as TC#1) which consisted of the following query:

select /*+ parallel (t1, 8,2) parallel (t2, 8, 2)  */
       min (t1.customer_trx_line_id + t2.customer_trx_line_id),
       max (t1.set_of_books_id + t2.set_of_books_id),
       avg (t1.set_of_books_id + t2.set_of_books_id),
       avg (t1.quantity_ordered + t2.quantity_ordered),
       max (t1.attribute_category),
       max (t2.attribute1),
       max (t1.attribute2)
from   (select *
        from   big_table
        where  rownum &amp;lt;= 100000000) t1,
       (select *
        from   big_table
        where  rownum &amp;lt;= 100000000) t2
where  t1.customer_trx_line_id = t2.customer_trx_line_id;

The execution plan for this query is:

----------------------------------------------------------------------------------------------------------------------------
| Id  | Operation                 | Name      | Rows  | Bytes |TempSpc| Cost (%CPU)| Time     |    TQ  |IN-OUT| PQ Distrib |
----------------------------------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT          |           |     1 |   249 |       |  2846K  (4)| 01:59:01 |        |      |            |
|   1 |  SORT AGGREGATE           |           |     1 |   249 |       |            |          |        |      |            |
|*  2 |   HASH JOIN               |           |   100M|    23G|   762M|  2846K  (4)| 01:59:01 |        |      |            |
|   3 |    VIEW                   |           |   100M|    10G|       |  1214K  (5)| 00:50:46 |        |      |            |
|*  4 |     COUNT STOPKEY         |           |       |       |       |            |          |        |      |            |
|   5 |      PX COORDINATOR       |           |       |       |       |            |          |        |      |            |
|   6 |       PX SEND QC (RANDOM) | :TQ10000  |   416M|  6749M|       |  1214K  (5)| 00:50:46 |  Q1,00 | P->S | QC (RAND)  |
|*  7 |        COUNT STOPKEY      |           |       |       |       |            |          |  Q1,00 | PCWC |            |
|   8 |         PX BLOCK ITERATOR |           |   416M|  6749M|       |  1214K  (5)| 00:50:46 |  Q1,00 | PCWC |            |
|   9 |          TABLE ACCESS FULL| BIG_TABLE |   416M|  6749M|       |  1214K  (5)| 00:50:46 |  Q1,00 | PCWP |            |
|  10 |    VIEW                   |           |   100M|    12G|       |  1214K  (5)| 00:50:46 |        |      |            |
|* 11 |     COUNT STOPKEY         |           |       |       |       |            |          |        |      |            |
|  12 |      PX COORDINATOR       |           |       |       |       |            |          |        |      |            |
|  13 |       PX SEND QC (RANDOM) | :TQ20000  |   416M|    10G|       |  1214K  (5)| 00:50:46 |  Q2,00 | P->S | QC (RAND)  |
|* 14 |        COUNT STOPKEY      |           |       |       |       |            |          |  Q2,00 | PCWC |            |
|  15 |         PX BLOCK ITERATOR |           |   416M|    10G|       |  1214K  (5)| 00:50:46 |  Q2,00 | PCWC |            |
|  16 |          TABLE ACCESS FULL| BIG_TABLE |   416M|    10G|       |  1214K  (5)| 00:50:46 |  Q2,00 | PCWP |            |
----------------------------------------------------------------------------------------------------------------------------
Predicate Information (identified by operation id):
---------------------------------------------------

   2 - access("T1"."n1"="T2"."n1")
   4 - filter(ROWNUM<=100000000)
   7 - filter(ROWNUM<=100000000)
  11 - filter(ROWNUM<=100000000)
  14 - filter(ROWNUM<=100000000)

This is a rather synthetic query but there are a few things that I would like to point out. First, this query uses a parallel hint with 3 values representing table/degree/instances, however instances has been deprecated (see 10.2 parallel hint documentation). In this case the DOP is calculated by degree * instances or 16, not DOP=8 involving 2 instances. Note that the rownum filter is causing all the rows from the tables to be sent back to the QC for the COUNT STOPKEY operation thus causing the execution plan to serialize, denoted by the P->S in the IN-OUT column.

Riyaj had enabled sql trace for the QC and the TKProf output is such:

Rows     Row Source Operation
-------  ---------------------------------------------------
      1  SORT AGGREGATE (cr=152 pr=701158 pw=701127 time=1510221226 us)
98976295   HASH JOIN  (cr=152 pr=701158 pw=701127 time=1244490336 us)
100000000    VIEW  (cr=76 pr=0 pw=0 time=200279054 us)
100000000     COUNT STOPKEY (cr=76 pr=0 pw=0 time=200279023 us)
100000000      PX COORDINATOR  (cr=76 pr=0 pw=0 time=100270084 us)
      0       PX SEND QC (RANDOM) :TQ10000 (cr=0 pr=0 pw=0 time=0 us)
      0        COUNT STOPKEY (cr=0 pr=0 pw=0 time=0 us)
      0         PX BLOCK ITERATOR (cr=0 pr=0 pw=0 time=0 us)
      0          TABLE ACCESS FULL BIG_TABLE_NAME_CHANGED_12 (cr=0 pr=0 pw=0 time=0 us)
100000000    VIEW  (cr=76 pr=0 pw=0 time=300298770 us)
100000000     COUNT STOPKEY (cr=76 pr=0 pw=0 time=200298726 us)
100000000      PX COORDINATOR  (cr=76 pr=0 pw=0 time=200279954 us)
      0       PX SEND QC (RANDOM) :TQ20000 (cr=0 pr=0 pw=0 time=0 us)
      0        COUNT STOPKEY (cr=0 pr=0 pw=0 time=0 us)
      0         PX BLOCK ITERATOR (cr=0 pr=0 pw=0 time=0 us)
      0          TABLE ACCESS FULL BIG_TABLE_NAME_CHANGED_12 (cr=0 pr=0 pw=0 time=0 us)

Note that the Rows column contains zeros for many of the row sources because this trace is only for the QC, not the slaves, and thus only QC rows will show up in the trace file. Something to be aware of if you decide to use sql trace with parallel execution.

Off To The Lab: Myth Busting Or Bust!

I wanted to take a query like TC#1 and run it in my own environment so I could do more monitoring of it. Given the alleged myth had to do with interconnect traffic of cross-instance (inter-node) parallel execution, I wanted to be certain to gather the appropriate data. I ran several tests using a similar query on a similar sized data set (by row count) as the initial example. I ran all my experiments on a Oracle Real Application Clusters version 11.1.0.7 consisting of eight nodes, each with two quad-core CPUs. The interconnect is InfiniBand and the protocol used is RDS (Reliable Datagram Sockets).

Before I get into the experiments I think it is worth mentioning that Oracle’s parallel execution (PX), which includes Parallel Query (PQ), PDML & PDDL, can consume vast amounts of resources. This is by design. You see, the idea of Oracle PX is to dedicate a large amount of resources (processes) to a problem by breaking it up into many smaller pieces and then operate on those pieces in parallel. Thus the more parallelism that is used to solve a problem, the more resources it will consume, assuming those resources are available. That should be fairly obvious, but I think it is worth stating.

For my experiments I used a table that contains just over 468M rows.

Below is my version of TC#1. The query is a self-join on a unique key and the table is range partitioned by DAY_KEY into 31 partitions. Note that I create a AWR snapshot immediately before and after the query.

exec dbms_workload_repository.create_snapshot

select /* &amp;amp;&amp;amp;1 */
       /*+ parallel (t1, 16) parallel (t2, 16) */
       min (t1.bsns_unit_key + t2.bsns_unit_key),
       max (t1.day_key + t2.day_key),
       avg (t1.day_key + t2.day_key),
       max (t1.bsns_unit_typ_cd),
       max (t2.curr_ind),
       max (t1.load_dt)
from   dwb_rtl_trx t1,
       dwb_rtl_trx t2
where  t1.trx_nbr = t2.trx_nbr;

exec dbms_workload_repository.create_snapshot

Experiment Results Using Fixed DOP=16

I ran my version of TC#1 across a varying number of nodes by using Oracle services (instance_groups and parallel_instance_group have been deprecated in 11g), but kept the DOP constant at 16 for all the tests. Below is a table of the experiment results.

Nodes
Elapsed Time
SQL Monitor Report
AWR Report
AWR SQL Report
1
00:04:54.12
2
00:03:55.35
4
00:02:59.24
8
00:02:14.39

Seemingly contrary to what many people would probably guess, the execution times got better the more nodes that participated in the query even though the DOP constant throughout each of tests.

Measuring The Interconnect Traffic

One of the new additions to the AWR report in 11g was the inclusion of interconnect traffic by client. This section is near the bottom of the report and looks like such:
Interconnect Throughput By Client
This allows the PQ message traffic to be tracked, whereas in prior releases it was not.

Even though AWR contains the throughput numbers (as in megabytes per second) I thought it would be interesting to see how much data was being transferred, so I used the following query directly against the AWR data. I put a filter predicate on to return only where there DIFF_RECEIVED_MB >= 10MB so the instances that were not part of the execution are filtered out, as well as the single instance execution.

break on snap_id skip 1
compute sum of DIFF_RECEIVED_MB on SNAP_ID
compute sum of DIFF_SENT_MB on SNAP_ID

select *
from   (select   snap_id,
                 instance_number,
                 round ((bytes_sent - lag (bytes_sent, 1) over
                   (order by instance_number, snap_id)) / 1024 / 1024) diff_sent_mb,
                 round ((bytes_received - lag (bytes_received, 1) over
                   (order by instance_number, snap_id)) / 1024 / 1024) diff_received_mb
        from     dba_hist_ic_client_stats
        where    name = 'ipq' and
                 snap_id between 910 and 917
        order by snap_id,
                 instance_number)
where  snap_id in (911, 913, 915, 917) and
       diff_received_mb &amp;gt;= 10
/

SNAP_ID    INSTANCE_NUMBER DIFF_SENT_MB DIFF_RECEIVED_MB
---------- --------------- ------------ ----------------
       913               1        11604            10688
                         2        10690            11584
**********                 ------------ ----------------
sum                               22294            22272

       915               1         8353             8350
                         2         8133             8418
                         3         8396             8336
                         4         8514             8299
**********                 ------------ ----------------
sum                               33396            33403

       917               1         5033             4853
                         2         4758             4888
                         3         4956             4863
                         4         5029             4852
                         5         4892             4871
                         6         4745             4890
                         7         4753             4889
                         8         4821             4881
**********                 ------------ ----------------
sum                               38987            38987

As you can see from the data, the more nodes that were involved in the execution, the more interconnect traffic there was, however, the execution times were best with 8 nodes.

Further Explanation Of Riyaj’s Issue

If you read Riyaj’s post, you noticed that he observed worse, not better as I did, elapsed times when running on two nodes versus one. How could this be? It was noted in the comment thread of that post that the configuration was using Gig-E as the interconnect in a Solaris IPMP active-passive configuration. This means the interconnect speeds would be capped at 128MB/s (1000Mbps), the wire speed of Gig-E. This is by all means is an inadequate configuration to use cross-instance parallel execution.

There is a whitepaper entitled Oracle SQL Parallel Execution that discusses many of the aspects of Oracle’s parallel execution. I would highly recommend reading it. This paper specifically mentions:

If you use a relatively weak interconnect, relative to the I/O bandwidth from the server to the storage configuration, then you may be better of restricting parallel execution to a single node or to a limited number of nodes; inter-node parallel execution will not scale with an undersized interconnect.

I would assert that this is precisely the root cause (insufficient interconnect bandwidth for cross-instance PX) behind the issues that Riyaj observed, thus making his execution slower on two nodes than one node.

The Advantage Of Hash Partitioning/Subpartitioning And Full Partition-Wise Joins

At the end of my comment on Riyaj’s blog, I mentioned:

If a DW frequently uses large table to large table joins, then hash partitioning or subpartitioning would yield added gains as partition-wise joins will be used.

I thought that it would be both beneficial and educational to extend TC#1 and implement hash subpartitioning so that the impact could be measured on both query elapsed time and interconnect traffic. In order for a full partition-wise join to take place, the table must be partitioned/subpartitioned on the join key column, so in this case I’ve hash subpartitioned on TRX_NBR. See the Oracle Documentation on Partition-Wise Joins for a more detailed discussion on PWJ.

Off To The Lab: Partition-Wise Joins

I’ve run through the exact same test matrix with the new range/hash partitioning model and below are the results.

Nodes
Elapsed Time
SQL Monitor Report
AWR Report
AWR SQL Report
1
00:02:42.41
2
00:01:37.29
4
00:01:12.82
8
00:01:05.04

As you can see by the elapsed times, the range/hash partitioning model with the full partition-wise join has decreased the overall execution time by around a factor of 2X compared to the range only partitioned version.

Now let’s take a look at the interconnect traffic for the PX messages:

break on snap_id skip 1
compute sum of DIFF_RECEIVED_MB on SNAP_ID
compute sum of DIFF_SENT_MB on SNAP_ID

select *
from   (select   snap_id,
                 instance_number,
                 round ((bytes_sent - lag (bytes_sent, 1) over
                   (order by instance_number, snap_id)) / 1024 / 1024) diff_sent_mb,
                 round ((bytes_received - lag (bytes_received, 1) over
                   (order by instance_number, snap_id)) / 1024 / 1024) diff_received_mb
        from     dba_hist_ic_client_stats
        where    name = 'ipq' and
                 snap_id between 1041 and 1048
        order by snap_id,
                 instance_number)
where  snap_id in (1042,1044,1046,1048) and
       diff_received_mb &amp;gt;= 10
/
no rows selected

Hmm. No rows selected?!? I had previously put in the predicate “DIFF_RECEIVED_MB >= 10MB” to filter out the nodes that were not participating in the parallel execution. Let me remove that predicate rerun the query.

   SNAP_ID INSTANCE_NUMBER DIFF_SENT_MB DIFF_RECEIVED_MB
---------- --------------- ------------ ----------------
      1042               1            8                6
                         2            2                3
                         3            2                3
                         4            2                3
                         5            2                3
                         6            2                3
                         7            2                3
                         8            2                3
**********                 ------------ ----------------
sum                                  22               27

      1044               1            7                7
                         2            3                2
                         3            2                2
                         4            2                2
                         5            2                2
                         6            2                2
                         7            2                2
                         8            2                2
**********                 ------------ ----------------
sum                                  22               21

      1046               1            1                2
                         2            1                2
                         3            1                2
                         4            3                1
                         5            1                1
                         6            1                1
                         7            1                1
                         8            1                1
**********                 ------------ ----------------
sum                                  10               11

      1048               1            6                5
                         2            1                2
                         3            3                2
                         4            1                2
                         5            1                2
                         6            1                2
                         7            1                2
                         8            1                2
**********                 ------------ ----------------
sum                                  15               19

Wow, there is almost no interconnect traffic at all. Let me verify with the AWR report from the 8 node execution.

The AWR report confirms that there is next to no interconnect traffic for the PWJ version of TC#1. The reason for this is that since the table is hash subpartitoned on the join column each of the subpartitions can be joined to each other minimizing the data sent between parallel execution servers. If you look at the execution plan (see the AWR SQL Report) from the first set of experiments you will notice that the broadcast method for each of the tables is HASH but in the range/hash version of TC#1 there is no broadcast at all for either of the two tables. The full partition-wise join behaves logically the same way that a shared-nothing database would; each of the parallel execution servers works on its partition which does not require data from any other partition because of the hash partitioning on the join column. The main difference is that in a shared-nothing database the data is physically hash distributed amongst the nodes (each node contains a subset of all the data) where as all nodes in a Oracle RAC database have access to the all the data.

Parting Thoughts

Personally I see no myth about cross-instance (inter-node) parallel execution and interconnect traffic, but frequently I see misunderstandings and misconceptions. As shown by the data in my experiment, TC#1 (w/o hash subpartitioning) running on eight nodes is more than 2X faster than running on one node using exactly the same DOP. Interconnect traffic is not a bad thing as long as the interconnect is designed to support the workload. Sizing the interconnect is really no different than sizing any other component of your cluster (memory/CPU/disk space/storage bandwidth). If it is undersized, performance will suffer. Depending on the number and speed of the host CPUs and the speed and bandwidth of the interconnect, your results may vary.

By hash subpartioning the table the interconnect traffic was all but eliminated and the query execution times were around 2X faster than the non-hash subpartition version of TC#1. This is obviously a much more scalable solution and one of the main reasons to leverage hash (sub)partitioning in a data warehouse.