Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa guida illustra un modo rapido per configurare e usare Orleans Streams. Per altre informazioni sui dettagli delle funzionalità di streaming, leggere altre parti di questa documentazione.
Configurazioni necessarie
In questa guida si usa un flusso basato sulla memoria che usa la messaggistica granulare per inviare dati di flusso ai sottoscrittori. Il provider di archiviazione in memoria viene usato per archiviare elenchi di sottoscrizioni. L'uso di meccanismi basati sulla memoria per lo streaming e l'archiviazione è destinato solo allo sviluppo e ai test locali, non per gli ambienti di produzione.
Orleans streaming richiede il pacchetto NuGet Microsoft.Orleans.Streaming. Questo pacchetto fornisce la funzionalità di streaming sia per il client che per il server, incluso il AddMemoryStreams metodo di estensione usato in questa guida.
Nel silo, dove silo è un ISiloBuilder, chiamare AddMemoryStreams:
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
Sul client del cluster, dove client è un IClientBuilder, chiama AddMemoryStreams.
client.AddMemoryStreams("StreamProvider");
In questa guida usare un flusso semplice basato su messaggi che usa la messaggistica granulare per inviare dati di flusso ai sottoscrittori. Usare il provider di archiviazione in memoria per archiviare elenchi di sottoscrizioni; questa non è una scelta saggia per le applicazioni di produzione reali.
Nel silo, dove hostBuilder è un ISiloHostBuilder, chiamare AddSimpleMessageStreamProvider:
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
Sul client del cluster, dove clientBuilder è un IClientBuilder, chiama AddSimpleMessageStreamProvider.
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
Annotazioni
Per impostazione predefinita, i messaggi passati sul flusso di messaggi semplici sono considerati non modificabili e possono essere passati per riferimento ad altri grani. Per disattivare questo comportamento, configurare il provider SMS per disattivare SimpleMessageStreamProviderOptions.OptimizeForImmutableData.
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
È possibile creare flussi, inviare dati usandoli come producer e ricevere i dati come sottoscrittori.
Generare eventi
È relativamente facile produrre eventi per i flussi di dati. In primo luogo, accedere al provider di flusso definito in precedenza nella configurazione ("StreamProvider"), quindi scegliere un flusso e trasmettere i dati.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
È relativamente facile produrre eventi per i flussi di dati. In primo luogo, accedere al provider di flusso definito in precedenza nella configurazione ("SMSProvider"), quindi scegliere un flusso e trasmettere i dati.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
Come si può notare, il flusso ha un GUID e uno spazio dei nomi. In questo modo è facile identificare flussi univoci. Ad esempio, il namespace per una chat room potrebbe essere "Rooms" e il GUID potrebbe essere quello del proprietario RoomGrain.
In questo caso, usare il GUID di una chat room nota. Usando il metodo del flusso, eseguire il OnNextAsync push dei dati. Eseguire questa operazione all'interno di un timer usando numeri casuali. È anche possibile usare qualsiasi altro tipo di dati per il flusso.
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
Sottoscrivere e ricevere dati di streaming
Per la ricezione di dati, è possibile usare sottoscrizioni implicite ed esplicite, descritte in dettaglio in Sottoscrizioni esplicite e implicite. In questo esempio vengono usate sottoscrizioni implicite, che sono più semplici. Quando un tipo di grana vuole sottoscrivere in modo implicito un flusso, utilizza l'attributo [ImplicitStreamSubscription(namespace)].
Per il tuo caso, definisci un esempio ReceiverGrain simile al seguente:
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
Ogni volta che i dati vengono inviati ai flussi nel namespace RANDOMDATA (come nell'esempio del timer), un grain di tipo ReceiverGrain con lo stesso flusso Guid riceve il messaggio. Anche se non esistono attivazioni di grain al momento, il runtime ne crea automaticamente uno nuovo e gli invia il messaggio.
Per eseguire questa operazione, completare il processo di sottoscrizione impostando il OnNextAsync metodo per la ricezione dei dati. A tale scopo, il ReceiverGrain dovrebbe chiamare qualcosa di simile a questo nel suo OnActivateAsync:
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
Tutto pronto! Ora, l'unico requisito è che qualcosa attiva la creazione del grano del produttore. Registra quindi il timer e inizia a inviare numeri interi casuali a tutte le parti interessate.
Anche in questo caso, questa guida ignora molti dettagli e offre solo una panoramica generale. Leggere altre parti di questo manuale e altre risorse su Rx per ottenere una buona comprensione delle funzionalità disponibili e del relativo funzionamento.
La programmazione reattiva può essere un approccio efficace per risolvere molti problemi. Ad esempio, è possibile usare LINQ nel sottoscrittore per filtrare i numeri ed eseguire varie operazioni interessanti.