[06:57:09] ryankemper: still waiting for the data to be there but yes, soon (probably today at earlist) we could move forward with the reload [06:58:41] for the pre-req, yes they're mostly SRE related, will check the state of the puppet repo [07:02:21] well... I might have spoken too soon, not seeing the rdf dumps in https://dumps.wikimedia.org/other/wikibase/wikidatawiki/20240722/ ... [07:28:39] dump from snapshot1017 seems to have started 3days ago [07:33:15] but it spawned /srv/mediawiki/multiversion/MWScript.php extensions/Wikibase/repo/maintenance/dumpRdf.php --wiki wikidatawiki --shard 5 just this morning [07:33:22] so no reload today... [08:23:55] dcausse: the federation guide is really good! Thanks! (https://www.wikidata.org/wiki/Wikidata:SPARQL_query_service/WDQS_graph_split/Internal_Federation_Guide) [08:24:06] This already contains most of what I need in the communication. [08:25:48] cool, sole part we haven't discussed is the end of the transition period, I'd go for 6months starting form september 1st [08:27:53] I suspect that august is going to be very quiet and I'm afraid that users will actually start working on/caring about this only around september [08:39:35] of course I forgot to update the checksum for refinery-drop-older-than when cleaning up some code... [08:42:21] seeing numpy 1 vs 2 errors as well... (running drop_dated_directories.py) [08:53:50] Status update: https://wikitech.wikimedia.org/wiki/Search_Platform/Weekly_Updates/2024-07-26 [08:54:21] seems like pyarrow disagrees with the numpy version installed... [08:58:13] but not sure to understand what owns what... [09:32:21] errand+lunch [12:41:40] going to try moving the version constraint from tox .test env to setup.cfg [12:52:43] draft of the graph split communication: https://docs.google.com/document/d/1OPnpP5g3c-8xNfJ_UIkuFAG8qaJBDTjkdrJvXB-s_O4/edit. Thanks for your feedback! [12:53:25] It's fairly short, David's doc covers everything in more details. [12:55:36] gehel: added a small suggestion but otherwise lgtm, thanks! [13:34:25] \o [13:39:00] o/ [13:40:03] dcausse: sounds like prod is also having the numpy 2 problem? i had tried forcing it in setup.cfg but for some reason it didn't take in tox so i put it in tox.ini. Could put the constraint in both [13:40:15] with `numpy < 2` basically [13:40:44] ebernhardson: seems like it worked in the end (with https://gitlab.wikimedia.org/repos/search-platform/discolytics/-/commit/0041d117fef4f07baccdd710d47d4a226ea01459) [13:41:10] ahh, i guess it didn't occur to me to try updating pyarrow. cool [13:41:46] yes tried to see what oters are using and saw this one in conda analytics [13:51:27] ebernhardson: if you have a sec, a quick one: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/777 (attempting to reduce the noise of this dag a bit) [13:51:44] sure, lookig [13:52:26] new options were added in https://gerrit.wikimedia.org/r/c/wikidata/query/rdf/+/961476/3/rdf-spark-tools/src/main/scala/org/wikidata/query/rdf/spark/transform/queries/sparql/QueryExtractor.scala [13:53:34] dcausse: i'm a little surprised, this looks like it will go from 200 partitions before to 16 now for initial processing? [13:58:15] oh, well i guess it was 200, then the scala patch made it 1, and now the airflow patch brings it up to 16? [14:00:52] i guess it was 4 in the intermediate, but my thought is the old numPartitions was the output partitions, and the input was using 200. Either way we can try it [14:01:05] ebernhardson: yes I think 4 is what it's running now [14:01:28] dcausse: --numPartitions only effected the output partition count, input was hardcoded to 00 [14:01:30] 200 [14:03:07] I think the 200 was a transitional patch to test? [14:04:12] "--num-partitions" seems to be "BC option for --input-partitions" [14:04:46] I think that a test run was made with 200 hardcoded, then 4 was set from the dag and it appeared to worked so far until recently [14:05:51] ok will ship to see if this works [14:07:53] dcausse: i guess on closer look, i suspect the old numPartitions didn't do anything. Depends on if the comments are correct [14:08:10] i guess numPartitions was only used to set spark.sql.shuffle.partitions, but the comment just before .repartition(200) says this job has no internal shuffles [14:08:39] but then i see a groupby/agg, which suggests a shuffle [14:09:11] either way, switch some numbers around and see what happens, cant be all that bad [14:09:29] usually i guess i would check by looking at the spark ui while it's running, when it's very explicit about how many partitions are used where [14:10:08] ok, running it now, will open the spark ui to see [14:12:03] I think this is this one? https://yarn.wikimedia.org/proxy/application_1719935448343_342175/ [14:13:44] not I understand how to read this ui tho... I see a 0/16 somewhere but no clue if that's related, I should probably have looked at a previous run to see what changed [14:14:09] it looks like spark reads from disk into 4 partitions, reshuffles into 16 to do the work, the down to 1 for the output [14:14:17] the ort happens at 1 partition as wel [14:14:19] sort [14:15:27] if the OOM happens when sorting that won't help then :/ [14:15:51] does the data on disk really need to be sorted? [14:16:07] oh i guess its a sort/limit [14:16:44] err, sigh...i should think more and talk less :P I'm not sure where the sort comes from, the sort i see in the code is getTop100QueryClasses, but i don't see that invoked here [14:16:55] but the spark graph clearly shows a final sort [14:19:14] hmm... indeed getTop100QueryClasses is not called anywhere... [14:20:22] ah it's getProcessedQueries [14:20:29] no my bad [14:20:57] i can't quite say what, but something about the execution vs the code feels surpising. Will have to wait for stage 0 to finish to get stats, but it's been running 9 minutes on what from the code looks like read from disk and shuffle [14:21:16] the actual work should come after the first shuffle [14:21:23] ok [14:23:40] i also wonder if it makes any difference that all 4 tasks are running in a single executor [14:26:00] it runs with --executor-cores 8 & spark.dynamicAllocation.maxExecutors=4 could that prevent spark from opening more executors? [14:26:43] dcausse: it would be that each executor can run 8 tasks, and there are only 4 tasks to schedule, so it doesn't think it needs any more [14:26:55] ok [14:27:44] the mem limits are per executor so that might not help to run with --executor-cores 8 [14:33:07] the thread dumps clearly show this is doing sparql processing in stage 0, before the shuffle to 16 partitions. Very curious [14:37:20] i wonder if it's hitting some optimization that only happens when the whole job runs in a single process [14:37:45] :/ [14:38:41] it's just that otherwise these thread dumps don't make sense, the code clearly goes direct from load from disk to repartition, does some work, and then applies the UDF. But the stack traces are clearly showing the UDF being applied while it claims to run stage 0 [14:39:08] with an exchange (shuffle) deeper in the stack [14:39:37] perhaps it needs a checkpoint before adding the column with udf [14:41:15] maybe, i'm curious to see what (if anything) it decides to do during stages 1 and 2 [14:41:43] thinking it will skip stage 1, and write to disk in stage 2 [14:42:50] there's no real work indeed in this job, it's just a select with some decorated columns [14:46:21] according to the thread dumps all the work is from org.apache.jena.query.QueryFactory.create [14:46:33] well, just guessing becase every time i've refreshed its in the same bit [14:46:53] poor mans flame graphs :P [14:52:57] :) [14:53:05] yes that's the sparql parser [14:54:25] so yes most likely when you read.repartition().select().withColumn() it ends up moving the repartition at the very end? [14:55:14] maybe? Which is kinda annoying, i stopped using .coalesce() because spark could decide to move it elsewhere. It seemed like .repartition() was able to explicitly happen exactly where you told it to happen [14:56:20] another thing i'm randomly finding online is that if spark thinks a udf is cheap it might be that the repartition isn't moving, but the optimizer moved the udf execution [14:56:40] some says to use first() to force the shuffle [14:57:23] https://stackoverflow.com/questions/45427906/how-to-force-repartitioning-in-a-spark-dataframe [14:59:22] dcausse: hmm, thats slightly different but maybe similar. Indeed if force spark to materialize the data frame we can ensure it does the work up to that point, but in my experience it doesn't always reuse the materialized dataframe unless we also .cache() it into memory, which is the opposite of what we want to solve the memory issues :P [15:00:01] i wonder how we can see the optimized plan...it must be somewhere but maybe it has to be in code [15:01:52] very curious. It's now onto stage 1, it started more executors, but the thread dumps still show basically the same traces (jena QueryFactory) [15:02:05] too wierd :P [15:03:35] perhaps --executor-cores 1 to see what it does? might still be running only 4 and won't solve the mystery [15:04:11] i mean, if it works maybe it doesn't matter. I just like to understand how things work :P [15:04:24] sure... [15:05:03] feels like withColumn("col", udf) might be something that it considers super cheap and decides to attach it close to the read [15:05:43] it does seem possible, but what pushes back against that is that the query parsing appears to happen both in stage 0 and stage 1 [15:05:52] when it should clearly only happen in one place [15:06:03] oh you mean it runs twice? [15:06:26] stage-0 ran for 40 minutes, had jena in all the thread dumps, now its been on stage-1 for 13 minutes, and jena again is in all the thread dumps [15:06:45] basically: https://yarn.wikimedia.org/proxy/application_1719935448343_342175/executors/threadDump/?executorId=1 [15:06:54] click on any of the 'Executor task launch worker for task ...' lines [15:07:37] before it started stage-1 i was thinking stage-1 would just get skipped, maybe it was rolled into stage-0 somehow since it's all same executor, but now thats it's running stage-1 i have no clue [15:09:24] sigh seems like it forgot the first stage and now wants to re-add this column? [15:10:16] why is it 2 stages btw? [15:10:25] dcausse: each shuffle is a stage [15:11:23] basically my understanding of spark stages is that a single stage is all the work it can do in a single pass without moving data between executors, any time data moves between executors thats a new stage [15:12:23] In Blazegraph, is there a reason why the "extent size" is only 200 MB? What would happen if I set this to, I dunno, 5 GB? [15:12:40] https://lists.apache.org/thread/mtpbmt1b3o3ot2o5ns8hk77xklpns87s [15:14:43] dcausse: curious, that does seem potentially similar. Their solution to df->rdd->df basically breaks the chances for the optimizer to do anything and avoids the problem, would be my guess [15:14:43] hare: I have no clue about the consequences, will check if I see some ref to it in phab or other notes [15:16:49] hare: at a glance I suspect that there'll be a bit more wasted space, not sure if you'll gain much write perf increase [15:19:12] Ah. So I suppose I should tell you what my actual problem is. For certain very large queries, that are not necessarily computationally complex but have very large return sets, I've been getting out of memory errors. [15:19:30] hmm, stage-1 finished in 24 minutes, but had 4x the tasks and 2x the executors of stage-0 that finished in 41 min. Suspiciously like it ran the whole UDF computation twice :S [15:22:39] https://dpaste.org/BEnXB [15:24:41] hare: if the results set is large I'm not sure you can do much [15:24:44] ? [15:24:51] you could use bd:slice [15:24:59] and run multiple queries [15:25:18] Oh interesting. Tell me more about bd:slice [15:27:31] hare: for your query this would look like this: https://w.wiki/AkzE [15:27:47] basically you scan by chunks instead of the whole thing [15:28:57] then you loop over it increasing the offset til you get nothing [15:29:00] Oh that is so cool. So if I do 25000 at a time, the first offset is 0, then 25001, then 50001? [15:29:12] yes [15:29:37] it's like a limit/offset but close to the triple index [15:29:44] And if I have a query that has a return set of less than 25000, having that in the query won't do anything but won't really hurt either [15:29:58] no I don't think so [15:30:17] sole drawback is that it's non-standard at all :( [15:30:42] well, I deploy Blazegraph so that I can be 1:1 compatible with its quirks [15:30:49] sure [15:50:55] ok the job failed again, seems like the partitioning is clearly not doing what I expected... [15:51:11] marking the dag task as failed so that it does not retry this 6 times [15:52:00] going offline, have a nice week-end [15:52:11] hmm, might poke at it a bit. Enjoy your weekend! [16:08:45] heads up, i'll be adding search platform folk to a slack channel on the we.3.1 okr for recommendations. the idea will be to ensure that folks in web, search platform, apps, and research involved in this stuff are able to coordinate discussion [16:32:44] hmm, superset is failing my dashboard update with: 422 Unprocessable Content [16:39:13] even switching browser still fails, going to assume superset just dislikes me :P [18:54:46] oh thats kinda annoying, the default spark settings define iceberg spark session, but they don't include the jar [18:59:45] lol, and the links to 1.2.1 docs referenced in config don't work anymore. But thats not hte funny part, the funny part is trying to find the links on apache.org links you to the wayback machine :P