Term OOC covariance TSMM#2529
Conversation
7f235ad to
0c121eb
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2529 +/- ##
============================================
+ Coverage 71.66% 71.67% +0.01%
- Complexity 49338 49357 +19
============================================
Files 1580 1581 +1
Lines 190516 190597 +81
Branches 37364 37373 +9
============================================
+ Hits 136525 136608 +83
+ Misses 43464 43460 -4
- Partials 10527 10529 +2 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
@janniklinde we solved conflicts and failing checks and created a fresh clean pr. |
There was a problem hiding this comment.
Thank you for the PR. Overall, these changes look very good to me. I left some minor comments in the code @tuluyhansozen @122Astha @AdityaPandey2612
| if(w2 == 0) | ||
| return cov1; |
There was a problem hiding this comment.
Why is this change necessary?
There was a problem hiding this comment.
its a reliability case where during the weighted mean computation, if its a sparse matrix, we run into the divide by zero error but a zero weight has no impact on the original aggregation so I just returned the original covariance itself.
| public void processInstruction( ExecutionContext ec ) { | ||
| public void processInstruction(ExecutionContext ec) { | ||
| MatrixObject min = ec.getMatrixObject(input1); | ||
| int nRows = (int) min.getDataCharacteristics().getRows(); | ||
| int nCols = (int) min.getDataCharacteristics().getCols(); | ||
| int bLen = min.getDataCharacteristics().getBlocksize(); | ||
|
|
||
| OOCStream<IndexedMatrixValue> qIn = min.getStreamHandle(); | ||
| int numRowBlocks = Math.toIntExact(min.getDataCharacteristics().getNumRowBlocks()); | ||
| int numColBlocks = Math.toIntExact(min.getDataCharacteristics().getNumColBlocks()); | ||
| int blocksPerJoinGroup = _type.isLeft() ? numColBlocks : numRowBlocks; | ||
| int partialsPerOutput = _type.isLeft() ? numRowBlocks : numColBlocks; | ||
|
|
||
| OOCStreamable<IndexedMatrixValue> inputStreamable = min.getStreamable(); | ||
| final boolean createdCache = !inputStreamable.hasStreamCache(); | ||
| final CachingStream inputCache = createdCache ? new CachingStream(min.getStreamHandle()) | ||
| : inputStreamable.getStreamCache(); | ||
|
|
||
| OOCStream<List<IndexedMatrixValue>> groupedPartials = createWritableStream(); | ||
| OOCStream<IndexedMatrixValue> partials = createWritableStream(); | ||
| OOCStream<IndexedMatrixValue> out = createWritableStream(); | ||
| addOutStream(out); | ||
| ec.getMatrixObject(output).setStreamHandle(out); | ||
|
|
||
| CompletableFuture<Void> joinFuture = joinManyOOC(inputCache.getReadStream(), inputCache.getReadStream(), groupedPartials, | ||
| this::createPartialOutputTiles, this::getJoinIndex, this::getJoinIndex, | ||
| blocksPerJoinGroup, blocksPerJoinGroup); | ||
| CompletableFuture<Void> expandFuture = expandOOC(groupedPartials, partials, values -> values); | ||
|
|
||
| BinaryOperator plus = InstructionUtils.parseBinaryOperator(Opcodes.PLUS.toString()); | ||
| CompletableFuture<Void> outFuture = groupedReduceOOC(partials, out, (left, right) -> { | ||
| MatrixBlock result = ((MatrixBlock) left.getValue()).binaryOperations(plus, right.getValue()); | ||
| left.setValue(result); | ||
| return left; | ||
| }, partialsPerOutput); | ||
|
|
||
| propagateFailuresToOutput(out, List.of(joinFuture, expandFuture, outFuture)); | ||
|
|
||
| outFuture.whenComplete((result, error) -> { | ||
| if(createdCache) | ||
| inputCache.scheduleDeletion(); | ||
| }); | ||
| } | ||
|
|
||
| private long getJoinIndex(IndexedMatrixValue value) { | ||
| return _type.isLeft() ? value.getIndexes().getRowIndex() : value.getIndexes().getColumnIndex(); | ||
| } | ||
|
|
||
| //validation check TODO extend compiler to not create OOC otherwise | ||
| if( (_type.isLeft() && nCols > bLen) | ||
| || (_type.isRight() && nRows > bLen) ) | ||
| { | ||
| throw new UnsupportedOperationException(); | ||
| private long getOutputIndex(IndexedMatrixValue value) { | ||
| return _type.isLeft() ? value.getIndexes().getColumnIndex() : value.getIndexes().getRowIndex(); | ||
| } | ||
|
|
||
| private List<IndexedMatrixValue> createPartialOutputTiles(IndexedMatrixValue left, IndexedMatrixValue right) { | ||
| long leftIndex = getOutputIndex(left); | ||
| long rightIndex = getOutputIndex(right); | ||
| if(leftIndex > rightIndex) | ||
| return List.of(); | ||
|
|
||
| MatrixBlock leftBlock = (MatrixBlock) left.getValue(); | ||
| MatrixBlock rightBlock = (MatrixBlock) right.getValue(); | ||
| if(leftIndex == rightIndex) { | ||
| MatrixBlock diagonal = leftBlock.transposeSelfMatrixMultOperations(new MatrixBlock(), _type); | ||
| return List.of(new IndexedMatrixValue(new MatrixIndexes(leftIndex, rightIndex), diagonal)); | ||
| } | ||
|
|
||
| //int dim = _type.isLeft() ? nCols : nRows; | ||
| MatrixBlock resultBlock = null; | ||
|
|
||
| OOCStream<MatrixBlock> tmpStream = createWritableStream(); | ||
|
|
||
| mapOOC(qIn, tmpStream, | ||
| tmp -> ((MatrixBlock) tmp.getValue()) | ||
| .transposeSelfMatrixMultOperations(new MatrixBlock(), _type)); | ||
|
|
||
| MatrixBlock tmp; | ||
| while ((tmp = tmpStream.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) { | ||
| if (resultBlock == null) | ||
| resultBlock = tmp; | ||
| else | ||
| resultBlock.binaryOperationsInPlace(plus, tmp); |
There was a problem hiding this comment.
These changes look good for the general case. However, the more general execution path is more expensive. I'd like you to keep the special case where we can avoid creating a CachingStream and using the heavier primitives (when only a single output tile is produced).
There was a problem hiding this comment.
Please assert that OOC operators were used (similar to the tsmm test)
There was a problem hiding this comment.
Assert that OOC op was used
This PR adds out-of-core coverage for covariance and improves OOC TSMM support.
Changes include:
CovarianceOOCInstructionfor unweighted and weighted covariance.OOCInstructionParserandOOCType.COV.cov(A, B)cov(A, B, W)