Luanti 5.15.0-dev
 
Loading...
Searching...
No Matches
s_async.h
Go to the documentation of this file.
1// Luanti
2// SPDX-License-Identifier: LGPL-2.1-or-later
3// Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
4
5#pragma once
6
7#include <vector>
8#include <deque>
9#include <unordered_set>
10#include <memory>
11
12#include <lua.h>
13#include "threading/semaphore.h"
14#include "threading/thread.h"
15#include "common/c_packer.h"
16#include "cpp_api/s_base.h"
17#include "cpp_api/s_security.h"
18
19// Forward declarations
20class AsyncEngine;
21
22
23// Declarations
24
25// Data required to queue a job
27{
28 LuaJobInfo() = default;
29 LuaJobInfo(std::string &&func, std::string &&params, const std::string &mod_origin = "") :
31 LuaJobInfo(std::string &&func, PackedValue *params, const std::string &mod_origin = "") :
33 params_ext.reset(params);
34 }
35
36 // Function to be called in async environment (from string.dump)
37 std::string function;
38 // Parameter to be passed to function (serialized)
39 std::string params;
40 // Alternative parameters
41 std::unique_ptr<PackedValue> params_ext;
42 // Result of function call (serialized)
43 std::string result;
44 // Alternative result
45 std::unique_ptr<PackedValue> result_ext;
46 // Name of the mod who invoked this call
47 std::string mod_origin;
48 // JobID used to identify a job and match it to callback
49 u32 id;
50};
51
52// Asynchronous working environment
54 virtual public ScriptApiBase, public ScriptApiSecurity {
55 friend class AsyncEngine;
56public:
57 virtual ~AsyncWorkerThread();
58
59 void *run() override;
60
61protected:
62 AsyncWorkerThread(AsyncEngine* jobDispatcher, const std::string &name);
63
64 bool checkPathInternal(const std::string &abs_path, bool write_required,
65 bool *write_allowed) override;
66
67private:
69 bool isErrored = false;
70};
71
72// Asynchornous thread and job management
74 friend class AsyncWorkerThread;
75 typedef void (*StateInitializer)(lua_State *L, int top);
76public:
77 AsyncEngine() = default;
80
86
91 void initialize(unsigned int numEngines);
92
99 u32 queueAsyncJob(std::string &&func, std::string &&params,
100 const std::string &mod_origin = "");
101
108 u32 queueAsyncJob(std::string &&func, PackedValue *params,
109 const std::string &mod_origin = "");
110
116 bool cancelAsyncJob(u32 id);
117
122 void step(lua_State *L);
123
127 unsigned int getThreadingCapacity() const {
129 }
130
131protected:
138 bool getJob(LuaJobInfo *job);
139
145 u32 queueAsyncJob(LuaJobInfo &&job);
146
151 void putJobResult(LuaJobInfo &&result);
152
156 void addWorkerThread();
157
161 void stepJobResults(lua_State *L);
162
166 void stepAutoscale();
167
171 void stepStuckWarning();
172
181 bool prepareEnvironment(lua_State* L, int top);
182
183private:
184 template <typename T>
185 inline void snapshotJobs(T &to)
186 {
187 for (const auto &it : jobQueue)
188 to.emplace(it.id);
189 }
190 template <typename T>
191 inline size_t compareJobs(const T &from)
192 {
193 size_t overlap = 0;
194 for (const auto &it : jobQueue)
195 overlap += from.count(it.id);
196 return overlap;
197 }
198
199 // Variable locking the engine against further modification
200 bool initDone = false;
201
202 // Maximum number of worker threads for automatic scaling
203 // 0 if disabled
204 unsigned int autoscaleMaxWorkers = 0;
206 std::unordered_set<u32> autoscaleSeenJobs;
207
208 u64 stuckTimer = 0;
209 std::unordered_set<u32> stuckSeenJobs;
210
211 // Only set for the server async environment (duh)
212 Server *server = nullptr;
213
214 // Internal store for registred state initializers
215 std::vector<StateInitializer> stateInitializers;
216
217 // Internal counter to create job IDs
219
220 // Mutex to protect job queue
221 std::mutex jobQueueMutex;
222 // Job queue
223 std::deque<LuaJobInfo> jobQueue;
224
225 // Mutex to protect result queue
227 // Result queue
228 std::deque<LuaJobInfo> resultQueue;
229
230 // List of current worker threads
231 std::vector<AsyncWorkerThread*> workerThreads;
232
233 // Counter semaphore for job dispatching
235};
236
238 virtual public ScriptApiBase
239{
240public:
242
243 virtual void initAsync() = 0;
244 void stepAsync();
245
246 u32 queueAsync(std::string &&serialized_func,
247 PackedValue *param, const std::string &mod_origin);
248 bool cancelAsync(u32 id);
249 unsigned int getThreadingCapacity() const {
251 }
252
253protected:
255};
#define MYMAX(a, b)
Definition basic_macros.h:11
Definition s_async.h:73
void snapshotJobs(T &to)
Definition s_async.h:185
unsigned int autoscaleMaxWorkers
Definition s_async.h:204
~AsyncEngine()
Definition s_async.cpp:34
std::unordered_set< u32 > autoscaleSeenJobs
Definition s_async.h:206
u32 queueAsyncJob(std::string &&func, std::string &&params, const std::string &mod_origin="")
Queue an async job.
Definition s_async.cpp:114
u64 autoscaleTimer
Definition s_async.h:205
std::vector< AsyncWorkerThread * > workerThreads
Definition s_async.h:231
std::deque< LuaJobInfo > jobQueue
Definition s_async.h:223
void stepJobResults(lua_State *L)
Process finished jobs callbacks.
Definition s_async.cpp:174
Semaphore jobQueueCounter
Definition s_async.h:234
std::deque< LuaJobInfo > resultQueue
Definition s_async.h:228
unsigned int getThreadingCapacity() const
Get the maximum number of threads that can be used by the async environment.
Definition s_async.h:127
AsyncEngine(Server *server)
Definition s_async.h:78
void registerStateInitializer(StateInitializer func)
Register function to be called on new states.
Definition s_async.cpp:65
std::unordered_set< u32 > stuckSeenJobs
Definition s_async.h:209
void putJobResult(LuaJobInfo &&result)
Put a Job result back to result queue.
Definition s_async.cpp:159
void stepStuckWarning()
Print warning message if too many jobs are stuck.
Definition s_async.cpp:238
void stepAutoscale()
Handle automatic scaling of worker threads.
Definition s_async.cpp:208
bool prepareEnvironment(lua_State *L, int top)
Initialize environment with current registred functions this function adds all functions registred by...
Definition s_async.cpp:262
bool getJob(LuaJobInfo *job)
Get a Job from queue to be processed this function blocks until a job is ready.
Definition s_async.cpp:141
AsyncEngine()=default
void step(lua_State *L)
Engine step to process finished jobs.
Definition s_async.cpp:167
void(* StateInitializer)(lua_State *L, int top)
Definition s_async.h:75
std::vector< StateInitializer > stateInitializers
Definition s_async.h:215
u64 stuckTimer
Definition s_async.h:208
u32 jobIdCounter
Definition s_async.h:218
void initialize(unsigned int numEngines)
Create async engine tasks and lock function registration.
Definition s_async.cpp:72
std::mutex jobQueueMutex
Definition s_async.h:221
bool initDone
Definition s_async.h:200
void addWorkerThread()
Start an additional worker thread.
Definition s_async.cpp:91
size_t compareJobs(const T &from)
Definition s_async.h:191
bool cancelAsyncJob(u32 id)
Try to cancel an async job.
Definition s_async.cpp:128
std::mutex resultQueueMutex
Definition s_async.h:226
Definition s_async.h:54
bool isErrored
Definition s_async.h:69
virtual ~AsyncWorkerThread()
Definition s_async.cpp:327
bool checkPathInternal(const std::string &abs_path, bool write_required, bool *write_allowed) override
Should check if the given path may be accessed.
Definition s_async.cpp:332
AsyncWorkerThread(AsyncEngine *jobDispatcher, const std::string &name)
Definition s_async.cpp:295
void * run() override
Definition s_async.cpp:348
AsyncEngine * jobDispatcher
Definition s_async.h:68
Definition s_async.h:239
unsigned int getThreadingCapacity() const
Definition s_async.h:249
virtual void initAsync()=0
ScriptApiAsync(Server *server)
Definition s_async.h:241
bool cancelAsync(u32 id)
Definition s_async.cpp:436
u32 queueAsync(std::string &&serialized_func, PackedValue *param, const std::string &mod_origin)
Definition s_async.cpp:429
void stepAsync()
Definition s_async.cpp:441
AsyncEngine asyncEngine
Definition s_async.h:254
Definition s_base.h:63
Definition s_security.h:28
Definition semaphore.h:18
Definition server.h:178
Definition thread.h:57
Definition activeobjectmgr.cpp:11
Definition s_async.h:27
std::unique_ptr< PackedValue > params_ext
Definition s_async.h:41
std::string mod_origin
Definition s_async.h:47
std::string function
Definition s_async.h:37
LuaJobInfo(std::string &&func, PackedValue *params, const std::string &mod_origin="")
Definition s_async.h:31
std::string result
Definition s_async.h:43
std::string params
Definition s_async.h:39
std::unique_ptr< PackedValue > result_ext
Definition s_async.h:45
LuaJobInfo(std::string &&func, std::string &&params, const std::string &mod_origin="")
Definition s_async.h:29
LuaJobInfo()=default
u32 id
Definition s_async.h:49
A packed value can be a primitive like a string or number but also a table including all of its conte...
Definition c_packer.h:72