Luanti 5.11.0-dev
 
Loading...
Searching...
No Matches
s_async.h
Go to the documentation of this file.
1// Luanti
2// SPDX-License-Identifier: LGPL-2.1-or-later
3// Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
4
5#pragma once
6
7#include <vector>
8#include <deque>
9#include <unordered_set>
10#include <memory>
11
12#include <lua.h>
13#include "threading/semaphore.h"
14#include "threading/thread.h"
15#include "common/c_packer.h"
16#include "cpp_api/s_base.h"
17#include "cpp_api/s_security.h"
18
19// Forward declarations
20class AsyncEngine;
21
22
23// Declarations
24
25// Data required to queue a job
27{
28 LuaJobInfo() = default;
29
30 // Function to be called in async environment (from string.dump)
31 std::string function;
32 // Parameter to be passed to function (serialized)
33 std::string params;
34 // Alternative parameters
35 std::unique_ptr<PackedValue> params_ext;
36 // Result of function call (serialized)
37 std::string result;
38 // Alternative result
39 std::unique_ptr<PackedValue> result_ext;
40 // Name of the mod who invoked this call
41 std::string mod_origin;
42 // JobID used to identify a job and match it to callback
43 u32 id;
44};
45
46// Asynchronous working environment
48 virtual public ScriptApiBase, public ScriptApiSecurity {
49 friend class AsyncEngine;
50public:
51 virtual ~AsyncWorkerThread();
52
53 void *run() override;
54
55protected:
56 AsyncWorkerThread(AsyncEngine* jobDispatcher, const std::string &name);
57
58 bool checkPathInternal(const std::string &abs_path, bool write_required,
59 bool *write_allowed) override;
60
61private:
63 bool isErrored = false;
64};
65
66// Asynchornous thread and job management
68 friend class AsyncWorkerThread;
69 typedef void (*StateInitializer)(lua_State *L, int top);
70public:
71 AsyncEngine() = default;
74
80
85 void initialize(unsigned int numEngines);
86
93 u32 queueAsyncJob(std::string &&func, std::string &&params,
94 const std::string &mod_origin = "");
95
102 u32 queueAsyncJob(std::string &&func, PackedValue *params,
103 const std::string &mod_origin = "");
104
109 void step(lua_State *L);
110
111protected:
118 bool getJob(LuaJobInfo *job);
119
124 void putJobResult(LuaJobInfo &&result);
125
129 void addWorkerThread();
130
134 void stepJobResults(lua_State *L);
135
139 void stepAutoscale();
140
149 bool prepareEnvironment(lua_State* L, int top);
150
151private:
152 // Variable locking the engine against further modification
153 bool initDone = false;
154
155 // Maximum number of worker threads for automatic scaling
156 // 0 if disabled
157 unsigned int autoscaleMaxWorkers = 0;
159 std::unordered_set<u32> autoscaleSeenJobs;
160
161 // Only set for the server async environment (duh)
162 Server *server = nullptr;
163
164 // Internal store for registred state initializers
165 std::vector<StateInitializer> stateInitializers;
166
167 // Internal counter to create job IDs
169
170 // Mutex to protect job queue
171 std::mutex jobQueueMutex;
172 // Job queue
173 std::deque<LuaJobInfo> jobQueue;
174
175 // Mutex to protect result queue
177 // Result queue
178 std::deque<LuaJobInfo> resultQueue;
179
180 // List of current worker threads
181 std::vector<AsyncWorkerThread*> workerThreads;
182
183 // Counter semaphore for job dispatching
185};
Definition s_async.h:67
unsigned int autoscaleMaxWorkers
Definition s_async.h:157
~AsyncEngine()
Definition s_async.cpp:28
std::unordered_set< u32 > autoscaleSeenJobs
Definition s_async.h:159
u32 queueAsyncJob(std::string &&func, std::string &&params, const std::string &mod_origin="")
Queue an async job.
Definition s_async.cpp:94
u64 autoscaleTimer
Definition s_async.h:158
std::vector< AsyncWorkerThread * > workerThreads
Definition s_async.h:181
std::deque< LuaJobInfo > jobQueue
Definition s_async.h:173
void stepJobResults(lua_State *L)
Process finished jobs callbacks.
Definition s_async.cpp:161
Semaphore jobQueueCounter
Definition s_async.h:184
std::deque< LuaJobInfo > resultQueue
Definition s_async.h:178
AsyncEngine(Server *server)
Definition s_async.h:72
void registerStateInitializer(StateInitializer func)
Register function to be called on new states.
Definition s_async.cpp:59
void putJobResult(LuaJobInfo &&result)
Put a Job result back to result queue.
Definition s_async.cpp:147
void stepAutoscale()
Handle automatic scaling of worker threads.
Definition s_async.cpp:195
bool prepareEnvironment(lua_State *L, int top)
Initialize environment with current registred functions this function adds all functions registred by...
Definition s_async.cpp:230
bool getJob(LuaJobInfo *job)
Get a Job from queue to be processed this function blocks until a job is ready.
Definition s_async.cpp:129
AsyncEngine()=default
void step(lua_State *L)
Engine step to process finished jobs.
Definition s_async.cpp:155
void(* StateInitializer)(lua_State *L, int top)
Definition s_async.h:69
std::vector< StateInitializer > stateInitializers
Definition s_async.h:165
u32 jobIdCounter
Definition s_async.h:168
void initialize(unsigned int numEngines)
Create async engine tasks and lock function registration.
Definition s_async.cpp:66
std::mutex jobQueueMutex
Definition s_async.h:171
bool initDone
Definition s_async.h:153
void addWorkerThread()
Start an additional worker thread.
Definition s_async.cpp:85
std::mutex resultQueueMutex
Definition s_async.h:176
Definition s_async.h:48
bool isErrored
Definition s_async.h:63
virtual ~AsyncWorkerThread()
Definition s_async.cpp:295
bool checkPathInternal(const std::string &abs_path, bool write_required, bool *write_allowed) override
Should check if the given path may be accessed.
Definition s_async.cpp:300
AsyncWorkerThread(AsyncEngine *jobDispatcher, const std::string &name)
Definition s_async.cpp:263
void * run() override
Definition s_async.cpp:316
AsyncEngine * jobDispatcher
Definition s_async.h:62
Definition s_base.h:64
Definition s_security.h:28
Definition semaphore.h:18
Definition server.h:166
Definition thread.h:57
Definition activeobjectmgr.cpp:11
Definition s_async.h:27
std::unique_ptr< PackedValue > params_ext
Definition s_async.h:35
std::string mod_origin
Definition s_async.h:41
std::string function
Definition s_async.h:31
std::string result
Definition s_async.h:37
std::string params
Definition s_async.h:33
std::unique_ptr< PackedValue > result_ext
Definition s_async.h:39
LuaJobInfo()=default
u32 id
Definition s_async.h:43
A packed value can be a primitive like a string or number but also a table including all of its conte...
Definition c_packer.h:72