Advanced Kapacitor Training: User Defined Functions (UDFs)
The User Defined Functions (UDFs) are a powerful functionality of Kapacitor—in this session you will learn how to build your own into your InfluxData solution.
Watch the Webinar
Watch the webinar “Advanced Kapacitor Training: User Defined Functions (UDFs)” by clicking on the download button on the right. This will open the recording.
Here is an unedited transcript of the webinar “Advanced Kapacitor Training: User Defined Functions (UDFs).” This is provided for those who prefer to read than watch the webinar. Please note that the transcript is raw. We apologize for any transcribing errors.
• Michael DeSa: Software Engineer, InfluxData
Michael DeSa 00:00.593 Perfect. Hopefully, everybody can see the Advanced Topics: Kapacitor, or the top-level slide there. And so what we’re going to be doing today is talking about user-defined functions. Thank you, Chris, for the introduction. It’s great being put on such a high pedestal. Hopefully, I can live up to that. So by the end of this section, participants should be able to describe what a user-defined function is or what user-defined functions are and how to use them. We’re going to go through the process of configuring a UDF in Kapacitor, so a little bit of configuration that goes on in the process of that. We’re going to also show what using that UDF that you just created looks like, and then we’re going to talk about the Kapacitor to UDF interface. So there’s a number of functions that you have to implement in order to really get your UDF to function correctly with Kapacitor, and so we’re going to go through and show kind of examples of that. That being said, the examples we’ll be showing will be in Python. So hopefully, you have at least some familiarity with Python or another programming language. It’ll sort of make the process a little bit easier.
Michael DeSa 01:21.454 So user-defined functions. A user-defined function is a function that you can plug in to Kapacitor using your own algorithm or whatever you want to do and it’ll run in its own process and it’ll communicate with Kapacitor through a defined protocol. At the moment, this communication happens either over standard-in, standard-out or on a UNIX socket. And for more details about that, I’ll have some notes at the end that have detailed examples of doing each one. So off to the right here, we have an example of a TICKscript and essentially, what this TICKscript is doing is streaming data from the measurement CPU and pulling data from a particular tag that is equal to CPU total. The next step there is actually a UDF. That UDF is called moving average. So this is actually a function that does not exist in Kapacitor. This is something that we’ve written external that Kapacitor is aware of and knows how to work with. And so the property methods that we see there, predominately field size and average, all of these things get encompassed in this UDF definition. And so for a long period of time, moving average wasn’t a part of the InfluxDB implementation or possibly—it is now part of the implementation but possibly now, you slightly disagree with that moving average. You’d like your moving average to be left-handed versus right-handed. Using UDFs is kind of the key way that you would want to use UDFs. And that at symbol in front of it there denotes that it is indeed a UDF.
Michael DeSa 03:14.544 So one of the questions that I get asked fairly frequently is, “Well, how do I know that I need a UDF?” And there’s kind of three things in my mind that are three sort of ideal use cases or three cases where you need a UDF. So the first one there is I need some external information that I want to add on to my alerts that isn’t actually available in the data that Kapacitor is processing. So maybe I’m doing something like I’m sending IDs down in my points. I tag the data that’s coming through with an ID but I actually want to go look up the value of that ID and say, get somebody’s name or somebody’s phone number or any number of things. And I want to attach that into the alert that comes out. So you can do this using Kapacitor or using a UDF in particular. You would sort of capture the data, make a request off to some external service, and then you could take that data, incorporate it back into your data that’s going through Kapacitor, pass the results back on to Kapacitor, and do any kind of alerting or anything of that sort that you may want to.
Michael DeSa 04:33.150 Another use case is if I have some kind of custom function, I want to do some kind of custom anomaly detection, or I want to run my own custom statistics function, or I want to do kurtosis or any number of things. And Kapacitor actually can’t do that currently. And so I need a way to implement that logic in my Kapacitor script. It’s crucial to what I’m doing. And then sort of pass that onward to the rest of Kapacitor. So this is another super common use case that we see. Another way that we kind of see UDFs being used is things that shouldn’t necessarily belong inside of Kapacitor. Maybe it’s a little bit too specific, kind of like plugins, so to speak. And so I’ll have an example of an anomaly detection plugin that we have that it’s external to Kapacitor but is still very useful. And the final point here, it’s kind of similar to the first two, but I would argue slightly different, is if I want to write data to Kapacitor, I want to write Kapacitor data to an external service that Kapacitor doesn’t currently support. So I want to open up a TCP connection and write some data somewhere. Or I want to write the results to my Postgres database or any number of things. These are things that Kapacitor doesn’t have built into them. And so I can take the data that is coming through my TICKscript or my task and then write the results onto that external services. So these are kind of the verticals that I think about whenever I’m trying to determine if somebody needs a UDF. And kind of this is the process that you can go through yourself.
Michael DeSa 07:40.968 So we’re going to go through each of these steps and I’m going to talk about the various methods. And we’ll see what implementing a UDF actually looks like. That being said, the whole UDF, there’s often more logic than we’ll cover and so, at the end, we’ll provide a number of examples that you can kind of go through yourself with a bit more time and a little bit more detail to really drill down what’s going on. So, as I mentioned briefly, a UDF agent is needed in order for Kapacitor to be built to interact with a language. Any programming language that supports encoding and decoding protocol buffers, which is basically all of them, can be used to create a UDF agent. There’s more details on this, and I’ll have a final slide, again, that sort of describes the process for creating an agent. It’s not too much work, but it does require a little bit of effort, and currently the two InfluxData-supported UDF agents are Go and Python. We’re always happy to add more and happily accept any PRs from the community. Go and Python just happen to be places that we’re a little bit more comfortable than in other languages. In the examples coming after here, we’ll be using Python just because I feel like it’s a good base language for most people.
Michael DeSa 09:10.312 So to implement the UDF handler interface, there’s a number of methods. For the most part, implementing the interface should be very, very easy. You just need to define the methods. The only thing that should be actually difficult is doing what you want to do with the UDF. And that’s kind of the whole point here, is we have a simple interface that you implement, just we take all the complexity out of having to implement the interface and put that into the logic of actually implementing what you want to do. So the first method that needs to be implemented in the interface is the info method. So this method is called when Kapacitor is started, so when the Kapacitor process or Kapacitor daemon is brought online, it calls this UDF info method. And this info method gives Kapacitor the information on how it should parse the dot methods or the property methods associated with UDF. So when I go to define my task, I know exactly which properties I should expect, and if those aren’t there or if there’s too many of them there, I can do the proper panicking or throw the proper error. You also specify whether the type of data it accepts is a stream or a batch. The terminology that we use is you specify the type of data that it wants and that it provides. So provides is what it outputs, wants is what it consumes.
Michael DeSa 10:44.283 The next method beneath that is the init method. So the init method is run when the task executing the UDF actually starts. So it receives a list of the options that were pulled from the TICKscript. So the info method, we specify which property methods or which properties there are, and with the init method, we actually pull those properties from the TICKscript and sort of place them into the context of our UDF. These are all a little bit abstract. We’ll go through and see examples of each of them as we continue. Beneath that are the begin batch and end batch methods. So they’re used when a UDF wants a batch of data. That means consumes batches of data. And they’re used to delimit the beginning and the end of a batch. And so you can kind of think of it as, “I receive a new batch. I got to set a bunch of context up. I got to clear the current state and get ready to start consuming points.” And so that’s kind of the way that you think about begin versus end batch. And we’ll see in more detail what that looks like. In a lot of cases, if I’m just writing a streaming UDF, meaning my task both wants and provides a stream, I actually will not need to implement these methods at all. I’ll simply raise an exception if you ever receive anything like this.
Michael DeSa 12:23.404 Beneath that, we have the point method. So the point method is used for receiving points including batch points. So if it’s a stream, I’ll just be constantly receiving a stream of points. If it’s a batch, I’ll be dealing with the data between the two begin and end batch delimiters. And so you can kind of think of it as an iterator, so to speak, where begin batch notes the beginning of a sort of a sequence and the end batch notes the end of a sequence. But the point method will be called for every single point that comes through in that sequence there. And for streaming, there’s no begin and end. It kind of just constantly flows through. And then the final things down beneath here are the snapshot and restore. These are very, very rarely used. The main use case is if your UDF has some state associated with it, and that you want Kapacitor to store that state. So you can take a snapshot or restore that state just so if the UDF dies or the process dies or anything happens, you don’t necessarily lose that data, or if you shut down Kapacitor, that state will still be saved. And so typically, whenever I’m implementing a UDF, I try to make them as stateless as possible. But every now and then, that’s not necessarily achievable. And we’ll see what these various methods look like coming up soon.
Michael DeSa 14:03.002 So the first thing that we talked about was that info method. So off to the right over here, we have the moving average, is the name of the UDF that we’ve created. And as we can see, it has three property methods. It has field, size, and as. So we want to express in our info method that this is the type of data that—or these are the types of properties that this UDF will accept. And so to do that, we go to the thing down below there and we can see where we’re setting these options. So response.info.options field and we say we want the field to be able to accept a string. We want size to be able to accept an int. Then we want as to be able to accept a string. And so we specify the types of values that the property can accept, as well as their names. On top of that, there’s also two additional things here which are the wants and provides. In this case, if we think about how moving averages work, moving averages consume a stream of data and they output also a stream of data. They output individual points, not batches points. So we know that in this case, we want and we provide a stream. So this is just kind of setting the context again for the UDF itself. So it just lets Kapacitor know, “Hey, you should keep an eye out for the things that hook into this UDF are streams and it outputs a stream. And then the three property methods it has are field, size, and as.”
Michael DeSa 16:08.659 The next thing here that we have is that init method. So as mentioned, that init method is called whenever the task containing the UDF starts executing. And so if we take a look at that down to the left over here, the moving average UDF that we have, we have property methods, field, size, and as. And if you’ll note that their values usage idle 10 and average are values that we want to pull from this UDF or from this task and put into our UDF handler. And so to do that, we have our init method and sort of typical Python style. This is actually defining a class. We take in self and it consumes what is called an init request. And then what we’re doing here is we iterate through all of the various different options and we set the associated values. So we grab the field, we grab the size, and we grab the as for each of those values. And then the next step here is if any of those are empty and we expected them not to be, or if they’re required, we can raise any errors that may be necessary here. So again, the high-level idea here is the task starts. We call this init method. The init method takes the data from the task, pulls the associated actual values off of the task, off of our UDF, and then sets them on the object where our handler is—interface is being implemented.
Michael DeSa 18:00.308 So the next example here is for begin batch. I should mention that we’re using a different UDF in this case just because in moving average, it both wants and provides a stream, and so there is no batching that is required here. And so I’ve pulled the outlier UDF to show what a batch looks like, or what begin batch method looks like. Ao again, we can see off to the right here. We say, “Def begin batch.” That takes in self. It’s defined in a class above it and takes in a begin request. And the first thing it does is it resets the state. So if you remember I was saying earlier, “This begin batch denotes its delimiting that the points to come are going to be batches of points rather than a stream of points.” And so batches of points must be treated differently from streams of points and so we want to reset the state whenever the batch is sort of coming through. And what we do is we generate a response and then we just copy all of the associated things from the begin batch. This is mostly just kind of bookkeeping to make sure that we keep all of the data around. And then we set the begin response as response. Just keeping some extra sort of bookkeeping around that we’d probably use later in the method.
Michael DeSa 19:36.213 Similarly, we’ve used the outliers and batch method. So as begin batch denotes the beginning delimiter and batch denotes the ending delimiter, and so in this case what we do is when we receive a—once we know that we’ve received all of the points by—that’s signified by hitting an end batch. We call the method self.state.outliers which will actually compute what the outliers are. And so as you can imagine for the point method I think I have coming up just next. That point method is really going to be there consuming all of the points and bundling and caching kind of all the points together, waiting for the receipt of that end batch so that it can actually compute which values were the outliers. And then the rest of the things here are just kind of bookkeeping for communicating data back to Kapacitor. So we do things like we get the length of the outliers and we start writing the response just to let it know how large the request would be.
Michael DeSa 20:48.427 So as mentioned, the point method is really where the logic for streaming methods take place and usually the gathering of points take place for batch methods. So I’m going to first talk about the moving average and what that kind of looks like and how you think about that. And so this initial section, we have def point, self point which is a protocol upper point. And, well, first thing we do is some bookkeeping. We generate a response, we copy all the data, and we clear any associated kind of fields. And then what we do is we add the point to—trying to get a little bit of a grasp, been some time since I’ve looked at it. If point is not in group, add self.state. Then we update the state. If point, add group. Oh, so this point.group here, I understand what’s going on. So higher up in your task, there’s a concept of a group, and this is the result of, say, grouping by a tag, or by a particular time bucket. Each of these things will have a group associated with them. And we want to track those groups in the UDF so that the correct data is processed together. So if I group by host, and I have host A and host B, host A and host B will have different moving averages. And so we want to make sure that those are respected correctly in the UDF.
Michael DeSa 22:36.402 So that’s what that—if point.group, not in self.state, so checking if the group is in the state. If it isn’t, then we want to add it to the state. And then otherwise, what we’re going to do is just compute whatever the current moving average is, add the point to the state, and then kind of set the resulting field on the data as it goes through, and then write the response back to Kapacitor. And so as you can see, this is really sort of all the sort of handler logic for actually interacting or updating the Kapacitor data as it goes through. An important thing to note here is there’s kind of this underscore state, this private state, that we’re updating in the background. And that’s kind of—if you’re thinking about this in some way, you think about anything in the sort of space of—kind of like [inaudible] kind of thing. The handler methods here would be thought of as kind of controller methods. They usually have some kind of model in the background that they’re interacting with. And so the handlers here are responsible for moving things around or sort of—I think that’s what I’m trying to say.
Michael DeSa 24:15.634 And so the next point here is the points for outliers. We can see that the outliers batch method is actually drastically simpler than the streaming method. And the reason for that is a lot of what I was saying is the point method in the case of outliers is really just kind of consuming and bundling together points, and waiting until it receives that end batch call before it actually goes off end and does any sort of processing. And so there’s actually a question in chat right now, is where do you find the list of available functions in the request response and begin end batch? So I think the question is related to things like the UDF_PB2 response and whatnot, and sort of copying the various things from there. All of that is really coming from the handler, the agents. And so if you go look in the Kapacitor repo under UDF agent, there’s a list of all the methods that are required to implement an agent. We should probably have another thing about what implementing an agent looks like. And again, at the end of all this, I have some links that will kind of give you a little bit better of an idea of what functions are available. So they’re actually not constrained. I should mention the begin end batch. The copy from those things are not constrained to the particular methods. They’re actually methods in a package, the UDF_PB2 package predominantly.
Michael DeSa 26:09.163 So I’ve passed over the snapshot just for the most part of this since I really don’t use it too frequently. And we’re going to move on to actually using the UDF. So how do I actually do that? And so we’re going to go through the process of configuring the UDF. We’re going to create a TICKscript that uses the UDF. And then we’re going to create a task from that TICKscript. And then we’re going to enable that task. And so to configure the UDF, we have to go into the Kapacitor configuration file. It’s on the lower section of this slide here. And I go to the UDF section. And then I go into the udf.functions section. And then I go to the udf.functions.moving_average section. And that moving average section is something that I’m defining. And the reason why I’ve made this moving average in bold here and then made the bold thing over there as well is both of those—the name that I put in my configuration file is actually the name that I will use in my TICKscript to specify a specific UDF. And so on top of that, you just need to give it a program so that program can be, in the case of an interpreted language like Python, it’ll be the path to your Python2 on your machine. And then the args will specify the path to the Python code, the moving average code that was actually written. This timeout here is just how long it should wait before that Python code will respond. And then the environment here is setting the agent environments, make sure that the Python agent is properly scoped.
Michael DeSa 28:07.296 Once we’ve done that, we simply close out that config file and then start Kapacitor by kapacitord-config with the Kapacitor toml. And then everything should work from there. So one question that I have in the Q&A currently is what happens if a stream-consuming UDF is defined in a batch TICKscript? Kapacitor will actually reject that task from being created and just say that these two types of things are incompatible. So the UDF, I should mention here that it doesn’t matter—there’s the top-level node which is—it’s either a batch task or a stream task. But within any particular task, there are certain methods that will either consume batches and output streams, or consume streams and output batches. So a good example of this is the window node. So the window node consumes a window of stream data, and then it groups all that together and then yields or provides a batch of data. So long as I’ve hooked together all of my Kapacitor and my UDF, all of my nodes and my TICKscript, everything should kind of flow through. And if you do not, then Kapacitor will reject that before it even accepts the task, when it’s constructing the UDF. And yeah.
Michael DeSa 29:45.537 So another question is you can register UDF as a child process or as a separate process. When a separate process, it can only register as a UNIX domain socket. Are you planning to offer support for TCP sockets instead? Yes. There is at least tentative plans to offer it as a TCP socket instead. So continuing on with the examples here. Once we’ve configured the UDF, we can then create a TICKscript that actually uses that UDF. So here I just have stream from the measurement CPU where lambda of CPU is CPU total. So grab the tag with CPU total, and then compute the moving average. On the field, use the title. Make that moving average size of 10. So there’s 10 points in the moving average, as average, and then write that resulting data back out to my database, myDB, the retention policy autogen, and the measurement CPU_average. And throw a tag on there that says that this was processed by Kapacitor. So once I’ve had that, I’ve created the task—I want to actually create the task. I’ve made the TICKscripts and so I say Kapacitor to find CPU average is a stream task. It takes the TICKscript CPU_average.tick, and it operates on the database retention policy telegraf.autogen. And then I say Kapacitor enable CPU average, and then that task should indeed be running. And then we can verify that that data is running by issuing an Influx. Use myDB, select star from CPU average. And then we should see data as it came through in that stream.
Michael DeSa 31:36.631 So as I mentioned kind of throughout this whole sequence here, there’s a number of additional resources to learn more about user-defined functions, and so I would recommend the following things. In our documentation, we have a section on custom anomaly detection. So you take your own custom algorithm, plug it into a scenario, into Kapacitor. So we got some documentation there. Then we can write a UDF-based socket. So if you want to do things based off of sockets instead of as a child process, you can do that there as well. Beneath that is an example of a very complex UDF that is maintained by Nathaniel. He’s one of my coworkers that works on the Kapacitor project, and he wrote a very flexible anomaly detection package, algorithms package, that can be used with Kapacitor as a UDF. It’s called Morgoth. If you want to do any kind of anomaly detection, I would highly recommend checking that out. And then beneath that, we have a couple examples just of UDF agents. So the documentation, all that you need to implement an agent, as well as examples of some other UDFs. And so I would recommend giving those resources a look.
Michael DeSa 33:01.615 And so I’ll sort of start answering some questions off in the chat to the side over there, and I’m going to leave these additional resources up. And so I’m going to start looking through chat. So to Tunde, I could actually not see—only thing I saw was, can you see my chat? That’s all I saw. I didn’t see anything else. Next question I have in chat is, “I have a UDF with a batch input with a TICKscript defining a period of 30 seconds and an every of 30 seconds. The output is a stream. What is happening when I write the response to an InfluxDB? It skips records when you define the input in TICKscript as created in every but normal output works fine.” I’d have to see specifically what you were doing. So feel free to either send me an email or point me at a GitHub repository. I’d have to do a bit more debugging. Or if you do believe that this is genuinely an issue, open up an issue on the Kapacitor repository and we can get that properly prioritized.
Michael DeSa 34:20.427 So the next question that I have here is, “Supposedly my UDF determines which Slack ID, email ID to send alert. How can I pass this info to an alert node?” So this is a great question. There’s a little bit of complications that come up with that in an alert node. We’ve got an open issue for that in our sort of—what’s the word that I’m looking for? It’s a Kanban board where you’ll be able to evaluate that data to ping a specific person. But yeah, so that’s coming. “Can you see my chat in question?” I could not see it. “Are you sending out this video presentation?” Yes. This video and presentation will be sent out at the end.
Michael DeSa 35:29.257 Happy to answer any additional questions that come up.
Michael DeSa 35:50.962 “Regarding subscriptions, is there any plan to support WHERE clauses for subscriptions?” At the moment, no. But that is something that has been requested. And it is something that I have also wanted. The way that subscriptions currently work is the data literally gets whatever write comes into InfluxDB as it’s structured, literally gets proxied or duplicated onto Kapacitor. So implementing a WHERE clause in there would be a little bit more work. I’ve heard a number of people that have wanted something like that, myself included, and it will probably end up happening, just the specific time as to when I’m not sure.
Michael DeSa 36:55.289 So the next question is, “What does Kapacitor do if a UDF is overwhelmed with the number of alerts that come in? What if it cannot keep up?” So kind of depends, but more likely than not, it will start dropping data or it’ll just sort of process it whenever it gets to it. It’s a common thing that does happen—is the UDF does a lot more load and so when working with that you should design your UDF in such a way—if you do know that it’s going to be under high load, you want to design your UDF in such a way that this won’t cause issues for—the UDF being overwhelmed will not cause issues for Kapacitor. But it’ll just kind of back up the rest of the Kapacitor pipeline. The next question is, “There’s no plugin for Splunk in the list of input plugins. Why?” No particular reason. Just hasn’t been a request that comes across. If you open up a feature request or sort of contact our product management team, we can get that sort of thing prioritized. If this is something that’s especially important to you, we can maybe work something out on a professional services basis where we would implement it for you.
Michael DeSa 38:26.471 So we’ve got another question here is, “New to Kapacitor, suppose I want to write a UDF that needs to update the current point or add an extra field. Can that be done or should it be streaming to a new measure?” No, this is actually spot on. That can be done and that’s a perfectly reasonable use case, or perfectly common use case for UDF. I want to add on some additional information to a point that’s coming through. That is something that we’ve designed it to do. So there is another question in the chat currently which is, “Can Kapacitor be distributed to distribute the load introduced by my UDF?” So there is a clustered version of Kapacitor that is currently being worked on that is a closed-source offering, and if you are interested in sort of being a part of the development of that or are interested in that, I would recommend that you reach out to our sales team and we can get you into maybe a private alpha or a private beta for clustered Kapacitor.
Michael DeSa 39:44.272 There’s an additional question. “I have one more question. When I use the HTTP API, is there—when I make change to some alert threshold in TICKscript task, it does not take effect immediately.” So I think that that’s probably due to the previous implementation of the way that alert handlers worked. In the 1.3 release of Kapacitor, we kind of reworked the way that alert handlers work. And so modifying thresholds in a TICKscript should take immediate effect. The reason for that is previously you had to reload the entire TICKscript, and now we’ve kind of pulled them off into their own thing. So that should be addressed eventually. “So do you recommend UDFs—do you recommend UDFs as a separate process? We had a scenario wherein we had multiple tasks and one task brought down the whole Kapacitor.” Yeah, I’d probably for that reason consider running it as a separate process, just because it is possible that it could bring down the entire Kapacitor. You may actually consider opening up an issue on Kapacitor for that, just so we can do a little bit more graceful handling when a specific task running a UDF fails. And so it won’t sort of take down an entire Kapacitor instance. That probably would be a good thing to do. So if you could please open up an issue for that, it would be greatly appreciated.
Michael DeSa 41:34.240 So next question is, “If Kapacitor UDF is in a separate container, how do I make sure that the tasks are enabled after the UDF container starts? Because otherwise the task will error out.” I’d just increase the timeout that it has. That would probably be the best way to do it. So give it enough time that the TICKscript will be able—or Kapacitor will be able to contact the UDF and pull the appropriate methods. That would be my recommendation there. “Where do we open such issues?” That’s a great question and I will direct you to that right now.
Michael DeSa 42:27.947 So I’ve posted as a response to, “Where do we open such issues?” in the chat, or in the Q&A, and just kind of posting it, opening an issue on our Kapacitor repos is definitely the best way. “I missed part of the discussion, but would the UDF function call one process many times or open or close the process many times?” It calls it one time. So the way it kind of works is there’s a number of methods and handler interface. Kapacitor will call out to those methods when the process first starts up. And then for each task that gets created, there’s an init method that will get called and it’ll pull the associated methods. And so each one will—it’ll run in a single UDF for multiple tasks. Each time a point passes, it goes through the same UDF, is the point I’m trying to make here.