Broadcast delegate from async task

I have a UActorComponent class which has a delegate. I verified that if I broadcast the delegate from the on tick function in the class that it is broadcast OK.

I need to be able to broadcast the delegate from an async task I created. I have the async task set up and running, but the broadcast doesn’t do anything. I think it’s an issue with what object I am referencing to do the broadcast, but I don’t know what it should be instead.

I’m trying to pass the current instance of the class when the connectToServer function is run, and use that object to broadcast, but the broadcast doesn’t actually happen. Code compiles OK.

What should I be doing so I can actually broadcast the delegate from my async task?

.h file → Has actor component class definition and async task

#pragma once

#define WIN32_LEAN_AND_MEAN

#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>

#pragma comment (lib, "Ws2_32.lib")
#pragma comment (lib, "Mswsock.lib")
#pragma comment (lib, "AdvApi32.lib")


#include "CoreMinimal.h"
#include "Components/ActorComponent.h"
#include "NewActorComponent.generated.h"

#define DEFAULT_BUFLEN 128

DECLARE_DYNAMIC_MULTICAST_DELEGATE_OneParam(FOnTcpRxData, FString, rxData);

UCLASS( ClassGroup=(Custom), meta=(BlueprintSpawnableComponent) )
class BLOCKS_API UNewActorComponent : public UActorComponent
{
	GENERATED_BODY()

public:	
	// Sets default values for this component's properties
	UNewActorComponent();

	UFUNCTION(BlueprintCallable, meta = (DisplayName = "Connect To Server", ScriptName = "Connect To Server"), Category = "Utilities|TCP")
	void ConnectToServer(bool& error);

	UPROPERTY(BlueprintAssignable, Category = "Utilities|TCP")
	FOnTcpRxData OnTcpDataRx;

protected:
	// Called when the game starts
	virtual void BeginPlay() override;

public:	
	// Called every frame
	virtual void TickComponent(float DeltaTime, ELevelTick TickType, FActorComponentTickFunction* ThisTickFunction) override;	
};

namespace ThreadingTest
{
	static void OnTcpRxData(SOCKET ConnectSocket, UNewActorComponent *myActor)
	{

		int iResult;
		int recvbuflen = DEFAULT_BUFLEN;
		bool error = true;
		char recvbuf[DEFAULT_BUFLEN];
		FString rxMsg = FString("No Message");


		// Receive until the peer closes the connection
		do {
			
			iResult = recv(ConnectSocket, recvbuf, recvbuflen, 0);

			if (iResult > 0)
				error = false;
			else if (iResult == 0)
				error = true;
			else
				error = true;

			if (error == false)
			{
				rxMsg = FString(ANSI_TO_TCHAR(recvbuf));
				rxMsg = rxMsg.ToLower();

				rxMsg.RemoveFromEnd(FString("\n\r"));
				rxMsg.RemoveFromEnd(FString("\r\n"));
			}
			else
			{
				rxMsg = FString("error");
			}

			if (error == false)
			{
				myActor->OnTcpDataRx.Broadcast(rxMsg);
			}

		} while (true);// iResult > 0);
	}

	}

class TcpRxAsyncTask : public FNonAbandonableTask
{
	SOCKET MyConnectSocket;
	UNewActorComponent *davesactor;

public:
	TcpRxAsyncTask(SOCKET ConnectSocket, UNewActorComponent *myActor)
	{
		MyConnectSocket = ConnectSocket;
		davesactor = myActor;

	}

	FORCEINLINE TStatId GetStatId() const
	{
		RETURN_QUICK_DECLARE_CYCLE_STAT(TcpRxAsyncTask, STATGROUP_ThreadPoolAsyncTasks);
	}

	void DoWork()
	{
		ThreadingTest::OnTcpRxData(MyConnectSocket, davesactor);
	}
};

.c file

#include "NewActorComponent.h"
#define WIN32_LEAN_AND_MEAN

#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>

#pragma comment (lib, "Ws2_32.lib")
#pragma comment (lib, "Mswsock.lib")
#pragma comment (lib, "AdvApi32.lib")

 #define DEFAULT_PORT "27015"

SOCKET ConnectSocket = INVALID_SOCKET;
uint32 SOCKET_READ_TIMEOUT_MSEC = 1;

// Sets default values for this component's properties
UNewActorComponent::UNewActorComponent()
{
	// Set this component to be initialized when the game starts, and to be ticked every frame.  You can turn these features
	// off to improve performance if you don't need them.
	PrimaryComponentTick.bCanEverTick = true;

	// ...
}


// Called when the game starts
void UNewActorComponent::BeginPlay()
{
	Super::BeginPlay();
}

