Category: Parallel Execution

Pitfalls of Using Parallel Execution with SQL Developer

[This post was originally published on 2012/02/29 and was hidden shortly thereafter. I’m un-hiding it as of 2012/05/30 with some minor edits.]

Many Oracle Database users like tools with GUI interfaces because they add features and functionality that are not easily available from the command line interfaces like SQL*Plus. One of the more popular tools from my experiences is Oracle SQL Developer in part because it’s a free tool from Oracle. Given SQL Developer’s current design (as of version 3.1.07.42), some issues frequently show up when using it with Oracle Databases with Parallel Execution. SQL Developer also contains a bug that exacerbates this issue as well.

The Issue

The crux of the issue with SQL Developer (and possibly other similar tools) and Parallel Execution comes down to how the application uses cursors. By default, SQL Developer has the array fetch size set to 50. This means that for any cursor SQL Developer opens for scolling, it will fetch the first 50 rows and when you scroll to the bottom of those rows in a grid, it will fetch the next 50 rows and so on. The array size can be controlled by going into Properties and changing Database -> Advanced -> SQL Array Fetch Size which allows for a max setting of 500 rows. This is good in the sense that the JDBC application can fetch an array of rows with a single JDBC database call, however, using this approach with Parallel Execution, the PX servers used for this cursor will not be released until the cursor is canceled or the last row is fetched. Currently the only way to force reading until the end of cursor in SQL Developer is to issue a Control+End in the data grid. As a result, any action that uses Parallel Execution and has not fetched all the rows or is not canceled/closed, will squat those Parallel Execution resources and prevent them from being used by other users. If enough users have open cursors backed by Parallel Execution, then it is possible that it could consume all of the Parallel Execution servers and will result in Parallel Execution requests being forced to Serial Execution because resources are not available, even if the system is completely idle.

The SQL Developer Bug

When experimenting with SQL Developer for this blog post I also found and filed a bug (bug 13706186) because it leaks cursors when a user browses data in a table by expanding Tables (in the left pane), clicking on a table name and then the Data tab. Unfortunately this bug adds insult to injury if the table is decorated with a parallel degree attribute because the leaked cursors do not release the Parallel Execution servers until the session is closed, thus preventing other sessions from using them.

This bug is easily demonstrated using the SCOTT schema, but any schema or table will do as long as the table has more rows than the array fetch size. For my example, I’m using a copy of the EMP table, called EMP2, which contains 896 rows and was created using the following SQL:

The steps to demonstrate this issue are as follows:

  1. Set up the EMP2 table using the above script or equivalent.
  2. Use SQL Developer to connect to the SCOTT schema or equivalent.
  3. Expand Tables in the Browser in the left pane.
  4. Click EMP2.
  5. Click on the Data tab of EMP2.
  6. Check the open cursors.
  7. Close the EMP2 tab in the right pane.
  8. Check the open cursors.
  9. Goto Step #4 and repeat.

I’m going to repeat this process two times for a total of three open and close operations. I’ll use this query to show the open cursors for the Data grid query for the EMP2 table (adjust if necessary if you are not using my example):

If we look at the output (scott_emp2_cursors.txt below the EM graphic) from the query we’ll see that the first time the EMP2 Data tab is opened, it opens two identical cursors (sql_exec_id 16777216 & 16777217). After closing the EMP2 Data tab, 16777216 is still open. The second time the EMP2 Data tab is opened, two more identical cursors are opened (sql_exec_id 16777218 & 16777219). The third time two more cursors are opened (sql_exec_id 16777220 & 16777221). After closing the tabs we still see three cursors open (sql_exec_id 16777216, 16777218 & 16777220), each of which are squatting two PX servers.

This behavior can also be seen in 11g Enterprise Manager (or dbconsole) on the SQL Monitoring page by sorting the statements by time — notice the (leaked cusor) statements with the green spinning pinwheels after all tabs have been closed (all parallel statements are monitored by default).

Sqlmon Cursor Leak

By the way, the cursor leak applies for tables without a parallel degree setting as well, but has more significant impact if the table is parallel because PX servers are a shared resource.

(scott_emp2_cursors.txt below)

My Thoughts

Obviously the cursor leak is a SQL Developer bug and needs fixing, but in the interim, DBAs should be aware that this behavior can have a global impact because Parallel Execution servers are shared by all database users. Also, if SQL Developer users are running Parallel Queries and keep the results grid open but do not fetch all the rows by using the Control+End functionality, those Parallel Execution servers will be unavailable for other users to use and could negatively impact other users queries leveraging Parallel Execution.

Personally I’d like to see a few enhancements to SQL Developer to avoid these pitfalls:

  1. Disable Parallel Execution for Table Data browsing.

    Browsing data in a table via a scrollable grid is a “small data” problem and does not require the “big data” resources of Parallel Execution. This could easily be done by adding a NOPARALLEL hint when SQL Developer builds the query string.

  2. Add a property with functionality to read all rows w/o requiring the Control+End command (always fetch until end of cursor) or until a max number or rows are read (or a max amount of memory is used for the result set), then close the cursor.

    By fetching until end of cursor or fetching a max number of rows and closing the cursor, the client will release any Parallel Execution resources it may have used. Obviously fetching all rows could be a problem with large result sets and cause SQL Developer to run out of memory and crash, which would be a bad user experience, but not releasing PX resources can easily lead to many bad user experiences.

    I’ve seen the issue of potentially large result sets dealt with in other JDBC based GUI tools that connect to parallel databases by the tool appending a “LIMIT X” clause to queries where the user can control the value for “X” in a property setting. To the best of my knowledge, no other parallel databases support cursors in the way that Oracle does (being able to fetch rows, pause, then resume fetching), so there is no issue there with squatting resources with them (once they start fetching they must continue until the last row is fetched or the statement is canceled). As of release 11.2, Oracle does not support the LIMIT clause but this functionality could be done in the client by using some some upper limit on “array fetch size” * “number of fetches” or wrapping queries with a “select * from ([query text]) where rownum <= X" or similiar.

