Luanti 5.10.0-dev
 
Loading...
Searching...
No Matches
s_async.h
Go to the documentation of this file.
1// Luanti
2// SPDX-License-Identifier: LGPL-2.1-or-later
3// Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
4
5#pragma once
6
7#include <vector>
8#include <deque>
9#include <unordered_set>
10#include <memory>
11
12#include <lua.h>
13#include "threading/semaphore.h"
14#include "threading/thread.h"
15#include "common/c_packer.h"
16#include "cpp_api/s_base.h"
17#include "cpp_api/s_security.h"
18
19// Forward declarations
20class AsyncEngine;
21
22
23// Declarations
24
25// Data required to queue a job
27{
28 LuaJobInfo() = default;
29
30 // Function to be called in async environment (from string.dump)
31 std::string function;
32 // Parameter to be passed to function (serialized)
33 std::string params;
34 // Alternative parameters
35 std::unique_ptr<PackedValue> params_ext;
36 // Result of function call (serialized)
37 std::string result;
38 // Alternative result
39 std::unique_ptr<PackedValue> result_ext;
40 // Name of the mod who invoked this call
41 std::string mod_origin;
42 // JobID used to identify a job and match it to callback
43 u32 id;
44};
45
46// Asynchronous working environment
48 virtual public ScriptApiBase, public ScriptApiSecurity {
49 friend class AsyncEngine;
50public:
51 virtual ~AsyncWorkerThread();
52
53 void *run();
54
55protected:
56 AsyncWorkerThread(AsyncEngine* jobDispatcher, const std::string &name);
57
58private:
60 bool isErrored = false;
61};
62
63// Asynchornous thread and job management
65 friend class AsyncWorkerThread;
66 typedef void (*StateInitializer)(lua_State *L, int top);
67public:
68 AsyncEngine() = default;
71
77
82 void initialize(unsigned int numEngines);
83
90 u32 queueAsyncJob(std::string &&func, std::string &&params,
91 const std::string &mod_origin = "");
92
99 u32 queueAsyncJob(std::string &&func, PackedValue *params,
100 const std::string &mod_origin = "");
101
106 void step(lua_State *L);
107
108protected:
115 bool getJob(LuaJobInfo *job);
116
121 void putJobResult(LuaJobInfo &&result);
122
126 void addWorkerThread();
127
131 void stepJobResults(lua_State *L);
132
136 void stepAutoscale();
137
146 bool prepareEnvironment(lua_State* L, int top);
147
148private:
149 // Variable locking the engine against further modification
150 bool initDone = false;
151
152 // Maximum number of worker threads for automatic scaling
153 // 0 if disabled
154 unsigned int autoscaleMaxWorkers = 0;
156 std::unordered_set<u32> autoscaleSeenJobs;
157
158 // Only set for the server async environment (duh)
159 Server *server = nullptr;
160
161 // Internal store for registred state initializers
162 std::vector<StateInitializer> stateInitializers;
163
164 // Internal counter to create job IDs
166
167 // Mutex to protect job queue
168 std::mutex jobQueueMutex;
169 // Job queue
170 std::deque<LuaJobInfo> jobQueue;
171
172 // Mutex to protect result queue
174 // Result queue
175 std::deque<LuaJobInfo> resultQueue;
176
177 // List of current worker threads
178 std::vector<AsyncWorkerThread*> workerThreads;
179
180 // Counter semaphore for job dispatching
182};
Definition s_async.h:64
unsigned int autoscaleMaxWorkers
Definition s_async.h:154
~AsyncEngine()
Definition s_async.cpp:24
std::unordered_set< u32 > autoscaleSeenJobs
Definition s_async.h:156
u32 queueAsyncJob(std::string &&func, std::string &&params, const std::string &mod_origin="")
Queue an async job.
Definition s_async.cpp:90
u64 autoscaleTimer
Definition s_async.h:155
std::vector< AsyncWorkerThread * > workerThreads
Definition s_async.h:178
std::deque< LuaJobInfo > jobQueue
Definition s_async.h:170
void stepJobResults(lua_State *L)
Process finished jobs callbacks.
Definition s_async.cpp:157
Semaphore jobQueueCounter
Definition s_async.h:181
std::deque< LuaJobInfo > resultQueue
Definition s_async.h:175
AsyncEngine(Server *server)
Definition s_async.h:69
void registerStateInitializer(StateInitializer func)
Register function to be called on new states.
Definition s_async.cpp:55
void putJobResult(LuaJobInfo &&result)
Put a Job result back to result queue.
Definition s_async.cpp:143
void stepAutoscale()
Handle automatic scaling of worker threads.
Definition s_async.cpp:191
bool prepareEnvironment(lua_State *L, int top)
Initialize environment with current registred functions this function adds all functions registred by...
Definition s_async.cpp:226
bool getJob(LuaJobInfo *job)
Get a Job from queue to be processed this function blocks until a job is ready.
Definition s_async.cpp:125
AsyncEngine()=default
void step(lua_State *L)
Engine step to process finished jobs.
Definition s_async.cpp:151
void(* StateInitializer)(lua_State *L, int top)
Definition s_async.h:66
std::vector< StateInitializer > stateInitializers
Definition s_async.h:162
u32 jobIdCounter
Definition s_async.h:165
void initialize(unsigned int numEngines)
Create async engine tasks and lock function registration.
Definition s_async.cpp:62
std::mutex jobQueueMutex
Definition s_async.h:168
bool initDone
Definition s_async.h:150
void addWorkerThread()
Start an additional worker thread.
Definition s_async.cpp:81
std::mutex resultQueueMutex
Definition s_async.h:173
Definition s_async.h:48
bool isErrored
Definition s_async.h:60
virtual ~AsyncWorkerThread()
Definition s_async.cpp:291
AsyncWorkerThread(AsyncEngine *jobDispatcher, const std::string &name)
Definition s_async.cpp:260
void * run()
Definition s_async.cpp:297
AsyncEngine * jobDispatcher
Definition s_async.h:59
Definition s_base.h:64
Definition s_security.h:26
Definition semaphore.h:18
Definition server.h:167
Definition thread.h:57
Definition activeobjectmgr.cpp:11
Definition s_async.h:27
std::unique_ptr< PackedValue > params_ext
Definition s_async.h:35
std::string mod_origin
Definition s_async.h:41
std::string function
Definition s_async.h:31
std::string result
Definition s_async.h:37
std::string params
Definition s_async.h:33
std::unique_ptr< PackedValue > result_ext
Definition s_async.h:39
LuaJobInfo()=default
u32 id
Definition s_async.h:43
A packed value can be a primitive like a string or number but also a table including all of its conte...
Definition c_packer.h:72