Sorting Large Datasets Into Equal Buckets
I was dealing with data kind of like this:
pk_value employee_id SomeValue
------------ ----------- ----------
97875250314 1 SomeValue1
98001185450 1 SomeValue2
97926729191 1 SomeValue3
98028101823 2 SomeValue1
97997295004 2 SomeValue2
97875250314 3 SomeValue1
98001185451 3 SomeValue2
97926729192 4 SomeValue1
98028101824 5 SomeValue1
...
What we needed to accomplish was to evenly distribute records across a certain number of buckets while isolating employee_ids to ONLY one bucket regardless how many records that one employee had. For the data above, we needed it to break down like this, for lets say 3 buckets.
bucket_id emp_count no_of_records
---------- ----------- -------------
1 1 (eid 1) 3
2 2 (eid 2,4) 3
3 2 (eid 3,5) 3
...
For the 9 records in the sample set, employee 1 had 3, employees 2 and 3 had 2 each, and employees 4 and 5 had 1 each. Distributing them above in that way allowed for each employee to belong to only 1 bucket, while still creating buckets of equal size.
In order to record this, we would have to add a column to the original dataset containing what bucket the records belongs to.
pk_value employee_id SomeValue bucket_id
------------ ----------- ---------- ----------
97875250314 1 SomeValue1 1
98001185450 1 SomeValue2 1
97926729191 1 SomeValue3 1
98028101823 2 SomeValue1 2
...
If the dataset were laid out like that, we could create the aggregated dataset by running the code below.
SELECT bucket_id
, COUNT(employee_id) AS emp_count
, SUM(SomeValue) AS no_of_records
FROM sourcetable
GROUP BY bucket_id
ORDER BY bucket_id;
So lets say we have a table with that schema above. We could do all the aggregations on the source data table, but if you are working with production data or a very large dataset, I found it easier to work with an aggregated representation of the table, since the values of the records themselves don't really matter - it's the count per employee that we are interested in.
So we create an aggregated mapping table to do our work on -
CREATE TABLE #bucket_mapping
(
id INT NOT NULL IDENTITY(1, 1) PRIMARY KEY CLUSTERED
, emp_id INT NOT NULL
, records_for_emp INT NOT NULL
, bucket_id INT NOT NULL
DEFAULT 0
);
CREATE NONCLUSTERED INDEX idx
ON #bucket_mapping ( bucket_id
, records_for_emp);
This table contains the employee id and a count of how many records they have, all first assigned to bucket_id 0. Since I was dealing with very large datasets I put a primary key on the identity column and added a nonclustered index on bucket_id and records_for_emp, but for smaller datasets neither of those are really necessary and could possibly slow down performance.
Insert the aggregated source data into the mapping table -
INSERT INTO #bucket_mapping
(
emp_id
, records_for_emp
)
SELECT employee_id
, COUNT(1) AS records_for_emp
FROM sourcetable
GROUP BY employee_id
ORDER BY records_for_emp DESC;
Now, the overarching concept for this solution is a three-pass sorting algorithm.
The first pass loops across each bucket, distributing the number of records per employee, sorted largest to smallest, using Run Length Encoding to greatly minimize the number of looping iterations.
Since the first pass leaves the last bucket larger than the others, the second pass distributes back across each bucket, ordered from record count fewest to greatest, the smallest employee in the final bucket with a records count > 1 until the final bucket is under the intended size.
The final pass takes any remaining single-records employees, n, and distributes them across the top n buckets, ordered fewest records to most in one execution.
Declare some variables - these will all be explained when they are used.
DECLARE @currentBucket INT = 1
, @totalBucketCount INT = 50
, @currentID INT = 0
, @lastID INT
, @numberOfRecords INT
, @bucketID INT
, @currentBucketSize INT = 0
, @maxBucketSize INT
, @batchThreshold INT
, @lengthOfRun INT
, @batchEndID INT
, @totalRecords INT;
To determine the total number of records being processed and the desired size of each bucket, the variables are set below -
SELECT @totalRecords = SUM(records_for_emp)
, @maxBucketSize = SUM(records_for_emp) / @totalBucketCount
FROM #bucket_mapping;
The first thing to do is separate these out logically into 2 groups - employees with a record count no other employee has, and employees that share record counts with other employees. Think back to the example dataset above. Employee 1 was the only employee with 3 records, while 2 and 4 shared a count of 2 records and 3 and 5 shared a count of 1.
When looked at through the lens of Run Length Encoding (RLE), the employees that have a unique record count are treated as single data items, and the employees that share record counts with other employees are treated as runs of data items.
To determine the record count and run length of the first run of data items, the variables are stored with the values below -
SELECT TOP 1
@batchThreshold = records_for_emp
, @lengthOfRun = COUNT(1)
FROM #bucket_mapping
WHERE bucket_id = 0
GROUP BY records_for_emp
HAVING COUNT(1) > 1
ORDER BY records_for_emp DESC;
For my dataset, the top 3 runs were these.
records_for_emp length_of_run
--------------- -------------
12252 2
10377 2
10268 2
...
Meaning the first time two employees share the same record count is for record count 12252, where it's shared by 2 employees. Those values are stored and we continue.
To perform the first pass on the first bucket, this is the code. This is where the while loop will start.
We find the lowest id and corresponding record count that isn't assigned to a bucket yet, whose record count is lower than the number of records still left to be filled for the current bucket.
SELECT TOP 1
@currentID = id
, @numberOfRecords = records_for_emp
FROM #bucket_mapping
WHERE bucket_id = 0
AND id > @currentID
AND records_for_emp <= (@maxBucketSize - @currentBucketSize)
ORDER BY id;
In my dataset, the first 3 records were these -
id records_for_emp
----------- ------------
1 39980
2 39872
3 39774
...
Meaning the record with the id of 1 and record count of 39980 were stored.
The next bit of code looks to see if this record count is the start of a run of data items or a single data item. Since we know the first run of data items in my example dataset has a record count of 12252, this is a single data item.
We check to see if the current bucket can hold the record count stored for the current id. In my example dataset, I have 214,214,960 total records and I want to split them across 50 buckets, so with a maximum bucket size of 4,284,299 records, 39980 can absolutely fit. We update the mapping table and set the currently unassigned bucket_id of 0 to the bucket_id of the current bucket, 1, and update the current_bucket_size to reflect the newly added data item, 0 + 39980.
IF @numberOfRecords = @batchThreshold
BEGIN
-- code processing runs of data items
END;
ELSE IF @currentBucketSize + @records_for_emp <= @maxBucketSize
BEGIN
UPDATE #bucket_mapping
SET bucket_id = @currentBucket
WHERE id = @currentID;
SELECT @currentBucketSize += @records_for_emp;
END;
This code will continue to loop, finding the next data item not assigned to a bucket that can fit into the space remaining in my current bucket, and add them one after the next until the bucket is full. When that occurs or when we've finished processing everything there is to process, the code below will run, advancing the @currentBucket variable by 1 and resetting the @currentID and @currentBucketSize variables back to 0. This is placed just below the code above in the while loop.
IF @currentBucketSize >= @maxBucketSize
OR @currentID >= @totalRecords
OR @lastID = @currentID
BEGIN
SELECT @currentID = 0
, @currentBucket += 1
, @currentBucketSize = 0;
END;
SET @lastID = @currentID;
In the event that the current bucket is under the @maxBucketSize by a given number, n, and there are no single data items with a record count of n or runs of data that add up to n, this will spin into an infinite loop. To prevent this, after each loop we set a placeholder variable, @lastID, to hold the current id that was just processed. At the end of the next loop, we evaluate the id we just processed, and if it is the same value as the last id we processed, we advance to the next bucket and reset the values as if the bucket were full.
My first bucket looks like this -
id emp_id records_for_emp bucket_id current_bucket_size
----------- ----------- --------------- ----------- -------------------
1 2241032 39980 1 39980
2 2464976 39872 1 79852
3 2377353 39774 1 119626
...
131 2191316 22912 1 4254752
132 2675322 22817 1 4277569
15114 1973944 6729 1 4284298
225224 2719573 1 1 4284299
(134 rows affected)
In this first bucket, it just happens to be that all the data items were single data items, making this first bucket contain 134 employees and all of their records over 134 loops.
Let's take a second to talk about efficiency and performance
That comes out to 1 row processed per 1 loop, which is really bad for performance on large datasets. My dataset in particular contains 284378 employees, and I'm not about to run that many loops to sort this data. Even looking at the last id inserted in my bucket above, 225224, my total number of employees, 284378, and my largest record count for data items in a run, 12252 - it's completely obvious that this dataset contains a lot of data items belonging to several runs. So what happens when those record counts are next on our list to be processed? When we continue into the next bucket we will see.
As this loop starts and iterates through the single data items, it treks along until it reaches the first data item of the first run of data.
id emp_id records_for_emp bucket_id current_bucket_size
----------- -------- --------------- ----------- -------------------
133 2224239 22422 2 22422
134 2030382 22408 2 44830
135 2822187 22071 2 66901
...
292 2684064 12371 2 2719655
293 2771062 12365 2 2732020
294 325574 12252 ??? ???
If you'll recall, we initialized the @batchThreshold and @lengthOfRun variables with the record count of the run, 12252, and the number of data items in the run, 2. Instead of looping through both of these 1 by 1, we can handle them as an entire run of data in one loop. To achieve that, we add this code before the code that handles the single data items in this way -
IF @numberOfRecords = @batchThreshold
BEGIN
IF (@maxBucketSize - @currentBucketSize) < (@batchThreshold * @lengthOfRun)
BEGIN
SELECT @lengthOfRun = (@maxBucketSize - @currentBucketSize) / @batchThreshold;
END;
SELECT @batchEndID = @currentID + (@lengthOfRun - 1);
UPDATE #bucket_mapping
SET bucket_id = @currentBucket
WHERE id
BETWEEN @currentID AND @batchEndID;
SELECT @currentBucketSize += (@batchThreshold * @lengthOfRun);
SELECT TOP 1
@batchThreshold = records_for_emp
, @lengthOfRun = COUNT(1)
FROM #bucket_mapping
WHERE bucket_id = 0
AND records_for_emp <= (@maxBucketSize - @currentBucketSize)
GROUP BY records_for_emp
HAVING COUNT(1) > 1
ORDER BY records_for_emp DESC;
END;
ELSE IF @currentBucketSize + @records_for_emp <= @maxBucketSize
BEGIN
UPDATE #bucket_mapping
SET bucket_id = @currentBucket
WHERE id = @currentID;
SELECT @currentBucketSize += @records_for_emp;
END;
Since the condition to execute the code that processes runs of data items is satisfied, we look to see if the entire size of the run is larger than what can fit into the remaining size of the current bucket. Since our current bucket size is 2,732,020 and our max bucket size is 4,284,299, we have plenty of room to fit the entire run of 2 data items, each having a record count of 12252.
If that were not the case, we would evaluate exactly how many data items in the run can fit into the remaining size of the current bucket and set the @lengthOfRun variable to that value.
Since the first data item in this run being processed is the current id, we simply add the @lengthOfRun - 1 to the current id to get the id of the record ending the run, since the BETWEEN operator is inclusive of the first and last parameters.
We update our run of data items to the current bucket id and add the total number of records across the run to the current bucket size. Finally, we find and store the next largest record count and run length of data items in a run where at least 1 of those data items can fit into the remaining size of the current bucket.
With the code that handles runs of data items in effect, this is my second bucket -
id emp_id records_for_emp bucket_id current_bucket_size
----------- ----------- --------------- ----------- -------------------
133 2224239 22422 2 22422
134 2030382 22408 2 44830
135 2822187 22071 2 66901
...
434 2731969 10230 2 4267126
435 2620227 10229 2 4277355
14875 1940363 6944 2 4284299
(304 rows affected)
For this bucket, 6 runs of 2 data items were processed, eliminating 6 loops for this bucket. We can count the number of loops with a variable and subtract the number of data items processed to find the number of loops saved, or we can use this formula -
sum(number_of_runs * run_length) - sum(number_of_runs) = loops eliminated
With that, we can see we executed 298 loops for this bucket - a compression ratio of 304:298 or 1.02 rows per loop. That's still pretty bad, but my third bucket is where the runs really ramp up.
My third bucket looks like this -
id emp_id records_for_emp bucket_id current_bucket_size
----------- ----------- --------------- ----------- -------------------
436 2654078 10228 3 10228
437 2724211 10226 3 20454
438 2637132 10225 3 30679
...
858 2642083 10039 3 4269902
859 423751 10039 3 4279941
18409 1816222 4358 3 4284299
(425 rows affected)
For the third bucket, there were 92 runs of data items -
runs run_length
----- -----------
37 2
11 3
10 4
13 5
6 6
7 7
3 8
2 9
1 10
2 11
Altogether, those 92 runs accounted for 371 rows, meaning that 146 loops produced 425 total rows. This time the compression ratio is 425:146 or 2.91 rows per loop.
This algorithm does best when the inherent properties of the dataset are such that it contains lots of runs. In my datasets, there are lots of runs of smaller values, meaning that the RLE logic works better the higher the current bucket id for smaller total bucket values, but performance scales exponentially until there are no runs, at which point performance will be linear at best and logarithmic at worst, depending on your cpu.
So now let's talk about the final bucket
When we begin filling the final bucket, we could loop through the remaining employees, adding them one at a time or calculating run length, but why? After this bucket there are no more buckets to put employees in. We can just eliminate any unnecessary loops by just dumping all remaining records into the final bucket instead by running this code before anything else in the while loop.
IF (@currentBucket = @totalBucketCount)
BEGIN
UPDATE #bucket_mapping
SET bucket_id = @currentBucket
WHERE bucket_id = 0;
BREAK;
END;
The problem that comes with this though, is that the final bucket contains some overflow records, and now looks like this -
bucket_id emp_count total_records
----------- ------------ -------------
1 134 4284299
2 304 4284299
3 425 4284299
...
48 2146 4284299
49 2986 4284299
50 248714 4284309
This doesn't look too bad, but it can diverge more when the employees with the fewest records have record counts > 1, such as in this other dataset I have -
bucket_id emp_count total_records
----------- ------------ -------------
1 5 29076
2 5 29075
3 5 29072
...
198 16 28994
199 19 28892
200 49 52558
To solve this and better smooth out our buckets, we introduce passes 2 and 3.
The second pass loops through any data items left over with record counts > 1 and distributes them one at a time to the bucket with the fewest total records at the beginning of the loop. The code for that goes in the section after we dump all the remaining records into the final bucket.
DECLARE @spillover TABLE
(
id INT
, records_for_emp INT
);
SELECT @currentBucketSize = SUM(records_for_emp)
FROM #bucket_mapping
WHERE bucket_id = @currentBucket;
INSERT INTO @spillover
SELECT TOP (@currentBucketSize - @maxBucketSize)
id
, records_for_emp
FROM #bucket_mapping
WHERE bucket_id = @currentBucket
ORDER BY records_for_emp;
First we create a table variable to hold the overflow data items. Since the number of overflow data items won't necessarily match the number of overflow records, We have to assume that this will need to work for whatever the @totalBucketCount is, or smaller. In the example where my final bucket was something like 24000 total records larger, we can see that the the total number of data items in that bucket is just 49. We can use the total record difference in determining how many data items to insert into the spillover table. This not only covers when the algorithm is working correctly, but essentially covers any edge cases in which the first pass would not work correctly.
WHILE (@currentBucketSize - @maxBucketSize) > 0
AND EXISTS
(
SELECT 1 FROM @spillover WHERE records_for_emp > 1
)
BEGIN
SELECT @bucketID = sGrp.bucket_id
FROM
(
SELECT TOP 1
bucket_id
, SUM(records_for_emp) AS total_records
FROM #bucket_mapping
GROUP BY bucket_id
ORDER BY total_records
) AS sGrp;
SELECT TOP 1
@recordCount = records_for_emp
, @currentID = id
FROM @spillover
WHERE records_for_emp > 1
ORDER BY records_for_emp;
UPDATE #bucket_mapping
SET bucket_id = @bucketID
WHERE id = @currentID;
SELECT @currentBucketSize -= @recordCount;
DELETE FROM @spillover
WHERE id = @currentID;
END;
My first data set had no data items with a record count > 1, so we'll use my second dataset. Grabbing the difference, (top 24000ish records) just grabbed all 49 data items in the bucket. We wont use them all, just enough for the final bucket to be within appropriate size.
Below are the results after each iteration of the second pass of the dataset with 24000ish records in difference.
target_bucket records_for_emp target_bucket_total current_bucket_total
------------- --------------- ------------------- --------------------
169 825 29182 51733
62 838 29220 50895
61 840 29232 50055
60 851 29262 49204
59 920 29333 48284
...
42 1068 29658 33334
41 1070 29662 32264
40 1079 29675 31185
192 1083 29679 30102
38 1088 29709 29014
The third and final pass operates on data items with a row count of only 1, distributing them all to the buckets with the fewest total records in one execution.
IF (@currentBucketSize - @maxBucketSize) > 0
AND EXISTS
(
SELECT 1
FROM @spillover
WHERE records_for_emp = 1
)
BEGIN;
WITH rList
AS
(
SELECT ROW_NUMBER() OVER (ORDER BY s.records_for_emp) AS id
, g.bucket_id
FROM #bucket_mapping AS g WITH (NOLOCK)
INNER JOIN @spillover AS s
ON s.id = g.id
)
, nRecord_count
AS
(
SELECT TOP (@currentBucketSize - @maxBucketSize)
ROW_NUMBER() OVER (ORDER BY grid.TotalItems) AS id
, grid.bucket_id
FROM
(
SELECT bucket_id
, SUM(records_for_emp) AS TotalItems
FROM #bucket_mapping WITH (NOLOCK)
GROUP BY bucket_id
) AS grid
)
UPDATE r
SET r.bucket_id = nr.bucket_id
FROM rList AS r
INNER JOIN nRecord_count AS nr
ON nr.id = r.id;
END;
Looking back at my first dataset if you recall, I had 10 data items with a record count of 1 left over in the final bucket. Grabbing the difference, TOP 10, grabs those leftover data items and assigns them to the TOP 10 smallest buckets. The reason we don't loop through like we did in the previous pass, is strictly because the needs of my dataset after this pass will be off by 1 anyways, as there will be 10 buckets with 1 item greater than the other 40.
Below are the source and destination buckets for the overflow data items.
source_bucket_id target_bucket_id
---------------- ----------------
50 49
50 48
50 47
50 46
50 45
50 44
50 43
50 42
50 41
50 40
When everything is all said and done, I can put in a counter at the end of each loop to determine in the end just how many loops I eliminated from my initial data set. If you recall, I had 284378 total employees to sort, and after all three passes I've only executed 8356 loops, bringing my full operation to a compression ratio of 284738/8356, or 34.08 rows per loop.
Those are just the statistics for my given dataset, and my given desired bucket parameter. I'm sure there are better general optimizations, but to load and aggregate the 2+ million rows of data took longer for me than all 3 passes of this sorting algorithm, totaling only 7 seconds (4 to load and aggregate, 3 to sort). Since this is going to be run once a week, that is good enough for my needs right now.
**CODE**
CREATE TABLE #bucket_mapping
(
id INT NOT NULL IDENTITY(1, 1) PRIMARY KEY CLUSTERED
, emp_id INT NOT NULL
, records_for_emp INT NOT NULL
, bucket_id INT NOT NULL
DEFAULT 0
);
CREATE NONCLUSTERED INDEX idx
ON #bucket_mapping (
bucket_id
, records_for_emp
);
INSERT INTO #bucket_mapping
(
emp_id
, records_for_emp
)
SELECT employee_id
, COUNT(1) AS records_for_emp
FROM sourcetable
GROUP BY employee_id
ORDER BY records_for_emp DESC;
DECLARE @currentBucket INT = 1
, @totalBucketCount INT = 50
, @currentID INT = 0
, @lastID INT
, @recordCount INT
, @bucketID INT
, @currentBucketSize INT = 0
, @maxBucketSize INT
, @batchThreshold INT
, @batchRange INT
, @batchEndID INT
, @totalRecordCount INT;
SELECT TOP 1
@batchThreshold = records_for_emp
, @batchRange = COUNT(1)
FROM #bucket_mapping WITH (NOLOCK)
WHERE bucket_id = 0
GROUP BY records_for_emp
HAVING COUNT(1) > 1
ORDER BY records_for_emp DESC;
SELECT @totalRecordCount = SUM(records_for_emp)
, @maxBucketSize = SUM(records_for_emp) / @totalBucketCount
FROM #bucket_mapping WITH (NOLOCK);
WHILE (@currentBucket <= @totalBucketCount)
BEGIN
IF (@currentBucket = @totalBucketCount)
BEGIN
UPDATE #bucket_mapping
SET bucket_id = @currentBucket
WHERE bucket_id = 0;
DECLARE @spillover TABLE
(
id INT
, records_for_emp INT
);
SELECT @currentBucketSize = SUM(records_for_emp)
FROM #bucket_mapping WITH (NOLOCK)
WHERE bucket_id = @currentBucket;
INSERT INTO @spillover
SELECT TOP (@currentBucketSize - @maxBucketSize)
id
, records_for_emp
FROM #bucket_mapping WITH (NOLOCK)
WHERE bucket_id = @currentBucket
ORDER BY records_for_emp;
WHILE (@currentBucketSize - @maxBucketSize) > 0
AND EXISTS
(
SELECT 1 FROM @spillover WHERE records_for_emp > 1
)
BEGIN
SELECT @bucketID = sGrp.bucket_id
FROM
(
SELECT TOP 1
bucket_id
, SUM(records_for_emp) AS total_records
FROM #bucket_mapping
GROUP BY bucket_id
ORDER BY total_records
) AS sGrp;
SELECT TOP 1
@recordCount = records_for_emp
, @currentID = id
FROM @spillover
WHERE records_for_emp > 1
ORDER BY records_for_emp;
UPDATE #bucket_mapping
SET bucket_id = @bucketID
WHERE id = @currentID;
SELECT @currentBucketSize -= @recordCount;
DELETE FROM @spillover
WHERE id = @currentID;
END;
IF (@currentBucketSize - @maxBucketSize) > 0
AND EXISTS
(
SELECT 1
FROM @spillover
WHERE records_for_emp = 1
)
BEGIN;
WITH rList
AS
(
SELECT ROW_NUMBER() OVER (ORDER BY s.records_for_emp) AS id
, g.bucket_id
FROM #bucket_mapping AS g WITH (NOLOCK)
INNER JOIN @spillover AS s
ON s.id = g.id
)
, nrecords_for_emp
AS
(
SELECT TOP (@currentBucketSize - @maxBucketSize)
ROW_NUMBER() OVER (ORDER BY grid.TotalItems) AS id
, grid.bucket_id
FROM
(
SELECT bucket_id
, SUM(records_for_emp) AS TotalItems
FROM #bucket_mapping WITH (NOLOCK)
GROUP BY bucket_id
) AS grid
)
UPDATE r
SET r.bucket_id = nr.bucket_id
FROM rList AS r
INNER JOIN nrecords_for_emp AS nr
ON nr.id = r.id;
END;
BREAK;
END;
SELECT TOP 1
@currentID = id
, @recordCount = records_for_emp
FROM #bucket_mapping WITH (NOLOCK)
WHERE bucket_id = 0
AND id > @currentID
AND records_for_emp <= (@maxBucketSize - @currentBucketSize)
ORDER BY id;
IF @recordCount = @batchThreshold
BEGIN
IF (@maxBucketSize - @currentBucketSize) < (@batchThreshold * @batchRange)
BEGIN
SELECT @batchRange = (@maxBucketSize - @currentBucketSize) / @batchThreshold;
END;
SELECT @batchEndID = @currentID + (@batchRange - 1);
UPDATE #bucket_mapping
SET bucket_id = @currentBucket
WHERE id
BETWEEN @currentID AND @batchEndID;
SELECT @currentBucketSize += (@batchRange * @batchThreshold);
SELECT TOP 1
@batchThreshold = records_for_emp
, @batchRange = COUNT(1)
FROM #bucket_mapping WITH (NOLOCK)
WHERE bucket_id = 0
AND records_for_emp <= (@maxBucketSize - @currentBucketSize)
GROUP BY records_for_emp
HAVING COUNT(1) > 1
ORDER BY records_for_emp DESC;
END;
ELSE IF @currentBucketSize + @recordCount <= @maxBucketSize
BEGIN
UPDATE #bucket_mapping
SET bucket_id = @currentBucket
WHERE id = @currentID;
SELECT @currentBucketSize += @recordCount;
END;
IF @currentBucketSize >= @maxBucketSize
OR @currentID >= @totalRecordCount
OR @lastID = @currentID
BEGIN
SELECT @currentID = 0
, @currentBucket += 1
, @currentBucketSize = 0;
SELECT TOP 1
@batchThreshold = records_for_emp
, @batchRange = COUNT(1)
FROM #bucket_mapping WITH (NOLOCK)
WHERE bucket_id = 0
GROUP BY records_for_emp
HAVING COUNT(1) > 1
ORDER BY records_for_emp DESC;
END;
SET @lastID = @currentID;
END;
SELECT bucket_id
, COUNT(1) AS records_for_emp
, SUM(records_for_emp) AS total_records
FROM #bucket_mapping
GROUP BY bucket_id
ORDER BY bucket_id;