There are some “clever” server-side ways to deal with this as well, such as adding a logon trigger that disables parallel query if the V$SESSION.PROGRAM name is “SQL Developer”, but a robust, clean client side solution is preferred by myself and likely other DBAs as well. It’s really just a simple matter of programming.

Summary

When using SQL Developer or similar tools, be aware of the potential to squat Parallel Execution resources if the client tool has open cursors. Educate your SQL Developer users on how they can play well with others in an Oracle Database using Parallel Execution by closing unneeded tabs with open cursors. Be aware of the impact of the cursor leak bug in SQL Developer 3.1.07.42 (and possibly previous releases) until it is fixed.

Personally I’d like to see an enhancement to deal with this behavior and I don’t think it would require much programming. It certainly would allow DBAs to feel more confident that SQL Developer is a tool that can be used on production systems and not result in any issues. What are your thoughts? Do my enhancement requests seem reasonable and warranted?

Counting Triangles Faster

A few weeks back one of the Vertica developers put up a blog post on counting triangles in an undirected graph with reciprocal edges. The author was comparing the size of the data and the elapsed times to run this calculation on Hadoop and Vertica and put up the work on github and encouraged others: “do try this at home.” So I did.

Compression

Vertica draws attention to the fact that their compression brought the size of the 86,220,856 tuples down to 560MB in size, from a flat file size of 1,263,234,543 bytes resulting in around a 2.25X compression ratio. My first task was to load the data and see how Oracle’s Hybrid Columnar Compression would compare. Below is a graph of the sizes.

As you can see, Oracle’s default HCC query compression (query high) compresses the data over 2X more than Vertica and even HCC query low compression beats out Vertica’s compression number.

Query Elapsed Times

The closest gear I had to Vertica’s hardware was an Exadata X2-2 system — both use 2 socket, 12 core Westmere-EP nodes. While one may try to argue that Exadata may somehow influence the execution times, I’ll point out that I was using In-Memory Parallel Execution so no table data was even read from spinning disk or Exadata Flash Cache — it’s all memory resident in the database nodes’ buffer cache. This seems to be inline with how Vertica executed their tests though not explicitly stated (it’s a reasonable assertion).

After I loaded the data and gathered table stats, I fired off the exact same SQL query that Vertica used to count triangles to see how Oracle would compare. I ran the query on 1, 2 and 4 nodes just like Vertica. Below is a graph of the results.

As you can see, the elapsed times are reasonably close but overall in the favor of Oracle winning 2 of the 3 scale points as well as having a lower sum of the three executions: Vertica 519 seconds, Oracle 487 seconds — advantage Oracle of 32 seconds.

It Should Go Faster!

As a database performance engineer I was thinking to myself, “it really should go faster!” I took a few minutes to look over things to see what could make this perform better. You might think I was looking at parameters or something like that, but you would be wrong. After a few minutes of looking at the query and the execution plan it became obvious to me — it could go faster! I made a rather subtle change to the SQL query and reran my experiments. With the modified SQL query Oracle was now executing twice as fast on 1 node than Vertica was on 4 nodes. Also, on 4 nodes, the elapsed time came in at just 14 seconds, compared to the 97 seconds Vertica reported — a difference of almost 7X! Below are the combined results.

What’s The Go Fast Trick?

I was thinking a bit more about the problem at hand — we need to count vertices but not count them twice since they are reciprocal. Given that for any edge, it exists in both directions, the query can be structured like Vertica wrote it — doing the filtering with a join predicate like e1.source < e2.source to eliminate the duplicates or we can simply use a single table filter predicate like source < dest before the join takes place. One of the first things they taught me in query planning and optimization class was to filter early! That notation pays off big here because the early filter cuts the rows going into the first join as well as the output of the first join by a factor of 2 — 1.8 billion rows output vs. 3.6 billion. That’s a huge savings not only in the first join, but also in the second join as well.

Here is what my revised query looks like:

with
  e1 as (select * from edges where source < dest),
  e2 as (select * from edges where source < dest),
  e3 as (select * from edges where source > dest)
select count(*)
from e1
join e2 on (e1.dest = e2.source)
join e3 on (e2.dest = e3.source)
where e3.dest = e1.source

Summary

First, I’d like to thank the Vertica team for throwing the challenge out there and being kind enough to provide the data, code and their elapsed times. I always enjoy a challenge — especially one that I can improve upon. Now, I’m not going to throw any product marketing nonsense out there as that is certainly not my style (and there certainly is more than enough of that already), but rather I’ll just let the numbers do the talking. I’d also like to point out that this experiment was done without any structure other than the table. And in full disclosure, all of my SQL commands are available as well.

The other comment that I would make is that the new and improved execution times really make a mockery of the exercise when comparing to Hadoop MapReduce or Pig, but I would also mention that this test case is extremely favorable for parallel pipelined databases that can perform all in-memory operations and given the data set is so small, this is the obviously the case. Overall, in my opinion, a poor problem choice to compare the three technologies as it obviously (over) highlights the right tool for the job cliche.

Experiments performed on Oracle Database 11.2.0.2.

Github source code: https://gist.github.com/grahn/1289188

Reading Parallel Execution Plans With Bloom Pruning And Composite Partitioning

You’ve probably heard sayings like “sometimes things aren’t always what they seem” and “people lie”. Well, sometimes execution plans lie. It’s not really by intent, but it is sometimes difficult (or impossible) to represent everything in a query execution tree in nice tabular format like dbms_xplan gives.