// Called every frame
void UNewActorComponent::TickComponent(float DeltaTime, ELevelTick TickType, FActorComponentTickFunction* ThisTickFunction)
{
	Super::TickComponent(DeltaTime, TickType, ThisTickFunction);
}

void UNewActorComponent::ConnectToServer(bool& error)
{
	WSADATA wsaData;
	error = true;
	struct addrinfo *result = NULL,
		*ptr = NULL,
		hints;
	char *sendbuf = "this is a test";

	int iResult;
	UE_LOG(LogTemp, Warning, TEXT("Your message"));
	// Initialize Winsock
	iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
	if (iResult != 0) {
		printf("WSAStartup failed with error: %d\n", iResult);

	}

	ZeroMemory(&hints, sizeof(hints));
	hints.ai_family = AF_UNSPEC;
	hints.ai_socktype = SOCK_STREAM;
	hints.ai_protocol = IPPROTO_TCP;

	// Resolve the server address and port
	iResult = getaddrinfo("127.0.0.1", DEFAULT_PORT, &hints, &result);
	if (iResult != 0) {
		printf("getaddrinfo failed with error: %d\n", iResult);
		WSACleanup();
		error = true;
	}

	// Attempt to connect to an address until one succeeds
	for (ptr = result; ptr != NULL; ptr = ptr->ai_next) {

		// Create a SOCKET for connecting to server
		ConnectSocket = socket(ptr->ai_family, ptr->ai_socktype,
			ptr->ai_protocol);
		if (ConnectSocket == INVALID_SOCKET) {

			WSACleanup();
			error = true;
		}
		DWORD timeout = SOCKET_READ_TIMEOUT_MSEC;
		setsockopt(ConnectSocket, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout));
		// Connect to server.
		iResult = connect(ConnectSocket, ptr->ai_addr, (int)ptr->ai_addrlen);
		if (iResult == SOCKET_ERROR) {
			closesocket(ConnectSocket);
			ConnectSocket = INVALID_SOCKET;
			continue;
		}
		break;
	}

	freeaddrinfo(result);

	if (ConnectSocket == INVALID_SOCKET) {
		printf("Unable to connect to server!\n");
		WSACleanup();
		error = true;
	}
	else
	{
		error = false;
	}

	if (error == false)
	{
		(new FAutoDeleteAsyncTask<TcpRxAsyncTask>(ConnectSocket, this))->StartBackgroundTask();
	}
}

I’m maybe a bit late but when i stumbled on this page, i hoped for a solution, so now that i have found it, here it is for others:

Make the delegate static

class MultithreadedTask : public FNonAbandonableTask
{
public:
	UObject* object;

	//gets defined in the .cpp
	static FOnThreadFinished OnThreadFinished;
...

Because it is static, you have to define it outside of any functions and inside the .cpp

FOnThreadFinished MultithreadedTask::OnThreadFinished;

Now you have a delegate you can call without needing a reference to the object that broadcasts or executes it.

An alternative is to create a singleton of your class like this:

UObjectClass* UObjectClass::GetInstance()
{
	static MultithreadedTask* i = 0;
	if (!i)
	{
		i = NewObject<UObjectClass>();
		i->AddToRoot();
	}
	return i;
}

the function has to be static, of course.

For a Bonus, here is how you call a delegate from your new thread if you want to change UObjects or do similar things. This is only allowed on the main game thread, but the delegate function is executed on the your new thread. To work arround this, use AsyncTask:

AsyncTask(ENamedThreads::GameThread, [OutputName, LayerImageCount]() {
		//Your Code to be executed, in this case executing the delegate
        MultithreadedTask::OnThreadFinished.ExecuteIfBound();
	});
1 Like

I have since found an even easier method, but please use it with caution.

If you want to still use the UObject system and use dynamic multicast style delegates within an async task on the game thread you may disable to script guard (at least in the Editor) to perform such a task.

void SomeClass::BeginDisconnection()
{
	OnDisconnectProcedureStarted.Broadcast();
	AsyncTask(ENamedThreads::AnyNormalThreadHiPriTask, [=]()
	{
		API::Logout();

		// Multicast dynamic delegates must be called from the game thread!
		AsyncTask(ENamedThreads::GameThread, [=]()
		{
#if WITH_EDITOR
			TGuardValue<bool> AutoRestore(GAllowActorScriptExecutionInEditor, true);
#endif
			OnDisconnectProcedureComplete.Broadcast(false);
		});
	});
}

Notice how in editor you can scope the script guard value that prevents delegates bound in UObjects in blueprints from being called. This at least worked for me in these cases.

It is a bit awkward running yet another async task back on the game thread, but I think it’s still a fairly elegant in-line solution.