[09:14:53] gehel: in case you missed it: https://www.wikidata.org/wiki/Wikidata_talk:Events/Data_Modelling_Days_2023#c-Lea_Lacroix_%28WMDE%29-20231121070100-Data_Modelling_Days%3A_important_information_for_speakers [09:15:05] dcausse: thanks! [10:14:02] I wonder what is "snapshot" in https://schema.wikimedia.org/repositories//primary/jsonschema/development/cirrussearch/update_pipeline/update/current.yaml [10:14:35] might be something we copied over from the cirrus_dump hive table [14:21:33] ebernhardson: deployed your patch, ran for a while but hit another problem https://logstash.wikimedia.org/app/discover#/doc/0fade920-6712-11eb-8327-370b46f9e7a5/ecs-k8s-1-1.11.0-6-2023.47?id=Z_E68osBaR9sTI-qriZP [14:25:46] failure on mw this time with "ParserOutput cannot be obtained" https://logstash.wikimedia.org/app/discover#/doc/logstash-*/logstash-deploy-1-7.0.0-1-2023.11.21?id=VElB8osBW1B7XSkmmCib [14:33:53] seems like a hidden revision https://it.wikipedia.org/wiki/Speciale:ApiSandbox#action=query&format=json&prop=revisions&revids=136505069&formatversion=2&rvprop=ids%7Ctimestamp%7Cflags%7Ccomment%7Cuser%7Ccontent&rvslots=main [14:34:25] cirrus should probably check this before blindly requesting the parseroutput? [15:59:15] \o [15:59:30] o/ [16:00:18] huh, don't think i've seen that before [16:00:30] ebernhardson: I have https://gitlab.wikimedia.org/repos/search-platform/cirrus-streaming-updater/-/merge_requests/65 waiting for CI but hopefully this should allow the pipeline to continue [16:01:04] nice! [16:07:06] ci ran out of disk, re-triggered [16:09:30] for checking about missing revision, i suppose it should give a badrevision output, have certainly seen those before requesting rev_id's that have been archived [16:28:39] ryankemper: I'll skip our pairing session once more... [16:28:52] ok [17:06:42] hmm, wonder if we track the request rate out of cirrus updater anywhere [17:06:52] i guess envoy should have it somewhere [17:08:32] ahh, we should have known to handle these: [itwiki_content_1680965136/j7s6UtqNSrODa8-wYznadA][[itwiki_content_1680965136][4]] ElasticsearchException[Elasticsearch exception [type=document_missing_exception, reason=[_doc][9952017]: document missing]] [17:09:19] historically we ignored all of those, it's a non-upsert request to fail like that so likely a redirect or weighted tags [17:12:17] hmm, it's a redirect to a new page [17:16:51] * ebernhardson is not yet seeing anywhere we do anything with the elasticsearch response [17:18:25] curiously the flink docs for elasticsearch sink call out that requests may fail due to malformed docuemnts to be indexed, but then only have examples for setting a retry backoff/limit [17:24:22] maybe a custom BulkProcessorBuilderFactory could achieve the desired effect, but seems awkward [17:30:48] curiously, in old versions of flink "he Flink Elasticsearch Sink allows the user to specify how request failures are handled, by simply implementing an ActionRequestFailureHandler and providing it to the constructor". But in the current version "The Flink Elasticsearch Sink allows the user to retry requests by specifying a backoff-policy." [17:32:22] * ebernhardson apparently needs to go spelunking through flink git history [17:33:41] ebernhardson: o/ i did not realize you all were considering using the flink elasticsearch sink [17:34:16] ottomata: yea, letting it handle the integration with flink's checkpointing which (in theory) gives at-least-once guarantees [17:34:33] i thought there was some reason you all didn't 'like it [17:34:55] i'm asking...because we have really wanted to generalize event platform -> sink connectors if we can. https://phabricator.wikimedia.org/T214430 [17:35:15] hmm, not that I remember :) but i wasn't helping write the initial bits here so i could have forgotten [17:36:27] tchin was trying some stuff here [17:36:28] https://gitlab.wikimedia.org/tchin/generic-flink-source-to-sink [17:37:17] certainly a general (perhaps registry?) set of solutions could be useful. I always worry a bit about how special cases get handled, like the error responses i'm currently trying to solve. Usually the more general something gets the harder it is to fix when things are wrong [17:37:23] yeah [17:37:51] we never got into the weeds, but so much of the sink part of pipelines are repetetive. [17:37:59] we want it to work kind of how it works for the hive tables (but streaming). [17:38:30] we've got the schemas, we can map that (with config) to the sink schema, and we've got a generic connector framework with flink... [17:39:02] certainly there are most of the bits available at least :) On our side setting up read/write from kafka doesn't seem that bulky (w/ the wmf-flink stuff) [17:39:20] at least, compared to the bulkiness of other things in flink like setting up our async fetch operator [17:39:33] right cuz we did that work. we never priortizied the connector stuff beyond the prototype for cassandra. [17:39:42] we'd love to abstract as much of the input/output boilerplate as we can [17:39:48] the inside of your pipeline is all yours ;) [17:40:03] ebernhardson: do you have a task? [17:40:22] i'd like to make sure this use case is understood by event platform folks. [17:41:19] ottomata: not specifically, i've been calling it "integration hell" under the umbrealla of a "run cirrus updater in staging" ticket. It seems each day i deploy a fix for an error, then spend 5-8 hours investigating and developing a fix for the next error it spits out [17:41:32] nasty [17:41:40] this is for the new search update pipeline, ya? [17:41:42] ya [17:42:28] it's getting further along at least, the errors are now in the sink which is at the end :) [17:42:47] which is the part ideallly event platform would own :/ [17:43:27] i don't know that you would handle these kinds of cases though, in this case we receive an event that says to update some page, but that page doesn't exist, and we specifically told elasticsearch to not upsert the request, so it fails [17:44:03] hm, oh you are using the event as a notification to tell ES to index somethign external? [17:44:09] not as the state of the update? [17:44:57] well, the event says `page A was redirected to page B`, so we have a tiny update to page B to inject the redirect. We only allow upserts for full-updates, not partials [17:45:24] oh the page doesn't exist in es..i see [17:45:35] i thoguht you mean it didn't exist in MW, so was wondering what that had to do with the sink [17:45:53] can be for a variety of reasons, maybe the page was created around the same time and they aren't in sequence, maybe the page was deleted, etc. [17:46:03] or maybe the index is out of sync, can happen too [17:47:56] we also get failures for things that don't exist in mediawiki, those go into a failures output topic :) [17:48:30] right but that's not in the sink :) [17:48:38] indeed [17:54:33] ebernhardson: the ElasticSearch table connector has as custom class name: for failure handling with a ActionRequestFailureHandler subclass. [17:54:33] failure-handler optoin? [17:54:37] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/elasticsearch/ [17:54:54] ottomata: ActionRequestFailureHandler is deprecated, right now i'm trying to find the commit that deprecated it to understand what the replacement is [17:55:01] ! [17:55:42] right you said that, i see that makes more sense, i'd expect the table connector to have simliar functionality as the datastream one, [17:55:47] weird [17:55:57] curious to know what you find... :) [17:56:00] yea i'm guessing there is some more generic functionality that isn't specific to the elasticsearch sink perhaps? But not clear what yet [17:59:52] commits not particularly helpful :P entire commit message is "Deprecate old (pre FLIP-143) Elasticsearch connector" [18:00:11] which is the unified sink api, so probably something there [18:26:43] since the deprecation patch had nothing useful dug through the patch for implementing the new sink, sadly while CR talked about numerous things, no mention of error handling responses. https://github.com/apache/flink/pull/17363 [18:27:28] pretty sure i can wedge something in there, but doesn't look like there is a blessed way [18:41:35] seems like you can pass a BulkProcessorBuilderFactory but yeah... not ideal [18:47:18] dcausse: indeed thats ehere i'm at, writing a custom Elasticsearch7SinkBuilder and wrapping the call to BulkProcesser.Builder.apply so that we can wrap the BulkProcessor.Listener [18:47:47] yes :( [19:02:13] * ebernhardson is not finding how to send these things to a side output though...might have to settle for log messages [19:04:07] it is possible, there is a Context object somewhere i could inject them into, but not sure how to get it :P [19:11:36] yea probably not worth it, the context is specialized to the event, so we would have to maintain some sort of mapping or way to get from the error back to the original event and find it's context [19:30:19] ebernhardson: gonna go hit the gym so won't be at pairing. Anyway, I'm circling back to looking at merging https://gerrit.wikimedia.org/r/c/operations/puppet/+/830240, when you get a chance can I get a +1 if all looks good [19:32:47] kk [20:21:17] Merged https://gerrit.wikimedia.org/r/c/operations/puppet/+/830240 [21:16:02] meh, noticed that the test i added for document missing doesn't work. We weren't actually consuming all the events we put into kafka, so when i added an event to the end and incremented the end position, it didn't read in my new event [21:16:48] now having it properly parse that, somehow wiremock only replaces one of the two ${{request.query.revids}} templates in the response body which is surprising [21:37:09] oh, so thats fun. the wiremock HandlebarsOptimizedTemplate optimizes the template by extracting everything before the first '{{' and after the last '}}', only passing the middle bit to the templating engine. But it doesn't ask handlebars what the delimiters are, it just uses the defaults. And in our wiremock integration we set a custom start delimiter :P [21:44:05] * ebernhardson sets the delimiter to '{{$' instead of '${{' so it works within these constraints :P [23:21:40] how weird...in the request/response objects passed to BulkProcessor.Listener request.requests has a size of 7, but response.responses only have 5 items. Where did the two failures go :P [23:21:50] * ebernhardson suspects captured by retries? unclear [23:32:45] * ebernhardson is dumb and didn't actually manage to get it to talk to elasticsearch, those were fixtures :P