One of the optimizations that was introduced back in 10gR2 was the use of bloom filters. Bloom filters can be used in two ways: 1) for filtering or 2) for partition pruning (bloom pruning) starting with 11g. Frequently the data models used in data warehousing are dimensional models (star or snowflake) and most Oracle warehouses use simple range (or interval) partitioning on the fact table date key column as that is the filter that yields the largest I/O reduction from partition pruning (most queries in a time series star schema include a time window, right!). As a result, it is imperative that the join between the date dimension and the fact table results in partition pruning.

Let’s consider a basic two table join between a date dimension and a fact table. For these examples I’m using STORE_SALES and DATE_DIM which are TPC-DS tables (I frequently use TPC-DS for experiments as it uses a dimensional (star) model and has a data generator.) STORE_SALES contains a 5 year window of data ranging from 1998-01-02 to 2003-01-02.

Range Partitioned STORE_SALES

For this example I used range partitioning on STORE_SALES.SS_SOLD_DATE_SK using 60 one month partitions (plus 1 partition for NULL SS_SOLD_DATE_SK values) that align with the date dimension (DATE_DIM) on calendar month boundaries. STORE_SALES has the parallel attribute (PARALLEL 16 in this case) set on the table to enable Oracle’s Parallel Execution (PX). Let’s look at the execution time and plan for our test query:

SQL> select
  2    max(ss_sales_price)
  3  from
  4    store_sales ss,
  5    date_dim d
  6  where
  7    ss_sold_date_sk = d_date_sk and
  8    d_year = 2000
  9  ;

MAX(SS_SALES_PRICE)
-------------------
                200

Elapsed: 00:00:41.67

SQL> select * from table(dbms_xplan.display_cursor(format=>'basic +parallel +partition +predicate'));

PLAN_TABLE_OUTPUT
--------------------------------------------------------------------------------------------------
EXPLAINED SQL STATEMENT:
------------------------
select   max(ss_sales_price) from   store_sales ss,   date_dim d where
 ss_sold_date_sk=d_date_sk and   d_year = 2000

Plan hash value: 934332680

---------------------------------------------------------------------------------------------------
| Id  | Operation                     | Name         | Pstart| Pstop |    TQ  |IN-OUT| PQ Distrib |
--------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT              |             |       |       |        |      |            |
|   1 |  SORT AGGREGATE               |             |       |       |        |      |            |
|   2 |   PX COORDINATOR              |             |       |       |        |      |            |
|   3 |    PX SEND QC (RANDOM)        | :TQ10001    |       |       |  Q1,01 | P->S | QC (RAND)  |
|   4 |     SORT AGGREGATE            |             |       |       |  Q1,01 | PCWP |            |
|*  5 |      HASH JOIN                |             |       |       |  Q1,01 | PCWP |            |
|   6 |       BUFFER SORT             |             |       |       |  Q1,01 | PCWC |            |
|   7 |        PART JOIN FILTER CREATE| :BF0000     |       |       |  Q1,01 | PCWP |            |
|   8 |         PX RECEIVE            |             |       |       |  Q1,01 | PCWP |            |
|   9 |          PX SEND BROADCAST    | :TQ10000    |       |       |        | S->P | BROADCAST  |
|* 10 |           TABLE ACCESS FULL   | DATE_DIM    |       |       |        |      |            |
|  11 |       PX BLOCK ITERATOR       |             |:BF0000|:BF0000|  Q1,01 | PCWC |            |
|* 12 |        TABLE ACCESS FULL      | STORE_SALES |:BF0000|:BF0000|  Q1,01 | PCWP |            |
--------------------------------------------------------------------------------------------------

Predicate Information (identified by operation id):
---------------------------------------------------

   5 - access("SS_SOLD_DATE_SK"="D_DATE_SK")
  10 - filter("D_YEAR"=2000)
  12 - access(:Z>=:Z AND :Z<=:Z)

In this execution plan you can see the creation of the bloom filter on line 7 which is populated from the values of D_DATE_SK from DATE_DIM. That bloom filter is then used to partition prune on the STORE_SALES table. This is why we see :BF0000 in the Pstart/Pstop columns.

Range-Hash Composite Partitioned STORE_SALES

For the next experiment, I kept the same range partitioning scheme but also added hash subpartitioning using the SS_ITEM_SK column (using 4 hash subpartitions per range partition). STORE_SALES2 has 61 range partitions x 4 hash subpartitions for a total of 244 aggregate partitions. Let’s look at the execution plan for our test query:

SQL> select
  2    max(ss_sales_price)
  3  from
  4    store_sales2 ss,
  5    date_dim d
  6  where
  7    ss_sold_date_sk = d_date_sk and
  8    d_year = 2000
  9  ;

MAX(SS_SALES_PRICE)
-------------------
                200

Elapsed: 00:00:41.06

SQL> select * from table(dbms_xplan.display_cursor(format=>'basic +parallel +partition +predicate'));

PLAN_TABLE_OUTPUT
--------------------------------------------------------------------------------------------------
EXPLAINED SQL STATEMENT:
------------------------
select   max(ss_sales_price) from   store_sales2 ss,   date_dim d where
  ss_sold_date_sk=d_date_sk and   d_year = 2000

Plan hash value: 2496395846

