1 /*
2 * Copyright (c) 2017-2019 sel-project
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in all
12 * copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 *
22 */
23 /**
24 * Copyright: Copyright (c) 2017-2019 sel-project
25 * License: MIT
26 * Authors: Kripth
27 * Source: $(HTTP github.com/sel-project/selery/source/selery/hub/hncom.d, selery/hub/hncom.d)
28 */
29 module selery.hub.hncom;
30
31 import core.atomic : atomicOp;
32 import core.thread : Thread;
33
34 import std.algorithm : canFind;
35 import std.concurrency : spawn;
36 import std.conv : to;
37 import std.datetime : dur;
38 import std.json : JSONValue;
39 import std.math : round;
40 import std.regex : ctRegex, matchFirst;
41 import std.socket;
42 import std..string;
43 import std.system : Endian;
44 import std.zlib;
45
46 import sel.net.modifiers : LengthPrefixedStream;
47 import sel.net.stream : TcpStream;
48 import sel.server.query : Query;
49 import sel.server.util;
50
51 import selery.about;
52 import selery.hncom.about;
53 import selery.hncom.handler : Handler = HncomHandler;
54 import selery.hncom.io : HncomAddress, HncomUUID;
55 import selery.hub.player : WorldSession = World, PlayerSession, Skin;
56 import selery.hub.server : HubServer;
57 import selery.util.thread : SafeThread;
58 import selery.util.util : microseconds;
59
60 import Login = selery.hncom.login;
61 import Status = selery.hncom.status;
62 import Player = selery.hncom.player;
63
64 alias HncomStream = LengthPrefixedStream!(uint, Endian.littleEndian);
65
66 class HncomHandler {
67
68 private shared HubServer server;
69
70 private shared JSONValue* additionalJson;
71
72 private shared Address address;
73
74 public shared this(shared HubServer server, shared JSONValue* additionalJson) {
75 this.server = server;
76 this.additionalJson = additionalJson;
77 }
78
79 public shared void start(inout(string)[] accepted, ushort port) {
80 bool v4, v6, public_;
81 foreach(address ; accepted) {
82 switch(address) {
83 case "127.0.0.1":
84 v4 = true;
85 break;
86 case "::1":
87 v6 = true;
88 break;
89 default:
90 if(address.canFind(":")) v6 = true;
91 else v4 = true;
92 public_ = true;
93 break;
94 }
95 }
96 Address address = getAddress(public_ ? (v4 ? "0.0.0.0" : "::") : (v4 ? "127.0.0.1" : "::1"), port)[0];
97 Socket socket = new TcpSocket(v4 && v6 ? AddressFamily.INET | AddressFamily.INET6 : address.addressFamily);
98 socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
99 socket.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, !v4 || !v6);
100 socket.blocking = true;
101 socket.bind(address);
102 socket.listen(8);
103 this.address = cast(shared)address;
104 spawn(&this.acceptClients, cast(shared)socket);
105 }
106
107 private shared void acceptClients(shared Socket _socket) {
108 debug Thread.getThis().name = "hncom_server@" ~ (cast()_socket).localAddress.toString();
109 Socket socket = cast()_socket;
110 while(true) {
111 Socket client = socket.accept();
112 Address address;
113 try {
114 address = client.remoteAddress;
115 } catch(Exception) {
116 continue;
117 }
118 if(this.server.acceptNode(address)) {
119 new SafeThread(this.server.config.lang, {
120 shared ClassicNode node = new shared ClassicNode(this.server, client, this.additionalJson);
121 delete node;
122 }).start();
123 } else {
124 client.close();
125 }
126 }
127 }
128
129 public shared pure nothrow @property @safe @nogc shared(Address) localAddress() {
130 return this.address;
131 }
132
133 }
134
135 /**
136 * Session of a node. It's executed in a dedicated thread.
137 */
138 abstract class AbstractNode : Handler!serverbound {
139
140 private static shared uint _id;
141
142 public immutable uint id;
143
144 private shared HubServer server;
145 private shared JSONValue* additionalJson;
146
147 protected HncomStream stream;
148
149 private shared bool n_main;
150 private shared string n_name;
151
152 private shared uint[][ubyte] accepted;
153
154 private shared uint n_max;
155 public shared Login.NodeInfo.Plugin[] plugins;
156
157 private shared PlayerSession[uint] players;
158 private shared WorldSession[uint] _worlds;
159
160 private uint n_latency;
161
162 private shared float n_tps;
163 private shared ulong n_ram;
164 private shared float n_cpu;
165
166 public shared this(shared HubServer server, shared JSONValue* additionalJson) {
167 this.id = atomicOp!"+="(_id, 1);
168 this.server = server;
169 this.additionalJson = additionalJson;
170 }
171
172 protected shared void exchageInfo(HncomStream stream) {
173 with(cast()server.config.hub) {
174 Login.HubInfo.GameInfo[ubyte] games;
175 if(bedrock) games[__BEDROCK__] = Login.HubInfo.GameInfo(bedrock.motd, bedrock.protocols, bedrock.onlineMode, ushort(0));
176 if(java) games[__JAVA__] = Login.HubInfo.GameInfo(java.motd, java.protocols, java.onlineMode, ushort(0));
177 this.sendHubInfo(stream, new Login.HubInfo(server.id, server.nextPool, displayName, games, server.onlinePlayers, server.maxPlayers, server.config.lang.acceptedLanguages.dup, false, (cast()*this.additionalJson).toString()));
178 }
179 auto info = this.receiveNodeInfo(stream);
180 this.n_max = info.max;
181 this.accepted = cast(shared uint[][ubyte])info.acceptedGames;
182 this.plugins = cast(shared)info.plugins;
183 foreach(node ; server.nodesList) stream.send(node.addPacket.encode());
184 server.add(this);
185 this.loop(stream);
186 server.remove(this);
187 this.onClosed();
188 }
189
190 protected abstract shared void sendHubInfo(HncomStream stream, Login.HubInfo packet);
191
192 protected abstract shared Login.NodeInfo receiveNodeInfo(HncomStream stream);
193
194 protected abstract shared void loop(HncomStream stream);
195
196 protected abstract void send(ubyte[] buffer);
197
198 protected shared void send(ubyte[] buffer) {
199 return (cast()this).send(buffer);
200 }
201
202 /**
203 * Gets the name of the node. The name is different for every node
204 * connected to hub and it should be used other nodes with
205 * the transfer function.
206 */
207 public shared nothrow @property @safe @nogc const string name() {
208 return this.n_name;
209 }
210
211 /**
212 * Indicates whether or not this is a main node.
213 * A main node is able to receive players without the
214 * use of the transfer function.
215 * Every hub should have at least one main node, otherwise
216 * every player that tries to connect will be disconnected with
217 * the 'end of stream' message.
218 */
219 public shared nothrow @property @safe @nogc const bool main() {
220 return this.n_main;
221 }
222
223 /**
224 * Gets the highest number of players that can connect to the node.
225 */
226 public shared nothrow @property @safe @nogc const uint max() {
227 return this.n_max;
228 }
229
230 /**
231 * Gets the number of players connected to the node.
232 */
233 public shared nothrow @property @safe @nogc const uint online() {
234 version(X86_64) {
235 return cast(uint)this.players.length;
236 } else {
237 return this.players.length;
238 }
239 }
240
241 /**
242 * Indicates whether the node is full.
243 */
244 public shared nothrow @property @safe @nogc const bool full() {
245 return this.max != Login.NodeInfo.UNLIMITED && this.online >= this.max;
246 }
247
248 /**
249 * Gets the list of worlds loaded on the node.
250 */
251 public shared nothrow @property shared(WorldSession)[] worlds() {
252 return this._worlds.values;
253 }
254
255 /**
256 * Gets the node's latency (it may not be precise).
257 */
258 public shared nothrow @property @safe @nogc const uint latency() {
259 return this.n_latency;
260 }
261
262 /**
263 * Gets the node's usage, updated with the ResourcesUsage packet.
264 */
265 public shared nothrow @property @safe @nogc const float tps() {
266 return this.n_tps;
267 }
268
269 /// ditto
270 public shared nothrow @property @safe @nogc const ulong ram() {
271 return this.n_ram;
272 }
273
274 /// ditto
275 public shared nothrow @property @safe @nogc const float cpu() {
276 return this.n_cpu;
277 }
278
279 public shared nothrow @property @safe bool accepts(ubyte game, uint protocol) {
280 auto p = game in this.accepted;
281 return p && (*p).canFind(protocol);
282 }
283
284 public shared @property Status.AddNode addPacket() {
285 return new Status.AddNode(this.id, this.name, this.main, cast(uint[][ubyte])this.accepted);
286 }
287
288 protected override void handleStatusLatency(Status.Latency packet) {
289 this.send(packet.encode());
290 }
291
292 protected override void handleStatusLog(Status.Log packet) {
293 string name;
294 if(packet.worldId != -1) {
295 auto world = packet.worldId in this._worlds;
296 if(world) name = world.name;
297 }
298 this.server.handleLog((cast(shared)this).name, packet.message, packet.timestamp, packet.commandId, packet.worldId, name);
299 }
300
301 protected override void handleStatusSendMessage(Status.SendMessage packet) {
302 if(packet.addressees.length) {
303 foreach(addressee ; packet.addressees) {
304 auto node = this.server.nodeById(addressee);
305 if(node !is null) node.sendMessage(this.id, false, packet.payload);
306 }
307 } else {
308 foreach(node ; this.server.nodesList) {
309 if(node.id != this.id) node.sendMessage(this.id, true, packet.payload);
310 }
311 }
312 }
313
314 protected override void handleStatusUpdateMaxPlayers(Status.UpdateMaxPlayers packet) {
315 this.n_max = packet.max;
316 this.server.updateMaxPlayers();
317 }
318
319 protected override void handleStatusUpdateUsage(Status.UpdateUsage packet) {
320 this.n_ram = (cast(ulong)packet.ram) * 1024Lu;
321 this.n_cpu = packet.cpu;
322 }
323
324 protected override void handleStatusUpdateLanguageFiles(Status.UpdateLanguageFiles packet) {
325 this.server.config.lang.add(packet.language, packet.messages);
326 }
327
328 protected override void handleStatusAddWorld(Status.AddWorld packet) {
329 //TODO notify the panel
330 this._worlds[packet.worldId] = new shared WorldSession(packet.worldId, packet.groupId, packet.name, packet.dimension);
331 }
332
333 protected override void handleStatusRemoveWorld(Status.RemoveWorld packet) {
334 //TODO notify the panel
335 this._worlds.remove(packet.worldId);
336 }
337
338 protected override void handleStatusRemoveWorldGroup(Status.RemoveWorldGroup packet) {
339 //TODO notify the panel
340 foreach(world ; _worlds) {
341 if(world.groupId == packet.groupId) this._worlds.remove(world.id);
342 }
343 }
344
345 protected override void handleStatusAddPlugin(Status.AddPlugin packet) {}
346
347 protected override void handleStatusRemovePlugin(Status.RemovePlugin packet) {}
348
349 protected override void handlePlayerKick(Player.Kick packet) {
350 auto player = packet.hubId in this.players;
351 if(player) {
352 this.players.remove(packet.hubId);
353 (*player).kick(packet.reason, packet.translation, packet.parameters);
354 }
355 }
356
357 protected override void handlePlayerTransfer(Player.Transfer packet) {
358 auto player = packet.hubId in this.players;
359 if(player) {
360 this.players.remove(packet.hubId);
361 (*player).connect(Player.Add.TRANSFERRED, packet.node, packet.message, packet.onFail);
362 }
363 }
364
365 protected override void handlePlayerUpdateDisplayName(Player.UpdateDisplayName packet) {
366 auto player = packet.hubId in this.players;
367 if(player) {
368 (*player).displayName = packet.displayName;
369 }
370 }
371
372 protected override void handlePlayerUpdateWorld(Player.UpdateWorld packet) {
373 auto player = packet.hubId in this.players;
374 auto world = packet.worldId in this._worlds;
375 if(player && world) {
376 (*player).world = *world;
377 }
378 }
379
380 protected override void handlePlayerUpdatePermissionLevel(Player.UpdatePermissionLevel packet) {
381 auto player = packet.hubId in this.players;
382 if(player) {
383 (*player).permissionLevel = packet.permissionLevel;
384 }
385 }
386
387 protected override void handlePlayerGamePacket(Player.GamePacket packet) {
388 //TODO compress if needed and send
389 }
390
391 protected override void handlePlayerSerializedGamePacket(Player.SerializedGamePacket packet) {
392 auto player = packet.hubId in this.players;
393 if(player) {
394 (*player).sendFromNode(packet.payload);
395 }
396 }
397
398 protected override void handlePlayerOrderedGamePacket(Player.OrderedGamePacket packet) {
399 auto player = packet.hubId in this.players;
400 if(player) {
401 (*player).sendOrderedFromNode(packet.order, packet.payload);
402 }
403 }
404
405 /**
406 * Sends data to the node received from a player.
407 */
408 public shared void sendTo(shared PlayerSession player, ubyte[] data) {
409 this.send(new Player.GamePacket(player.id, data).encode());
410 }
411
412 /**
413 * Executes a remote command.
414 */
415 public shared void remoteCommand(string command, ubyte origin, Address address, int commandId) {
416 this.send(new Status.RemoteCommand(origin, address, command, commandId).encode());
417 }
418
419 /**
420 * Notifies the node that another node has connected
421 * to the hub.
422 */
423 public shared void addNode(shared AbstractNode node) {
424 this.send(node.addPacket.encode());
425 }
426
427 /**
428 * Notifies the node that another node has been
429 * disconnected from the hub.
430 */
431 public shared void removeNode(shared AbstractNode node) {
432 this.send(new Status.RemoveNode(node.id).encode());
433 }
434
435 /**
436 * Sends a message to the node.
437 */
438 public shared void sendMessage(uint sender, bool broadcasted, ubyte[] payload) {
439 this.send(new Status.ReceiveMessage(sender, broadcasted, payload).encode());
440 }
441
442 /**
443 * Sends the number of online players and maximum number of
444 * players to the node.
445 */
446 public shared void updatePlayers(inout uint online, inout uint max) {
447 this.send(new Status.UpdatePlayers(online, max).encode());
448 }
449
450 /**
451 * Adds a player to the node.
452 */
453 public shared void addPlayer(shared PlayerSession player, ubyte reason, ubyte[] transferMessage) {
454 this.players[player.id] = player;
455 this.send(new Player.Add(player.id, reason, transferMessage, player.type, player.protocol, player.uuid, player.username, player.displayName, player.gameName, player.gameVersion, player.permissionLevel, player.dimension, player.viewDistance, player.address, Player.Add.ServerAddress(player.serverIp, player.serverPort), player.skin is null ? Player.Add.Skin.init : Player.Add.Skin(player.skin.name, player.skin.data.dup, player.skin.cape.dup, player.skin.geometryName, player.skin.geometryData.dup), player.language, cast(ubyte)player.inputMode, player.hncomAddData().toString()).encode());
456 }
457
458 /**
459 * Called when a player is transferred by the hub (not by the node)
460 * to another node.
461 */
462 public shared void onPlayerTransferred(shared PlayerSession player) {
463 this.onPlayerGone(player, Player.Remove.TRANSFERRED);
464 }
465
466 /**
467 * Called when a player lefts the server using the disconnect
468 * button or closing the socket.
469 */
470 public shared void onPlayerLeft(shared PlayerSession player) {
471 this.onPlayerGone(player, Player.Remove.LEFT);
472 }
473
474 /**
475 * Called when a player times out.
476 */
477 public shared void onPlayerTimedOut(shared PlayerSession player) {
478 this.onPlayerGone(player, Player.Remove.TIMED_OUT);
479 }
480
481 /**
482 * Called when a player is kicked (not by the node).
483 */
484 public shared void onPlayerKicked(shared PlayerSession player) {
485 this.onPlayerGone(player, Player.Remove.KICKED);
486 }
487
488 /**
489 * Generic function that removes a player from the
490 * node's list and sends a PlayerDisconnected packet to
491 * notify the node of the disconnection.
492 */
493 protected shared void onPlayerGone(shared PlayerSession player, ubyte reason) {
494 if(this.players.remove(player.id)) {
495 this.send(new Player.Remove(player.id, reason).encode());
496 }
497 }
498
499 public shared void sendDisplayNameUpdate(shared PlayerSession player, string displayName) {
500 this.send(new Player.UpdateDisplayName(player.id, displayName).encode());
501 }
502
503 public shared void sendPermissionLevelUpdate(shared PlayerSession player, ubyte permissionLevel) {
504 this.send(new Player.UpdatePermissionLevel(player.id, permissionLevel).encode());
505 }
506
507 public shared void sendViewDistanceUpdate(shared PlayerSession player, uint viewDistance) {
508 this.send(new Player.UpdateViewDistance(player.id, viewDistance).encode());
509 }
510
511 public shared void sendLanguageUpdate(shared PlayerSession player, string language) {
512 this.send(new Player.UpdateLanguage(player.id, language).encode());
513 }
514
515 /**
516 * Updates a player's latency (usually sent every 30 seconds).
517 */
518 public shared void sendLatencyUpdate(shared PlayerSession player) {
519 this.send(new Player.UpdateLatency(player.id, player.latency).encode());
520 }
521
522 /**
523 * Updates a player's packet loss (usually sent every 30 seconds).
524 */
525 public shared void sendPacketLossUpdate(shared PlayerSession player) {
526 this.send(new Player.UpdatePacketLoss(player.id, player.packetLoss).encode());
527 }
528
529 /**
530 * Called when the client closes the connection.
531 * Tries to transfer every connected player to the main node.
532 */
533 public shared void onClosed(bool transfer=true) {
534 if(transfer) {
535 foreach(shared PlayerSession player ; this.players) {
536 player.connect(Player.Add.FORCIBLY_TRANSFERRED);
537 }
538 } else {
539 foreach(shared PlayerSession player ; this.players) {
540 player.kick("disconnect.close", true, []);
541 }
542 }
543 }
544
545 public abstract shared inout string toString();
546
547 }
548
549 class ClassicNode : AbstractNode {
550
551 private shared Socket socket;
552 private immutable string remoteAddress;
553
554 public shared this(shared HubServer server, Socket socket, shared JSONValue* additionalJson) {
555 super(server, additionalJson);
556 this.socket = cast(shared)socket;
557 this.remoteAddress = socket.remoteAddress.toString();
558 debug Thread.getThis().name = "hncom_client#" ~ to!string(this.id);
559 socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"msecs"(2500));
560 socket.blocking = true;
561 auto stream = new HncomStream(new TcpStream(socket, 4096));
562 this.stream = cast(shared)stream;
563 auto payload = stream.receive();
564 if(payload.length && payload[0] == Login.ConnectionRequest.ID) {
565 immutable password = server.config.hub.hncomPassword;
566 auto request = Login.ConnectionRequest.fromBuffer(payload);
567 this.n_name = request.name.idup;
568 this.n_main = request.main;
569 Login.ConnectionResponse response = new Login.ConnectionResponse();
570 if(request.protocol > __PROTOCOL__) response.status = Login.ConnectionResponse.OUTDATED_HUB;
571 else if(request.protocol < __PROTOCOL__) response.status = Login.ConnectionResponse.OUTDATED_NODE;
572 else if(password.length && !password.length) response.status = Login.ConnectionResponse.PASSWORD_REQUIRED;
573 else if(password.length && password != request.password) response.status = Login.ConnectionResponse.WRONG_PASSWORD;
574 else if(!this.n_name.length || this.n_name.length > 32) response.status = Login.ConnectionResponse.INVALID_NAME_LENGTH;
575 else if(!this.n_name.matchFirst(ctRegex!r"[^a-zA-Z0-9_+-.,!?:@#$%\/]").empty) response.status = Login.ConnectionResponse.INVALID_NAME_CHARACTERS;
576 else if(server.nodeNames.canFind(this.n_name)) response.status = Login.ConnectionResponse.NAME_ALREADY_USED;
577 else if(["reload", "stop"].canFind(this.n_name.toLower)) response.status = Login.ConnectionResponse.NAME_RESERVED;
578 stream.send(response.encode());
579 if(response.status == Login.ConnectionResponse.OK) {
580 this.exchageInfo(stream);
581 }
582 }
583 socket.close();
584 }
585
586 protected override shared void sendHubInfo(HncomStream stream, Login.HubInfo packet) {
587 stream.send(packet.encode());
588 }
589
590 protected override shared Login.NodeInfo receiveNodeInfo(HncomStream stream) {
591 stream.stream.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"minutes"(5)); // giving it the time to load resorces and generate worlds
592 auto payload = stream.receive();
593 if(payload.length && payload[0] == Login.NodeInfo.ID) return Login.NodeInfo.fromBuffer(payload);
594 else return Login.NodeInfo.init;
595 }
596
597 protected override shared void loop(HncomStream stream) {
598 auto _this = cast()this;
599 stream.stream.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"msecs"(0)); // blocking without timeout
600 while(true) {
601 auto payload = stream.receive();
602 if(payload.length) _this.handleHncom(payload);
603 else break; // connection closed or error
604 }
605 }
606
607 protected override void send(ubyte[] payload) {
608 this.stream.send(payload);
609 }
610
611 public override shared inout string toString() {
612 return "Node(" ~ to!string(this.id) ~ ", " ~ this.name ~ ", " ~ this.remoteAddress ~ ", " ~ to!string(this.n_main) ~ ")";
613 }
614
615 }
616
617 class LiteNode : AbstractNode {
618
619 static import std.concurrency;
620
621 public shared static bool ready = false;
622 public shared static std.concurrency.Tid tid;
623
624 private std.concurrency.Tid node;
625
626 public shared this(shared HubServer server, shared JSONValue* additionalJson) {
627 super(server, additionalJson);
628 tid = cast(shared)std.concurrency.thisTid;
629 ready = true;
630 this.node = cast(shared)std.concurrency.receiveOnly!(std.concurrency.Tid)();
631 this.n_main = true;
632 this.exchageInfo(null);
633 }
634
635 protected override shared void sendHubInfo(HncomStream stream, Login.HubInfo packet) {
636 std.concurrency.send(cast()this.node, cast(shared)packet);
637 }
638
639 protected override shared Login.NodeInfo receiveNodeInfo(HncomStream stream) {
640 return cast()std.concurrency.receiveOnly!(shared Login.NodeInfo)();
641 }
642
643 protected override shared void loop(HncomStream stream) {
644 auto _this = cast()this;
645 while(true) {
646 ubyte[] payload = std.concurrency.receiveOnly!(immutable(ubyte)[])().dup;
647 if(payload.length) {
648 _this.handleHncom(payload);
649 } else {
650 break;
651 }
652 }
653 }
654
655 protected override void send(ubyte[] buffer) {
656 std.concurrency.send(this.node, buffer.idup);
657 }
658
659 protected override shared void send(ubyte[] buffer) {
660 std.concurrency.send(cast()this.node, buffer.idup);
661 }
662
663 public override shared inout string toString() {
664 return "LiteNode(" ~ to!string(this.id) ~ ")";
665 }
666
667 }