Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into feature/add_ci_and…
Browse files Browse the repository at this point in the history
…_linting
  • Loading branch information
ddimaria committed Sep 13, 2022
2 parents e31a42f + c54b121 commit e156adf
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 49 deletions.
44 changes: 44 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
module.exports = {
parser: '@typescript-eslint/parser',
plugins: [
'@typescript-eslint',
'eslint-plugin-prettier',
'autofix',
'import',
'compat',
'prettier',
'unused-imports',
'react-perf',
],
ignorePatterns: ['**/proto_ts/**/*'],
rules: {
'@typescript-eslint/explicit-module-boundary-types': 'off',
'@typescript-eslint/no-var-requires': 0,
'@typescript-eslint/ban-ts-comment': 0,
'@typescript-eslint/no-empty-interface': 0,
'@typescript-eslint/ban-types': 0,
'@typescript-eslint/no-explicit-any': 0,
'@typescript-eslint/no-non-null-assertion': 0,
'prettier/prettier': 'error',
'import/order': 'error',
'function-paren-newline': ['error', 'consistent'],
'array-callback-return': 0,
'@typescript-eslint/no-unused-vars': 1,
'function-paren-newline': 0,
'unused-imports/no-unused-imports-ts': 2,
camelcase: 0,
'react-hooks/exhaustive-deps': 1,
'no-use-before-define': 'off',
'@typescript-eslint/no-use-before-define': ['error'],
},
extends: [
'react-app',
'eslint:recommended',
'plugin:@typescript-eslint/recommended',
'prettier',
'plugin:import/errors',
'plugin:import/warnings',
'plugin:import/typescript',
'plugin:markdown/recommended',
],
};
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- [API](#api)
- [Transport](#transport)
- [Connection](#connection)
- [Hacking](#hacking)
- [Contribute](#contribute)
- [Development](#development)
- [Build](#build)
Expand Down Expand Up @@ -56,13 +57,20 @@ const values = await pipe(socket, all);

[![](https://raw.githubusercontent.com/libp2p/js-libp2p-interfaces/master/packages/libp2p-interfaces/src/connection/img/badge.png)](https://github.com/libp2p/js-libp2p-interfaces/tree/master/packages/libp2p-interfaces/src/connection)

## Hacking

Besides the usual `npm install` to get dependencies, `npm run build` to invoke tsc, and `npm run test` to execute unit tests...

There is also `npm run autogen` which uses ProtoBuf's protoc to populate the generated code directory `proto_ts` based on `*.proto` files in src. Don't forget to run this step before `build` any time you make a change to any of the `*.proto` files.

## Contribute

Contributions are welcome! The libp2p implementation in JavaScript is a work in progress. As such, there's a few things you can do right now to help out:

- [Check out the existing issues](//github.com/little-bear-labs//js-libp2p-webrtc/issues).
- **Perform code reviews**.
- **Add tests**. There can never be enough tests.
- Go through the modules and **check out existing issues**. This is especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it - for instance, you may need to read up on p2p and more complex operations like muxing to be able to help technically.

Please be aware that all interactions related to libp2p are subject to the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md).

Expand Down
5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
"!**/*.tsbuildinfo"
],
"scripts": {
"autogen": "npx protoc --ts_out proto_ts --proto_path src src/*.proto",
"othergen": "./node_modules/.bin/proto-loader-gen-types --longs=String --enums=String --defaults --oneofs --grpcLib=@grpc/grpc-js --outDir=proto_ts/ src/*.proto",
"build": "aegir build",
"test": "aegir test --target browser",
"lint": "aegir lint",
Expand Down Expand Up @@ -53,6 +55,9 @@
"@libp2p/multistream-select": "^3.0.0",
"@libp2p/peer-id": "^1.1.15",
"@multiformats/multiaddr": "^10.4.0",
"@protobuf-ts/plugin": "^2.8.0",
"@protobuf-ts/protoc": "^2.8.0",
"@protobuf-ts/runtime": "^2.8.0",
"abortable-iterator": "^4.0.2",
"err-code": "^3.0.1",
"it-merge": "^1.0.4",
Expand Down
105 changes: 105 additions & 0 deletions proto_ts/message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// @generated by protobuf-ts 2.8.0
// @generated from protobuf file "message.proto" (package "webrtc.pb", syntax proto2)
// tslint:disable
import type { BinaryWriteOptions } from "@protobuf-ts/runtime";
import type { IBinaryWriter } from "@protobuf-ts/runtime";
import { WireType } from "@protobuf-ts/runtime";
import type { BinaryReadOptions } from "@protobuf-ts/runtime";
import type { IBinaryReader } from "@protobuf-ts/runtime";
import { UnknownFieldHandler } from "@protobuf-ts/runtime";
import type { PartialMessage } from "@protobuf-ts/runtime";
import { reflectionMergePartial } from "@protobuf-ts/runtime";
import { MESSAGE_TYPE } from "@protobuf-ts/runtime";
import { MessageType } from "@protobuf-ts/runtime";
/**
* @generated from protobuf message webrtc.pb.Message
*/
export interface Message {
/**
* @generated from protobuf field: optional webrtc.pb.Message.Flag flag = 1;
*/
flag?: Message_Flag;
/**
* @generated from protobuf field: optional bytes message = 2;
*/
message?: Uint8Array;
}
/**
* @generated from protobuf enum webrtc.pb.Message.Flag
*/
export enum Message_Flag {
/**
* The sender will no longer send messages on the stream.
*
* @generated from protobuf enum value: FIN = 0;
*/
FIN = 0,
/**
* The sender will no longer read messages on the stream. Incoming data is
* being discarded on receipt.
*
* @generated from protobuf enum value: STOP_SENDING = 1;
*/
STOP_SENDING = 1,
/**
* The sender abruptly terminates the sending part of the stream. The
* receiver can discard any data that it already received on that stream.
*
* @generated from protobuf enum value: RESET = 2;
*/
RESET = 2
}
// @generated message type with reflection information, may provide speed optimized methods
class Message$Type extends MessageType<Message> {
constructor() {
super("webrtc.pb.Message", [
{ no: 1, name: "flag", kind: "enum", opt: true, T: () => ["webrtc.pb.Message.Flag", Message_Flag] },
{ no: 2, name: "message", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ }
]);
}
create(value?: PartialMessage<Message>): Message {
const message = {};
globalThis.Object.defineProperty(message, MESSAGE_TYPE, { enumerable: false, value: this });
if (value !== undefined)
reflectionMergePartial<Message>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: Message): Message {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* optional webrtc.pb.Message.Flag flag */ 1:
message.flag = reader.int32();
break;
case /* optional bytes message */ 2:
message.message = reader.bytes();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message: Message, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* optional webrtc.pb.Message.Flag flag = 1; */
if (message.flag !== undefined)
writer.tag(1, WireType.Varint).int32(message.flag);
/* optional bytes message = 2; */
if (message.message !== undefined)
writer.tag(2, WireType.LengthDelimited).bytes(message.message);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message webrtc.pb.Message
*/
export const Message = new Message$Type();
20 changes: 20 additions & 0 deletions src/message.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
syntax = "proto2";

package webrtc.pb;

message Message {
enum Flag {
// The sender will no longer send messages on the stream.
FIN = 0;
// The sender will no longer read messages on the stream. Incoming data is
// being discarded on receipt.
STOP_SENDING = 1;
// The sender abruptly terminates the sending part of the stream. The
// receiver can discard any data that it already received on that stream.
RESET = 2;
}

optional Flag flag = 1;

optional bytes message = 2;
}
53 changes: 47 additions & 6 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import merge from 'it-merge';
import { Uint8ArrayList } from 'uint8arraylist';
import { fromString } from 'uint8arrays/from-string';
import { logger } from '@libp2p/logger';
import * as pb from '../proto_ts/message';

const log = logger('libp2p:webrtc:stream');

Expand Down Expand Up @@ -97,7 +98,27 @@ export class WebRTCStream implements Stream {
res = new Uint8Array(data as ArrayBuffer);
}
log.trace(`[stream:${this.id}][${this.stat.direction}] received message: length: ${res.length} ${res}`);
(this.source as Pushable<Uint8ArrayList>).push(new Uint8ArrayList(res));
let m = pb.Message.fromBinary(res);
log(`[stream:${this.id}][${this.stat.direction}] received pb.Message: ${Object.entries(m)}`);
switch (m.flag) {
case undefined:
break; //regular message only
case pb.Message_Flag.STOP_SENDING:
log.trace('Remote has indicated, with "STOP_SENDING" flag, that it will discard any messages we send.');
this.closeWrite();
break;
case pb.Message_Flag.FIN:
log.trace('Remote has indicated, with "FIN" flag, that it will not send any further messages.');
this.closeRead();
break;
case pb.Message_Flag.RESET:
log.trace('Remote abruptly stopped sending, indicated with "RESET" flag.');
this.closeRead();
}
if (m.message) {
log.trace('%s incoming message %s', this.id, m.message);
(this.source as Pushable<Uint8ArrayList>).push(new Uint8ArrayList(m.message));
}
};

this.channel.onclose = (_evt) => {
Expand Down Expand Up @@ -128,7 +149,10 @@ export class WebRTCStream implements Stream {
if (closed || this.writeClosed) {
break;
}
this.channel.send(buf.subarray());
let res = buf.subarray();
let send_buf = pb.Message.toBinary({ message: buf.subarray() });
log.trace(`[stream:${this.id}][${this.stat.direction}] sending message: length: ${res.length} ${res}, encoded through pb as ${send_buf}`);
this.channel.send(send_buf);
}
}

Expand All @@ -141,15 +165,16 @@ export class WebRTCStream implements Stream {
}
this.stat.timeline.close = new Date().getTime();
this.closed = true;
this.closeRead();
this.closeWrite();
this.readClosed = true;
this.writeClosed = true;
this.channel.close();
}

/**
* Close a stream for reading only
*/
closeRead(): void {
this._sendFlag(pb.Message_Flag.STOP_SENDING);
this.readClosed = true;
(this.source as Pushable<Uint8ArrayList>).end();
if (this.readClosed && this.writeClosed) {
Expand All @@ -161,6 +186,7 @@ export class WebRTCStream implements Stream {
* Close a stream for writing only
*/
closeWrite(): void {
this._sendFlag(pb.Message_Flag.FIN);
this.writeClosed = true;
this.closeWritePromise.resolve();
if (this.readClosed && this.writeClosed) {
Expand All @@ -176,10 +202,25 @@ export class WebRTCStream implements Stream {
}

/**
* Call when a remote error occurs, should close the stream for reading and writing
* Close the stream for writing, and indicate to the remote side this is being done 'abruptly'
* @see closeWrite
*/
reset(): void {
this.close();
this.stat = defaultStat(this.stat.direction);
this._sendFlag(pb.Message_Flag.RESET);
this.writeClosed = true;
this.closeWritePromise.resolve();
if (this.readClosed && this.writeClosed) {
this.close();
}
}

private _sendFlag(flag: pb.Message_Flag): void {
try {
log('Sending flag: %s', flag.toString());
this.channel.send(pb.Message.toBinary({ flag: flag }));
} catch (e) {
log.error(`Exception while sending flag ${flag}: ${e}`);
}
}
}
1 change: 0 additions & 1 deletion src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ export class WebRTCTransport implements Transport, Initializable {
dataChannelOpenPromise.reject();
}, HANDSHAKE_TIMEOUT_MS);

await dataChannelOpenPromise.promise;
await this.componentsPromise.promise;

let myPeerId = await this.getPeerId();
Expand Down
34 changes: 16 additions & 18 deletions test/connection.browser.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,44 @@
import {createConnectionPair, echoHandler} from "../test/util.js";
import { expect } from 'aegir/chai';
import { pipe } from 'it-pipe';
import all from 'it-all';
import first from 'it-first';
import {fromString} from 'uint8arrays/from-string';
import {v4} from 'uuid';
import { v4 } from 'uuid';

const echoProtocol = "/echo/1.0.0"
const echoProtocol = '/echo/1.0.0';

describe('connection browser tests', () => {
it('can run the echo protocol (first)', async () => {
let [{ connection: client }, server] = await createConnectionPair();
let serverRegistrar = server.registrar;
await serverRegistrar.handle(echoProtocol, echoHandler, { maxInboundStreams: 10, maxOutboundStreams: 10 })
await serverRegistrar.handle(echoProtocol, echoHandler, { maxInboundStreams: 10, maxOutboundStreams: 10 });
let clientStream = await client.newStream([echoProtocol]);
let data = fromString(v4());
let response = await pipe(
[data],
clientStream,
async (source) => await first(source),
);
let response = await pipe([data], clientStream, async (source) => await first(source));

expect(response).to.not.be.undefined;
expect(response!.subarray()).to.equalBytes(data);
});

it('can run the echo protocol (all)', async () => {
//enableLogger('libp2p:webrtc:connection');
//enableLogger('libp2p:webrtc:stream');
let [{ connection: client }, server] = await createConnectionPair();
let serverRegistrar = server.registrar;
await serverRegistrar.handle(echoProtocol, echoHandler, { maxInboundStreams: 10, maxOutboundStreams: 10 })
await serverRegistrar.handle(echoProtocol, echoHandler, { maxInboundStreams: 10, maxOutboundStreams: 10 });
let clientStream = await client.newStream([echoProtocol]);
// close stream after 2 seconds
setTimeout(() => clientStream.close(), 2000);
let data = fromString(v4());
let response = await pipe(
[data],
clientStream,
async (source) => await all(source),
);

expect(response).to.not.be.undefined;
expect(response![0].subarray()).to.equalBytes(data);
clientStream.sink([data]);
let responsed = false;
for await (const response of clientStream.source) {
expect(response).to.not.be.undefined;
expect(response.subarray()).to.equalBytes(data);
responsed = true;
break;
}
expect(responsed).to.be.true();
});
});

Expand Down
Loading

0 comments on commit e156adf

Please sign in to comment.