[07:20:27] errand [10:05:18] lunch [13:08:46] o/ [15:04:06] \o [15:08:31] o/ [15:18:45] feels like if I do .filter(...).coaelesce(1), filtering is actually done after the repartitionning :/ [15:21:02] yes, that's the problem with coalesce, it goes backwards in the graph to the last thing that repartitioned [15:21:52] also avro data is now available, at ebernhardson.cirrus2hive_v4. Testing it against your notebook now. It almost works with the default memory settings (but not quite, trying now with 768M of memory overhead) [15:22:09] well, the first major aggregation anyways, haven't gotten far :P [15:22:14] ebernhardson: ho thanks! [15:22:33] I can try to use it now [15:23:51] also have a list of things we might want to fix in cirrus. like file_text can be false or an empty array. cirrus records empty labels/descriptions as [] instead of {}, defaultsort can be false. and descriptions is map on wikidata, but map> on commons. should probably standardize? [15:24:21] yes these are not great indeed :/ [15:27:26] will poke cirrus a bit, i imagine those will also make the streaming updater more difficult since it's going to want to load the json into some plain objects and not super-generic structures i imagine [15:28:39] curiously my run of your first-stage parquet output has finished all the partitions, but took a long time to return to the command line. But it finished using all default options except 768M memory overhead without killing anything and the UI doesn't show excessive GC anywhere [15:28:48] yes only the "core" fields will be promoted as first class citizen, rest has to be in a kind of Map ... [15:29:25] ebernhardson: how many executors? [15:29:49] dcausse: wasn't limited, so it used 3k :P [15:29:53] ah :) [15:29:57] well, the top executor id is 3k [15:30:12] 3.5hr of task time for the first output step [15:30:17] I started to have issues once I limited the number of executors tho [15:30:34] 3.5hr with parquet or avro? [15:30:37] avro [15:30:43] :/ [15:31:04] but split across many executors, it had to read in ~3TB of data from disk [15:31:55] you mean to run the first cell that writes "size_stats.parquet" ? [15:32:00] yea [15:32:33] hm... I'm running it and it's already at 7381/27127 [15:32:34] hmm, curiously the executors page of ui says 3.5h of task time, but the stage ui says 81.2h across all tasks [15:33:03] yea it only took 15min or so of real time [15:33:16] ah sorry [15:33:42] I'm running with 30 executors [15:33:53] oh wow, and it's still going quickly! good sign :) [15:34:42] avro does seem to be working a bit better for this use case, should move forward with that [15:35:00] sure [15:36:27] i just realized the executors ui is simply wrong, it says 3.5 hr but it only aggregated up the first 100 used executors and not all of them, so 3.5hr was probably totally wrong. The 81h from stages page is probably correct. [15:37:28] although yours is halfway done with only 13h used. shrug mysterious :P [15:41:38] hm... wondering if the data is skewed: https://yarn.wikimedia.org/proxy/application_1663082229270_203300/executors/ [15:41:57] only 5 tasks running from the same executor [15:43:18] it is skewed, the partitions are using string ranges of id's, but the largest partition should only be ~280M and 500k rows [15:45:08] it's curious your at 14k complete out of 27k, but it's only running 5. I suspect the problem is it's waiting to be able to run things node_local? [15:45:28] now it's unblocked [15:45:31] that's where it sees it has the data and an executor on the same machine, so it prefers to run it there instead of transfering the data around the network [15:47:24] yes not sure I fully understand all this :/ [15:49:07] I thought that this first part would be pretty simple: a single stage reading all the data (the only thing it does is on a per row basis: counting the number bytes) and then write to parquet [15:51:10] there are perhaps other limiting factors, yarn not allowing spark to take the 4 cores I asked on a particular executor? [15:52:14] i don't fully either :P But spark has a few locality levels, and it will delay tasks hoping to run them at the preferred locality. It will wait around so that an executor can read the data from it's own hard drives (hdfs blocks on the instance), once that times out it will wait to read the data from the same rack as the hdfs blocks, finally it will run it from anywhere in the cluster. [15:52:16] The docs aren't particularly clear, the wait is supposed to only be 3-seconds per level, but it seems like that wait gets reset each time a new task is started on a node [15:53:28] and your 75th percentile task is only 5s, and running multiple tasks per executor so it would get reset often (but the docs don't clearly say it reests...that's just what it looks like to me from past investigation) [15:53:36] s/reests/resets/ [15:54:49] cpu's shouldn't be a concern, when yarn starts the instance it allocates the cpu's to your instance. I suppose in the past it didn't look like yarn actually monitors cpu usage though and simply trusts things to behave (i accidently ran mjolnir things without limiting xgboost parallelization and it never killed things for that) [15:55:08] there allocate only means "how many cpus it thinks are available to be allocated elsewhere" [15:55:55] ok [15:56:46] datastax docs for spark.locality.wait mention one problem scenario, sounds same: Despite jobs processing small amounts of data, it is still taking too long since only 1 or 2 tasks are running on data-local nodes with the rest of the cluster mostly idle. [15:57:26] seems like what I'm seeing [15:58:36] can set spark.locality.wait=1s, or use more executors with less cpu's per executor to get the job spread around the cluster. with only 30 executors and 91 nodes in the cluster you don't have great odds of having local data with multiple options [16:00:02] if it's reset per executor (not per task) that's hard to tune [16:00:42] ok [16:01:23] will try with 90execs*1core next [16:03:28] it seems from a quick review of spark that it has a set of pending tasks for each node, as long as something is poped from that set of tasks the rest of them will not consider switching to a more generic locality level [16:03:40] within the wait time [16:07:42] i suppose i can also see about creating less partitions to start with so it doesn't have such small things to work through as well, the median partition is only 100MB/13k rows. I suppose i divided them up so much because i was expecting the row count to be pretty skewed, with id's prefixed 100-109 getting many more rows than 990-999 [16:09:36] similar issue here, noone seems to know why exactly it happens :P https://stackoverflow.com/questions/55245841/why-isnt-a-very-big-spark-stage-using-all-available-executors/55246153#55246153 [16:10:27] but turning off locality works to fix. mjolnir's query explorer which reads a ton of small partitions turns it off completely [16:11:00] interesting [16:11:13] spark.locality.wait: 0s is how to turn it off? [16:11:27] yea, mjolnir sets it to 0 [16:12:04] err, i guess the tasks says query explorer, it's the mjolnir stage that does feature selection [16:12:05] it'd be nice to be able to set it per stage [16:12:13] I mean per "call" [16:12:25] there are lots of times i wish i could set spark things on a particular stage or transformation. sadly spark disagrees :( [16:13:04] thats partly how mjolnir got split into so many individual jobs, so i could set particular things [16:13:15] I see [17:11:45] lunch/errands, back in ~1h [17:14:39] dinner [17:59:01] back [18:08:50] inflatador: ebernhardson: can’t make pairing today, have a bit of a headache so gonna go lie down for a bit [18:10:10] ryankemper ACK, hope ya feel better soon! [18:22:11] ebernhardson gehel did y'all have anything planned for the pairing? We can skip if not, I'm just reading flink stuff ATM [18:22:47] i don't have anything in particular, although in sprint planning yesterday we were wondering about https://phabricator.wikimedia.org/T316236 [18:22:57] inflatador: my plan is being on vacation ;) [18:23:07] oops, sorry to bug gehel ;( [18:23:44] No problem! [18:23:58] ebernhardson ACK, will look at this ticket and get back to you [19:41:36] inflatador: wrt https://phabricator.wikimedia.org/T316236, looks like https://gerrit.wikimedia.org/r/c/operations/puppet/+/835596/ is mostly ready but needs the changes in `modules/profile/manifests/dumps/distribution/nfs.pp` to instead be made in `modules/profile/manifests/wmcs/nfs/ferm.pp` [19:42:07] as for actually merging it once that change is made, probably a good time would be thursday's puppet window since we'll probably want ebernhardson around [19:52:32] thanks ryankemper , was just about to write an email about that. Have we settled on NFS? https://gerrit.wikimedia.org/r/c/operations/puppet/+/832543/11#message-2156107122bb55be98ee3c5486fd4e4cebd717a1 mentioned using YARN/swift although it's an older comment than the PRs [19:53:38] we also got an email saying that labstore will be decomm'd soon, guessing we want to use clouddumps (which is also part of the puppet config) instead? [20:08:29] inflatador: yeah I think we're pretty settled on NFS. wrt labstore I haven't seen that email, what's its title? [20:09:45] ryankemper it's titled "Stray entries in /etc/fstab on a couple of hosts", went to wdqs admins disto list [20:09:49] err...distro list [20:11:52] just fwded [20:12:47] inflatador: I found it, thanks [20:13:28] Looks like` labstore100[45]` are the active ones. A bit confused why `labstore100[67]` are the decom'd ones though since those are sequentially later [20:15:26] Ah I see, 45 are `eqiad.wmnet` whereas 67 are `.wikimedia.org` [20:15:38] Yeah we'll need to figure out what to do about that. For now I changed the `Hosts` in our patch to `labstore1004.eqiad.wmnet` instead of `labstore1007.wikimedia.org` and will see what pcc says [20:15:58] And yeah we'll want to do the manual cleanup on wcqs at some point this week, maybe thurs pairing [20:16:17] yeah, that's fine. I can review the current PR now [20:48:45] meh, turns out in python mocking when it records an invocation it takes a typical reference. Which means if you mutate the object after the call the recorded call no longer has the right call args :(