---------------------------------------------------------------------------------------------------
| Id  | Operation                     | Name         | Pstart| Pstop |    TQ  |IN-OUT| PQ Distrib |
---------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT              |              |       |       |        |      |            |
|   1 |  SORT AGGREGATE               |              |       |       |        |      |            |
|   2 |   PX COORDINATOR              |              |       |       |        |      |            |
|   3 |    PX SEND QC (RANDOM)        | :TQ10001     |       |       |  Q1,01 | P->S | QC (RAND)  |
|   4 |     SORT AGGREGATE            |              |       |       |  Q1,01 | PCWP |            |
|*  5 |      HASH JOIN                |              |       |       |  Q1,01 | PCWP |            |
|   6 |       BUFFER SORT             |              |       |       |  Q1,01 | PCWC |            |
|   7 |        PART JOIN FILTER CREATE| :BF0000      |       |       |  Q1,01 | PCWP |            |
|   8 |         PX RECEIVE            |              |       |       |  Q1,01 | PCWP |            |
|   9 |          PX SEND BROADCAST    | :TQ10000     |       |       |        | S->P | BROADCAST  |
|* 10 |           TABLE ACCESS FULL   | DATE_DIM     |       |       |        |      |            |
|  11 |       PX BLOCK ITERATOR       |              |     1 |     4 |  Q1,01 | PCWC |            |
|* 12 |        TABLE ACCESS FULL      | STORE_SALES2 |     1 |   244 |  Q1,01 | PCWP |            |
---------------------------------------------------------------------------------------------------

Predicate Information (identified by operation id):
---------------------------------------------------

   5 - access("SS_SOLD_DATE_SK"="D_DATE_SK")
  10 - filter("D_YEAR"=2000)
  12 - access(:Z>=:Z AND :Z<=:Z)

Once again you can see the creation of the bloom filter from DATE_DIM on line 7, however you will notice that we no longer see :BF0000 as our Pstart and Pstop values. In fact, it may appear that partition pruning is not taking place at all as we see 1/244 as our Pstart/Pstop values. However, if we compare the execution times between the range and range/hash queries you note they are identical to the nearest second, thus there really is no way that partition (bloom) pruning is not taking place. After all, if this plan read all 5 years of data it would take 5 times as long as reading just 1 year and that certainly is not the case. Would you have guessed that partition pruning is taking place had we not worked though the range only experiment first? Hmmm…

So What Is Going On?

Before we dive in, let’s quickly look at what the execution plans would look like if PX was not used (using serial execution).

--
-- Range Partitioned, Serial Execution
--

---------------------------------------------------------------------
| Id  | Operation                     | Name        | Pstart| Pstop |
---------------------------------------------------------------------
|   0 | SELECT STATEMENT              |             |       |       |
|   1 |  SORT AGGREGATE               |             |       |       |
|*  2 |   HASH JOIN                   |             |       |       |
|   3 |    PART JOIN FILTER CREATE    | :BF0000     |       |       |
|*  4 |     TABLE ACCESS FULL         | DATE_DIM    |       |       |
|   5 |    PARTITION RANGE JOIN-FILTER|             |:BF0000|:BF0000|
|   6 |     TABLE ACCESS FULL         | STORE_SALES |:BF0000|:BF0000|
---------------------------------------------------------------------
              
--
-- Range-Hash Composite Partitioned, Serial Execution
--
                                       
----------------------------------------------------------------------
| Id  | Operation                     | Name         | Pstart| Pstop |
----------------------------------------------------------------------
|   0 | SELECT STATEMENT              |              |       |       |
|   1 |  SORT AGGREGATE               |              |       |       |
|*  2 |   HASH JOIN                   |              |       |       |
|   3 |    PART JOIN FILTER CREATE    | :BF0000      |       |       |
|*  4 |     TABLE ACCESS FULL         | DATE_DIM     |       |       |
|   5 |    PARTITION RANGE JOIN-FILTER|              |:BF0000|:BF0000|
|   6 |     PARTITION HASH ALL        |              |     1 |     4 |
|   7 |      TABLE ACCESS FULL        | STORE_SALES2 |     1 |   244 |
----------------------------------------------------------------------

When using composite partitioning, pruning is placed on one of the partition iterators. When the two nested partition iterators (range/hash in this case) are changed into a block iterator (line 14 – PX BLOCK ITERATOR), we have to pick a “victim” in the query plan tree since only one node in the plan needs now to carry the pruning information (with PX the pruning is really done by the QC, not the row source like in serial plans). As a result, the information associated the the victimized partition iterator is lost in the explain plan. This is why there is no :BF0000 for Pstart/Pstop in the plan in this case. It is probably more accurate to have the parallel plans for both range and range/hash look like this:

---------------------------------------------------------------------------------------------------
| Id  | Operation                     | Name         | Pstart| Pstop |    TQ  |IN-OUT| PQ Distrib |
---------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT              |              |       |       |        |      |            |
|   1 |  SORT AGGREGATE               |              |       |       |        |      |            |
|   2 |   PX COORDINATOR              |              |       |       |        |      |            |
|   3 |    PX SEND QC (RANDOM)        | :TQ10001     |       |       |  Q1,01 | P->S | QC (RAND)  |
|   4 |     SORT AGGREGATE            |              |       |       |  Q1,01 | PCWP |            |
|*  5 |      HASH JOIN                |              |       |       |  Q1,01 | PCWP |            |
|   6 |       BUFFER SORT             |              |       |       |  Q1,01 | PCWC |            |
|   7 |        PART JOIN FILTER CREATE| :BF0000      |       |       |  Q1,01 | PCWP |            |
|   8 |         PX RECEIVE            |              |       |       |  Q1,01 | PCWP |            |
|   9 |          PX SEND BROADCAST    | :TQ10000     |       |       |        | S->P | BROADCAST  |
|* 10 |           TABLE ACCESS FULL   | DATE_DIM     |       |       |        |      |            |
|  11 |       PX BLOCK ITERATOR       |              |       |       |  Q1,01 | PCWC |            |
|* 12 |        TABLE ACCESS FULL      | STORE_SALES  |:BF0000|:BF0000|  Q1,01 | PCWP |            |
---------------------------------------------------------------------------------------------------

Where the bloom pruning is on the TABLE ACCESS FULL row source. This is because there is no Pstart/Pstop for a PX BLOCK ITERATOR row source (it’s block ranges, so partition information is lost – it had been contained in level above this).

