In this chapter, we're going to make our nodes real-time. Basically, what that means is that we're going to emit a proper status for each of our nodes so that the end user sees exactly what's happening with the workflow. Those will include loading, error, and success states. We're going to achieve this by using ingest real-time package. I would highly suggest that you find the ingest real-time documentation page.
The reason for that is real-time at the time of me making this tutorial is currently in developer preview. So what does that mean? It basically means that the feature is widely available for all Ingest accounts, but depending on the user feedback, some APIs and SDKs might change in the future. So that is why I suggest that you also visit this website Simply so you see which version you will be working with and so you see if any instructions here have changed since me making this tutorial. But as always, I will show you exactly the version I am working with.
So let's go ahead and do npm install ingest realtime. I'm going to go here, the same place I installed handlebars, and let's add at ingest forward slash realtime. I am immediately going to go inside of my packet.json so that those of you who want to follow the exact same thing will be able to do so. As you can see I'm using a version 0.4.4. Just As a reminder, let me show you the rest of my versions.
My ingest is 3.44.1, my ingest CLI is 1.12.1. We are yet to see if these will work compatibly. Sometimes a newer version of Realtime can cause some problems and vice versa. So you have to match the versions. But we're going to go through this step by step and we are going to make sure to fix any instances like that if they even happen.
So just make sure you have installed ingest real-time. Now what we're going to do is we are going to go back inside of our source and let's go inside of our ingest folder client.ts and now what we have to do here is besides defining the id which is node base let's also add a middleware and this middleware will be an array and it will accept a real-time middleware like this. You can import real-time middleware from our newly added package. Just make sure to go forward slash middleware in the import right here. Perfect.
Now that we have added that, let's go ahead and let's define our first channel. So inside of the Ingest folder, I'm going to create a new folder called Channels. And inside of here, I'm going to create a new file, httprequest.ts. Then I'm going to go ahead and import channel and topic from ingest real-time and finally I'm going to export const HTTP request channel We're going to call our channel import and we're going to name this HTTP request execution and then I'm going to chain add topic. Let me go ahead and expand this just a bit more or maybe not, but this is how it looks like.
You can also chain it down like this, however you prefer. Instead of add topic, let's go ahead and call topic like this and the topic will be called status. Let's go ahead and define the type of that topic. So each status will have a node id which it is referring to which will be a type of string and status which can be loading, success, or error like this. And let's go ahead and execute this and add a comma.
There we go. That is our first request channel. Now of course there are a lot of magic strings going on around here and we can surely reuse some of the enums and node types that we have all over. For example, let me go ahead and find some status. Node status indicator has a type node status.
Loading, success, error, and initial. So yes, not exactly the same. I will see if there is a way to reuse some of this later, but for now just be careful that you didn't misspell any of these. Now that we have the HTTP request channel, we have to go back inside of our ingest folder functions.ts and instead of here go inside of the top where you call the create function here and after you name your event go ahead and add channels and in the channels go ahead and add HTTP request channel and execute it like this. Make sure to import it from our newly created folder.
And now once you add the channels here and if you've correctly added the middleware here another prop should appear besides event and step and that is publish. So you can see that right now there is no error if I try to do this but if I go ahead and comment this out you can see that I immediately get an error here. So make sure that you have the middleware here save the file and then you should be able to destructure publish from here. If it is still not working, you can always restart Visual Studio Code or you can restart TypeScript server individually, which should make it work then. Now that we have the publish method, what we can do is we can pass it to each of our executor here.
So context await executor which accepts the data node id context and step which will now also have publish. Now obviously we have an error for this because executors are currently not made for that. So I'm going to go ahead inside of get executor here and I'm going to go inside of HTTP request executor right here and besides having step we will now also have publish. Let's go ahead and fix this error here by going inside of node executor right here and here we have our publish to do add real-time leader so we can now finally do that. So I'm going to go ahead and comment this out.
Publish will be a type of real time which you can import from ingest real time dot publish function like this and you can import real time as a type. Now in here we have an error and I think I know exactly why. Yes, it is because record string unknown is missing the following properties. Yes, In the previous chapter we modified these to be required because that's how we expect them to be. But then our node executor here.
This doesn't match that requirement. So OK at least we are aware of it. I'm going to go ahead and add to do fixed types. I will see how to improve that because yes, even though this makes sense to be expected, perhaps we should make all of them optional. And then simply do a runtime validation here.
I'm going to see what is the best solution for that. For now, yes, to do fixed types because we do have a problem here. But now, inside of your HTTP request executor, you should have access to the publish function because we defined it right here. Let me show you where this is. I don't think I showed you that.
So instead of source features executions types dot ts in here, you should have node executor params, which now has publish. Previously, this was commented out. So let me go ahead and quickly just show you everything we've added so far. We added the ingest real-time package. After that we went into client.ts and we imported the real-time middleware and added it to our ingest instance.
After we did that, we created the HTTP request channel with node ID and status loading success and error. After we added that, we went into functions.ts of source ingest and we simply added that new channel in the array channels in the same object where event is defined. After that, we were able to extract a new field publish here and we simply pass it along to our executor function. The way we fixed the error for the executor function is by going inside of types, inside of source features executions And in here we found node executor params interface and we simply commented out our to do to add real time later because we are doing it right now. And we simply gave it a type of real-time dot publish function.
Perfect. And finally in the executor itself we were able to then destructure the publish because now it is properly properly typed and it is available in here. So let's go ahead and remove this to do right here and let's actually do it. Await publish and let me go ahead and switch to the actual code. There we go.
Await publish. So we are calling this method right here. Make sure to await it and call HTTP request not this HTTP request channel like this so make sure to import it from ingest channels HTTP request go ahead and execute it and call status node ID and status of loading. Let's go ahead and just fix this again. I think I made something with my imports.
Yes, this happens often actually. Let me try again. HTTP request channel. Make sure that you imported it from here and then let's go ahead and do dot status my apologies executed dot status node id this is basically doing this right but since the prop is named the same as the key we can use a shorthand operator and pass in the status to be loading. There we go.
So now we are successfully admitting that this node is loading. And now we have to throw errors when it fails. So for example, right here, let's go ahead and give it an error like this. Then we can copy this and do the same thing here. Same thing here.
Perfect. So when should we throw the success? Right here, before we return the result. So let's go ahead and simply emit the status success. Whoops, success.
There is potentially a room to reduce the amount of code we are writing by maybe wrapping this entire thing instead of try and catch and then if these errors get thrown we could simply admit this which is pretty much the exact same line of code everywhere. But I do have to verify that that's what will happen because this is an ingest specific error. So I'm not sure exactly what happens here. But if you've noticed that we could probably reuse this code. You're probably right.
There probably is a way to make this better. But let's just be super explicit with our states right now simply so we know exactly what they're doing and exactly what's happening with our code. So far you shouldn't have errors anywhere except in the executor registry. And the error should not be happening at all because of our new real-time function instead because variable name, endpoint and method are no longer optional. So our executor registry is confused because that does not match the T data type which we gave it right here.
One fix could be maybe Giving each of these their own type? We will see. We will see. But there is a way to fix this in a nice way. I will try my hardest to do that.
Perfect. Now that we have that, these statuses are being emitted. And I think that we can even try it out immediately. So make sure you have all of your apps running. I would recommend restarting your Ingest server and maybe even your Next.js app simply because you just added a new package.
So maybe some cache is happening or something. Make sure you have both your InGIS development server on and one of your workflows and go ahead and click execute workflow. So there we go. This is what you should be seeing. When we first what we do is we prepare the workflow.
This is basically we do a topological sort. After that we call manual trigger. Now nothing is happening here but it will be happening later once we add the channel for manual triggers so that we can also emit the loading and success state for the manual trigger. But you can see that we have this. Let me try and expand.
I'm not sure how I can expand this. Maybe if I hover over it. There we go. Publish HTTP request execution. So we are successfully publishing some events right.
So you should be seeing these success events and these loading events right here. I mean you can't really see which one is which but you should now be seeing this publish things. Now here's an important note. If for whatever reason you found this complicated or maybe the real-time package has changed significantly for you just know that this is simply a cool addition to our N8n clone. It doesn't change the functionality itself.
So if you got it working you will be able to continue the entire tutorial even without a real time. I just wanted to let you know that because this is a developer preview. So in case it drastically changes and you just can't work your way around it, don't worry, you can just go to the next chapter. But I would suggest, you know, still going through this chapter just in case I do some other things here. Perfect.
So now we have to find a way to emit that status to our editor right here. And the way we're going to do that is by implementing a hook. So let's go instead of source features executions. And in here, I'm going to create hooks folder. And I'm going to add use node status dot ds.
I'm going to import type real time from ingest real time. I'm going to import use ingest subscription from ingest real time hooks. I'm going to import use effect and use state and I'm going to borrow node status from components and I think we call this react flow node status indicator so you should have this component It's basically the one where we added node status with loading success, error, or initial. Perfect. Now let's go ahead and let's create the interface for this hook.
So interface use node status options will accept a required node ID, channel and topic as well as a refresh token method which basically returns a promise with a token from realtime.subscribe.token inside. Let's go ahead and do export function use node status and let me go ahead and open it properly let me fix the typo. Let's go ahead and grab node ID, channel, topic and refresh token and let's bind use node status option type. Perfect. Now in here let's start by defining the status state.
So status set status is a use state. We are using the type node status with the initial value of initial. Perfect. Let's go ahead and get the data by using use ingest subscription, passing the refresh token and enable to true. And now let's create a use effect that is going to listen to messages coming from our ingest subscription and we're specifically going to be looking for the newest one with our channel, our topic, our node ID and our status.
So I'm going to go ahead and first check If there is no data.length or in other words if there is no data coming from that use ingest subscription hook above, let's just break the method. There is nothing for us to do in this use effect. Otherwise, we have to find the latest message for this node. Now I'm sure there are a bunch of ways you can do that, but this is the way I managed to do it very consistently and safely for my project. You are of course free to tinker with this if you feel like it can be done in a simpler way.
So latest message is going to be data.filter. Let's go ahead and get that message. Now in here I'm first going to do if message.kind is a type of data and if message.channel is exactly the same as our channel which we define when we call this hook. If message.topic is exactly the same as our topic. If message.data .nodeid is same as our nodeid.
This way we know exactly what this event is referring to for exactly what node and once we find that we have to sort it by latest so let's go ahead and sort by a and b values here and let's quickly check if a.kind is equal to data and b.kind is equal to data let's go ahead and return new date b.createdAt.getTime minus new date a.createdAt.getTime like this minus new date a dot created at dot get time like this. And let me just see. So this we should not have a comma here I believe and return my apologies outside of the if clause just return zero like this and then this will basically be an array which should only have one item inside. We can immediately access it like this. The first index in this latest message array.
Perfect. And now let's do a final check here. So if latest message question mark dot kind is equal to data set status latest message dot data dot status as a node status. So yes not too happy about having to cast this but data can literally be anything. So that's why it's important that you make sure you don't misspell it because you can type this.
I think you can try this and it's not going to give you any errors. So just be careful with typing these things. Same goes for data node ID here. Make sure you are not misspelling that. And then let's go ahead and add some dependency arrays here so data no whoops data node ID channel and topic and perhaps it would be better.
Let me see. Do I ever use data? Well yes I use data in here entirely so I think I have to pass it right here. And let's return the final status here. That's it.
That is our use node status hook. Once we have defined this hook we have to go ahead and use it. But just before we can use it we have to create our refresh token method. There are many ways we can do this but the quickest way of doing this is by using server actions. So inside of executions components HTTP request I'm going to go ahead and create the actions.ts file.
Inside of it I'm going to go ahead and mark this as a use server. And then I'm going to add a couple of imports. I'm going to import get subscription token and type real time from ingest real time. I'm going to import our HTTP request channel from ingest channels HTTP request. And finally, I'm going to import ingest from our ingest folder client where we recently just added the middleware.
So make sure you have all three. Then let's define the type. Type HTTP request token will be realtime.token open pointy brackets and inside define two things type of HTTP request channel and an array with a string status inside. Let's export async function fetch HTTP request real-time token and return a promise HTTP request token. Define the token await get subscription token pass in ingest define the channel to be HTTP request channel make sure it's an executed function and topics to listen to will be status and return the token.
And if you've done it properly there shouldn't be any type errors here. Let me go ahead and just quickly zoom out so you can see how it looks like without any collapsing lines. Perfect. Now that we have that action defined, let's go ahead inside of HTTP request node.tsx. And in here, I'm going to finally change this node status.
So I don't know if you remember but if you manually change this node status to loading you will see that both nodes magically become loading. If you change to success they will become success. So now we're going to actually make it listen to the event. So let's go ahead and call useNodeStatus here from our hooks useNodeStatus Okay, yes, it's a reusable one across all executions. That's why it is all the way up there.
Perfect. And once we have it here, Let's go ahead and give it node ID of props.id, channel, and now this is where it gets kind of tricky. So this is the part I don't like. HTTP request execution. And you need to be super careful that you didn't accidentally, let me find the channel that you didn't accidentally misspell it so it would be a better idea to copy from HTTP request channel and just paste it here.
I think there is potential to fix this. I think technically we could call HTTP request channel itself and then execute it and then call name. I think this should work. I'm not 100% sure. Let's try it like this.
I actually haven't done this with my initial source code, but it looks like a very interesting solution. I'm not sure about what really happens when you execute it and can you just execute it like that. I'm not sure. But let's go ahead and focus on topic status and let's add the refresh token fetch HTTP request real-time token do not execute it it's a promise so let's try it out Oh yes yes I think this should be enough. Let's refresh for good luck.
Let me collapse the sidebar and let's click execute workflow. And now loading success loading success. Absolutely amazing job. Amazing amazing job. Again if for whatever reason you were not able to complete this do not worry this is not crucial to completing this tutorial.
It's obviously a super cool effect but it does not really change whether you will be able or not finish this tutorial looks like this is working as well which is honestly a better solution than to just copy this and paste it here and always be super careful that you did it correctly. Another alternative might be to simply use a constant we define here and then import it here. So because I'm just not sure about the implications of executing this like that. I'm not sure what it does. Not too sure.
Okay. Instead of HTTP request token. Yeah, for example, we executed here. So that's why I have a feeling that it can fail. Maybe.
Yeah. I'm not too confident with this. I think that I will resort to using a string simply because this is this is what I did in my initial source code. Right. So I just want to stay consistent to what I know works 100%.
And then later, we can change this by fixing all the weird magic strings that we have around. So now what I'm going to do is I'm going to purposely make this an invalid JSON like this and click save. Save this entire thing and let's see a node fail. So execute workflow loading success loading. And yes, it will actually take a while for this to fail.
It's obvious that it's failing, right? We know that. But this will have, I think, three attempts before it reaches its actual failure status. So if you want to speed that up, I think that you can go inside of ingest functions.ts and find this individual create function and in this first object where you define the id I think that you can also add retries 0 now I would highly suggest that you add a little comment here to do change for production or maybe remove in production simply because, yeah, it's a shame to fail immediately fetch requests can fail. That's normal.
Since I'm already at two requests, I'm just going to wait it out now. We could actually have a bug here, but I can see the finalization happened. But this never actually throws the error. I think that's because we forgot to do it. So let's go ahead and go back inside of features executions components HTTP request executor.ds.
Yes, there is definitely a bug here. So this entire step dot run should somehow be within try or catch or we should at least look at the response and what happens inside of it. So let me go ahead and see what is the best way of doing that. I think what we can do is simply wrap the entire await step here instead of try and catch. So I'm going to go ahead and try and do that here like this catch.
And then inside of this catch, I'm just going to do, I'm going to copy this away to publish like this and return error. And let's make sure to just continue throwing that error like this. Not a hundred percent sure This is the best solution but it's the first thing that came to my mind right now basically skipping the entire return result and all of those other things. So let's see that now. So make sure you've set the retry thing to zero and make sure you wrap that.
Let me try and refresh this now. Perfect. And let me try executing it again. Loading success, loading error. Perfect.
Absolutely amazing. Exactly what we wanted to do. And you can see how this time there were no retries. Perfect. And if I'm correct, I think that throwing these now, Where am I in the executor?
I think that now we might not even need to throw these right here because once we throw this error, it's going to go on. Actually, no, it will not go inside of catch because try catch is only for the request right here. Yes. So we still need them right here. We'll see.
Maybe CodeRabbit will have some interesting solutions for this. Maybe we should just wrap the entire thing instead of try catch. I'm not sure. One thing that I do want to do before we move on is go inside of the HTTP request channel and let's export const HTTP request channel name and let's go ahead and just make it this. There we go.
And then let's use it here. HTTP request channel name. I'm going to search through my code exactly to see where else I'm using it. It's only in it in its equivalent node. So I'm going to change this here.
HTTP request channel name. There we go. And we can remove this then. We don't need it. And can we just import this as type?
We cannot. OK. There we go. So that is now working. No more magic strings here.
Perfect. And let's go ahead and just try it one more time. And yes, if you execute your workflow two times in a row, it's simply going to reset all the statuses or at least it should. Let's try. There we go.
So it will try each status again. Perfect. Amazing job. Now let's go ahead and do the same thing for the click manual trigger. So that's actually kind of the only thing that will be happening in these super simple triggers which don't require anything to load.
They can only emit the loading state and then they can emit the success state. I don't think there's any way an error can even happen in those triggers. So let's start by creating the manual trigger channel. So instead of features, No, where is it? Instead of ingest, yes, they're kind of everywhere.
We should improve that too. So HTTP request, let's change this to manual trigger like this. And let's change this to be manual trigger channel name and this will be called manual trigger execution. This will be called manual trigger channel. So even though they are all exactly the same I would highly suggest having them separate.
I think you have to have them separate. You could create some magical abstraction that would generate all of them but sometimes I think abstractions are not that good. OK. So make sure you have no identical thing but for the manual trigger. Now let's go inside of ingest functions that the S and let's add manual trigger channel.
There we go. Channels manual trigger. Perfect. Now that we have that let's go ahead and I think we have to go inside of the manual trigger executor. So inside of triggers folder components manual trigger executor that yes I agree that the folder structure is a bit complex as of now.
I'll have to see if maybe I should rethink how my executions and triggers work. But yes, let's go ahead and set up manual trigger executor.ts right here. And now we have publish. We shouldn't have any errors here because we are using node executor here. We already defined the publish function inside of it.
And now what we can do is very simply the same thing we did before. So await publish import manual trigger channel from ingest channels manual trigger and use the loading status. And then down here the moment we run this completely unfailable workflow step change this to success. There we go. Now in order to make this actually work, what we have to do is we have to create the action to refresh the token.
So I'm going to go inside of features, executions, components, HTTP request. I'm going to copy the actions file and then in the triggers manual trigger folder. I'm going to paste them right here and inside of here. I'm going to rename the instance of HTTP request token to manual trigger token. It's going to be using a type of and an instance of manual trigger channel, which means I have to fix this import to be manual trigger.
There we go. Everything else should stay exactly the same. So we are just using the new channel and of course we are renaming the type which also means we should rename this so fetch HTTP request no fetch manual trigger real-time token. Perfect. Once we have that working, let's open the node from HTTP request so that we can copy the node status and then let's go inside of triggers manual trigger node.tsx and change the hardcoded node status to now be our hook use node status.
Make sure to import use node status from features executions hooks use node status and by now I fully agree it's weird that we are in the folder called triggers working on a manual trigger which is technically the node execution of that trigger it's like I'm confusing node execution with the node type. So yes I fully agree it's a bit confusing how things are everywhere right now. I will try to think of some better folder structure for now, but just bear with me at least in this chapter and import useNodeStatus from where we created it, features, executions, hooks, useNodeStatus. Change the channel to be manual trigger channel name from ingest channels manual trigger and finally use fetch manual trigger real-time token from .slash actions. And I'm trying to think if I forgot to do something I think this should work.
Let me go ahead and refresh. Make sure you have saved all of your files. Let's click execute workflow right here. There we go. Loading success, loading success, loading fail.
Amazing. Now everything has its own channel. Perfect. Amazing, amazing job. Now that we have this working, let's go ahead and merge this.
So 21 node real-time. Let's see. We added in just real-time. We created the channels. We are publishing events and we are capturing events using our use node status.
So 21 node real-time. I'm just going to go ahead and create that branch create new branch 21 node real-time and then I'm going to go ahead and commit these 15 files right here So stage all changes 21 node real time. I'm going to commit and publish the branch. Now once this has been published as always let's go ahead and open a pull request. And since this is a pretty significant pull request, I want to make sure CodeRabbit reviews this one.
So let's see that in a second. And here we have the summary by CodeRabbit. New features. We added real-time status tracking for HTTP request executions, displaying dynamic updates, loading success and error states, and we added real-time status tracking for the manual trigger. We did all of this by introducing real-time capabilities to replace the static status indicator across execution nodes.
So how do we do that? As always, here We have a file by file and cohort summary which basically goes over all the files that we added. But here we have the sequence diagram explaining exactly what's going on. So every node component of our now has a hook called use node status with all the fields it needs and after that it subscribes using use ingest subscription and the action.ts file which we have created. Once the connection is established with the real-time channel that we define per node, we go ahead and emit events.
So during execution, we publish the loading event. And then after completion, we publish the success event. And finally, That state updates on the front end and it re-renders with the new status. So here is what CodeRabbit suggests. In the use node status hook logic, verify data filtering, sorting, and state update.
Executor publish integration, confirm status events are published at the correct lifecycle points in both HTTP request and manual trigger flows. This was one of the questions I did have for the Ingest real-time team. I confirmed myself that this works. I pretty consistently managed to get sequential states. So I can only conclude that if you await publish, they come at the right time.
So let's actually take a look at the requested changes here. So in here, in our use node status hook, it tells us to address race condition with status initialization. I think the problem is that we could technically miss out on the loading state if the success or error comes too fast. In our specific example, I think this is okay. It gives us an option to do optimistic updates by setting the status to loading, but I think this is fine as it is right now.
As always, CodeRabbit is not a big fan of node status, my apologies, of typecasting as it should of course it is our reviewer after all. So if you want to be as strict you can implement this is a valid status which will basically allow you to check at runtime if what you received from the use ingest subscription data is what you intend to show to your user. And in here, it tells us that we are not handling error for the manual trigger, which is a good point. I just don't see how it can fail, but yes, step.run could technically fail for some reason. Right, so we could be consistent and wrap that inside of try catch and just publish the error on catch.
And in here it's basically telling us to use this to handle that and not a to do. What it doesn't know is that this is a YouTube tutorial so that's why I'm showing it inside of a comment here. But yes, completely valid comments. But let's go ahead and merge for now since we got exactly the result we wanted at this state of our tutorial. Once you've merged it, go back inside of your main branch right here And as always make sure that you synchronize your changes.
So click OK right here. And then what I like to do is I like to click on my graph here and just double check that 21 is the latest one which I have just merged. I believe that that marks the end of this chapter. So what we've done is we implemented real-time, we pushed to GitHub, created a new branch, created a new PR and reviewed and merged. And now we should be ready to start developing some other nodes because we can easily copy and paste from these two nodes which we have, which are completely finished and have all the important features in them.
Otherwise it would have been very hard to update a bunch of nodes which we created. Amazing job and see you in the next chapter.