mapboxgl源码分析-2-worker和主线程
# web worker到底是什么?
- web worker: 它允许一段JavaScript程序运行在主线程之外通过new Worker()创建另外一个线程。
- 出现web worker的原因:Javascript是运行在单线程环境中,无法同时运行多个脚本,但为了应付各种复杂的场景,出现了web worker 使得js也能想其他如java一样 同时多线程运行
注意:传入 Worker 构造函数的参数 URI 必须遵循同源策略。Worker线程的创建的是异步的,主线程代码不会阻塞在这里等待worker线程去加载、执行指定的脚本文件,而是会立即向下继续执行后面代码。
# mapboxgl怎么创建的worker?
# 1. 通过rollup将需要在worker线程内运行的脚本打包成一个集合
//rollup.config.js文件里
input: ['src/index.js', 'src/source/worker.js'],
output: {
dir: 'rollup/build/mapboxgl',
format: 'amd',
sourcemap: 'inline',
indent: false,
chunkFileNames: 'shared.js'
},
//将所有需要在worker线程内运行的脚本打包到rollup/build/mapboxgl.js里整合起来 为第二个打包worker逻辑做准备
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 2. 处理主程序代码mapboxgl和生成子程序代码worker的URL对象
将上面整合的代码脚本通过rollup打包格式umd自带的define()方法
//rollup.config.js文件里
//当前打包的目的主要是为umd打包模式添加一个自定义define方法,从而能自主处理主程序代码和子程序代码
input: 'rollup/mapboxgl.js',
output: {
name: 'mapboxgl',
file: outputFile,
format: 'umd',
sourcemap: production ? true : 'inline',
indent: false,
intro: fs.readFileSync(fileURLToPath(new URL('./rollup/bundle_prelude.js', import.meta.url)), 'utf8'),
banner
},
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# define方法
//rollup/bundle_prelude.js文件里
//其中通过amd打包格式的特点 将上面打包的合集的引入依赖分别赋值给 shared, worker, mapboxgl三个对象 其中mapboxgl代码是主程序逻辑 worker子线程逻辑代码 shared共用代码
function define(_, chunk) {
if (!shared) {
shared = chunk;
} else if (!worker) {
worker = chunk;
} else {
var workerBundleString = "self.onerror = function() { console.error('An error occurred while parsing the WebWorker bundle. This is most likely due to improper transpilation by Babel; please see https://docs.mapbox.com/mapbox-gl-js/guides/install/#transpiling'); }; var sharedChunk = {}; (" + shared + ")(sharedChunk); (" + worker + ")(sharedChunk); self.onerror = null;"
var sharedChunk = {}; //sharedChunk各种import依赖的对象集合 但这次只需要使用{}空对象表示 因为在此打包的时候 已经通过amd打包将import依赖整合过了
shared(sharedChunk); //将公用的代码赋值给对应的主线程和子线程的代码
mapboxgl = chunk(sharedChunk);//将主程序逻辑代码赋值给mapboxgl对象
if (typeof window !== 'undefined' && window && window.URL && window.URL.createObjectURL) {
//将worker代码逻辑通过URL.createObjectURL处理成同源链接赋值给workerUrl
mapboxgl.workerUrl = window.URL.createObjectURL(new Blob([workerBundleString], { type: 'text/javascript' }));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
注意:主要是amd和umd打包模式的妙用 rollup的amd和umd打包模式想深入了解的可自己百度
# 3.创建worker
# 在主线程里创建worker
//src\util\browser\web_worker.js文件里
export default function (): WorkerInterface {
//在主线程中生成 Worker 线程
return (mapboxgl.workerClass != null) ? new mapboxgl.workerClass() : (new window.Worker(mapboxgl.workerUrl): any);
}
1
2
3
4
5
6
2
3
4
5
6
在主线程里创建worker代码
//生成环境: src\util\browser\web_worker.js文件里
export default function (): WorkerInterface {
//在主线程中生成 Worker 线程
return (mapboxgl.workerClass != null) ? new mapboxgl.workerClass() : (new window.Worker(mapboxgl.workerUrl): any); // eslint-disable-line new-cap
}
//开发环境: src\util\web_worker.js文件里
export default function WebWorker (): WorkerInterface {
const parentListeners = [],
workerListeners = [],
parentBus = new MessageBus(workerListeners, parentListeners),
workerBus = new MessageBus(parentListeners, workerListeners);
parentBus.target = workerBus;
workerBus.target = parentBus;
new WebWorker.Worker(workerBus);
return parentBus;
}
WebWorker.Worker = Worker;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 默认创建2个worker
//src\util\worker_pool.js文件里
acquire(mapId: number | string): Array<WorkerInterface> {
if (!this.workers) {
//WorkerPool.workerCount 默认为2
this.workers = [];
while (this.workers.length < WorkerPool.workerCount) {
this.workers.push(new WebWorker()); //此时的WebWorker就是上面的export default导出对象
}
}
this.active[mapId] = true;
return this.workers.slice();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
思考:此时我们的主线程里有了两个worker线程了 可以独立执行任务,但这怎么和主线程相互联系呢?
# 在worker肚子里创建了一个自定义的worker将self传入
//src\source\worker.js文件:
if (typeof WorkerGlobalScope !== 'undefined' &&
typeof self !== 'undefined' &&
self instanceof WorkerGlobalScope) {
//通过self WorkerGlobalScope是否存在来判断是在主线程还是子线程里 如果存在WorkerGlobalScope说明在子线程里
self.worker = new Worker(self);
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
注意: self上可用的属性和方法是window上的子集 所以可以通过self来实现主程序和子线程之间的通信!
# 为每个Worker对象都创建一个对应的actor类
所以actor类的个数将是web worker的2倍 actor将负责主线程和子线程的发送和监听数据 实现双向绑定
//src\util\dispatcher.js文件:
constructor(workerPool: WorkerPool, parent: any) {
//工作池
this.workerPool = workerPool;
this.actors = [];
this.currentActor = 0;
this.id = uniqueId();//返回数量2
const workers = this.workerPool.acquire(this.id);
for (let i = 0; i < workers.length; i++) {
const worker = workers[i];
// 对应的主线程创建actor类
const actor = new Dispatcher.Actor(worker, parent, this.id);
actor.name = `Worker ${i}`;
this.actors.push(actor);
}
...
}
//src\source\worker.js文件:
// 此创建worker是自定义worker: self.worker = new Worker(self);
constructor (self: WorkerGlobalScopeInterface) {
PerformanceUtils.measure('workerEvaluateScript');
this.self = self;
// 对应的子线程创建actor类
this.actor = new Actor(self, this);
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Actor类
负责注册message方法和发送postMessage事件 主程序和子线程数据中转站
//src\util\actor.js文件:
//构造函数里注册message方法
constructor (target: any, parent: any, mapId: ?number) {
this.target = target;
...
//在worker这里接收主线程发来的消息
//注意 1. this.target为self时也会注册message 而这个message方法触发方式:当主线程postMessage发来消息时触发 2.this.target为worker对象时 而这个message方法触发方式 当子线程postMessage发来消息时触发
this.target.addEventListener('message', this.receive, false);
this.globalScope = isWorker() ? target : window; //self 与 window
this.scheduler = new Scheduler();
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
- send方法负责发送数据 postMessage
//src\util\actor.js文件:
send (type: string, data: mixed, callback: ?Function, targetMapId: ?string, mustQueue: boolean = false, callbackMetadata?: Object): ?Cancelable {
// 我们使用字符串ID而不是数字,因为它们被用作对象键,因此被隐式字符串化。我们使用随机ID,因为一个参与者可能从多个其他参与者接收消息,
// 这些参与者可能在不同的执行上下文中运行。线性增加的ID可能会产生冲突。
const id = Math.round((Math.random() * 1e18)).toString(36).substring(0, 10);
if (callback) {
callback.metadata = callbackMetadata;
this.callbacks[id] = callback;
} else {
// console.log('不存在callbock,此时执行的是tile瓦片服务加载,且已请求获得了数据将不在循环发送消息')
//此时有个好处就是send发送了事件后将不会再接收到反馈 将receive死循环掐断 终止了反复主子线程发送消息 这个只在raster_tile和vector_tile里有
}
const buffers: ?Array<Transferable> = isSafari(this.globalScope) ? undefined : [];
// 注意 这里的this.target可以是self或子线程worker 也可以是worker实例即在主线程中的实例对象 所以 this.target.postMessage能同时处理将消息从主线程映射发送到Worker或从Worker发送回主线程映射实例
this.target.postMessage({
id,
type,
hasCallback: !!callback,
targetMapId,
mustQueue,
sourceMapId: this.mapId,
data: serialize(data, buffers)
}, buffers);
//注意:只有执行相关tile瓦片事件的时候会用到return返回的对象 然后主动执行cancel方法,因为瓦片需要请求服务器数据
return {
//2.第二种终止互相循环通信的方式,但每次发送都会执行两次'<cancel>',只有两次才能停止,而且worker是2个,所以一共执行了4次
cancel: () => {
if (callback) {
//将回调设置为null,以便在请求中止后永不激发
// Set the callback to null so that it never fires after the request is aborted.
delete this.callbacks[id];
}
// console.log('执行取消发送请求');
this.target.postMessage({
id,
type: '<cancel>',
targetMapId,
sourceMapId: this.mapId
});
}
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
- receive message事件----接收----主程序或worker发来的消息
receive (message: Object) {
// if()
const data = message.data,
id = data.id;
...
if (data.type === '<cancel>') {
//2.第二种终止互相循环通信的方式,但每次发送都会执行两次'<cancel>',只有两次才能停止,而且worker是2个,所以一共执行了4次
//什么清空终止了请求瓦片服务呢?
//从队列中删除原始请求。这只有在还没有开始的情况下才有可能。id将保留在队列中,但由于没有关联的任务,一旦执行该任务,它将被删除。
const cancel = this.cancelCallbacks[id];
delete this.cancelCallbacks[id];
if (cancel) {
cancel.cancel();
}
} else {
//当事件是loadTile时 data.mustQueue=true
if (data.mustQueue || isWorker()) {
// 对于经常被取消的辅助任务,如loadTile,在实际处理它们之前存储它们。这是必要的,因为我们希望继续接收<cancel>消息。
// 某些任务在工作线程中可能需要一段时间,因此在执行队列中的下一个任务之前,postMessage会抢先执行该任务,并且可以处理<cancel>消息。
// 我们使用MessageChannel对象将process()流限制为一次一个。
const callback = this.callbacks[id];
const metadata = (callback && callback.metadata) || {type: "message"};
//scheduler类是为了让我们要执行的函数必定在下一帧渲染的时候执行!!!!!!!!!!!!
this.cancelCallbacks[id] = this.scheduler.add(() => this.processTask(id, data), metadata);
} else {
//在主线程中,会立即处理消息,这样其他worker就不会在从workers处获取部分数据之间发生冲突。
this.processTask(id, data);
}
}
}
//处理receive接收到的消息通过task.type来区分是主线程发来的还是子线程发来的 并分发给worker的各个方法执行 setReferrer syncRTLPluginState checkIfReady setProjection enableTerrain spriteLoaded enforceCacheSizeLimit setLayers loadTile getGlyphs abortTile removeTile
processTask (id: number, task: any) {
if (task.type === '<response>') {
//this.target=worker 子线程接收到主线程发来的数据后------然后处理的方法
//对应方法中的done()函数已被调用,如果有回调,我们现在将在始发参与者中触发回调。
const callback = this.callbacks[id];
delete this.callbacks[id];
if (callback) {
if (task.error) {
callback(deserialize(task.error));
} else {
callback(null, deserialize(task.data));
}
}
} else {
//this.target=== self 主线程向子线程发来的数据-----然后开始处理
// task.type == setReferrer syncRTLPluginState checkIfReady setProjection enableTerrain spriteLoaded enforceCacheSizeLimit setLayers loadTile getGlyphs abortTile removeTile
const buffers: ?Array<Transferable> = isSafari(this.globalScope) ? undefined : [];
//done函数接收主线程中传递的参数,以及子线程的回调函数
const done = task.hasCallback ? (err, data) => {
//task.hasCallback为true执行了done方法就会返回type: '<response>',类型
// console.log('done方法');
delete this.cancelCallbacks[id];
this.target.postMessage({
id,
type: '<response>',
sourceMapId: this.mapId,
error: err ? serialize(err) : null,
data: serialize(data, buffers)
}, buffers);
} : (_) => {
};
const params = (deserialize(task.data): any);
if (this.parent[task.type]) {
//执行worker.js类中的各个方法处理数据 如: worker.enableTerrain(task.sourceMapId, params, done)
this.parent[task.type](task.sourceMapId, params, done);
} else if (this.parent.getWorkerSource) {
// console.log('getWorkerSource',task.type)
//注意:!!!!!!此判断代码基本不会进入执行
// task.type == sourcetype.method
const keys = task.type.split('.');
const scope = (this.parent: any).getWorkerSource(task.sourceMapId, keys[0], params.source);
scope[keys[1]](params, done);
} else {
// No function was found.
done(new Error(`Could not find function ${task.type}`));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# 此时主线程和worker之间的通信的大体流程就出来了
# 今天先到这!下一篇具体讲worker如何获取和处理数据
编辑 (opens new window)
上次更新: 2024/08/09, 10:55:31