Hopefully this helps you understand and correctly identify execution plans contain bloom pruning even though at first glance you may not think they do. If you are uncertain, use the execution stats for the query looking at metrics like amount of data read and execution times to provide some empirical insight.

The Core Performance Fundamentals Of Oracle Data Warehousing – Parallel Execution

[back to Introduction]

Leveraging Oracle’s Parallel Execution (PX) in your Oracle data warehouse is probably the most important feature/technology one can use to speed up operations on large data sets.  PX is not, however, “go fast” magic pixi dust for any old operation (if thats what you think, you probably don’t understand the parallel computing paradigm). With Oracle PX, a large task is broken up into smaller parts, sub-tasks if you will, and each sub-task is then worked on in parallel.  The goal of Oracle PX: divide and conquer.  This allows a significant amount of hardware resources to be engaged in solving a single problem and is what allows the Oracle database to scale up and out when working with large data sets.

I though I’d touch on some basics and add my observations but this is by far not an exhaustive write up on Oracle’s Parallel Execution.  There is an entire chapter in the Oracle Database documentation on PX as well as several white papers.  I’ve listed all these in the Resources section at the bottom of this post.  Read them, but as always, feel free to post questions/comments here.  Discussion adds great value.

A Basic Example of Parallel Execution

Consider a simple one table query like the one below.

You can see that the PX Coordinator (also known as the Query Coordinator or QC) breaks up the “work” into several chunks and those chunks are worked on by the PX Server Processes.  The technical term for the chunk a PX Server Process works on is called a granule.  Granules can either be block-based or partition-based.

When To Use Parallel Execution

PX is a key component in data warehousing as that is where large data sets usually exist.  The most common operations that use PX are queries (SELECTs) and data loads (INSERTs or CTAS).  PX is most commonly controlled by using the PARALLEL attribute on the object, although it can be controlled by hints or even Oracle’s Database Resource Manager.  If you are not using PX in your Oracle data warehouse than you are probably missing out on a shedload of performance opportunity.

When an object has its PARALLEL attribute set or the PARALLEL hint is used queries will leverage PX, but to leverage PX for DML operations (INSERT/DELETE/UPDATE) remember to alter your session by using the command:

alter session [enable|force] parallel dml;

Do Not Fear Parallel Execution

Since Oracle’s PX is designed to take advantage of multiple CPUs (or CPU cores) at a time, it can leverage significant hardware resources, if available.  From my experiences in talking with Oracle DBAs, the ability for PX to do this scares them. This results in DBAs implementing a relatively small degree of parallelism (DOP) for a system that could possibly support a much higher level (based on #CPUs).  Often times though, the system that PX is being run on is not a balanced system and frequently has much more CPU power than disk and channel bandwidth, so data movement from disk becomes the bottleneck well before the CPUs are busy.  This results in many statements like “Parallel Execution doesn’t work” or similar because the user/DBA isn’t observing a decrease in execution time with more parallelism.  Bottom line:  if the hardware resources are not available, the software certainly can not scale.

Just for giggles (and education), here is a snippet from top(1) from a node from an Oracle Database Machine running a single query (across all 8 database nodes) at DOP 256.

top - 20:46:44 up 5 days,  3:48,  1 user,  load average: 36.27, 37.41, 35.75
Tasks: 417 total,  43 running, 373 sleeping,   0 stopped,   1 zombie
Cpu(s): 95.6%us,  1.6%sy,  0.0%ni,  2.2%id,  0.0%wa,  0.2%hi,  0.4%si,  0.0%st
Mem:  74027752k total, 21876824k used, 52150928k free,   440692k buffers
Swap: 16771852k total,        0k used, 16771852k free, 13770844k cached

USER       PID  PR  NI  VIRT  SHR  RES S %CPU %MEM    TIME+  COMMAND
oracle   16132  16   0 16.4g 5.2g 5.4g R 63.8  7.6 709:33.02 ora_p011_orcl
oracle   16116  16   0 16.4g 4.9g 5.1g R 60.9  7.2 698:35.63 ora_p003_orcl
oracle   16226  15   0 16.4g 4.9g 5.1g R 59.9  7.2 702:01.01 ora_p028_orcl
oracle   16110  16   0 16.4g 4.9g 5.1g R 58.9  7.2 697:20.51 ora_p000_orcl
oracle   16122  15   0 16.3g 4.9g 5.0g R 56.9  7.0 694:54.61 ora_p006_orcl

(Quite the TIME+ column there, huh!)

Summary

In this post I’ve been a bit light on the technicals of PX, but that is mostly because 1) this is a fundamentals post and 2) there is a ton of more detail in the referenced documentation and I really don’t feel like republishing what already exists. Bottom line, Oracle Parallel Execution is a must for scaling performance in your Oracle data warehouse.  Take the time to understand how to leverage it to maximize performance in your environment and feel free to start a discussion here if you have questions.

References

Oracle Parallel Execution: Interconnect Myths And Misunderstandings

