Minetest 5.9.0-dev
 
Loading...
Searching...
No Matches
s_async.h
Go to the documentation of this file.
1/*
2Minetest
3Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
4
5This program is free software; you can redistribute it and/or modify
6it under the terms of the GNU Lesser General Public License as published by
7the Free Software Foundation; either version 2.1 of the License, or
8(at your option) any later version.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU Lesser General Public License for more details.
14
15You should have received a copy of the GNU Lesser General Public License along
16with this program; if not, write to the Free Software Foundation, Inc.,
1751 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18*/
19
20#pragma once
21
22#include <vector>
23#include <deque>
24#include <unordered_set>
25#include <memory>
26
27#include <lua.h>
28#include "threading/semaphore.h"
29#include "threading/thread.h"
30#include "common/c_packer.h"
31#include "cpp_api/s_base.h"
32#include "cpp_api/s_security.h"
33
34// Forward declarations
35class AsyncEngine;
36
37
38// Declarations
39
40// Data required to queue a job
42{
43 LuaJobInfo() = default;
44
45 // Function to be called in async environment (from string.dump)
46 std::string function;
47 // Parameter to be passed to function (serialized)
48 std::string params;
49 // Alternative parameters
50 std::unique_ptr<PackedValue> params_ext;
51 // Result of function call (serialized)
52 std::string result;
53 // Alternative result
54 std::unique_ptr<PackedValue> result_ext;
55 // Name of the mod who invoked this call
56 std::string mod_origin;
57 // JobID used to identify a job and match it to callback
58 u32 id;
59};
60
61// Asynchronous working environment
63 virtual public ScriptApiBase, public ScriptApiSecurity {
64 friend class AsyncEngine;
65public:
66 virtual ~AsyncWorkerThread();
67
68 void *run();
69
70protected:
71 AsyncWorkerThread(AsyncEngine* jobDispatcher, const std::string &name);
72
73private:
75 bool isErrored = false;
76};
77
78// Asynchornous thread and job management
80 friend class AsyncWorkerThread;
81 typedef void (*StateInitializer)(lua_State *L, int top);
82public:
83 AsyncEngine() = default;
86
92
97 void initialize(unsigned int numEngines);
98
105 u32 queueAsyncJob(std::string &&func, std::string &&params,
106 const std::string &mod_origin = "");
107
114 u32 queueAsyncJob(std::string &&func, PackedValue *params,
115 const std::string &mod_origin = "");
116
121 void step(lua_State *L);
122
123protected:
130 bool getJob(LuaJobInfo *job);
131
136 void putJobResult(LuaJobInfo &&result);
137
141 void addWorkerThread();
142
146 void stepJobResults(lua_State *L);
147
151 void stepAutoscale();
152
161 bool prepareEnvironment(lua_State* L, int top);
162
163private:
164 // Variable locking the engine against further modification
165 bool initDone = false;
166
167 // Maximum number of worker threads for automatic scaling
168 // 0 if disabled
169 unsigned int autoscaleMaxWorkers = 0;
171 std::unordered_set<u32> autoscaleSeenJobs;
172
173 // Only set for the server async environment (duh)
174 Server *server = nullptr;
175
176 // Internal store for registred state initializers
177 std::vector<StateInitializer> stateInitializers;
178
179 // Internal counter to create job IDs
181
182 // Mutex to protect job queue
183 std::mutex jobQueueMutex;
184 // Job queue
185 std::deque<LuaJobInfo> jobQueue;
186
187 // Mutex to protect result queue
189 // Result queue
190 std::deque<LuaJobInfo> resultQueue;
191
192 // List of current worker threads
193 std::vector<AsyncWorkerThread*> workerThreads;
194
195 // Counter semaphore for job dispatching
197};
Definition: s_async.h:79
unsigned int autoscaleMaxWorkers
Definition: s_async.h:169
~AsyncEngine()
Definition: s_async.cpp:39
std::unordered_set< u32 > autoscaleSeenJobs
Definition: s_async.h:171
u32 queueAsyncJob(std::string &&func, std::string &&params, const std::string &mod_origin="")
Queue an async job.
Definition: s_async.cpp:104
u64 autoscaleTimer
Definition: s_async.h:170
std::vector< AsyncWorkerThread * > workerThreads
Definition: s_async.h:193
std::deque< LuaJobInfo > jobQueue
Definition: s_async.h:185
void stepJobResults(lua_State *L)
Process finished jobs callbacks.
Definition: s_async.cpp:171
Semaphore jobQueueCounter
Definition: s_async.h:196
std::deque< LuaJobInfo > resultQueue
Definition: s_async.h:190
AsyncEngine(Server *server)
Definition: s_async.h:84
void registerStateInitializer(StateInitializer func)
Register function to be called on new states.
Definition: s_async.cpp:69
void putJobResult(LuaJobInfo &&result)
Put a Job result back to result queue.
Definition: s_async.cpp:157
void stepAutoscale()
Handle automatic scaling of worker threads.
Definition: s_async.cpp:205
bool prepareEnvironment(lua_State *L, int top)
Initialize environment with current registred functions this function adds all functions registred by...
Definition: s_async.cpp:240
bool getJob(LuaJobInfo *job)
Get a Job from queue to be processed this function blocks until a job is ready.
Definition: s_async.cpp:139
AsyncEngine()=default
void step(lua_State *L)
Engine step to process finished jobs.
Definition: s_async.cpp:165
void(* StateInitializer)(lua_State *L, int top)
Definition: s_async.h:81
std::vector< StateInitializer > stateInitializers
Definition: s_async.h:177
u32 jobIdCounter
Definition: s_async.h:180
void initialize(unsigned int numEngines)
Create async engine tasks and lock function registration.
Definition: s_async.cpp:76
std::mutex jobQueueMutex
Definition: s_async.h:183
bool initDone
Definition: s_async.h:165
void addWorkerThread()
Start an additional worker thread.
Definition: s_async.cpp:95
std::mutex resultQueueMutex
Definition: s_async.h:188
Definition: s_async.h:63
bool isErrored
Definition: s_async.h:75
virtual ~AsyncWorkerThread()
Definition: s_async.cpp:305
void * run()
Definition: s_async.cpp:311
AsyncEngine * jobDispatcher
Definition: s_async.h:74
Definition: s_base.h:79
Definition: s_security.h:41
Definition: semaphore.h:33
Definition: server.h:146
Definition: thread.h:57
static LightingParams params
Definition: light.cpp:40
Definition: activeobjectmgr.cpp:26
Definition: s_async.h:42
std::unique_ptr< PackedValue > params_ext
Definition: s_async.h:50
std::string mod_origin
Definition: s_async.h:56
std::string function
Definition: s_async.h:46
std::string result
Definition: s_async.h:52
std::string params
Definition: s_async.h:48
std::unique_ptr< PackedValue > result_ext
Definition: s_async.h:54
LuaJobInfo()=default
u32 id
Definition: s_async.h:58
A packed value can be a primitive like a string or number but also a table including all of its conte...
Definition: c_packer.h:87