A number of weeks back I had come across a paper/presentation by Riyaj Shamsudeen entitled Battle of the Nodes: RAC Performance Myths (avaiable here). As I was looking through it I saw one example that struck me as very odd (Myth #3 – Interconnect Performance) and I contacted him about it. After further review Riyaj commented that he had made a mistake in his analysis and offered up a new example. I thought I’d take the time to discuss this as parallel execution seems to be one of those areas where many misconceptions and misunderstandings exist.

The Original Example

I thought I’d quickly discuss why I questioned the initial example. The original query Riyaj cited is this one:

select /*+ full(tl) parallel (tl,4) */
       avg (n1),
       max (n1),
       avg (n2),
       max (n2),
       max (v1)
from   t_large tl;

As you can see this is a very simple single table aggregation without a group by. The reason that I questioned the validity of this example in the context of interconnect performance is that the parallel execution servers (parallel query slaves) will each return exactly one row from the aggregation and then send that single row to the query coordinator (QC) which will then perform the final aggregation. Given that, it would seem impossible that this query could cause any interconnect issues at all.

Riyaj’s Test Case #1

Recognizing the original example was somehow flawed, Riyaj came up with a new example (I’ll reference as TC#1) which consisted of the following query:

select /*+ parallel (t1, 8,2) parallel (t2, 8, 2)  */
       min (t1.customer_trx_line_id + t2.customer_trx_line_id),
       max (t1.set_of_books_id + t2.set_of_books_id),
       avg (t1.set_of_books_id + t2.set_of_books_id),
       avg (t1.quantity_ordered + t2.quantity_ordered),
       max (t1.attribute_category),
       max (t2.attribute1),
       max (t1.attribute2)
from   (select *
        from   big_table
        where  rownum &amp;lt;= 100000000) t1,
       (select *
        from   big_table
        where  rownum &amp;lt;= 100000000) t2
where  t1.customer_trx_line_id = t2.customer_trx_line_id;

The execution plan for this query is:

----------------------------------------------------------------------------------------------------------------------------
| Id  | Operation                 | Name      | Rows  | Bytes |TempSpc| Cost (%CPU)| Time     |    TQ  |IN-OUT| PQ Distrib |
----------------------------------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT          |           |     1 |   249 |       |  2846K  (4)| 01:59:01 |        |      |            |
|   1 |  SORT AGGREGATE           |           |     1 |   249 |       |            |          |        |      |            |
|*  2 |   HASH JOIN               |           |   100M|    23G|   762M|  2846K  (4)| 01:59:01 |        |      |            |
|   3 |    VIEW                   |           |   100M|    10G|       |  1214K  (5)| 00:50:46 |        |      |            |
|*  4 |     COUNT STOPKEY         |           |       |       |       |            |          |        |      |            |
|   5 |      PX COORDINATOR       |           |       |       |       |            |          |        |      |            |
|   6 |       PX SEND QC (RANDOM) | :TQ10000  |   416M|  6749M|       |  1214K  (5)| 00:50:46 |  Q1,00 | P->S | QC (RAND)  |
|*  7 |        COUNT STOPKEY      |           |       |       |       |            |          |  Q1,00 | PCWC |            |
|   8 |         PX BLOCK ITERATOR |           |   416M|  6749M|       |  1214K  (5)| 00:50:46 |  Q1,00 | PCWC |            |
|   9 |          TABLE ACCESS FULL| BIG_TABLE |   416M|  6749M|       |  1214K  (5)| 00:50:46 |  Q1,00 | PCWP |            |
|  10 |    VIEW                   |           |   100M|    12G|       |  1214K  (5)| 00:50:46 |        |      |            |
|* 11 |     COUNT STOPKEY         |           |       |       |       |            |          |        |      |            |
|  12 |      PX COORDINATOR       |           |       |       |       |            |          |        |      |            |
|  13 |       PX SEND QC (RANDOM) | :TQ20000  |   416M|    10G|       |  1214K  (5)| 00:50:46 |  Q2,00 | P->S | QC (RAND)  |
|* 14 |        COUNT STOPKEY      |           |       |       |       |            |          |  Q2,00 | PCWC |            |
|  15 |         PX BLOCK ITERATOR |           |   416M|    10G|       |  1214K  (5)| 00:50:46 |  Q2,00 | PCWC |            |
|  16 |          TABLE ACCESS FULL| BIG_TABLE |   416M|    10G|       |  1214K  (5)| 00:50:46 |  Q2,00 | PCWP |            |
----------------------------------------------------------------------------------------------------------------------------
Predicate Information (identified by operation id):
---------------------------------------------------

   2 - access("T1"."n1"="T2"."n1")
   4 - filter(ROWNUM<=100000000)
   7 - filter(ROWNUM<=100000000)
  11 - filter(ROWNUM<=100000000)
  14 - filter(ROWNUM<=100000000)

This is a rather synthetic query but there are a few things that I would like to point out. First, this query uses a parallel hint with 3 values representing table/degree/instances, however instances has been deprecated (see 10.2 parallel hint documentation). In this case the DOP is calculated by degree * instances or 16, not DOP=8 involving 2 instances. Note that the rownum filter is causing all the rows from the tables to be sent back to the QC for the COUNT STOPKEY operation thus causing the execution plan to serialize, denoted by the P->S in the IN-OUT column.

Riyaj had enabled sql trace for the QC and the TKProf output is such:

Rows     Row Source Operation
-------  ---------------------------------------------------
      1  SORT AGGREGATE (cr=152 pr=701158 pw=701127 time=1510221226 us)
98976295   HASH JOIN  (cr=152 pr=701158 pw=701127 time=1244490336 us)
100000000    VIEW  (cr=76 pr=0 pw=0 time=200279054 us)
100000000     COUNT STOPKEY (cr=76 pr=0 pw=0 time=200279023 us)
100000000      PX COORDINATOR  (cr=76 pr=0 pw=0 time=100270084 us)
      0       PX SEND QC (RANDOM) :TQ10000 (cr=0 pr=0 pw=0 time=0 us)
      0        COUNT STOPKEY (cr=0 pr=0 pw=0 time=0 us)
      0         PX BLOCK ITERATOR (cr=0 pr=0 pw=0 time=0 us)
      0          TABLE ACCESS FULL BIG_TABLE_NAME_CHANGED_12 (cr=0 pr=0 pw=0 time=0 us)
100000000    VIEW  (cr=76 pr=0 pw=0 time=300298770 us)
100000000     COUNT STOPKEY (cr=76 pr=0 pw=0 time=200298726 us)
100000000      PX COORDINATOR  (cr=76 pr=0 pw=0 time=200279954 us)
      0       PX SEND QC (RANDOM) :TQ20000 (cr=0 pr=0 pw=0 time=0 us)
      0        COUNT STOPKEY (cr=0 pr=0 pw=0 time=0 us)
      0         PX BLOCK ITERATOR (cr=0 pr=0 pw=0 time=0 us)
      0          TABLE ACCESS FULL BIG_TABLE_NAME_CHANGED_12 (cr=0 pr=0 pw=0 time=0 us)

Note that the Rows column contains zeros for many of the row sources because this trace is only for the QC, not the slaves, and thus only QC rows will show up in the trace file. Something to be aware of if you decide to use sql trace with parallel execution.

Off To The Lab: Myth Busting Or Bust!

I wanted to take a query like TC#1 and run it in my own environment so I could do more monitoring of it. Given the alleged myth had to do with interconnect traffic of cross-instance (inter-node) parallel execution, I wanted to be certain to gather the appropriate data. I ran several tests using a similar query on a similar sized data set (by row count) as the initial example. I ran all my experiments on a Oracle Real Application Clusters version 11.1.0.7 consisting of eight nodes, each with two quad-core CPUs. The interconnect is InfiniBand and the protocol used is RDS (Reliable Datagram Sockets).

Before I get into the experiments I think it is worth mentioning that Oracle’s parallel execution (PX), which includes Parallel Query (PQ), PDML & PDDL, can consume vast amounts of resources. This is by design. You see, the idea of Oracle PX is to dedicate a large amount of resources (processes) to a problem by breaking it up into many smaller pieces and then operate on those pieces in parallel. Thus the more parallelism that is used to solve a problem, the more resources it will consume, assuming those resources are available. That should be fairly obvious, but I think it is worth stating.

For my experiments I used a table that contains just over 468M rows.

Below is my version of TC#1. The query is a self-join on a unique key and the table is range partitioned by DAY_KEY into 31 partitions. Note that I create a AWR snapshot immediately before and after the query.

exec dbms_workload_repository.create_snapshot

select /* &amp;amp;&amp;amp;1 */
       /*+ parallel (t1, 16) parallel (t2, 16) */
       min (t1.bsns_unit_key + t2.bsns_unit_key),
       max (t1.day_key + t2.day_key),
       avg (t1.day_key + t2.day_key),
       max (t1.bsns_unit_typ_cd),
       max (t2.curr_ind),
       max (t1.load_dt)
from   dwb_rtl_trx t1,
       dwb_rtl_trx t2
where  t1.trx_nbr = t2.trx_nbr;

exec dbms_workload_repository.create_snapshot

Experiment Results Using Fixed DOP=16

I ran my version of TC#1 across a varying number of nodes by using Oracle services (instance_groups and parallel_instance_group have been deprecated in 11g), but kept the DOP constant at 16 for all the tests. Below is a table of the experiment results.

Nodes
Elapsed Time
SQL Monitor Report
AWR Report
AWR SQL Report
1
00:04:54.12
2
00:03:55.35
4
00:02:59.24
8
00:02:14.39

Seemingly contrary to what many people would probably guess, the execution times got better the more nodes that participated in the query even though the DOP constant throughout each of tests.

Measuring The Interconnect Traffic

One of the new additions to the AWR report in 11g was the inclusion of interconnect traffic by client. This section is near the bottom of the report and looks like such:
Interconnect Throughput By Client
This allows the PQ message traffic to be tracked, whereas in prior releases it was not.

Even though AWR contains the throughput numbers (as in megabytes per second) I thought it would be interesting to see how much data was being transferred, so I used the following query directly against the AWR data. I put a filter predicate on to return only where there DIFF_RECEIVED_MB >= 10MB so the instances that were not part of the execution are filtered out, as well as the single instance execution.

break on snap_id skip 1
compute sum of DIFF_RECEIVED_MB on SNAP_ID
compute sum of DIFF_SENT_MB on SNAP_ID

select *
from   (select   snap_id,
                 instance_number,
                 round ((bytes_sent - lag (bytes_sent, 1) over
                   (order by instance_number, snap_id)) / 1024 / 1024) diff_sent_mb,
                 round ((bytes_received - lag (bytes_received, 1) over
                   (order by instance_number, snap_id)) / 1024 / 1024) diff_received_mb
        from     dba_hist_ic_client_stats
        where    name = 'ipq' and
                 snap_id between 910 and 917
        order by snap_id,
                 instance_number)
where  snap_id in (911, 913, 915, 917) and
       diff_received_mb &amp;gt;= 10
/

SNAP_ID    INSTANCE_NUMBER DIFF_SENT_MB DIFF_RECEIVED_MB
---------- --------------- ------------ ----------------
       913               1        11604            10688
                         2        10690            11584
**********                 ------------ ----------------
sum                               22294            22272

       915               1         8353             8350
                         2         8133             8418
                         3         8396             8336
                         4         8514             8299
**********                 ------------ ----------------
sum                               33396            33403

       917               1         5033             4853
                         2         4758             4888
                         3         4956             4863
                         4         5029             4852
                         5         4892             4871
                         6         4745             4890
                         7         4753             4889
                         8         4821             4881
**********                 ------------ ----------------
sum                               38987            38987

As you can see from the data, the more nodes that were involved in the execution, the more interconnect traffic there was, however, the execution times were best with 8 nodes.

Further Explanation Of Riyaj’s Issue

If you read Riyaj’s post, you noticed that he observed worse, not better as I did, elapsed times when running on two nodes versus one. How could this be? It was noted in the comment thread of that post that the configuration was using Gig-E as the interconnect in a Solaris IPMP active-passive configuration. This means the interconnect speeds would be capped at 128MB/s (1000Mbps), the wire speed of Gig-E. This is by all means is an inadequate configuration to use cross-instance parallel execution.

There is a whitepaper entitled Oracle SQL Parallel Execution that discusses many of the aspects of Oracle’s parallel execution. I would highly recommend reading it. This paper specifically mentions:

If you use a relatively weak interconnect, relative to the I/O bandwidth from the server to the storage configuration, then you may be better of restricting parallel execution to a single node or to a limited number of nodes; inter-node parallel execution will not scale with an undersized interconnect.

I would assert that this is precisely the root cause (insufficient interconnect bandwidth for cross-instance PX) behind the issues that Riyaj observed, thus making his execution slower on two nodes than one node.

The Advantage Of Hash Partitioning/Subpartitioning And Full Partition-Wise Joins

At the end of my comment on Riyaj’s blog, I mentioned:

If a DW frequently uses large table to large table joins, then hash partitioning or subpartitioning would yield added gains as partition-wise joins will be used.

I thought that it would be both beneficial and educational to extend TC#1 and implement hash subpartitioning so that the impact could be measured on both query elapsed time and interconnect traffic. In order for a full partition-wise join to take place, the table must be partitioned/subpartitioned on the join key column, so in this case I’ve hash subpartitioned on TRX_NBR. See the Oracle Documentation on Partition-Wise Joins for a more detailed discussion on PWJ.

Off To The Lab: Partition-Wise Joins

I’ve run through the exact same test matrix with the new range/hash partitioning model and below are the results.

Nodes
Elapsed Time
SQL Monitor Report
AWR Report
AWR SQL Report
1
00:02:42.41
2
00:01:37.29
4
00:01:12.82
8
00:01:05.04

As you can see by the elapsed times, the range/hash partitioning model with the full partition-wise join has decreased the overall execution time by around a factor of 2X compared to the range only partitioned version.

Now let’s take a look at the interconnect traffic for the PX messages:

break on snap_id skip 1
compute sum of DIFF_RECEIVED_MB on SNAP_ID
compute sum of DIFF_SENT_MB on SNAP_ID

select *
from   (select   snap_id,
                 instance_number,
                 round ((bytes_sent - lag (bytes_sent, 1) over
                   (order by instance_number, snap_id)) / 1024 / 1024) diff_sent_mb,
                 round ((bytes_received - lag (bytes_received, 1) over
                   (order by instance_number, snap_id)) / 1024 / 1024) diff_received_mb
        from     dba_hist_ic_client_stats
        where    name = 'ipq' and
                 snap_id between 1041 and 1048
        order by snap_id,
                 instance_number)
where  snap_id in (1042,1044,1046,1048) and
       diff_received_mb &amp;gt;= 10
/
no rows selected

Hmm. No rows selected?!? I had previously put in the predicate “DIFF_RECEIVED_MB >= 10MB” to filter out the nodes that were not participating in the parallel execution. Let me remove that predicate rerun the query.

   SNAP_ID INSTANCE_NUMBER DIFF_SENT_MB DIFF_RECEIVED_MB
---------- --------------- ------------ ----------------
      1042               1            8                6
                         2            2                3
                         3            2                3
                         4            2                3
                         5            2                3
                         6            2                3
                         7            2                3
                         8            2                3
**********                 ------------ ----------------
sum                                  22               27

      1044               1            7                7
                         2            3                2
                         3            2                2
                         4            2                2
                         5            2                2
                         6            2                2
                         7            2                2
                         8            2                2
**********                 ------------ ----------------
sum                                  22               21

      1046               1            1                2
                         2            1                2
                         3            1                2
                         4            3                1
                         5            1                1
                         6            1                1
                         7            1                1
                         8            1                1
**********                 ------------ ----------------
sum                                  10               11

      1048               1            6                5
                         2            1                2
                         3            3                2
                         4            1                2
                         5            1                2
                         6            1                2
                         7            1                2
                         8            1                2
**********                 ------------ ----------------
sum                                  15               19

Wow, there is almost no interconnect traffic at all. Let me verify with the AWR report from the 8 node execution.

The AWR report confirms that there is next to no interconnect traffic for the PWJ version of TC#1. The reason for this is that since the table is hash subpartitoned on the join column each of the subpartitions can be joined to each other minimizing the data sent between parallel execution servers. If you look at the execution plan (see the AWR SQL Report) from the first set of experiments you will notice that the broadcast method for each of the tables is HASH but in the range/hash version of TC#1 there is no broadcast at all for either of the two tables. The full partition-wise join behaves logically the same way that a shared-nothing database would; each of the parallel execution servers works on its partition which does not require data from any other partition because of the hash partitioning on the join column. The main difference is that in a shared-nothing database the data is physically hash distributed amongst the nodes (each node contains a subset of all the data) where as all nodes in a Oracle RAC database have access to the all the data.

Parting Thoughts

Personally I see no myth about cross-instance (inter-node) parallel execution and interconnect traffic, but frequently I see misunderstandings and misconceptions. As shown by the data in my experiment, TC#1 (w/o hash subpartitioning) running on eight nodes is more than 2X faster than running on one node using exactly the same DOP. Interconnect traffic is not a bad thing as long as the interconnect is designed to support the workload. Sizing the interconnect is really no different than sizing any other component of your cluster (memory/CPU/disk space/storage bandwidth). If it is undersized, performance will suffer. Depending on the number and speed of the host CPUs and the speed and bandwidth of the interconnect, your results may vary.

By hash subpartioning the table the interconnect traffic was all but eliminated and the query execution times were around 2X faster than the non-hash subpartition version of TC#1. This is obviously a much more scalable solution and one of the main reasons to leverage hash (sub)partitioning in a data warehouse.